In [2]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:10 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease [15.9 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Get:12 http://security.ubuntu.com/ubuntu bionic-security/restricted amd64 Packages [1,188 kB]
Get:

In [97]:
from pyspark.sql.functions import round, col
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [98]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   12039526| RTIS3L2M1F5SM|B001CXYMFS|     737716809|Thrustmaster T-Fl...|     Video Games|          5|            0|          0|   N|                Y|an amazing joysti...|Used this for Eli...|2015-08-31 00:00:00|
|         US|    9636577| R1ZV7R40OLHKD|B00M920ND6|     569686175|Tonsee 6 buttons ...| 

In [99]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])

vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RTIS3L2M1F5SM|          5|            0|          0|   N|                Y|
| R1ZV7R40OLHKD|          5|            0|          0|   N|                Y|
|R3BH071QLH8QMC|          1|            0|          1|   N|                Y|
|R127K9NTSXA2YH|          3|            0|          0|   N|                Y|
|R32ZWUXDJPW27Q|          4|            0|          0|   N|                Y|
|R3AQQ4YUKJWBA6|          1|            0|          0|   N|                Y|
|R2F0POU5K6F73F|          5|            0|          0|   N|                Y|
|R3VNR804HYSMR6|          5|            0|          0|   N|                Y|
| R3GZTM72WA2QH|          5|            0|          0|   N|                Y|
| RNQOY62705W1K|          4|            0|          0|   N|     

In [100]:
# 1) Filter out the data do that only reviews of or more is shown
filtered_df= vine_df.filter(df.total_votes >= 20)
filtered_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R4PKAZRQJJX14|          1|           21|         34|   N|                N|
|R2CI0Y288CC7E2|          1|           21|         35|   N|                Y|
|R127WEQY2FM1T3|          1|          147|        175|   N|                Y|
|R3EZ0EPYLDA34S|          1|           14|         31|   N|                Y|
|R2FJ94555FZH32|          2|           55|         60|   N|                N|
|R1U3AR67RE273L|          1|           51|         65|   N|                Y|
|R3PZOXA5X1U8KW|          4|           31|         36|   N|                N|
| R6KTC1OPIOIIG|          2|           19|         34|   N|                Y|
|R36O341WWXXKNP|          5|           28|         31|   N|                N|
|R3GSK9MM8DNOYI|          1|            4|         32|   N|     

In [101]:
# confirming data types
filtered_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)



In [102]:
# 2) Created a dataFrame that has the percent helpful votes column

percent_votes_df = filtered_df.withColumn("%_Helpful_Votes", (filtered_df["helpful_votes"]/ filtered_df["total_votes"])*100)
percent_votes_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|   %_Helpful_Votes|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
| R4PKAZRQJJX14|          1|           21|         34|   N|                N| 61.76470588235294|
|R2CI0Y288CC7E2|          1|           21|         35|   N|                Y|              60.0|
|R127WEQY2FM1T3|          1|          147|        175|   N|                Y|              84.0|
|R3EZ0EPYLDA34S|          1|           14|         31|   N|                Y| 45.16129032258064|
|R2FJ94555FZH32|          2|           55|         60|   N|                N| 91.66666666666666|
|R1U3AR67RE273L|          1|           51|         65|   N|                Y| 78.46153846153847|
|R3PZOXA5X1U8KW|          4|           31|         36|   N|                N| 86.11111111111111|
| R6KTC1OPIOIIG|          2|  

In [103]:
# 2) Filtering the df so that only %_Helpful_Votes is 50 or greater

filtered_votes_df = percent_votes_df.filter(percent_votes_df["%_Helpful_Votes"] >= 50)
filtered_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  %_Helpful_Votes|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
| R4PKAZRQJJX14|          1|           21|         34|   N|                N|61.76470588235294|
|R2CI0Y288CC7E2|          1|           21|         35|   N|                Y|             60.0|
|R127WEQY2FM1T3|          1|          147|        175|   N|                Y|             84.0|
|R2FJ94555FZH32|          2|           55|         60|   N|                N|91.66666666666666|
|R1U3AR67RE273L|          1|           51|         65|   N|                Y|78.46153846153847|
|R3PZOXA5X1U8KW|          4|           31|         36|   N|                N|86.11111111111111|
| R6KTC1OPIOIIG|          2|           19|         34|   N|                Y|55.88235294117647|
|R36O341WWXXKNP|          5|           2

In [104]:
# 3) Filter so that vine column is "Y"

Y_vine_df = filtered_votes_df.filter(filtered_votes_df["vine"] == "Y")
Y_vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  %_Helpful_Votes|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R3KKUSGFZWSUIY|          5|           56|         63|   Y|                N|88.88888888888889|
|R10FO5UKKVZBK2|          3|           23|         23|   Y|                N|            100.0|
| RM4KSGEOR7MU1|          5|           19|         24|   Y|                N|79.16666666666666|
| RG7VRMYLEXD23|          4|           22|         26|   Y|                N|84.61538461538461|
|R11O4YSCPSNL6L|          3|           20|         26|   Y|                N|76.92307692307693|
|R286MFBAJ8NPD6|          5|           46|         51|   Y|                N|90.19607843137256|
|R1JRR530H4COA2|          5|           22|         28|   Y|                N|78.57142857142857|
| RQ5WD90PUNBU9|          5|           2

In [106]:
# obtaing total vote count for vine == Y

Y_ratings_count = Y_vine_df.count()
Y_ratings_count

94

In [86]:
# Filtering N_vine_df for 5 stars and confirming it
Y_Fivestars = Y_vine_df.filter(Y_vine_df["star_rating"] == 5)
Y_Fivestars.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  %_Helpful_Votes|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R3KKUSGFZWSUIY|          5|           56|         63|   Y|                N|88.88888888888889|
| RM4KSGEOR7MU1|          5|           19|         24|   Y|                N|79.16666666666666|
|R286MFBAJ8NPD6|          5|           46|         51|   Y|                N|90.19607843137256|
|R1JRR530H4COA2|          5|           22|         28|   Y|                N|78.57142857142857|
| RQ5WD90PUNBU9|          5|           21|         24|   Y|                N|             87.5|
|R3KAW29CJ8L6DQ|          5|           17|         20|   Y|                N|             85.0|
|R1OWK33OPI45KT|          5|           55|         58|   Y|                N|94.82758620689656|
|R2UP7VTED8O425|          5|           3

In [107]:
# Obtain count of vine == Y 5 star reviews

Y_fivestar_count = Y_Fivestars.count()
Y_fivestar_count

48

In [108]:
# Obtain the percent of 5star votes
Y_percent_fivestar_votes = (Y_fivestar_count/Y_ratings_count)*100
Y_percent_fivestar_votes

51.06382978723404

In [109]:
#Placeing info onto a single df
Data = [(Y_ratings_count, Y_fivestar_count, Y_percent_fivestar_votes)]
columns = ["Total_Ratings_Count", "Five_Star_Count", "Five_Star_Count%"]

In [110]:
# 5) created a df for all needed info for vine == Y
Y_df = spark.createDataFrame(Data, columns)
Y_df.show()

+-------------------+---------------+-----------------+
|Total_Ratings_Count|Five_Star_Count| Five_Star_Count%|
+-------------------+---------------+-----------------+
|                 94|             48|51.06382978723404|
+-------------------+---------------+-----------------+



In [111]:
# rounding the  Five_Star_Count% to the tenths place
Y_final_df = Y_df.select("*", round(col("Five_Star_Count%"), 1)).withColumnRenamed("round(Five_Star_Count%, 1)", "Percent_Five_Stars").drop("Five_Star_Count%")
Y_final_df.show()

+-------------------+---------------+------------------+
|Total_Ratings_Count|Five_Star_Count|Percent_Five_Stars|
+-------------------+---------------+------------------+
|                 94|             48|              51.1|
+-------------------+---------------+------------------+



In [112]:
# 4) Filter so that vine column is "N"

N_vine_df = filtered_votes_df.filter(filtered_votes_df["vine"] == "N")
N_vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  %_Helpful_Votes|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
| R4PKAZRQJJX14|          1|           21|         34|   N|                N|61.76470588235294|
|R2CI0Y288CC7E2|          1|           21|         35|   N|                Y|             60.0|
|R127WEQY2FM1T3|          1|          147|        175|   N|                Y|             84.0|
|R2FJ94555FZH32|          2|           55|         60|   N|                N|91.66666666666666|
|R1U3AR67RE273L|          1|           51|         65|   N|                Y|78.46153846153847|
|R3PZOXA5X1U8KW|          4|           31|         36|   N|                N|86.11111111111111|
| R6KTC1OPIOIIG|          2|           19|         34|   N|                Y|55.88235294117647|
|R36O341WWXXKNP|          5|           2

In [113]:
# obtaing total vote count for vine == N
N_ratings_count = N_vine_df.count()
N_ratings_count

40471

In [114]:
# Filtering N_vine_df for 5 stars and confirming it
N_Fivestars = N_vine_df.filter(N_vine_df["star_rating"] == 5)
N_Fivestars.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  %_Helpful_Votes|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R36O341WWXXKNP|          5|           28|         31|   N|                N|90.32258064516128|
|R29BOS5HMAY1LO|          5|           88|        110|   N|                N|             80.0|
| RDX2ZZ46AM343|          5|           32|         34|   N|                Y|94.11764705882352|
| R60GI4Z1CNGGV|          5|           17|         20|   N|                Y|             85.0|
|R2FAARI3JQO9XQ|          5|           29|         30|   N|                Y|96.66666666666667|
|R2WS53BH47WUMT|          5|           68|         84|   N|                N|80.95238095238095|
|R2BBH73BUJYBGK|          5|           18|         22|   N|                N|81.81818181818183|
|R18ZBSSAMJOV8I|          5|          45

In [115]:
# Obtain count of vine == N 5 star reviews

N_fivestar_count = N_Fivestars.count()
N_fivestar_count

15663

In [116]:
# Obtain the percent of 5star votes
percent_fivestar_votes = (N_fivestar_count/N_ratings_count)*100
percent_fivestar_votes

38.701786464381904

In [117]:
#Placeing info onto a single df
Data = [(N_ratings_count, N_fivestar_count, percent_fivestar_votes)]
columns = ["Total_Ratings_Count", "Five_Star_Count", "Five_Star_Count%"]

In [118]:
# 5) created a df for all needed info for vine == N
N_df = spark.createDataFrame(Data, columns)
N_df.show()

+-------------------+---------------+------------------+
|Total_Ratings_Count|Five_Star_Count|  Five_Star_Count%|
+-------------------+---------------+------------------+
|              40471|          15663|38.701786464381904|
+-------------------+---------------+------------------+



In [119]:
# rounding the  Five_Star_Count% to the tenths place
N_final_df = N_df.select("*", round(col("Five_Star_Count%"), 1)).withColumnRenamed("round(Five_Star_Count%, 1)", "Percent_Five_Stars").drop("Five_Star_Count%")
N_final_df.show()

+-------------------+---------------+------------------+
|Total_Ratings_Count|Five_Star_Count|Percent_Five_Stars|
+-------------------+---------------+------------------+
|              40471|          15663|              38.7|
+-------------------+---------------+------------------+

