In [1]:
# Activate Spark in our Colab notebook.
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.2'
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [1 In0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to                                                                               Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 http://archive.ubuntu.com/ubuntu b

In [2]:
# Get postgresql package
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-12-06 14:09:18--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-12-06 14:09:18 (5.02 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
# Import Spark and create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-HW-1").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

**bold text**# Extract the Amazon Data into Spark DataFrame

In [4]:
# Read in the data from an S3 Bucket
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

df_watch = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="yyyy-mm-dd")
df_watch.show()


+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...|2015-01-31 00:08:00|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...| 

In [5]:
# Get the number of rows in the DataFrame.
df_watch.count()

960872

In [6]:
# Drop null values
df_watch = df_watch.dropna()
print(df_watch.count())


960679


In [7]:
# Removed duplicate rows
df_watch = df_watch.dropDuplicates()
df_watch.count()

960679

# Transform the Data

In [8]:
# Print schema
df_watch.printSchema()


root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [9]:
# Changing datatypes
from pyspark.sql.types import TimestampType, IntegerType

df_watch = df_watch.withColumn("customer_id", df_watch["customer_id"].cast(IntegerType()))\
       .withColumn("product_parent", df_watch["product_parent"].cast(IntegerType()))\
       .withColumn("review_date", df_watch["review_date"].cast(TimestampType()))\
       .withColumn("star_rating", df_watch["star_rating"].cast(IntegerType()))\
       .withColumn("helpful_votes", df_watch["helpful_votes"].cast(IntegerType()))\
      .withColumn("total_votes", df_watch["total_votes"].cast(IntegerType()))\

df_watch.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



#Analysis of the Data

###All Reviews

In [10]:

df_analysis = df_watch.select(["review_date","customer_id", "verified_purchase", "product_title","star_rating","helpful_votes","total_votes", "vine"])
df_analysis.show()

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2014-01-22 00:05:00|   15688375|                Y|Rip Curl Men's A1...|          2|            0|          1|   N|
|2014-01-16 00:06:00|   21739087|                Y|Casio Men's Sport...|          5|            0|          0|   N|
|2015-01-26 00:02:00|    2501262|                Y|Invicta Men's 901...|          5|            0|          0|   N|
|2014-01-30 00:12:00|   16842563|                Y|U.S. Polo Assn. C...|          5|            0|          0|   N|
|2009-01-27 00:01:00|   52130705|                N|Invicta Men's 524...|          4|            0|          0|   N|
|2014-01-27 00:11:00|   41518340|                Y|Rudiger Men's R20...|

In [11]:
summary_df_analysis = df_analysis.select(["star_rating","helpful_votes","total_votes"]).describe()

print("Summary statistics for all")
summary_df_analysis.show()


Summary statistics for all
+-------+------------------+------------------+-----------------+
|summary|       star_rating|     helpful_votes|      total_votes|
+-------+------------------+------------------+-----------------+
|  count|            960679|            960679|           960679|
|   mean|4.1382480516384765|1.1959103925452728|1.560124661827728|
| stddev|1.2932883558454442| 8.160239197449105|9.067823957832323|
|    min|                 1|                 0|                0|
|    max|                 5|              4004|             4249|
+-------+------------------+------------------+-----------------+



In [24]:
five_star_only = df_analysis[df_analysis['star_rating'] == 5].count()
five_star_only

571493

In [26]:
all_votes = df_analysis.count()
all_votes

960679

In [27]:
# Pecentage of five-star reviews
print((five_star_only/all_votes)*100)

59.488445151814496


###Vine Members Only

In [13]:
from pyspark.sql.functions import col

vine_df_analysis = df_analysis.filter(col("vine")  == "Y")
vine_df_analysis.show()


+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2015-01-14 00:04:00|   16436797|                N|kate spade new yo...|          4|            0|          1|   Y|
|2015-01-03 00:04:00|   51553681|                N|kate spade new yo...|          4|            0|          1|   Y|
|2014-01-10 00:06:00|   18995994|                N|Casio Men's STB-1...|          4|            0|          0|   Y|
|2013-01-27 00:06:00|   38548391|                N|Invicta Men's 143...|          4|            4|          6|   Y|
|2012-01-12 00:01:00|   52752630|                N|Invicta Men's 146...|          5|            0|          0|   Y|
|2011-01-28 00:10:00|   38226380|                N|Invicta Men's 124...|

In [14]:
summary_vine_df_analysis = vine_df_analysis.select(["star_rating","helpful_votes","total_votes"]).describe()

print("Summary statistics for vine customers")
summary_vine_df_analysis.show()

Summary statistics for vine customers
+-------+------------------+------------------+-----------------+
|summary|       star_rating|     helpful_votes|      total_votes|
+-------+------------------+------------------+-----------------+
|  count|              1747|              1747|             1747|
|   mean| 4.034344590726961|2.8580423583285635|3.712077847738981|
| stddev|0.9366959934006919|11.039313131908232|12.01083357759813|
|    min|                 1|                 0|                0|
|    max|                 5|               349|              370|
+-------+------------------+------------------+-----------------+



In [15]:
vine_five_star_only = vine_df_analysis[vine_df_analysis['star_rating'] == 5].count()
vine_five_star_only

605

In [16]:
vine_df_analysis_all = vine_df_analysis.count()
vine_df_analysis_all

1747

In [17]:
# Pecentage of five-star reviews in Vine
print((vine_five_star_only/vine_df_analysis_all)*100)

34.630795649685176


### Non Vine Members

In [18]:
notvine_df_analysis = df_analysis.filter(col("vine")  == "N")
notvine_df_analysis.show()

+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|        review_date|customer_id|verified_purchase|       product_title|star_rating|helpful_votes|total_votes|vine|
+-------------------+-----------+-----------------+--------------------+-----------+-------------+-----------+----+
|2014-01-22 00:05:00|   15688375|                Y|Rip Curl Men's A1...|          2|            0|          1|   N|
|2014-01-16 00:06:00|   21739087|                Y|Casio Men's Sport...|          5|            0|          0|   N|
|2015-01-26 00:02:00|    2501262|                Y|Invicta Men's 901...|          5|            0|          0|   N|
|2014-01-30 00:12:00|   16842563|                Y|U.S. Polo Assn. C...|          5|            0|          0|   N|
|2009-01-27 00:01:00|   52130705|                N|Invicta Men's 524...|          4|            0|          0|   N|
|2014-01-27 00:11:00|   41518340|                Y|Rudiger Men's R20...|

In [19]:
summary_notvine_df_analysis = notvine_df_analysis.select(["star_rating","helpful_votes","total_votes"]).describe()

print("Summary statistics for not-vine customers")
summary_notvine_df_analysis.show()

Summary statistics for not-vine customers
+-------+------------------+------------------+-----------------+
|summary|       star_rating|     helpful_votes|      total_votes|
+-------+------------------+------------------+-----------------+
|  count|            958932|            958932|           958932|
|   mean| 4.138437344879512|1.1928822898808258|1.556204193832305|
| stddev|1.2938410587847071| 8.153765015489014|9.061132029872722|
|    min|                 1|                 0|                0|
|    max|                 5|              4004|             4249|
+-------+------------------+------------------+-----------------+



In [20]:
notvine_five_star_only = notvine_df_analysis[notvine_df_analysis['star_rating'] == 5].count()
notvine_five_star_only

570888

In [21]:
notvine_df_analysis_all = notvine_df_analysis.count()
notvine_df_analysis_all

958932

In [22]:
# Pecentage of five-star reviews in Not - Vine
print((notvine_five_star_only/notvine_df_analysis_all)*100)

59.5337312760446
