In [1]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [W                                                                               Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease [1,581 B]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Waiting for headers] [W                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Get:5 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Hit:6 http://archive.ubuntu.com/ubuntu focal InRelease
Get:7 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubunt

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   10140119|R3LI5TRP3YIDQL|B00TXH4OLC|     384427924|Whatever's for Us...|           Music|          5|            0|          0|   N|                Y|          Five Stars|Love this CD alon...| 2015-08-31|
|         US|   27664622|R3LGC3EKEG84PX|B00B6QXN6U|     831769051|Same Trailer Diff...|           Music|          5|    

In [4]:
from pyspark.sql.functions import to_date

In [25]:
# Create the vine_table. DataFrame
# vine_df = df.select([])
vine_df = df.select(["review_id", "star_rating","helpful_votes","total_votes","vine","verified_purchase"])



In [26]:
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3LI5TRP3YIDQL|          5|            0|          0|   N|                Y|
|R3LGC3EKEG84PX|          5|            0|          0|   N|                Y|
| R9PYL3OYH55QY|          5|            0|          1|   N|                Y|
|R3PWBAWUS4NT0Q|          3|            0|          0|   N|                Y|
|R15LYP3O51UU9E|          5|            0|          0|   N|                Y|
|R1AD7L0CC3DSRI|          5|            0|          0|   N|                Y|
|R32FE8Y45QV434|          5|            0|          0|   N|                Y|
|R3NM4MZ4XWL43Q|          5|            1|          2|   N|                Y|
|R3H4FXX6Q7I37D|          4|            0|          0|   N|                Y|
|R30L5PET7LFFDC|          5|            1|          1|   N|     

In [27]:
# Filter the data to only include reviews with total_votes >= 20
filtered_df = vine_df.filter(vine_df["helpful_votes"] >= 20)


In [28]:
filtered_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2SHXRL6SL1GC9|          3|           25|         26|   N|                Y|
|R2ZC033X86YOY8|          5|           25|         26|   N|                N|
|R2P4PJJ2ROTPBM|          5|           46|         48|   N|                N|
| RO8RAEGB66BKR|          4|           46|         46|   N|                N|
| RRFZ7QZTRJC59|          5|          292|        300|   N|                N|
| RFN4PNRUD1UW2|          4|           21|         22|   N|                N|
| RO7EBJEP7IHIX|          5|           26|         26|   N|                N|
| REIWYIGFMEKEV|          5|           26|         30|   N|                Y|
|R2FV27WWRWROZU|          4|           26|         27|   N|                N|
|R16QHOYRY9QLQU|          5|           36|         39|   N|     

In [29]:
# Calculate the helpful_ratio column
from pyspark.sql.functions import col

In [30]:
ratio_df = filtered_df.withColumn("helpful_ratio", col("helpful_votes") / col("total_votes"))


In [31]:
ratio_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     helpful_ratio|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R2SHXRL6SL1GC9|          3|           25|         26|   N|                Y|0.9615384615384616|
|R2ZC033X86YOY8|          5|           25|         26|   N|                N|0.9615384615384616|
|R2P4PJJ2ROTPBM|          5|           46|         48|   N|                N|0.9583333333333334|
| RO8RAEGB66BKR|          4|           46|         46|   N|                N|               1.0|
| RRFZ7QZTRJC59|          5|          292|        300|   N|                N|0.9733333333333334|
| RFN4PNRUD1UW2|          4|           21|         22|   N|                N|0.9545454545454546|
| RO7EBJEP7IHIX|          5|           26|         26|   N|                N|               1.0|
| REIWYIGFMEKEV|          5|  

Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y'.


In [32]:
# Step. 3 
vine_paid_df = ratio_df.filter(ratio_df.vine == 'Y')

In [33]:
vine_paid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     helpful_ratio|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
| R4V3ICFDTIDIF|          4|           20|         21|   Y|                N|0.9523809523809523|
|R1JZ0JAPW83WFS|          4|           54|         58|   Y|                N|0.9310344827586207|
| RSPA592OKHQMZ|          2|           28|         78|   Y|                N| 0.358974358974359|
|R1XH7EA97FAVP7|          3|           35|         44|   Y|                N|0.7954545454545454|
|R1GGYGVTHP84DG|          4|           25|         27|   Y|                N|0.9259259259259259|
| RXGU9DSKZJSP0|          3|           21|         22|   Y|                N|0.9545454545454546|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+



Repeat Step 3, but this time retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'.

In [34]:
# Step 4: 
vine_unpaid_df = ratio_df.filter(ratio_df.vine == 'N')


In [35]:
vine_unpaid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     helpful_ratio|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R2SHXRL6SL1GC9|          3|           25|         26|   N|                Y|0.9615384615384616|
|R2ZC033X86YOY8|          5|           25|         26|   N|                N|0.9615384615384616|
|R2P4PJJ2ROTPBM|          5|           46|         48|   N|                N|0.9583333333333334|
| RO8RAEGB66BKR|          4|           46|         46|   N|                N|               1.0|
| RRFZ7QZTRJC59|          5|          292|        300|   N|                N|0.9733333333333334|
| RFN4PNRUD1UW2|          4|           21|         22|   N|                N|0.9545454545454546|
| RO7EBJEP7IHIX|          5|           26|         26|   N|                N|               1.0|
| REIWYIGFMEKEV|          5|  


Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid vs unpaid).

In [37]:
from pyspark.sql.functions import count

In [36]:
# paid reviews:

paid_reviews_count = ratio_df.count()



In [42]:
print(paid_reviews_count)

85827


In [40]:
# 5 stars
paid_5_star_reviews_count = vine_paid_df.filter(vine_paid_df.star_rating == 5).count()

In [43]:
print(paid_5_star_reviews_count)

0


In [41]:
# Percentage

paid_5_star_reviews_percentage = (paid_5_star_reviews_count / paid_reviews_count) * 100


In [44]:
print(paid_5_star_reviews_percentage)

0.0


In [45]:
# Unpaid reviews
unpaid_reviews_count = vine_unpaid_df.count()

In [46]:
print(unpaid_reviews_count)

85821


In [47]:
# 5-star reviews unpaid
unpaid_5_star_reviews_count = vine_unpaid_df.filter(vine_unpaid_df.star_rating == 5).count()

In [48]:
print(unpaid_5_star_reviews_count )

58315


In [49]:
# Percentage
unpaid_5_star_reviews_percentage = (unpaid_5_star_reviews_count / unpaid_reviews_count) * 100


In [50]:
print(unpaid_5_star_reviews_percentage)

67.94956945269806
