### Start Spark Session

In [None]:
import os

# Find the latest versions of
#   Spark & Hadoop:  https://www.apache.org/dist/spark/
#                    https://spark.apache.org/downloads.html
#   Postgres driver: https://jdbc.postgresql.org/
os.environ['HADOOP_VERSION']   = hadoop_version   = 'hadoop3'
os.environ['SPARK_VERSION']    = spark_version    = 'spark-3.3.0'
os.environ['POSTGRES_VERSION'] = postgres_version = 'postgresql-42.5.0'

! apt update
! apt full-upgrade
! apt autoremove

# Install Java
! apt install openjdk-11-jdk-headless > /dev/null
os.environ['JAVA_HOME']  = '/usr/lib/jvm/java-11-openjdk-amd64'

# Install Spark
! wget https://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-$HADOOP_VERSION.tgz
! tar xf $SPARK_VERSION-bin-$HADOOP_VERSION.tgz
os.environ['SPARK_HOME'] = f'/content/{spark_version}-bin-{hadoop_version}'
! pip install findspark

# Install Postgres driver
! wget https://jdbc.postgresql.org/download/$POSTGRES_VERSION.jar

# Install AWS's Boto3
! pip install boto3

! pwd && ls -al

# Start a SparkSession
import findspark
findspark.init()

!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

from   pyspark.sql import SparkSession
import boto3

# <NAME THE APP>
spark = SparkSession.builder \
  .appName('Vine-Review-Analysis') \
  .config('spark.driver.extraClassPath', f'/content/{postgres_version}.jar') \
  .getOrCreate()
  
spark

### Load Amazon Data into Spark DataFrame

In [None]:
from pyspark import SparkFiles
url = "https://csbrew172-bucket.s3.amazonaws.com/outdoors_reviews.tsv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("outdoors_reviews.tsv"), sep="\t", header=True, inferSchema=True)
df.show(10)

### Create DataFrames to match tables

In [4]:
# Create the vine_table DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R35T75OLUGHL5C|          4|            0|          0|   N|                Y|
|R2BV735O46BN33|          5|            0|          0|   N|                Y|
|R2NBEUGPQQGXP1|          4|            0|          0|   N|                Y|
|R17LLAOJ8ITK0S|          3|            1|          1|   N|                Y|
|R39PEQBT5ISEF4|          1|            0|          0|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



### Perform Vine Analysis

Filtering Vine data

In [9]:
from pyspark.sql.functions import col

# Filter for reviews with more than 20 votes
votes_df = vine_df.filter(col("total_votes") > 20)

# Filter for reviews where helpful votes were at leat 50% of total votes
helpful_df = votes_df.filter((col("helpful_votes") / col("total_votes")) >= 0.5)

# Filter for reviews in the Vine program
in_vine_df = helpful_df.filter(col("vine") == "Y")

# Filter for reviews not in the Vine program
not_vine_df = helpful_df.filter(col("vine") == "N")

Total Review Count: Paid vs Unpaid

In [14]:
# Determine total reviews in Vine
paid_total = in_vine_df.count()
print(paid_total)

103


In [15]:
# Determine total reviews not in Vine
unpaid_total = not_vine_df.count()
print(unpaid_total)

37441


5-Star Review Count: Paid vs Unpaid

In [16]:
# Determine 5-star reviews in Vine
paid_5_stars = in_vine_df.filter(col("star_rating") == 5).count()
print(paid_5_stars)

55


In [17]:
# Determine 5-star reviews not in Vine
unpaid_5_stars = not_vine_df.filter(col("star_rating") == 5).count()
print(unpaid_5_stars)

19736


5-Star Review Percentage: Paid vs Unpaid

In [22]:
# Determine percentage of 5-star reviews in Vine
paid_percent = paid_5_stars / paid_total
print("{0:.1%}".format(paid_percent))

53.4%


In [23]:
# Determine percentage of 5-star reviews not in Vine
unpaid_percent = unpaid_5_stars / unpaid_total
print("{0:.1%}".format(unpaid_percent))

52.7%
