In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to cloud.r-project.o                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
                                                                               Hit:5 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [3 InRelease 15.6 kB/88.7 kB 18%] [4 InRelease 48.9 kB/88.7 kB 55%] [Connect0% [1 InRelease gpgv 15.9 kB] [3 InRelease 15.6 kB/88.7 kB 18%] [4 InRelease 540% [1 InRelease gpgv 15.9 kB] [3 InRelease 33.0 kB/88.7 kB

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...| 2015-08-31|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...|         Watches|          5|    

In [4]:
# clean df
cleaned_df = df.dropna()
cleaned_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...| 2015-08-31|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...|         Watches|          5|    

In [5]:
# Create the vine_table DataFrame
vine_df = cleaned_df.select(["review_id","star_rating", "helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3O9SGZBVQBV76|          5|            0|          0|   N|                Y|
| RKH8BNC3L5DLF|          5|            0|          0|   N|                Y|
|R2HLE8WKZSU3NL|          2|            1|          1|   N|                Y|
|R31U3UH5AZ42LL|          5|            0|          0|   N|                Y|
|R2SV659OUJ945Y|          4|            0|          0|   N|                Y|
| RA51CP8TR5A2L|          5|            0|          0|   N|                Y|
| RB2Q7DLDN6TH6|          5|            1|          1|   N|                Y|
|R2RHFJV0UYBK3Y|          1|            1|          5|   N|                N|
|R2Z6JOQ94LFHEP|          5|            1|          2|   N|                Y|
| RX27XIIWY5JPB|          4|            0|          0|   N|     

In [6]:
# Step 1: All rows where total_votes count is equal to or greater than 20
from pyspark.sql.functions import col
total_votes_df = vine_df.filter(col("total_votes") >= 20)
total_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R14W2VCHHK5V7W|          5|           19|         20|   N|                Y|
|R1S3T57O3OYT5S|          5|           19|         20|   N|                Y|
|R1BTWIBLYYVOV7|          5|           30|         30|   N|                Y|
| R6F9VY91ADPLA|          1|            8|         30|   N|                N|
|R3PXNV89DFIXKV|          5|           35|         37|   N|                Y|
|R2ZF9NYVT3J7D6|          5|           19|         22|   N|                Y|
|R20NYA5V0UF9NE|          5|           27|         28|   N|                Y|
|R2X8FZRUOS8R8C|          4|           25|         26|   N|                Y|
|R2D8IMBVX3XCLF|          1|           14|         20|   N|                Y|
|R25UD9TA63L3Q8|          5|           25|         27|   N|     

In [7]:
# Step 2: Helpful_votes divided by total_votes greater than 50%.
percent_votes_df = total_votes_df.withColumn('percent_votes',col('helpful_votes')/col('total_votes')).alias('percent_votes').filter(col("percent_votes") >= 0.5)
percent_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     percent_votes|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R14W2VCHHK5V7W|          5|           19|         20|   N|                Y|              0.95|
|R1S3T57O3OYT5S|          5|           19|         20|   N|                Y|              0.95|
|R1BTWIBLYYVOV7|          5|           30|         30|   N|                Y|               1.0|
|R3PXNV89DFIXKV|          5|           35|         37|   N|                Y|0.9459459459459459|
|R2ZF9NYVT3J7D6|          5|           19|         22|   N|                Y|0.8636363636363636|
|R20NYA5V0UF9NE|          5|           27|         28|   N|                Y|0.9642857142857143|
|R2X8FZRUOS8R8C|          4|           25|         26|   N|                Y|0.9615384615384616|
|R2D8IMBVX3XCLF|          1|  

In [13]:
# Step 3: Retrieves all the rows where a review was written as part of the Vine program (paid)
paid_vine_df = percent_votes_df.filter(col("vine") == "Y")
paid_vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     percent_votes|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R1B7M0OP3UNP6O|          5|           49|         52|   Y|                N|0.9423076923076923|
|R2UUV4UGGYMQG8|          5|           34|         39|   Y|                N|0.8717948717948718|
| R9K0LZV2BK9YY|          4|           37|         39|   Y|                N|0.9487179487179487|
|R2OVFLNEUEGTJM|          3|           18|         25|   Y|                N|              0.72|
| RBE09ELJ77LQ0|          5|           44|         45|   Y|                N|0.9777777777777777|
|R3867T8AIJJHM6|          5|           26|         27|   Y|                N|0.9629629629629629|
|R1FNVUXPU63WOZ|          4|           43|         48|   Y|                N|0.8958333333333334|
|R25XGG2G12SE1Z|          4|  

In [14]:
# Step 4: Retrieves all the rows where the review was not part of the Vine program (unpaid)
non_paid_df = percent_votes_df.filter(col("vine") == "N")
non_paid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     percent_votes|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R14W2VCHHK5V7W|          5|           19|         20|   N|                Y|              0.95|
|R1S3T57O3OYT5S|          5|           19|         20|   N|                Y|              0.95|
|R1BTWIBLYYVOV7|          5|           30|         30|   N|                Y|               1.0|
|R3PXNV89DFIXKV|          5|           35|         37|   N|                Y|0.9459459459459459|
|R2ZF9NYVT3J7D6|          5|           19|         22|   N|                Y|0.8636363636363636|
|R20NYA5V0UF9NE|          5|           27|         28|   N|                Y|0.9642857142857143|
|R2X8FZRUOS8R8C|          4|           25|         26|   N|                Y|0.9615384615384616|
|R2D8IMBVX3XCLF|          1|  

In [12]:
# Step 5: Total number of reviews, 5-star reviews, and percentage of 5-star reviews
from pyspark.sql.functions import col,when,count,lit
ratings_total_df = percent_votes_df.groupBy("vine").agg(
    count(col("vine")).alias("Total_Reviews"),
    count(when(col("star_rating") == 5, True)).alias("Total_5_Star_Reviews"),
    (count(when(col("star_rating") == 5, True))/count(col("vine"))*100).alias("%_5_Star_To_Total")).show()

+----+-------------+--------------------+------------------+
|vine|Total_Reviews|Total_5_Star_Reviews| %_5_Star_To_Total|
+----+-------------+--------------------+------------------+
|   Y|           47|                  15|31.914893617021278|
|   N|         8343|                4318| 51.75596308282392|
+----+-------------+--------------------+------------------+

