In [2]:
!pip install py4j
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.0'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Collecting py4j
[?25l  Downloading https://files.pythonhosted.org/packages/30/42/25ad191f311fcdb38b750d49de167abd535e37a144e730a80d7c439d1751/py4j-0.10.9.1-py2.py3-none-any.whl (198kB)
[K     |█▋                              | 10kB 9.1MB/s eta 0:00:01[K     |███▎                            | 20kB 12.7MB/s eta 0:00:01[K     |█████                           | 30kB 15.6MB/s eta 0:00:01[K     |██████▋                         | 40kB 10.3MB/s eta 0:00:01[K     |████████▎                       | 51kB 10.7MB/s eta 0:00:01[K     |██████████                      | 61kB 9.9MB/s eta 0:00:01[K     |███████████▌                    | 71kB 8.7MB/s eta 0:00:01[K     |█████████████▏                  | 81kB 9.2MB/s eta 0:00:01[K     |██████████████▉                 | 92kB 10.0MB/s eta 0:00:01[K     |████████████████▌               | 102kB 9.4MB/s eta 0:00:01[K     |██████████████████▏             | 112kB 9.4MB/s eta 0:00:01[K     |███████████████████▉            | 122kB 9.4MB/s e

In [3]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-01-31 20:14:46--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2021-01-31 20:14:46 (4.77 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [6]:
#Bring in the DataFrame for Video Game Review Data

from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   12039526| RTIS3L2M1F5SM|B001CXYMFS|     737716809|Thrustmaster T-Fl...|     Video Games|          5|            0|          0|   N|                Y|an amazing joysti...|Used this for Eli...| 2015-08-31|
|         US|    9636577| R1ZV7R40OLHKD|B00M920ND6|     569686175|Tonsee 6 buttons ...|     Video Games|          5|    

In [7]:
# Create the vine_table. DataFrame
vine_df = df.select(['review_id', 'star_rating', 'helpful_votes','total_votes','vine','verified_purchase'])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RTIS3L2M1F5SM|          5|            0|          0|   N|                Y|
| R1ZV7R40OLHKD|          5|            0|          0|   N|                Y|
|R3BH071QLH8QMC|          1|            0|          1|   N|                Y|
|R127K9NTSXA2YH|          3|            0|          0|   N|                Y|
|R32ZWUXDJPW27Q|          4|            0|          0|   N|                Y|
|R3AQQ4YUKJWBA6|          1|            0|          0|   N|                Y|
|R2F0POU5K6F73F|          5|            0|          0|   N|                Y|
|R3VNR804HYSMR6|          5|            0|          0|   N|                Y|
| R3GZTM72WA2QH|          5|            0|          0|   N|                Y|
| RNQOY62705W1K|          4|            0|          0|   N|     

In [8]:
#Filtering down to where products received 20 or more reviews.
vine_df_filtered = vine_df.filter("total_votes>=20")
vine_df_filtered.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R4PKAZRQJJX14|          1|           21|         34|   N|                N|
|R2CI0Y288CC7E2|          1|           21|         35|   N|                Y|
|R127WEQY2FM1T3|          1|          147|        175|   N|                Y|
|R3EZ0EPYLDA34S|          1|           14|         31|   N|                Y|
|R2FJ94555FZH32|          2|           55|         60|   N|                N|
|R1U3AR67RE273L|          1|           51|         65|   N|                Y|
|R3PZOXA5X1U8KW|          4|           31|         36|   N|                N|
| R6KTC1OPIOIIG|          2|           19|         34|   N|                Y|
|R36O341WWXXKNP|          5|           28|         31|   N|                N|
|R3GSK9MM8DNOYI|          1|            4|         32|   N|     

In [9]:
#Taking the prior DataFrame and further filtering to those that have a better proportion of 'helpful' reviews within the product's set of total reviews.
vine_df_helpful = vine_df_filtered.filter("(helpful_votes/total_votes)>=0.5")
vine_df_helpful.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R4PKAZRQJJX14|          1|           21|         34|   N|                N|
|R2CI0Y288CC7E2|          1|           21|         35|   N|                Y|
|R127WEQY2FM1T3|          1|          147|        175|   N|                Y|
|R2FJ94555FZH32|          2|           55|         60|   N|                N|
|R1U3AR67RE273L|          1|           51|         65|   N|                Y|
|R3PZOXA5X1U8KW|          4|           31|         36|   N|                N|
| R6KTC1OPIOIIG|          2|           19|         34|   N|                Y|
|R36O341WWXXKNP|          5|           28|         31|   N|                N|
|R10LZVBLQHBVJ0|          2|          151|        198|   N|                N|
|R1VR5GLGY1GE7N|          1|           49|         51|   N|     

In [10]:
#Set of filtered reviews that had a good distribution of helpful reviews and was part of the Vine program
vine_df_paid = vine_df_helpful.filter("vine='Y'")
vine_df_paid.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3KKUSGFZWSUIY|          5|           56|         63|   Y|                N|
|R10FO5UKKVZBK2|          3|           23|         23|   Y|                N|
| RM4KSGEOR7MU1|          5|           19|         24|   Y|                N|
| RG7VRMYLEXD23|          4|           22|         26|   Y|                N|
|R11O4YSCPSNL6L|          3|           20|         26|   Y|                N|
|R286MFBAJ8NPD6|          5|           46|         51|   Y|                N|
|R1JRR530H4COA2|          5|           22|         28|   Y|                N|
| RQ5WD90PUNBU9|          5|           21|         24|   Y|                N|
|R12648VHCQWUV9|          4|           21|         28|   Y|                N|
|R3KAW29CJ8L6DQ|          5|           17|         20|   Y|     

In [11]:
#Set of filtered reviews that had a good distribution of helpful reviews and was part of the Vine program
vine_df_unpaid = vine_df_helpful.filter("vine='N'")
vine_df_unpaid.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R4PKAZRQJJX14|          1|           21|         34|   N|                N|
|R2CI0Y288CC7E2|          1|           21|         35|   N|                Y|
|R127WEQY2FM1T3|          1|          147|        175|   N|                Y|
|R2FJ94555FZH32|          2|           55|         60|   N|                N|
|R1U3AR67RE273L|          1|           51|         65|   N|                Y|
|R3PZOXA5X1U8KW|          4|           31|         36|   N|                N|
| R6KTC1OPIOIIG|          2|           19|         34|   N|                Y|
|R36O341WWXXKNP|          5|           28|         31|   N|                N|
|R10LZVBLQHBVJ0|          2|          151|        198|   N|                N|
|R1VR5GLGY1GE7N|          1|           49|         51|   N|     

In [12]:
#Look at the review splits between Vine and Non-Vine members in terms of number of reviews, number of 5-star reviews, and percentage of 5-star reviews
# Total votes cast based on paid/unpaid membership status

vine_splits_total_votes = vine_df.groupby(vine_df["vine"]).agg({"total_votes":"count"}).withColumnRenamed("count(total_votes)", "total_votes")
vine_splits_total_votes.show()

+----+-----------+
|vine|total_votes|
+----+-----------+
|   Y|       4291|
|   N|    1781706|
+----+-----------+



In [39]:
# Number of 5 Star reviews based on paid/unpaid membership status

num_5_star = vine_df.filter("star_rating=5")
vine_splits_num_5_star = num_5_star.groupby(num_5_star["vine"]).agg({"star_rating":"count"}).withColumnRenamed("count(star_rating)", "num_5_star")
vine_splits_num_5_star.show()

+----+----------+
|vine|num_5_star|
+----+----------+
|   Y|      1607|
|   N|   1025317|
+----+----------+



In [47]:
# Let's give these two tables an alias!

ta = vine_splits_total_votes.alias('ta')
tb = vine_splits_num_5_star.alias('tb')

In [49]:
# Join the two together!

vine_inner_join = ta.join(tb, ta.vine == tb.vine)

# Give it a nickname
vij = vine_inner_join.alias('vij')
vij.show()

+----+-----------+----+----------+
|vine|total_votes|vine|num_5_star|
+----+-----------+----+----------+
|   Y|       4291|   Y|      1607|
|   N|    1781706|   N|   1025317|
+----+-----------+----+----------+



In [50]:
#Come up with the percent of 5 star reviews in relation to total reviews within each bucket of paid/unpaid memberships

vine_ij_percentage = vij.withColumn('percent_5_star', vij["num_5_star"]/vij["total_votes"])
vine_ij_percentage.show()

+----+-----------+----+----------+------------------+
|vine|total_votes|vine|num_5_star|    percent_5_star|
+----+-----------+----+----------+------------------+
|   Y|       4291|   Y|      1607|0.3745047774411559|
|   N|    1781706|   N|   1025317|0.5754692412777417|
+----+-----------+----+----------+------------------+

