In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m12.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=3c19190f0da749da11bd1e4b4829627669bcfd22bec040d53ece4dab27a2c397
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2023-01-28 13:20:28--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2023-01-28 13:20:29 (6.31 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
# Create Spark Session "M17-Amazon-Challenge"
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Books_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   25933450| RJOVP071AVAJO|0439873800|      84656342|There Was an Old ...|           Books|          5|            0|          0|   N|                Y|          Five Stars|I love it and so ...|2015-08-31 00:00:00|
|         US|    1801372|R1ORGBETCDW3AI|1623953553|     729938122|      I Saw a Friend| 

In [5]:
from pyspark.sql.functions import col
from pyspark.sql.functions import count

In [6]:
# Create the vine_table. DataFrame
vine_table = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_table.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RJOVP071AVAJO|          5|            0|          0|   N|                Y|
|R1ORGBETCDW3AI|          5|            0|          0|   N|                Y|
| R7TNRFQAOUTX5|          5|            0|          0|   N|                Y|
|R2GANXKDIFZ6OI|          5|            0|          0|   N|                N|
|R2NYB6C3R8LVN6|          5|            2|          2|   N|                Y|
|R13U5PBJI1H94K|          2|            1|          1|   N|                N|
|R1H8UVH990F8VE|          5|            2|          2|   N|                N|
|R2MC0N30WZMRQ5|          5|            0|          0|   N|                Y|
| R2NO2HXK16Y4J|          5|            0|          0|   N|                Y|
|R245YIAVJK82ZL|          5|            0|          0|   N|     

In [7]:
vine_table = vine_table.withColumn('star_rating_int', col("star_rating").cast("int"))

In [8]:
vine_table = vine_table.select(["review_id", "star_rating_int", "helpful_votes", "total_votes", "vine", "verified_purchase"]).withColumnRenamed("star_rating_int", "star_rating")
vine_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)



In [9]:
# Deliverable 2
# Step 1
# Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 
# to pick reviews that are more likely to be helpful and to avoid having division by zero errors later on.
vine_table_over_20 = vine_table.filter(vine_table.total_votes >= 20)

In [10]:
# Step 2. Filter the new DataFrame created in Step 1 and create a new DataFrame to retrieve all the rows where
# the number of helpful_votes divided by total_votes is equal to or greater than 50%.

vine_table_helpful_votes_over_50 = vine_table_over_20.filter(vine_table_over_20.helpful_votes / vine_table_over_20.total_votes >= .5)

In [11]:
vine_table_helpful_votes_over_50.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3T6LR4H11XRAN|          5|           26|         26|   N|                Y|
|R3QFNFMFWOQIBO|          3|           15|         25|   N|                Y|
|R24S2SV0ZIFIMV|          1|           41|         78|   N|                N|
|R3OCONKYE1A047|          5|           33|         38|   N|                N|
|R2U7YNJZ5JHWVD|          5|           23|         25|   N|                Y|
|R1XVYF74WCJKO3|          1|           47|         49|   N|                N|
|R3KKWNG5VABDYC|          5|           31|         32|   Y|                N|
|R3KK9MHFLY2BVW|          5|           21|         22|   N|                N|
|R2OEP8MGX5K370|          5|           30|         34|   N|                Y|
| RTX63ENBKWCXF|          1|           28|         35|   N|     

In [12]:
# Step 3
# Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves all the rows where
# a review was written as part of the Vine program (paid), vine == 'Y'.

vine_table_vine_y = vine_table_helpful_votes_over_50.filter(vine_table_helpful_votes_over_50.vine == 'Y')
vine_table_vine_y.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3KKWNG5VABDYC|          5|           31|         32|   Y|                N|
|R397PIJYYVJ7PD|          4|           31|         32|   Y|                N|
| RP6Y2CTIFUYD0|          2|           31|         32|   Y|                N|
|R1FYRNM5U0S2VZ|          3|           20|         24|   Y|                N|
| R1ME19UY4UNAO|          5|           53|         57|   Y|                N|
|R1JW7ZHT2VLT1E|          3|           40|         45|   Y|                N|
| R98UYXY43TURV|          4|           46|         50|   Y|                N|
|R34VXM3EJDS78Z|          3|          374|        410|   Y|                N|
|R1QX8UOW5JO6L0|          4|           20|         21|   Y|                N|
| R5Y5SXCRQ2GNH|          3|           34|         37|   Y|     

In [13]:
# Step 4
# Repeat Step 3, but this time retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'.

vine_table_vine_n = vine_table_helpful_votes_over_50.filter(vine_table_helpful_votes_over_50.vine == 'N')
vine_table_vine_n.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3T6LR4H11XRAN|          5|           26|         26|   N|                Y|
|R3QFNFMFWOQIBO|          3|           15|         25|   N|                Y|
|R24S2SV0ZIFIMV|          1|           41|         78|   N|                N|
|R3OCONKYE1A047|          5|           33|         38|   N|                N|
|R2U7YNJZ5JHWVD|          5|           23|         25|   N|                Y|
|R1XVYF74WCJKO3|          1|           47|         49|   N|                N|
|R3KK9MHFLY2BVW|          5|           21|         22|   N|                N|
|R2OEP8MGX5K370|          5|           30|         34|   N|                Y|
| RTX63ENBKWCXF|          1|           28|         35|   N|                N|
|R38I0D0J8O6LB6|          5|          101|        102|   N|     

In [14]:
# Step 5
# Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid vs unpaid).

# Total 5-star reviews for paid

y_count = vine_table_vine_y.count()
print(f"Total number of paid reviews: {y_count}")

Total number of paid reviews: 5012


In [15]:
n_count = vine_table_vine_n.count()
print(f"Total number of unpaid reviews: {n_count}")

Total number of unpaid reviews: 109297


In [16]:
five_y_count = vine_table_vine_y.filter(vine_table_vine_y.star_rating == 5).count()
print(f"Number of paid 5-star reviews: {five_y_count}")

Number of paid 5-star reviews: 2031


In [17]:
five_n_count = vine_table_vine_n.filter(vine_table_vine_n.star_rating == 5).count()
print(f"Number of unpaid 5-star reviews: {five_n_count}")

Number of unpaid 5-star reviews: 49967


In [18]:
percent_y = five_y_count / y_count * 100
print(f"Percentage of paid five-star reviews: {percent_y}%")

Percentage of paid five-star reviews: 40.52274541101357%


In [19]:
percent_n = five_n_count / n_count * 100
print(f"Percentage of unpaid five-star reviews: {percent_n}%")

Percentage of unpaid five-star reviews: 45.716716835777746%
