In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Hit:9 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-HW-2").getOrCreate()

# Load Amazon Data into Spark DataFrame

In [10]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
video_games_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Video_Games_v1_00.tsv.gz"), sep="\t", header=True,inferSchema=True)
video_games_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   12039526| RTIS3L2M1F5SM|B001CXYMFS|     737716809|Thrustmaster T-Fl...|     Video Games|          5|            0|          0|   N|                Y|an amazing joysti...|Used this for Eli...| 2015-08-31|
|         US|    9636577| R1ZV7R40OLHKD|B00M920ND6|     569686175|Tonsee 6 buttons ...|     Video Games|          5|    

# Filter by Votes

In [11]:
filtered_video_games_df=video_games_df.select("star_rating","helpful_votes","total_votes","vine","verified_purchase").filter("verified_purchase=='Y'")
filtered_video_games_df.show(10)

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          1|            0|          1|   N|                Y|
|          3|            0|          0|   N|                Y|
|          4|            0|          0|   N|                Y|
|          1|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          4|            0|          0|   N|                Y|
+-----------+-------------+-----------+----+-----------------+
only showing top 10 rows



In [12]:
# Filter for greater than 20 total votes
total_votes_df=video_games_df.select("star_rating","helpful_votes","total_votes","vine","verified_purchase").filter("total_votes>20")
total_votes_df.show(10)

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          1|           21|         34|   N|                N|
|          1|           21|         35|   N|                Y|
|          1|          147|        175|   N|                Y|
|          1|           14|         31|   N|                Y|
|          2|           55|         60|   N|                N|
|          1|           51|         65|   N|                Y|
|          4|           31|         36|   N|                N|
|          2|           19|         34|   N|                Y|
|          5|           28|         31|   N|                N|
|          1|            4|         32|   N|                N|
+-----------+-------------+-----------+----+-----------------+
only showing top 10 rows



In [13]:
video_games_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [17]:
from pyspark.sql.functions import col,when

In [19]:
# Filter for greater than 50% helpful vote percentage
helpful_votes_df= total_votes_df.withColumn("percent_helpful_votes",(col("helpful_votes")/col("total_votes"))*100)
helpful_votes_df= helpful_votes_df.filter("percent_helpful_votes>50").select("star_rating","helpful_votes","total_votes","vine","verified_purchase")
helpful_votes_df.show(10)

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          1|           21|         34|   N|                N|
|          1|           21|         35|   N|                Y|
|          1|          147|        175|   N|                Y|
|          2|           55|         60|   N|                N|
|          1|           51|         65|   N|                Y|
|          4|           31|         36|   N|                N|
|          2|           19|         34|   N|                Y|
|          5|           28|         31|   N|                N|
|          2|          151|        198|   N|                N|
|          1|           49|         51|   N|                Y|
+-----------+-------------+-----------+----+-----------------+
only showing top 10 rows



# Describe Stats

In [33]:
# Paid reviews
paid_df=video_games_df.select("star_rating","helpful_votes","total_votes","vine","verified_purchase").filter("vine=='Y'")
paid_df.describe().show(10)

+-------+------------------+-----------------+------------------+----+-----------------+
|summary|       star_rating|    helpful_votes|       total_votes|vine|verified_purchase|
+-------+------------------+-----------------+------------------+----+-----------------+
|  count|              4291|             4291|              4291|4291|             4291|
|   mean| 4.074807737124213|2.348403635516197|3.2780237706828244|null|             null|
| stddev|0.9182159041910316|12.57900104398382|13.768374606698877|null|             null|
|    min|                 1|                0|                 0|   Y|                N|
|    max|                 5|              347|               362|   Y|                Y|
+-------+------------------+-----------------+------------------+----+-----------------+



In [32]:
# Paid helpful reviews that are unverified
helpful_votes_df.filter("vine=='Y'").filter("verified_purchase=='N'").describe().show()

+-------+------------------+-----------------+------------------+----+-----------------+
|summary|       star_rating|    helpful_votes|       total_votes|vine|verified_purchase|
+-------+------------------+-----------------+------------------+----+-----------------+
|  count|                90|               90|                90|  90|               90|
|   mean| 4.166666666666667|56.27777777777778|63.644444444444446|null|             null|
| stddev|0.9858550158920685|66.20465847653779|  69.8570468260785|null|             null|
|    min|                 1|               16|                21|   Y|                N|
|    max|                 5|              347|               362|   Y|                N|
+-------+------------------+-----------------+------------------+----+-----------------+



In [34]:
# Unpaid reviews
unpaid_df=video_games_df.select("star_rating","helpful_votes","total_votes","vine","verified_purchase").filter("vine=='N'")
unpaid_df.describe().show()

+-------+------------------+-----------------+------------------+-------+-----------------+
|summary|       star_rating|    helpful_votes|       total_votes|   vine|verified_purchase|
+-------+------------------+-----------------+------------------+-------+-----------------+
|  count|           1781706|          1781706|           1781706|1781706|          1781706|
|   mean| 4.059856676690767|2.259134784302236| 3.758523011091617|   null|             null|
| stddev|1.3566952586970766| 19.1844293543374|22.471248446467634|   null|             null|
|    min|                 1|                0|                 0|      N|                N|
|    max|                 5|            10498|             10780|      N|                Y|
+-------+------------------+-----------------+------------------+-------+-----------------+



# Determine the percentage of five-star review among vine reviews

In [37]:
paid_number=paid_df.count()
paid_five_star_number=paid_df.filter("star_rating==5").count()
percentage_five_star_vine=paid_five_star_number/paid_number
print(f'Number of paid reviews {paid_number}')
print(f'Number of paid five star reviews {paid_five_star_number}')
print(f'Percantage of paid reviews that are five stars {percentage_five_star_vine * 100}%')

Number of paid reviews 4291
Number of paid five star reviews 1607
Percantage of paid reviews that are five stars 37.45047774411559%


# Determine the percentage of five-star review among non-Vine reviews

In [38]:
unpaid_number=unpaid_df.count()
unpaid_five_star_number=unpaid_df.filter("star_rating==5").count()
percentage_five_star_non_vine=unpaid_five_star_number/unpaid_number
print(f'Number of unpaid reviews {unpaid_number}')
print(f'Number of unpaid five star reviews {unpaid_five_star_number}')
print(f'Percantage of paid reviews that are five stars {percentage_five_star_non_vine * 100}%')

Number of unpaid reviews 1781706
Number of unpaid five star reviews 1025317
Percantage of paid reviews that are five stars 57.54692412777417%
