<a href="https://colab.research.google.com/github/Niraj-Khatri/Pyspark-AWS/blob/master/Electronics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup Spark and Import Dataset

In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [1 InRelease 14.2 kB/88.7                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [1 InRelease 28.7 kB/88.70% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://ppa.launchpad.net/

In [None]:
# Connect to Postgres
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-09-02 19:01:27--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.2’


2021-09-02 19:01:27 (10.4 MB/s) - ‘postgresql-42.2.9.jar.2’ saved [914037/914037]



In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AmazonReviews").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# Import Amazon Data File From AWS


In [None]:
# Import struct fields
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, TimestampType

## Electronics Data

In [23]:
# Read in data from url
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Electronics_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
data = spark.read.csv(SparkFiles.get("amazon_reviews_us_Electronics_v1_00.tsv.gz"), sep='\t', header=True)

# Clean the data
data = data.withColumn('review_date', data['review_date'].cast(TimestampType()))
data = data.withColumn('customer_id', data['customer_id'].cast(IntegerType()))
data = data.withColumn('product_parent', data['product_parent'].cast(IntegerType()))
data = data.withColumn('star_rating', data['star_rating'].cast(IntegerType()))
data = data.withColumn('helpful_votes', data['helpful_votes'].cast(IntegerType()))
data = data.withColumn('total_votes', data['total_votes'].cast(IntegerType()))
electronics_df = data.dropna()


## Video Game Data

In [24]:
# Read in data from url 
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
data = spark.read.csv(SparkFiles.get("amazon_reviews_us_Video_Games_v1_00.tsv.gz"), sep='\t', header=True)

# Clean the data
data = data.withColumn('review_date', data['review_date'].cast(TimestampType()))
data = data.withColumn('customer_id', data['customer_id'].cast(IntegerType()))
data = data.withColumn('product_parent', data['product_parent'].cast(IntegerType()))
data = data.withColumn('star_rating', data['star_rating'].cast(IntegerType()))
data = data.withColumn('helpful_votes', data['helpful_votes'].cast(IntegerType()))
data = data.withColumn('total_votes', data['total_votes'].cast(IntegerType()))
videogames_df = data.dropna()

# Create Tables

## Customers Table

In [35]:
# Define datasets to merge
electronics_customers = electronics_df.select("customer_id")
videogames_customers = videogames_df.select("customer_id")

# Merge
customers = electronics_customers.union(videogames_customers)

# Create final table
customers = df.groupBy("customer_id").count()
customers = customers.withColumnRenamed('count','customer_count')
customers = customers.withColumn('customer_count', customers['customer_count'].cast(IntegerType()))

## Products Table


In [45]:
# Define datasets to merge
electronics_products = electronics_df.select("product_id", "product_title")
videogames_products = videogames_df.select("product_id", "product_title")

# Merge
products = electronics_products.union(videogames_products)

# Create final table
products = df.groupby("product_id", "product_title").count().select("product_id", "product_title")

## Reviews Table


In [46]:
# Define datasets to merge
electronics_reviews = electronics_df.select("review_id", "customer_id", "product_id", "product_parent", "review_date")
videogames_reviews = videogames_df.select("review_id", "customer_id", "product_id", "product_parent", "review_date")

# Merge
reviews = electronics_reviews.union(videogames_reviews)

# Create final table
reviews = df.groupby("review_id", "customer_id", "product_id", "product_parent", "review_date").count().select("review_id", "customer_id", "product_id", "product_parent", "review_date")

## Vines Table

In [47]:
# Define datasets to merge
electronics_vines = electronics_df.select("review_id", "star_rating", "helpful_votes", "total_votes", "vine")
videogames_vines = videogames_df.select("review_id", "star_rating", "helpful_votes", "total_votes", "vine")

# Merge
vines = electronics_vines.union(videogames_vines)

# The review number, rating, helpful votes, total votes, and vine 
vines = vines.select("review_id", "star_rating", "helpful_votes", "total_votes", "vine")

# Connect To RDS

In [48]:
# Configure settings for RDS
mode = "overwrite"
jdbc_url="jdbc:postgresql://amazonreviews.c5bki1uyc90h.us-east-2.rds.amazonaws.com:5432/AmazonReviews"
config = {"user":"postgres", 
          "password": "Goldensun123!", 
          "driver":"org.postgresql.Driver"}

In [49]:
# Write tables to postgres in RDS
customers.write.jdbc(url=jdbc_url, table = "customers", mode=mode, properties=config)
products.write.jdbc(url=jdbc_url, table = "products", mode=mode, properties=config)
reviews.write.jdbc(url=jdbc_url, table = "review_id_table", mode=mode, properties=config)
vines.write.jdbc(url=jdbc_url, table = "vine_table", mode=mode, properties=config)
