In [None]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:10 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:14 http://security.ubuntu.com/ubuntu 

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2023-01-12 17:48:19--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2023-01-12 17:48:20 (6.20 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Tools_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   15785389|R2UM5QMHBHC90Q|B00H5U9ZD6|     115362950|WallPeg 12 sq ft ...|           Tools|          5|            0|          0|   N|                Y|Great organizer, ...|Very nice. Will o...| 2015-08-31|
|         US|   47910848|  RF0D1LEIF6L7|B001TJGCS0|     570955425|Nite Ize Nite Daw...|           Tools|          4|    

In [18]:
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col
from pyspark.sql.functions import count
# Read in the Review dataset as a DataFrame

In [12]:
vine_df2=spark.read.csv("/content/drive/MyDrive/tesxt.csv", header=True)
vine_df2.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2UM5QMHBHC90Q|        5.0|          0.0|        0.0|   N|                Y|
|  RF0D1LEIF6L7|        4.0|          0.0|        0.0|   N|                Y|
|  RM6YKIWQVNSY|        1.0|          6.0|        6.0|   N|                Y|
|R1RL3L68ASPS36|        4.0|          0.0|        0.0|   N|                Y|
|R1U4XFBFAG34CY|        5.0|          0.0|        0.0|   N|                Y|
|R3KFIK8P0I91PL|        5.0|          0.0|        0.0|   N|                Y|
| RENOAY76PPK1O|        5.0|          0.0|        0.0|   N|                Y|
| RINV884I0NL5V|        1.0|          0.0|        0.0|   N|                Y|
| R5KJH6CXZH2PX|        5.0|          0.0|        0.0|   N|                Y|
| RO69JF6QWD0W1|        5.0|          0.0|        0.0|   N|     

In [13]:
#Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 to pick reviews that are more likely
# to be helpful and to avoid having division by zero errors later on.
twenty_df = vine_df2.filter("total_votes>=20")
twenty_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RXAHWIC1584UQ|        5.0|         20.0|       23.0|   N|                Y|
| RSBELAIC899DO|        5.0|         46.0|       51.0|   N|                Y|
|R2HCC4CJ59D225|        4.0|         43.0|       43.0|   N|                Y|
|R39BVCCVPRV6F5|        5.0|         10.0|       22.0|   N|                Y|
| RL9VF9WXHEHKR|        5.0|         35.0|       36.0|   N|                Y|
|R1ZINLWUOLTZ46|        1.0|         39.0|       45.0|   N|                N|
|R1CQKM1K1CHOHS|        1.0|         21.0|       22.0|   N|                N|
|R1ZG19X1NOLCRF|        5.0|          5.0|       21.0|   N|                Y|
|R1YOX5Z4GF3ZSW|        1.0|          1.0|       22.0|   N|                Y|
| R2DHAM7J1KMWN|        5.0|         23.0|       24.0|   N|     

In [14]:
#Filter the new DataFrame or table created in Step 1 and create a new DataFrame or table to retrieve all the rows where the number of helpful_votes divided by total_votes 
#is equal to or greater than 50%
fifty_percent_df = twenty_df.filter("helpful_votes/total_votes>=0.5")
fifty_percent_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RXAHWIC1584UQ|        5.0|         20.0|       23.0|   N|                Y|
| RSBELAIC899DO|        5.0|         46.0|       51.0|   N|                Y|
|R2HCC4CJ59D225|        4.0|         43.0|       43.0|   N|                Y|
| RL9VF9WXHEHKR|        5.0|         35.0|       36.0|   N|                Y|
|R1ZINLWUOLTZ46|        1.0|         39.0|       45.0|   N|                N|
|R1CQKM1K1CHOHS|        1.0|         21.0|       22.0|   N|                N|
| R2DHAM7J1KMWN|        5.0|         23.0|       24.0|   N|                N|
|R26EZ2INBETPU2|        1.0|        103.0|      128.0|   N|                N|
|R265ZMDO2ISN1O|        5.0|         20.0|       21.0|   N|                Y|
|R3NZ7JTOCPYE1S|        4.0|         32.0|       33.0|   N|     

In [15]:
#Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), 
#vine == 'Y'.
paid_df = fifty_percent_df.filter("vine == 'Y'")
paid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3QIQRG107AP19|        5.0|         38.0|       41.0|   Y|                N|
|R1ZNBHGCYE8VN1|        5.0|        175.0|      181.0|   Y|                N|
|R1BNZJ9IKGRBC2|        4.0|         63.0|       70.0|   Y|                N|
|R2IXTN4PBRHBO2|        5.0|         19.0|       24.0|   Y|                N|
|R32AOW3L09SYMQ|        5.0|         29.0|       34.0|   Y|                N|
|R34Z4VTLF7PC6X|        4.0|         33.0|       36.0|   Y|                N|
|R25403NE9JCRZZ|        2.0|         36.0|       40.0|   Y|                N|
|R2QT2G3YRGC0PS|        5.0|         18.0|       20.0|   Y|                N|
| RDUP15OURNUSA|        5.0|         22.0|       23.0|   Y|                N|
|R1EI4NZTG7G14F|        5.0|         17.0|       23.0|   Y|     

In [17]:
#Repeat Step 3, but this time retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'.
unpaid_df = fifty_percent_df.filter("vine == 'N'")
unpaid_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RXAHWIC1584UQ|        5.0|         20.0|       23.0|   N|                Y|
| RSBELAIC899DO|        5.0|         46.0|       51.0|   N|                Y|
|R2HCC4CJ59D225|        4.0|         43.0|       43.0|   N|                Y|
| RL9VF9WXHEHKR|        5.0|         35.0|       36.0|   N|                Y|
|R1ZINLWUOLTZ46|        1.0|         39.0|       45.0|   N|                N|
|R1CQKM1K1CHOHS|        1.0|         21.0|       22.0|   N|                N|
| R2DHAM7J1KMWN|        5.0|         23.0|       24.0|   N|                N|
|R26EZ2INBETPU2|        1.0|        103.0|      128.0|   N|                N|
|R265ZMDO2ISN1O|        5.0|         20.0|       21.0|   N|                Y|
|R3NZ7JTOCPYE1S|        4.0|         32.0|       33.0|   N|     

In [21]:
#Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid vs unpaid).
#review count, 5.0 star-rating count, 5.0 star count/review count

paid_review_count = paid_df.agg({'review_id':'count'}).show()
# paid_review_count.show()

+----------------+
|count(review_id)|
+----------------+
|             285|
+----------------+



In [22]:
unpaid_review_count = unpaid_df.agg({'review_id':'count'}).show()

+----------------+
|count(review_id)|
+----------------+
|           31545|
+----------------+



In [43]:
# paid_5star_count = paid_df.agg({'star_rating'==5.0:'count'}).show()

paid_5star = paid_df.select('star_rating').where(paid_df.star_rating==5.0)


In [40]:
paid_5star_count = paid_5star.agg({'star_rating':'count'}).show()

+------------------+
|count(star_rating)|
+------------------+
|               163|
+------------------+



In [44]:
unpaid_5star = unpaid_df.select('star_rating').where(unpaid_df.star_rating==5.0)


In [42]:
unpaid_5star_count = unpaid_5star.agg({'star_rating':'count'}).show()

+------------------+
|count(star_rating)|
+------------------+
|             14614|
+------------------+



In [None]:
# paid reviews are 57.2% 5-star rating
# unpaid reviews are 46.3% 5-star rating