In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.39)] [Co                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Connecting to security.ubuntu.com (91.189.91.39)] [Connecting to cloud.r-pr                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
                                                                               Hit:5 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
                                                                               Get:6 http://ppa.launchpad.net/deadsnakes/ppa/ub

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-05-16 16:56:56--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-05-16 16:56:57 (1.53 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigDataChallengeMusicalInst").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Musical_Instruments_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Musical_Instruments_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="mm/dd/yy")
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|   product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   45610553| RMDCHWD0Y5OZ9|B00HH62VB6|     618218723|AGPtek® 10 Isolat...|Musical Instruments|          3|            0|          1|   N|                N|         Three Stars|Works very good, ...| 2015-08-31|
|         US|   14640079| RZSL0BALIYUNU|B003LRN53I|     986692292|Sennheiser HD203 ...|Musical Instruments| 

In [5]:
rows=df.count()
rows


904765

In [6]:
# Drop duplicates and incomplete rows 
print(df.count())
df = df.dropna()
print(df.count())
df = df.dropDuplicates()
print(df.count())

904765
904663
904663


In [7]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [8]:
customers_df = df.groupby("customer_id").agg({"customer_id":"count"})\

customers_df.orderBy("count(customer_id)").show(truncate=False)

+-----------+------------------+
|customer_id|count(customer_id)|
+-----------+------------------+
|21793952   |1                 |
|7031078    |1                 |
|50147780   |1                 |
|13543136   |1                 |
|49659674   |1                 |
|32491208   |1                 |
|23256975   |1                 |
|45409008   |1                 |
|26995450   |1                 |
|2353351    |1                 |
|49932082   |1                 |
|44343789   |1                 |
|44570839   |1                 |
|47509250   |1                 |
|37764267   |1                 |
|9563903    |1                 |
|51133518   |1                 |
|52839998   |1                 |
|14140634   |1                 |
|17229741   |1                 |
+-----------+------------------+
only showing top 20 rows



In [9]:
from pyspark.sql.functions import desc
customers2_df = customers_df.withColumnRenamed("customer_id", "customer_id")\
                                     .withColumnRenamed("count(customer_id)", "customer_count")
customers2_df.orderBy(desc("customer_count")).show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   53037408|           577|
|   13431979|           161|
|   32772847|           157|
|   12337602|           143|
|   46042658|           132|
|    2429197|           132|
|   36979893|           131|
|   35156819|           123|
|   18810404|           123|
|   46796594|           119|
|   15509593|           114|
|   12677290|           113|
|   29902248|           109|
|   30700782|           108|
|   16671442|           101|
|   24639383|            97|
|   14897934|            95|
|   46953315|            94|
|   35648646|            91|
|   29830790|            88|
+-----------+--------------+
only showing top 20 rows



In [10]:
customers2_df.count()

573084

In [11]:
product_df = df.select(["product_id","product_title"])
product_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00B054NVG|Odyssey FZGSPIDDJ...|
|B004W4TYMA|BEHRINGER EUROPOW...|
|B00AM966CU|LEDJump Data Repe...|
|B006X735PQ|Snark SN-2 All In...|
|B00DY1F2CS|NEEWER Adjustable...|
|B00J0823FI|Hercules DG400BB ...|
|B0002H0FZ6|D'Addario Bronze ...|
|B005DBF0AG|Adjustable Artist...|
|B00558JGN4|gemini dj Firstmi...|
|B000VTMUQ2|Dunlop Tortex Sta...|
|B0002KZIV4|Fender Strat Knob...|
|B0016MNBAM|EMUSIC New Versio...|
|B001TYLS98|Black 3.5mm Male ...|
|B0002M6CVC|Ernie Ball slinky...|
|B00F20MIC6|Fretfunk® Acousti...|
|B0002IL3AQ|Django Reinhardt ...|
|B0014SP0GQ|Classic 'C' Flute...|
|B000AJIF4E|Sony MDR7506 Prof...|
|B001LTZZSG|Kala MK Makala Uk...|
|B004OK17QS|TC Electronic Hal...|
+----------+--------------------+
only showing top 20 rows



In [12]:
# Drop duplicates and incomplete rows 
print(product_df.count())
product_df = product_df.dropna()
print(product_df.count())
product_df = product_df.dropDuplicates()
print(product_df.count())

904663
904663
123318


In [13]:
# Configuration for RDS instance
mode="overwrite"
jdbc_url = "jdbc:postgresql://****.amazonaws.com:5432/bd_db2"
config = {"user":"postgres",
          "password": "****",
          "driver":"org.postgresql.Driver"}

In [14]:
customers2_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [15]:
# Write DataFrame to table
product_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [16]:
# Select the column headers in the review_id_table schema
from pyspark.sql.functions import to_date

to_date("review_date", 'yyyy-MM-dd').alias()

review_df = df.select(["review_id", "customer_id", "product_id", "product_parent", to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R100LMJJNWFDJK|   12078619|B00B054NVG|     963648561| 2014-10-24|
|R10106OQ9QP86T|   43421823|B004W4TYMA|     461512942| 2015-03-30|
|R101ER44F5J3EK|   49269789|B00AM966CU|     172551627| 2015-02-04|
|R1036AJM4DUM64|   17911630|B006X735PQ|     627151806| 2013-04-03|
|R103CWY3MPPRYB|   31063425|B00DY1F2CS|     554396934| 2015-08-02|
|R104VHJ7WLB4PP|    7146158|B00J0823FI|     647000350| 2015-06-11|
|R105A5ZRV1UHT1|   15499595|B0002H0FZ6|     497097576| 2014-10-18|
|R105RU48J2VL19|   17984606|B005DBF0AG|     990545730| 2015-06-15|
|R106LKOCWLJKP1|   14027720|B00558JGN4|     463070117| 2013-02-13|
|R106NRR8JSGUID|   20032109|B000VTMUQ2|     477030445| 2014-07-05|
|R106V7A3TQ944V|   41138518|B0002KZIV4|     741135212| 2012-12-10|
|R109H6GZEOXX1S|   25494035|B0016MNBAM|     288892337| 2011-09

In [17]:
review_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)