In [None]:
# Activate Spark in our Colab notebook.
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.2'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()



Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Hit:5 http://security.ubuntu.com/ubuntu focal-security InRelease
Hit:6 http://archive.ubuntu.com/ubuntu focal InRelease
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Hit:9 http://archive.ubuntu.com/ubuntu focal-updates InRelease
Hit:10 http://archive.ubuntu.com/ubuntu focal-backports InRelease
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease
Hit:14 http://ppa.launchpad.net/ubuntugis/ppa/ubuntu focal InRelease
Reading package list

In [None]:
# Get postgresql package
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2023-03-02 01:23:18--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2023-03-02 01:23:20 (1.22 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [None]:
# Import Spark and create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Part2").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# Extract the Amazon Data into Spark DataFrame

In [None]:
# Read in the data from an S3 Bucket
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Outdoors_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Outdoors_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="yyyy-mm-dd")
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   18446823|R35T75OLUGHL5C|B000NV6H94|     110804376|Stearns Youth Boa...|        Outdoors|          4|            0|          0|   N|                Y|          Four Stars|          GOOD VALUE|2015-01-31 00:08:00|
|         US|   13724367|R2BV735O46BN33|B000IN0W3Y|     624096774|Primal Wear Men's...| 

In [None]:
# Get the number of rows in the DataFrame.
df.count()

2302401

In [None]:
#Drop null values if any
df = df.dropna()
df.count()

2302174

# Transform the Data

In [None]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [None]:
# Change datatypes
# Source Link: https://stackoverflow.com/questions/32284620/how-to-change-a-dataframe-column-from-string-type-to-double-type-in-pyspark

from pyspark.sql.types import TimestampType, IntegerType

df = df.withColumn("customer_id", df["customer_id"].cast(IntegerType()))\
       .withColumn("product_parent", df["product_parent"].cast(IntegerType()))\
       .withColumn("review_date", df["review_date"].cast(TimestampType()))\
       .withColumn("star_rating", df["star_rating"].cast(IntegerType()))\
       .withColumn("helpful_votes", df["helpful_votes"].cast(IntegerType()))\
      .withColumn("total_votes", df["total_votes"].cast(IntegerType()))\

# Print schema
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



### Level 2
## All customer reviews.

In [None]:
#  View all customers on review date, customer, id, verified purchase, product title, star rating, helpful rating, total votes, and in vine program

analysis_df = df.select(["review_date","customer_id", "verified_purchase", "product_title","star_rating","helpful_votes","total_votes", "vine"])
analysis_df.show()

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2015-01-31 00:08:00|   18446823|                Y|Stearns Youth Boa...|          4|            0|          0|   N|
|2015-01-31 00:08:00|   13724367|                Y|Primal Wear Men's...|          5|            0|          0|   N|
|2015-01-31 00:08:00|   51001958|                Y|Osprey Hydraulics...|          4|            0|          0|   N|
|2015-01-31 00:08:00|   32866903|                Y|CamelBak eddy .75...|          3|            1|          1|   N|
|2015-01-31 00:08:00|   30907790|                Y|Children Black Re...|          1|            0|          0|   N|
|2015-01-31 00:08:00|   20232229|                Y|Ibera Bicycle Tri...|

In [None]:
# Statistics for ALL CUSTOMERS for star_rating, helpful votes, total votes
summary_analysis_df = analysis_df.select(["star_rating","helpful_votes","total_votes"]).describe()

print("Summary statistics for All Customers")
summary_analysis_df.show()

Summary statistics for All Customers
+-------+------------------+------------------+------------------+
|summary|       star_rating|     helpful_votes|       total_votes|
+-------+------------------+------------------+------------------+
|  count|           2302174|           2302174|           2302174|
|   mean| 4.240025732199217|1.8469689953930502|2.2948195922636603|
| stddev|1.2101686846950972|12.770681186888938|13.628677045693843|
|    min|                 1|                 0|                 0|
|    max|                 5|              2703|              2751|
+-------+------------------+------------------+------------------+



In [None]:
# Sorting all customers with 5 being the highest rating

from pyspark.sql.functions import desc

star_rating_df = analysis_df.orderBy(analysis_df.star_rating.desc())
star_rating_df.show()

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2015-01-31 00:08:00|   36542249|                Y|RAD Cycle Product...|          5|            1|          1|   N|
|2015-01-31 00:08:00|   33495535|                Y|20L/33L- Most Dur...|          5|            0|          0|   N|
|2015-01-31 00:08:00|   16687947|                Y|XLC Quick Release...|          5|            0|          0|   N|
|2015-01-31 00:08:00|   13724367|                Y|Primal Wear Men's...|          5|            0|          0|   N|
|2015-01-31 00:08:00|   11373767|                Y|Timbuk2 Aviator T...|          5|            0|          1|   N|
|2015-01-31 00:08:00|   32421041|                Y|K2 Skate Men's F....|

## Viewing Vine Customers

In [None]:
# Load in a sql function to use columns
from pyspark.sql.functions import col

vine_analysis_df = analysis_df.filter(col("vine")  == "Y")
vine_analysis_df.show()

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2015-01-31 00:08:00|   37388532|                N|The Alter Ego Per...|          5|            2|          5|   Y|
|2015-01-31 00:08:00|   44451381|                N|Slumberjack Bound...|          5|            0|          0|   Y|
|2015-01-31 00:08:00|   17304105|                N|Thule EnRoute Blu...|          5|            3|          4|   Y|
|2015-01-31 00:08:00|   51016139|                N|Slumberjack Bound...|          4|            0|          0|   Y|
|2015-01-31 00:08:00|   31691563|                N|Timberjill 20 Deg...|          5|            0|          0|   Y|
|2015-01-31 00:08:00|   50046574|                N|The Alter Ego Per...|

In [None]:
# Summary statistics for VINE CUSTOMERS for star_rating, helpful votes, total votes

vine_summary_analysis_df = vine_analysis_df.select(["star_rating","helpful_votes","total_votes"]).describe()

print("Summary statistics for VINE CUSTOMERS")
vine_summary_analysis_df.show()

Summary statistics for VINE CUSTOMERS
+-------+------------------+------------------+------------------+
|summary|       star_rating|     helpful_votes|       total_votes|
+-------+------------------+------------------+------------------+
|  count|              3137|              3137|              3137|
|   mean| 4.372967803634046| 3.058335989799171|3.9190309212623524|
| stddev|0.8186341579032859|12.566739239470186|13.562137725035525|
|    min|                 1|                 0|                 0|
|    max|                 5|               326|               340|
+-------+------------------+------------------+------------------+



In [None]:
# Top rated products with the Top helpful votes 

from pyspark.sql.functions import desc

vine_helpful_votes_df = vine_analysis_df.orderBy(vine_analysis_df.helpful_votes.desc())
vine_helpful_votes_df = vine_helpful_votes_df.filter('star_rating = 5')

print("Top 20 rated products with the Top helpful votes for VINE CUSTOMERS")
vine_helpful_votes_df.show()

Top 20 rated products with the Top helpful votes for VINE CUSTOMERS
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2014-01-08 00:08:00|   51999404|                N|Thule Urban Glide...|          5|          326|        340|   Y|
|2012-01-01 00:08:00|   49805190|                N|Zippo Flex Neck U...|          5|          170|        181|   Y|
|2010-01-19 00:09:00|   51859664|                N|  YBIKE Balance Bike|          5|          140|        153|   Y|
|2015-01-08 00:05:00|   13722260|                N|Thule Vertex XT H...|          5|          123|        128|   Y|
|2014-01-21 00:06:00|   51037469|                N|Mountain House Ju...|          5|          118|        126|   Y|
|201

In [None]:
#  Converting into Pandas DataFrame to get the complete product title name 
vine_helpful_votes_df.select("product_title").toPandas().head(3)

Unnamed: 0,product_title
0,Thule Urban Glide - Jogging Stroller
1,Zippo Flex Neck Utility Lighter
2,YBIKE Balance Bike


In [None]:
# Least rated products with the Top helpful votes for VINE CUSTOMERS

from pyspark.sql.functions import desc

low_vine_helpful_votes_df = vine_analysis_df.orderBy(vine_analysis_df.helpful_votes.desc())
low_vine_helpful_votes_df = low_vine_helpful_votes_df.filter('star_rating = 1')

print("Least rated products with the Top helpful votes for VINE CUSTOMERS")
low_vine_helpful_votes_df.show()

Least rated products with the Top helpful votes for VINE CUSTOMERS
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2013-01-31 00:05:00|   53090839|                N|Runtastic Bike Ca...|          1|           18|         18|   Y|
|2013-01-04 00:12:00|   51036953|                N|Allen Sports Prem...|          1|           17|         20|   Y|
|2014-01-08 00:01:00|   38056064|                N|Wahoo Balance Blu...|          1|           11|         12|   Y|
|2013-01-02 00:12:00|   52761853|                N|KOR Nava BPA Free...|          1|            7|         15|   Y|
|2012-01-27 00:07:00|   51002192|                N|Energizer Weather...|          1|            6|          9|   Y|
|2011

In [None]:
#  Converting into Pandas DataFrame to get the complete product title name 
low_vine_helpful_votes_df.select("product_title").toPandas().head(5)

Unnamed: 0,product_title
0,"Runtastic Bike Case for iPhone 4, 4S, 5 (Black)"
1,Allen Sports Premier 4-Bike Trunk Rack
2,Wahoo Balance Bluetooth Smart Scale for iPhone...
3,KOR Nava BPA Free 650ml Filter Water Bottle
4,Energizer Weather Ready Rechargeable LED Flash...


# Non Vine Customers

In [None]:
# Viewing NON-VINE CUSTOMERS
non_vine_analysis_df = analysis_df.filter(col("vine")  == "N")
non_vine_analysis_df.show()

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2015-01-31 00:08:00|   18446823|                Y|Stearns Youth Boa...|          4|            0|          0|   N|
|2015-01-31 00:08:00|   13724367|                Y|Primal Wear Men's...|          5|            0|          0|   N|
|2015-01-31 00:08:00|   51001958|                Y|Osprey Hydraulics...|          4|            0|          0|   N|
|2015-01-31 00:08:00|   32866903|                Y|CamelBak eddy .75...|          3|            1|          1|   N|
|2015-01-31 00:08:00|   30907790|                Y|Children Black Re...|          1|            0|          0|   N|
|2015-01-31 00:08:00|   20232229|                Y|Ibera Bicycle Tri...|

In [None]:
# Summary statistics for NON-VINE CUSTOMERS for star_rating, helpful votes, total votes

non_vine_summary_analysis_df = non_vine_analysis_df.select(["star_rating","helpful_votes","total_votes"]).describe()

print("Summary statistics for NON-VINE CUSTOMERS")
non_vine_summary_analysis_df.show()

Summary statistics for NON-VINE CUSTOMERS
+-------+------------------+------------------+------------------+
|summary|       star_rating|     helpful_votes|       total_votes|
+-------+------------------+------------------+------------------+
|  count|           2299037|           2299037|           2299037|
|   mean| 4.239844334823667|1.8453161040905388| 2.292603381328791|
| stddev|1.2106065679246594|12.770881428703447|13.628638311192573|
|    min|                 1|                 0|                 0|
|    max|                 5|              2703|              2751|
+-------+------------------+------------------+------------------+



In [None]:
# Top 10 rated products with the Top helpful votes for NON-VINE CUSTOMERS

from pyspark.sql.functions import desc

non_vine_helpful_votes_df = non_vine_analysis_df.orderBy(non_vine_analysis_df.helpful_votes.desc())
non_vine_helpful_votes_df = non_vine_helpful_votes_df.filter('star_rating = 5')

print("Top 10 rated products with the Top helpful votes for NON-VINE CUSTOMERS")
non_vine_helpful_votes_df.show(10)

Top 10 rated products with the Top helpful votes for NON-VINE CUSTOMERS
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2011-01-13 00:10:00|   16316106|                Y|Magnet Steel Bike...|          5|         2478|       2600|   N|
|2014-01-19 00:04:00|   34490348|                Y|LifeStraw Persona...|          5|         2362|       2496|   N|
|2013-01-04 00:10:00|   30530118|                Y|SoundAsleep Dream...|          5|         2350|       2420|   N|
|2013-01-18 00:05:00|   35554264|                Y|LifeStraw Persona...|          5|         1864|       1961|   N|
|2013-01-24 00:10:00|    6262999|                Y|SoundAsleep Dream...|          5|         1722|       1792|   N|


In [None]:
#  Converting into Pandas DataFrame to get the complete product title name 
non_vine_helpful_votes_df.select("product_title").toPandas().head(5)

Unnamed: 0,product_title
0,Magnet Steel Bike Bicycle Indoor Exercise Trai...
1,"LifeStraw Personal Water Filter for Hiking, Ca..."
2,SoundAsleep Dream Series Air Mattress with Com...
3,"LifeStraw Personal Water Filter for Hiking, Ca..."
4,SoundAsleep Dream Series Air Mattress with Com...


# Top rated review count with most helpfulVotes for Vine customers

In [None]:
#  Customer: 51999404

view_count_topvine_customer_df = vine_helpful_votes_df.filter((col('customer_id')=="51999404"))
view_count_topvine_customer_df.show()

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2014-01-08 00:08:00|   51999404|                N|Thule Urban Glide...|          5|          326|        340|   Y|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+



In [None]:
#  Customer: 49805190

view_count_2topvine_customer_df = vine_helpful_votes_df.filter((col('customer_id')=="49805190"))
view_count_2topvine_customer_df.show()

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2012-01-01 00:08:00|   49805190|                N|Zippo Flex Neck U...|          5|          170|        181|   Y|
|2010-01-22 00:04:00|   49805190|                N|Optic Nerve Eyequ...|          5|            2|          2|   Y|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+



In [None]:
#  Customer: 51859664

view_count_3topvine_customer_df = vine_helpful_votes_df.filter((col('customer_id')=="51859664"))
view_count_3topvine_customer_df.show()

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2010-01-19 00:09:00|   51859664|                N|  YBIKE Balance Bike|          5|          140|        153|   Y|
|2014-01-03 00:02:00|   51859664|                N|Thule  Cougar One...|          5|            0|          0|   Y|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+



Review type by Customer with highest review count

In [None]:
from pyspark.sql.functions import desc

customers_df = df.groupby("customer_id").agg({"customer_id":"count"})
customers_df = customers_df.orderBy(desc("count(customer_id)"))
customers_df = customers_df.withColumnRenamed("count(customer_id)", "customer_count") 
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   13355404|           291|
|   47355039|           205|
|   51155788|           197|
|   20433453|           129|
|   40944345|           128|
|    3247563|           128|
|   27550730|           125|
|   30636778|           115|
|   10796373|           113|
|   40364490|           112|
|   16255502|           112|
|   38137849|           111|
|   52988261|           109|
|   36205928|           108|
|   52196389|           104|
|   18466687|           104|
|   44142946|           102|
|   49782074|           100|
|   45014035|           100|
|   31807575|            98|
+-----------+--------------+
only showing top 20 rows



In [None]:
#  Customer with Non Verified Purchase

customer_review_count = non_vine_helpful_votes_df.filter((col('customer_id')=="13355404"))
customer_review_count.show(4)

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2014-01-18 00:04:00|   13355404|                N|Bushbox Outdoor P...|          5|          183|        189|   N|
|2015-01-19 00:06:00|   13355404|                N|OutdoorMaster Hik...|          5|          129|        137|   N|
|2014-01-14 00:04:00|   13355404|                N|Bushbox Titanium ...|          5|           84|         86|   N|
|2014-01-09 00:04:00|   13355404|                N|Coghlan's Backpac...|          5|           83|         85|   N|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
only showing top 4 rows



In [None]:
#  Customer with verified purchase

customer2_review_count = non_vine_helpful_votes_df.filter((col('customer_id')=="51155788"))
customer2_review_count.show(3)

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2014-01-12 00:02:00|   51155788|                Y|SRAM Supercork Bi...|          5|           32|         34|   N|
|2014-01-03 00:03:00|   51155788|                Y|Selle Royal Respi...|          5|           25|         27|   N|
|2013-01-19 00:06:00|   51155788|                Y|Kenda Tube Bicycl...|          5|           19|         22|   N|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
only showing top 3 rows

