In [1]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:5 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:7 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Packages [1,073 kB]
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ub

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.17.jar

--2022-12-08 23:48:02--  https://jdbc.postgresql.org/download/postgresql-42.2.17.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1004734 (981K) [application/java-archive]
Saving to: ‘postgresql-42.2.17.jar’


2022-12-08 23:48:02 (5.98 MB/s) - ‘postgresql-42.2.17.jar’ saved [1004734/1004734]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.17.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Ebook_Purchase_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)

In [5]:
from pyspark.sql.functions import to_date

In [6]:
# Create the customers_table DataFrame
customers_df = df.groupby("customer_id").agg({'customer_id':'count'}).withColumnRenamed("count(customer_id)", "customer_count")

In [7]:
# Create the products_table DataFrame and drop duplicates. 
products_df = df.select(['product_id','product_title']).drop_duplicates(['product_id'])

In [8]:
# Create the review_id_table DataFrame. 
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
review_id_df = df.select(['review_id','customer_id','product_id','product_parent', to_date("review_date", 'yyyy-MM-dd').alias("review_date")])

In [9]:
# Create the vine_table. DataFrame
vine_df = df.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase'])

In [10]:
vine_20 = vine_df.filter(vine_df.total_votes>=20)

In [11]:
vine_20.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2HQLKREFKG25D|          5|           21|         21|   N|                Y|
| RWIL1XU6YM0U4|          5|           20|         20|   N|                N|
|R28SB7Q1WBJRFM|          5|           25|         27|   N|                N|
|R3PVKAWMZW55U8|          5|          106|        119|   N|                Y|
|R21JS1HIQV1H7W|          3|           17|         22|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [12]:
vine_20_50 = vine_20.filter(vine_20.helpful_votes/vine_20.total_votes >= 0.5)

In [14]:
vine_20_50.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2HQLKREFKG25D|          5|           21|         21|   N|                Y|
| RWIL1XU6YM0U4|          5|           20|         20|   N|                N|
|R28SB7Q1WBJRFM|          5|           25|         27|   N|                N|
|R3PVKAWMZW55U8|          5|          106|        119|   N|                Y|
|R21JS1HIQV1H7W|          3|           17|         22|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [16]:
vine_20_50_Y = vine_20_50.filter(vine_20_50.vine=='Y')

In [17]:
vine_20_50_Y.show(5)

+---------+-----------+-------------+-----------+----+-----------------+
|review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+---------+-----------+-------------+-----------+----+-----------------+
+---------+-----------+-------------+-----------+----+-----------------+



In [18]:
vine_20_50_N = vine_20_50.filter(vine_20_50.vine=='N')

In [19]:
vine_20_50_N.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2HQLKREFKG25D|          5|           21|         21|   N|                Y|
| RWIL1XU6YM0U4|          5|           20|         20|   N|                N|
|R28SB7Q1WBJRFM|          5|           25|         27|   N|                N|
|R3PVKAWMZW55U8|          5|          106|        119|   N|                Y|
|R21JS1HIQV1H7W|          3|           17|         22|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [20]:
vine_star_paid = vine_20_50_Y.filter(vine_20_50_Y.star_rating==5)

In [21]:
vine_star_unpaid = vine_20_50_N.filter(vine_20_50_Y.star_rating==5)

In [22]:
vine_star_unpaid.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2HQLKREFKG25D|          5|           21|         21|   N|                Y|
| RWIL1XU6YM0U4|          5|           20|         20|   N|                N|
|R28SB7Q1WBJRFM|          5|           25|         27|   N|                N|
|R3PVKAWMZW55U8|          5|          106|        119|   N|                Y|
|R23P75H9VX5RLM|          5|           32|         36|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [25]:
total_review =vine_df.count()

In [27]:
print(total_review)

12520722


In [28]:
paid_5_start = vine_star_paid.count()
print(f"paid 5 start review count is {paid_5_start}")

paid 5 start review count is 0


In [29]:
unpaid_5_start = vine_star_unpaid.count()
print(f"unpaid 5 start review count is {unpaid_5_start}")

unpaid 5 start review count is 20918


In [30]:
print(vine_df.filter(vine_df.vine=='Y').count())

32


In [31]:
print(vine_df.filter(vine_df.vine=='N').count())

12520673
