In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 14.2 kB/88.                                                                               Hit:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [Waiting for headers] [1 InRelease 75.0 kB/88.7 kB 85%] [Connected to cloud.                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [4 InRelease 15.6 kB/88.7 kB 18%] [1 InRelease 75.0 kB/88.7 kB 85%] [Connect                                  

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-05-22 05:03:52--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.3’


2022-05-22 05:03:54 (1.54 MB/s) - ‘postgresql-42.2.9.jar.3’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
# Load in employee.csv from S3 into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Major_Appliances_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Major_Appliances_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="mm/dd/yy")
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   16199106|R203HPW78Z7N4K|B0067WNSZY|     633038551|FGGF3032MW Galler...|Major Appliances|          5|            0|          0|   N|                Y|If you need a new...|What a great stov...| 2015-08-31|
|         US|   16374060|R2EAIGVLEALSP3|B002QSXK60|     811766671|Best Hand Clothes...|Major Appliances|          5|    

## Drop duplicates and incomplete rows

In [5]:
print(df.count())
df = df.dropna()
print(df.count())
df = df.dropDuplicates()
print(df.count())

96901
96888
96888


## Examine the schema

In [6]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [21]:

unpaid_df=df.where("vine == 'N'")




In [22]:
unpaid_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   26514959|R354ZG794Q5KJQ|B00E66SE2U|     199871830|Part # 3390719 - ...|Major Appliances|          3|            0|          0|   N|                Y|         Three Stars|not an oem part b...| 2015-08-30|
|         US|   48250444| RNZ36HB1YSQ6G|B00SNKU7VQ|     197365398|Whirlpool W102953...|Major Appliances|          5|    

In [27]:
print(unpaid_df.count())

96640


In [23]:
star5unpaid_df=unpaid_df.where("star_rating == '5'")

In [31]:
star1unpaid_df=unpaid_df.where("star_rating == '1'")

In [32]:
star1unpaid_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    3420426| R2MHXTICGUDVE|B00NN166DS|     546911168|GE GFE28HSHSS 27....|Major Appliances|          1|            3|          3|   N|                N|Worst customer se...|Worst customer se...| 2015-08-29|
|         US|   13016134|R1SUJJFCOKC7HI|B00GXHRC6E|     348942012|LG H/E Super Capa...|Major Appliances|          1|    

In [33]:
print(star1unpaid_df.count())

19835


In [24]:
star5unpaid_df.show()


+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   48250444| RNZ36HB1YSQ6G|B00SNKU7VQ|     197365398|Whirlpool W102953...|Major Appliances|          5|            1|          1|   N|                Y|          Five Stars|        Works great!| 2015-08-29|
|         US|     539114|R172DDSWTL0P9K|B00AW04ZMM|     962476921|Panda Small Compa...|Major Appliances|          5|    

In [26]:
print(star5unpaid_df.count())

49588


In [10]:
paid_df=df.where("vine == 'Y'")

In [11]:
print(paid_df.count())



248


In [12]:
paid_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   27249597|R348AGJRTJ4RGR|B00LUGCU0G|     596157190|Whynter BR-091WS ...|Major Appliances|          4|            4|          5|   Y|                N|A sleek looking b...|This is a neat li...| 2014-10-28|
|         US|   42644737|R36LKAXN60PHW1|B00M7GMEYK|     350114849|             Whynter|Major Appliances|          5|    

In [28]:
star5paid_df=paid_df.where("star_rating == '5'")

In [29]:
star5paid_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   42644737|R36LKAXN60PHW1|B00M7GMEYK|     350114849|             Whynter|Major Appliances|          5|            2|          3|   Y|                N|Perfect Size For ...|This freezer is p...| 2014-10-12|
|         US|   52801489|R1B3OGO36CJKDA|B005XAM44W|     920442298|NewAir AWC-270E 2...|Major Appliances|          5|    

In [30]:
print(star5paid_df.count())

112


In [35]:
star1paid_df=paid_df.where("star_rating == '1'")

In [38]:
star1paid_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   30092914|R28B5FVG088F11|B00J23C5PO|     201017666|Coca-Cola Soccer ...|Major Appliances|          1|            0|          0|   Y|                N|Wow- really? Who ...|Other than if you...| 2014-06-17|
|         US|   24685789|R36XE8CKUMYXN2|B008RL4OCY|     726642495|NewAir AI-215SS P...|Major Appliances|          1|    

In [36]:
print(star1paid_df.count())

3


## Rename columns

## Create a new DataFrame for employee passwords

## Write DataFrame to RDS