In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 60.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=9ed04869433df2a98e68f7ab2077015e38ee60f4317a347727881e0d1628b418
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-12-20 04:31:47--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-12-20 04:31:48 (4.89 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
url = "https://amazon-suraj.s3.us-east-2.amazonaws.com/amazon_reviews_us_Watches_v1_00.tsv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...|2015-08-31 00:00:00|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...| 

In [5]:
# Create the vine_table. DataFrame
# vine_df = df.select([])
vine_df = df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3O9SGZBVQBV76|          5|            0|          0|   N|                Y|
| RKH8BNC3L5DLF|          5|            0|          0|   N|                Y|
|R2HLE8WKZSU3NL|          2|            1|          1|   N|                Y|
|R31U3UH5AZ42LL|          5|            0|          0|   N|                Y|
|R2SV659OUJ945Y|          4|            0|          0|   N|                Y|
| RA51CP8TR5A2L|          5|            0|          0|   N|                Y|
| RB2Q7DLDN6TH6|          5|            1|          1|   N|                Y|
|R2RHFJV0UYBK3Y|          1|            1|          5|   N|                N|
|R2Z6JOQ94LFHEP|          5|            1|          2|   N|                Y|
| RX27XIIWY5JPB|          4|            0|          0|   N|     

In [6]:
#totalvotes count equal to or greater than 20 to pick reviews.
from pyspark.sql.functions import col
total_votes = vine_df.filter(col("total_votes")>=20)
total_votes.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R14W2VCHHK5V7W|          5|           19|         20|   N|                Y|
|R1S3T57O3OYT5S|          5|           19|         20|   N|                Y|
|R1BTWIBLYYVOV7|          5|           30|         30|   N|                Y|
| R6F9VY91ADPLA|          1|            8|         30|   N|                N|
|R3PXNV89DFIXKV|          5|           35|         37|   N|                Y|
|R2ZF9NYVT3J7D6|          5|           19|         22|   N|                Y|
|R20NYA5V0UF9NE|          5|           27|         28|   N|                Y|
|R2X8FZRUOS8R8C|          4|           25|         26|   N|                Y|
|R2D8IMBVX3XCLF|          1|           14|         20|   N|                Y|
|R25UD9TA63L3Q8|          5|           25|         27|   N|     

In [9]:
#create a new DataFrame to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.
helpful_df = total_votes.withColumn('votes_percentage', col('helpful_votes')/col('total_votes')).alias('votes_percentage').filter(col('votes_percentage') >= .5)
helpful_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  votes_percentage|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R14W2VCHHK5V7W|          5|           19|         20|   N|                Y|              0.95|
|R1S3T57O3OYT5S|          5|           19|         20|   N|                Y|              0.95|
|R1BTWIBLYYVOV7|          5|           30|         30|   N|                Y|               1.0|
|R3PXNV89DFIXKV|          5|           35|         37|   N|                Y|0.9459459459459459|
|R2ZF9NYVT3J7D6|          5|           19|         22|   N|                Y|0.8636363636363636|
|R20NYA5V0UF9NE|          5|           27|         28|   N|                Y|0.9642857142857143|
|R2X8FZRUOS8R8C|          4|           25|         26|   N|                Y|0.9615384615384616|
|R2D8IMBVX3XCLF|          1|  

In [10]:
# create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid)
paid_df = helpful_df.filter(col('vine') == 'Y')
paid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  votes_percentage|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R1B7M0OP3UNP6O|          5|           49|         52|   Y|                N|0.9423076923076923|
|R2UUV4UGGYMQG8|          5|           34|         39|   Y|                N|0.8717948717948718|
| R9K0LZV2BK9YY|          4|           37|         39|   Y|                N|0.9487179487179487|
|R2OVFLNEUEGTJM|          3|           18|         25|   Y|                N|              0.72|
| RBE09ELJ77LQ0|          5|           44|         45|   Y|                N|0.9777777777777777|
|R3867T8AIJJHM6|          5|           26|         27|   Y|                N|0.9629629629629629|
|R1FNVUXPU63WOZ|          4|           43|         48|   Y|                N|0.8958333333333334|
|R25XGG2G12SE1Z|          4|  

In [11]:
#retrieve all the rows where the review was not part of the Vine program (unpaid)
unpaid = helpful_df.filter(col('vine') == "N")
unpaid.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|  votes_percentage|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R14W2VCHHK5V7W|          5|           19|         20|   N|                Y|              0.95|
|R1S3T57O3OYT5S|          5|           19|         20|   N|                Y|              0.95|
|R1BTWIBLYYVOV7|          5|           30|         30|   N|                Y|               1.0|
|R3PXNV89DFIXKV|          5|           35|         37|   N|                Y|0.9459459459459459|
|R2ZF9NYVT3J7D6|          5|           19|         22|   N|                Y|0.8636363636363636|
|R20NYA5V0UF9NE|          5|           27|         28|   N|                Y|0.9642857142857143|
|R2X8FZRUOS8R8C|          4|           25|         26|   N|                Y|0.9615384615384616|
|R2D8IMBVX3XCLF|          1|  

In [29]:
# the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid vs unpaid).
from pyspark.sql.functions import when,count,col
five_star_rating = helpful_df.groupBy('vine').agg(count(col('vine')).alias('Total_reviews'), count(when(col('star_rating') == 5, True)).alias("Total_five_star_review"), (count(when(col("star_rating") == 5, True))/count(col('vine'))*100).alias("five_star_rating_percentage")).show()


+----+-------------+----------------------+---------------------------+
|vine|Total_reviews|Total_five_star_review|five_star_rating_percentage|
+----+-------------+----------------------+---------------------------+
|   Y|           47|                    15|         31.914893617021278|
|   N|         8362|                  4332|          51.80578808897393|
+----+-------------+----------------------+---------------------------+

