In [2]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()
import pandas as pd

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connected to cloud.r-pr0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:10 http://archive.ubuntu

In [3]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-06-05 20:55:38--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-06-05 20:55:38 (6.42 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [30]:
from pyspark import SparkFiles
# Load in whole_collection from S3 into a DataFrame
url = "https://my-team3-tmfinalproject2022.s3.amazonaws.com/Whole_Collection.csv"
spark.sparkContext.addFile(url)

whole_collection_df = spark.read.option('header', 'true').csv(SparkFiles.get("Whole_Collection.csv"), inferSchema=True, sep=',',timestampFormat="yyyy-mm-dd hh mm ss")
whole_collection_df.show(10)

+-------------------+-----------+------------------+------------+--------+--------------------+------------------+------------+-------------+------------+-----------+-------------+------------------+-------------------+------------------+-----------------+-------------------+--------------+--------+------------+-----------+----------+-----------+------+------+------+------+------+------+------+------+
|       date_created|customer_ID|external_reference|operation_id|  status|       status_detail|transaction_amount|installments| payment_type|hour_created|DOW_created|month_created|claim_date_created|claim_status_detail|claim_hour_created|claim_DOW_created|claim_month_created|   shp_Carrier|Shipping|bill_zipcode|shp_zipcode|fraud_flag|     Item_1|Item_2|Item_3|Item_4|Item_5|Item_6|Item_7|Item_8|Item_9|
+-------------------+-----------+------------------+------------+--------+--------------------+------------------+------------+-------------+------------+-----------+-------------+----------

In [35]:
#Casting date_created column into a timestamp column
from pyspark.sql.functions import *
whole_collection_df=whole_collection_df.withColumn("date_created",to_timestamp("date_created"))

In [22]:
# Load in CPs_Geometry from S3 into a DataFrame
url = "https://my-team3-tmfinalproject2022.s3.amazonaws.com/CPs_Geometry.csv"
spark.sparkContext.addFile(url)

cps_geometry_df = spark.read.option('header', 'true').csv(SparkFiles.get("CPs_Geometry.csv"), inferSchema=True, sep=',')
cps_geometry_df.show(10)

+-----------+------+------------+-----------------+----------------+
|postal_code| state|municipality|        longitude|        latitude|
+-----------+------+------------+-----------------+----------------+
|      28089|Colima|      Colima|-103.734926980881|19.2228453654363|
|      28656|Colima|      Colima|-103.565674051469|18.9133416441602|
|      28000|Colima|      Colima|-103.725402479529|19.2419258678293|
|      28010|Colima|      Colima|-103.714608687286|19.2501391841229|
|      28600|Colima|      Colima|-103.686323332837|19.1369780674865|
|      28078|Colima|      Colima|-103.751415170745|19.2349653118884|
|      28635|Colima|      Colima|-103.595117083654|19.1957429869816|
|      28067|Colima|      Colima|-103.706534032834|19.2330755727729|
|      28080|Colima|      Colima|-103.731627465412|19.2271745326631|
|      28634|Colima|      Colima|-103.578108549626|19.1748737080013|
+-----------+------+------------+-----------------+----------------+
only showing top 10 rows



In [36]:
#Renaming columns before joins of dataframes
cps_geometry_df=cps_geometry_df.withColumnRenamed('postal_code','shp_zipcode')

In [37]:
#Joining both dataframes
whole_collection_geom_df=whole_collection_df.join(cps_geometry_df, on='shp_zipcode', how='inner')
whole_collection_geom_df.show()

+-----------+-------------------+-----------+------------------+------------+--------+--------------------+------------------+------------+-------------+------------+-----------+-------------+------------------+-------------------+------------------+-----------------+-------------------+--------------+--------+------------+----------+-------------------+------+------+------+------+------+------+------+------+--------------------+--------------------+-----------------+----------------+
|shp_zipcode|       date_created|customer_ID|external_reference|operation_id|  status|       status_detail|transaction_amount|installments| payment_type|hour_created|DOW_created|month_created|claim_date_created|claim_status_detail|claim_hour_created|claim_DOW_created|claim_month_created|   shp_Carrier|Shipping|bill_zipcode|fraud_flag|             Item_1|Item_2|Item_3|Item_4|Item_5|Item_6|Item_7|Item_8|Item_9|               state|        municipality|        longitude|        latitude|
+-----------+-------

In [53]:
# Droping nan columns
whole_collection_geom_df=whole_collection_geom_df.dropna(how='any', subset='municipality')

In [61]:
# Configure setting for RDS
password=input()
mode = 'append'
jdbc_url='jdbc:postgresql://mypostgresfinalprojectdb.c83cfcogcthm.us-east-2.rds.amazonaws.com:5432/my_data_finalproject'
config ={'user':"root",
         "password":f'{password}',
         "driver":"org.postgresql.Driver"}

postgresfp


In [62]:
# Write Dataframe to whole_collection_geom table in RDS
whole_collection_geom_df.write.jdbc(url=jdbc_url, table='Whole_Collection_Geom',mode=mode, properties=config)