In [1]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-2.4.6'
spark_version = 'spark-2.4.7'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

/bin/sh: apt-get: command not found
/bin/sh: apt-get: command not found
/bin/sh: wget: command not found
tar: Error opening archive: Failed to open 'spark-2.4.7-bin-hadoop2.7.tgz'


Exception: Unable to find py4j, your SPARK_HOME may not be configured correctly

In [29]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-10-11 00:24:02--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.4’


2020-10-11 00:24:02 (4.96 MB/s) - ‘postgresql-42.2.9.jar.4’ saved [914037/914037]



In [30]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [31]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://laurentvh-kickstarter.s3.us-east-2.amazonaws.com/latest_data.csv"
spark.sparkContext.addFile(url)
kick_df = spark.read.csv(SparkFiles.get("latest_data.csv"), sep=",", header=True, inferSchema=True)

# Show DataFrame
kick_df.show()

+----------+-------------+------------+------------------------+-------------------+-------------------+-------+-------------------+-------+----------+----------+------------------+------------+------------+------------------+------------------+----------+------------+------------+--------------+-----------+-------------+
|Unnamed: 0|backers_count|    category|country_displayable_name|         created_at|           deadline|   goal|        launched_at|pledged|staff_pick|     state|       usd_pledged|sub_category|blurb_length|launch_to_deadline|creation_to_launch|launch_day|deadline_day|launch_month|deadline_month|launch_time|deadline_time|
+----------+-------------+------------+------------------------+-------------------+-------------------+-------+-------------------+-------+----------+----------+------------------+------------+------------+------------------+------------------+----------+------------+------------+--------------+-----------+-------------+
|         0|            3|  

In [32]:
kick_df.dtypes

[('Unnamed: 0', 'int'),
 ('backers_count', 'int'),
 ('category', 'string'),
 ('country_displayable_name', 'string'),
 ('created_at', 'timestamp'),
 ('deadline', 'timestamp'),
 ('goal', 'double'),
 ('launched_at', 'timestamp'),
 ('pledged', 'double'),
 ('staff_pick', 'boolean'),
 ('state', 'string'),
 ('usd_pledged', 'double'),
 ('sub_category', 'string'),
 ('blurb_length', 'double'),
 ('launch_to_deadline', 'int'),
 ('creation_to_launch', 'int'),
 ('launch_day', 'string'),
 ('deadline_day', 'string'),
 ('launch_month', 'string'),
 ('deadline_month', 'string'),
 ('launch_time', 'string'),
 ('deadline_time', 'string')]

In [33]:
# Split into two tables
kick_launch_df = kick_df.select(["Unnamed: 0","country_displayable_name", "launched_at", "launch_to_deadline", "creation_to_launch","launch_day","deadline_day","launch_month","deadline_month","launch_time","deadline_time"])
kick_launch_df.show()

+------------------------+-------------------+------------------+------------------+----------+------------+------------+--------------+-----------+-------------+
|country_displayable_name|        launched_at|launch_to_deadline|creation_to_launch|launch_day|deadline_day|launch_month|deadline_month|launch_time|deadline_time|
+------------------------+-------------------+------------------+------------------+----------+------------+------------+--------------+-----------+-------------+
|       the United States|2014-09-02 15:44:07|                30|                 3|   Tuesday|    Thursday|   September|       October|    2pm-4pm|      2pm-4pm|
|      the United Kingdom|2015-02-18 21:59:26|                29|                 0| Wednesday|      Friday|    February|         March|   8pm-10pm|     8pm-10pm|
|       the United States|2018-07-28 13:18:38|                30|                 0|  Saturday|      Monday|        July|        August|   12pm-2pm|     12pm-2pm|
|      the United King

In [34]:
kick_pledge_df = kick_df.select(["Unnamed: 0","backers_count", "category", "created_at","deadline","goal","pledged","staff_pick","state","usd_pledged","sub_category","blurb_length"])
kick_pledge_df.show()

+------------------------+-------------+------------+-------------------+-------------------+-------+-------+----------+----------+------------------+------------+------------+
|country_displayable_name|backers_count|    category|         created_at|           deadline|   goal|pledged|staff_pick|     state|       usd_pledged|sub_category|blurb_length|
+------------------------+-------------+------------+-------------------+-------------------+-------+-------+----------+----------+------------------+------------+------------+
|       the United States|            3|        food|2014-08-30 01:10:21|2014-10-02 15:44:07|  400.0|   15.0|     false|    failed|              15.0| small batch|        28.0|
|      the United Kingdom|            6|  technology|2015-02-18 17:15:56|2015-03-20 20:59:26|  200.0|   28.0|     false|    failed|       43.04176128|    software|        22.0|
|       the United States|           90| photography|2018-07-28 02:07:21|2018-08-27 13:18:38| 2500.0| 4350.0|     f

In [35]:
# Steps to get our transformed raw data into RDS
# Configure settings for RDS
mode = "append"
jdbc_url= "jdbc:postgresql://kickstarter.cp9rrwk96at4.us-east-1.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres",
          "password": "Laurent123!",
          "driver":"org.postgresql.Driver"}

In [40]:
# Write DataFrame to active_user table in RDS
kick_pledge_df.write.jdbc(url=jdbc_url, table='kickstarter_pledge', mode=mode, properties=config)

In [41]:
kick_launch_df.write.jdbc(url=jdbc_url, table='kickstarter_launch', mode=mode, properties=config)