In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connecting to security.u                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connecting to security.u                                                                               Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connecting to security.u0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connecting to security.u                                                                               Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:5 https://

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
#!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Vine_Review_Analysis").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Furniture_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Furniture_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

In [5]:
#from pyspark.sql.types import StructType,StructField,StringType,ArrayType,MapType
#from pyspark.sql.functions import expr,when,col,lit,sum,avg,max
#spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

### Create DataFrames to match tables

In [6]:
from pyspark.sql.functions import to_date 
from pyspark.sql import Row, DataFrame, column
from pyspark.sql.types import IntegerType
# Read in the Review dataset as a DataFrame
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   24509695|R3VR960AHLFKDV|B004HB5E0E|     488241329|Shoal Creek Compu...|       Furniture|          4|            0|          0|   N|                Y|... desk is very ...|This desk is very...| 2015-08-31|
|         US|   34731776|R16LGVMFKIUT0G|B0042TNMMS|     205864445|Dorel Home Produc...|       Furniture|          5|    

In [7]:
# Create the vine_table. DataFrame
##df.columns
vine_df = df.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase'])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3VR960AHLFKDV|          4|            0|          0|   N|                Y|
|R16LGVMFKIUT0G|          5|            0|          0|   N|                Y|
|R1AIMEEPYHMOE4|          5|            1|          1|   N|                Y|
|R1892CCSZWZ9SR|          3|            0|          0|   N|                Y|
|R285P679YWVKD1|          3|            0|          0|   N|                N|
| RLB33HJBXHZHU|          5|            0|          0|   N|                Y|
|R1VGTZ94DBAD6A|          5|            2|          2|   N|                Y|
|R168KF82ICSOHD|          5|            0|          0|   N|                Y|
|R20DIYIJ0OCMOG|          5|            0|          0|   N|                Y|
| RD46RNVOHNZSC|          5|            0|          0|   N|     

In [8]:
vine_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)



# Step 1

In [9]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DataType
from pyspark.sql.functions import udf

In [10]:
# Filter data & CREATE A NEW DataFrame to retrieve all the rows 
# where total_votes `count is equal to or greater than 20
total_votes_df=vine_df.where("total_votes >=20")#.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase']).show(truncate=False)
total_votes_df.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase']).show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|RL8D0KJ0J9L0O |5          |152          |165        |N   |Y                |
|R1BEINAIQFBRJC|5          |21           |23         |N   |Y                |
|R2L59KIJH302P9|4          |26           |26         |N   |Y                |
|RR99CPG695T0I |5          |215          |248        |N   |N                |
|R1XQNKKUPCMWVO|5          |43           |44         |N   |Y                |
|R3JUXVCT1NSK2A|3          |25           |26         |N   |Y                |
|R3GNSIFV1J2Y2B|1          |15           |60         |N   |N                |
|RTCRZARYY4LXX |5          |52           |54         |N   |Y                |
|R3OFB4P7Y8WR27|1          |15           |26         |N   |Y                |
|R3MTAYGQM25N63|4          |58           |59         |N   |Y    

# Step 2

In [24]:
# #Filter the DataFrame created in Step1 & CREATE A NEW DataFrame to retrieve all the rows 
# where the number of helpful_votes divided by total_votes is equal to or greater than 50%
helpful__df = total_votes_df.where(total_votes_df['helpful_votes']/total_votes_df['total_votes'] >= 0.50)
helpful__df.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase']).show(truncate=False)
helpful__df.count()

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|RL8D0KJ0J9L0O |5          |152          |165        |N   |Y                |
|R1BEINAIQFBRJC|5          |21           |23         |N   |Y                |
|R2L59KIJH302P9|4          |26           |26         |N   |Y                |
|RR99CPG695T0I |5          |215          |248        |N   |N                |
|R1XQNKKUPCMWVO|5          |43           |44         |N   |Y                |
|R3JUXVCT1NSK2A|3          |25           |26         |N   |Y                |
|RTCRZARYY4LXX |5          |52           |54         |N   |Y                |
|R3OFB4P7Y8WR27|1          |15           |26         |N   |Y                |
|R3MTAYGQM25N63|4          |58           |59         |N   |Y                |
|RJNDSWES5ISZ7 |5          |78           |79         |N   |Y    

18155

In [20]:
# Filter the DataFrame created in Step2, & CREATE A NEW DataFrame that retrieves all the rows 
# where a review was written as part of the Vine program (paid), vine == 'Y'
paid_vine_df = helpful__df.filter(helpful__df.vine == 'Y')
paid_vine_df.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase']).show(truncate=False)
paid_vine_df.count()

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2BQOD1R0228FN|3          |17           |26         |Y   |N                |
|RC31RUPFOHBHQ |4          |102          |117        |Y   |N                |
|REN3N1WITLF1Y |5          |33           |37         |Y   |N                |
|R71RZQ9UZVG47 |4          |38           |47         |Y   |N                |
|R38NMQBH88HLM6|4          |18           |24         |Y   |N                |
|R33FGX9EE3QVR6|4          |26           |26         |Y   |N                |
|R1KIOIK6WEYE59|3          |19           |20         |Y   |N                |
|R25X9BMOB3FD0E|4          |32           |37         |Y   |N                |
|R3VCKFCX2377Q2|4          |95           |101        |Y   |N                |
|R1E0OUG63HMSM4|3          |58           |61         |Y   |N    

136

# Step 4 -UNPAID REVIEWS 

In [19]:
# Filter the DataFrame created in Step2, & CREATE A NEW DataFrame that retrieves all the rows 
#  where the review was not part of the Vine program (unpaid), vine == 'N'
unpaid_vine_df = helpful__df.filter(helpful__df.vine == 'N')
unpaid_vine_df.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase']).show(truncate=False)
unpaid_vine_df.count()

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|RL8D0KJ0J9L0O |5          |152          |165        |N   |Y                |
|R1BEINAIQFBRJC|5          |21           |23         |N   |Y                |
|R2L59KIJH302P9|4          |26           |26         |N   |Y                |
|RR99CPG695T0I |5          |215          |248        |N   |N                |
|R1XQNKKUPCMWVO|5          |43           |44         |N   |Y                |
|R3JUXVCT1NSK2A|3          |25           |26         |N   |Y                |
|RTCRZARYY4LXX |5          |52           |54         |N   |Y                |
|R3OFB4P7Y8WR27|1          |15           |26         |N   |Y                |
|R3MTAYGQM25N63|4          |58           |59         |N   |Y                |
|RJNDSWES5ISZ7 |5          |78           |79         |N   |Y    

18019

# Step 5 

In [14]:
# Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid vs unpaid).

### Determine the total number of reviews,

In [15]:
#total Reviews 
total_reviews = helpful__df.count()
total_reviews 

18155

### Determine the number of 5-star reviews, 

In [16]:
total_five_stars = helpful__df.filter(helpful__df.star_rating ==5).count()
total_five_stars

8556

In [17]:
total_five_stars/total_reviews*100

47.12751308179565

### Percentage of PAID 5-star reviews 

In [22]:
# paid total number of reviews
total_paid_reviews = paid_vine_df.count()
total_paid_reviews

136

In [21]:
paid_five_stars = paid_vine_df.filter(paid_vine_df.star_rating == 5).count()
paid_five_stars

74

In [23]:
paid_five_stars/total_paid_reviews*100

54.41176470588235

### Percentage of UNPAID 5-star reviews

In [25]:
total_unpaid_reviews = unpaid_vine_df.count()
total_unpaid_reviews

18019

In [26]:
# paid 5-star reviews
unpaid_five_stars = unpaid_vine_df.filter(unpaid_vine_df.star_rating == 5).count()
unpaid_five_stars 

8482

In [27]:
unpaid_five_stars/total_unpaid_reviews*100

47.072534546867196