# Goal

The goal of this notebook is to explore the cloud-based ETL process using the following tools:  
*  A large dataset is obtained from the Chicago Data portal of all the 2019 rideshare trips.  
*  Google Colab pySpark is used to extract and transform the data into a more manageable sized dataframe.  
*  At the end of the process, determination of whether the dataset needs added to Amazon RDS will occur.  

# ETL Steps

1.  Data can be found on the [Chicago Data Portal](https://data.cityofchicago.org/Transportation/Transportation-Network-Providers-Trips-2019/iu3g-qa69)
1.  In Google Drive, create a Jupyter notebook.  
1.  Using largely boilerplate code, setup the PySpark environment
1.  The data portal has an api that can be used directly with pySpark. 
1.  Import in the dataset
1.  Identify any interesting features about the dataset

# Questions to answer
1.  Given dates and times of weather storms, does the frequency of rides go up?  Does the cost go up?  Does the frequency of shared rides go up?  Do the drivers make more money with shared rides?  
1.  Given dates and times and locations of events, do the number of rides within proximity change?  When do the frequency changes occur and at what distance does it become neglibible?  
1.  Can these questions be discovered from this dataset or does a webscrape or api of active trips need to be obtained to have high precision data?
1.  Do electric cars or combustion engine cars make better rideshare vehicles?
1.  Where are gas stations and charging stations relative to common pickup/dropoff locations?
1.  How do bus and train line locations effect rideshare frequencies?
1.  How have rideshare changed with COVID?  Have pooled rides less frequent?
1.  How much gas is used by the fleet?  How much electricity?  What effect does the change in energy usage have on gas and electricity?


# Other factors to study:
1.  Does gas price effect rideshare cost?
1.  Does weather effect rideshare frequency/cost?
1.  Do special events effect ridesharer frequency/cost?
1.  Are there common pickup/dropoff locations?
1.  How many rides begin and end in the same district?
1.  How does traffic effect rideshare prices?

# Visualize:
1.  What does an hour of ride share rides look like?


Ref:  
*  https://dev.socrata.com/foundry/data.cityofchicago.org/iu3g-qa69  




In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

# Environment Setup and Dependencies

In [None]:
# Import AWS RDS configuration
from google.colab import files
src = list(files.upload().values())[0]
open('config.py','wb').write(src)
from config import username, password, rds_url

Saving config.py to config (1).py


In [1]:
# Dependencies
import os

# set spark version
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark


# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/88.7 kB 16%] [Connected to cloud.                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease 57.6 kB/88.7 kB 65%] [Connected to cloud.r-project.org (18.160.                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
                                                                               Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [3 InRelease 18.5 kB/88.7 kB 21%] [1 InRelease 57.6 kB/88.7 kB 65%] [Waiting0% [2 InRele

In [2]:
# setup pyspark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("rideshareAnalysis").getOrCreate()

sc=spark.sparkContext
  

# Extract 
*  Create connectiont to S3 bucket and file

In [3]:
import requests
import pandas as pd

In [27]:
# total number of rows

url = "https://data.cityofchicago.org/resource/iu3g-qa69.json"
response = requests.get(f'{url}?$select=count(*)').json()
row_total = int(response[0]['count'])
row_total


111850744

In [33]:
import numpy as np

{ "trip_id": np.unicode_,
  "trip_start_timestamp": np.datetime64,
   "trip_end_timestamp": np.datetime64,
   "trip_seconds": np.int32,
   "trip_miles": np.float32,
   "pickup_census_tract": np.int64,
   "dropoff_census_tract": np.int64,
   "pickup_community_area": np.int64,
   "dropoff_community_area": np.int64,
   "fare": np.float32,
   "tip": np.float32,
   "additional_charges": np.float32,
   "trip_total": np.float32,
   "shared_trip_authorized": np.unicode_, 
   "trips_pooled": np.bool,
   "pickup_centroid_latitude": np.float32,
   "pickup_centroid_longitude": np.float32,
   "pickup_centroid_location": np.unicode_,
   "dropoff_centroid_latitude": np.float32 ,
   "dropoff_centroid_longitude": np.float32,
   "dropoff_centroid_location": np.unicode_}


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations


{'additional_charges': numpy.float32,
 'dropoff_census_tract': numpy.int64,
 'dropoff_centroid_latitude': numpy.float32,
 'dropoff_centroid_location': numpy.str_,
 'dropoff_centroid_longitude': numpy.float32,
 'dropoff_community_area': numpy.int64,
 'fare': numpy.float32,
 'pickup_census_tract': numpy.int64,
 'pickup_centroid_latitude': numpy.float32,
 'pickup_centroid_location': numpy.str_,
 'pickup_centroid_longitude': numpy.float32,
 'pickup_community_area': numpy.int64,
 'shared_trip_authorized': numpy.str_,
 'tip': numpy.float32,
 'trip_end_timestamp': numpy.datetime64,
 'trip_id': numpy.str_,
 'trip_miles': numpy.float32,
 'trip_seconds': numpy.int32,
 'trip_start_timestamp': numpy.datetime64,
 'trip_total': numpy.float32,
 'trips_pooled': bool}

In [28]:
header = {
    'Content-Type':'application/json',
    'X-App-Token': 'xdlmC6aKIr3S3xMCDsxKlRiWo'
}

from pyspark.sql.types import StructType, StructField, \
  StringType, TimestampType, IntegerType, FloatType

schema = StructType([
   StructField("trip_id", StringType(), True),
   StructField("trip_start_timestamp", StringType(), True),
   StructField("trip_end_timestamp", StringType(), True),
   StructField("trip_seconds", StringType(), True),
   StructField("trip_miles", StringType(), True),
   StructField("pickup_census_tract", StringType(), True),
   StructField("dropoff_census_tract", StringType(), True),
   StructField("pickup_community_area", StringType(), True),
   StructField("dropoff_community_area", StringType(), True),
   StructField("fare", StringType(), True),
   StructField("tip", StringType(), True),
   StructField("additional_charges", StringType(), True),
   StructField("trip_total", StringType(), True),
   StructField("shared_trip_authorized", StringType(), True),
   StructField("trips_pooled", StringType(), True),
   StructField("pickup_centroid_latitude", StringType(), True),
   StructField("pickup_centroid_longitude", StringType(), True),
   StructField("pickup_centroid_location", StringType(), True),
   StructField("dropoff_centroid_latitude", StringType(), True),
   StructField("dropoff_centroid_longitude", StringType(), True),
   StructField("dropoff_centroid_location", StringType(), True)
   ])

In [29]:
# 

token = 'xdlmC6aKIr3S3xMCDsxKlRiWo'
limit = 100000

more_data = True
# row_total = row_total

for row in range(0, row_total, limit):

  records_remaining = row_total - row
  print(f"Records remaining: {records_remaining}")

  if  records_remaining < limit:
    limit = records_remaining
    response = requests.get(f'https://data.cityofchicago.org/resource/iu3g-qa69.json?$limit={limit}&$offset={row}', headers=header).json()
    total_records = records_remaining
    print("Last record capture")

  else:
    response = requests.get(f'https://data.cityofchicago.org/resource/iu3g-qa69.json?$limit={limit}&$offset={row}', headers=header).json()
    
  
  if row == 0:
    df = spark.createDataFrame(response, schema)
  else:
    df_new = spark.createDataFrame(response, schema)
    df = df.union(df_new)
    print(f"Union complete")
  # from google.colab import files
  # df.write.option("compression","gzip").csv("rideshare_data.csv")
  # files.download('rideshare_data.csv.gz')


Records remaining: 111850744
Records remaining: 111750744
Union complete
Records remaining: 111650744
Union complete
Records remaining: 111550744
Union complete
Records remaining: 111450744
Union complete
Records remaining: 111350744
Union complete
Records remaining: 111250744
Union complete
Records remaining: 111150744
Union complete
Records remaining: 111050744
Union complete
Records remaining: 110950744
Union complete
Records remaining: 110850744
Union complete
Records remaining: 110750744
Union complete
Records remaining: 110650744
Union complete
Records remaining: 110550744
Union complete
Records remaining: 110450744
Union complete
Records remaining: 110350744
Union complete
Records remaining: 110250744
Union complete
Records remaining: 110150744
Union complete
Records remaining: 110050744
Union complete
Records remaining: 109950744
Union complete
Records remaining: 109850744
Union complete
Records remaining: 109750744
Union complete
Records remaining: 109650744
Union complete
Rec

Py4JJavaError: ignored

In [26]:
df.repartition(1)\
  .write.format("com.databricks.spark.csv")\
  .option("header", "true")\
  .option("compression","gzip")\
  .save("/content/summary.csv")

AnalysisException: ignored

In [22]:
from google.colab import files
files.download('/content/summary.csv/part-000*')

FileNotFoundError: ignored

In [21]:
print(df.count())

200100


In [None]:
# add files to pyspark
from pyspark import SparkFiles

# Load file
#----------------------
# date source url
# https://data.cityofchicago.org/Transportation/Transportation-Network-Providers-Trips-2019/iu3g-qa69 

# api url
url = "https://data.cityofchicago.org/api/views/iu3g-qa69/rows.csv?accessType=DOWNLOAD" 
filename = "iu3g-qa69.csv"
spark.sparkContext.addFile(url)

# read file
df = spark.read.csv(SparkFiles.get(filename), header=True, inferSchema=True, sep=',', timestampFormat="mm/dd/yyyy h:mm:ss a")
df.show(10)

+--------------------+--------------------+--------------------+------------+----------------+-------------------+--------------------+---------------------+----------------------+----+---+------------------+----------+----------------------+------------+------------------------+-------------------------+------------------------+-------------------------+--------------------------+-------------------------+
|             trip_id|trip_start_timestamp|  trip_end_timestamp|trip_seconds|      trip_miles|pickup_census_tract|dropoff_census_tract|pickup_community_area|dropoff_community_area|fare|tip|additional_charges|trip_total|shared_trip_authorized|trips_pooled|pickup_centroid_latitude|pickup_centroid_longitude|pickup_centroid_location|dropoff_centroid_latitude|dropoff_centroid_longitude|dropoff_centroid_location|
+--------------------+--------------------+--------------------+------------+----------------+-------------------+--------------------+---------------------+---------------------

In [None]:
from google.colab import files
df.write.option("compression","gzip").csv("rideshare_data.csv")
files.download('rideshare_data.csv.gz')

NameError: ignored

In [None]:
df.select('trip_end_timestamp').show(truncate=False)

+-----------------------+
|trip_end_timestamp     |
+-----------------------+
|2019-01-01T00:30:00.000|
|2019-01-01T00:15:00.000|
|2019-01-01T00:30:00.000|
|2019-01-01T00:15:00.000|
|2019-01-01T00:15:00.000|
|2019-01-01T00:15:00.000|
|2019-01-01T00:00:00.000|
|2019-01-01T00:30:00.000|
|2019-01-01T00:30:00.000|
|2019-01-01T00:30:00.000|
|2019-01-01T00:00:00.000|
|2019-01-01T00:15:00.000|
|2019-01-01T00:30:00.000|
|2019-01-01T00:30:00.000|
|2019-01-01T00:30:00.000|
|2019-01-01T00:15:00.000|
|2019-01-01T00:15:00.000|
|2019-01-01T00:15:00.000|
|2019-01-01T00:15:00.000|
|2019-01-01T00:15:00.000|
+-----------------------+
only showing top 20 rows



# Transform
*  Remove bad and duplicated records
*  Check number of records left
*  Convert column datatypes if needed
*  Create table data
*  Normalize data

In [None]:
# size of dataframe (rows)
print(df.count())


1000


In [None]:
# drop incomplete records
df = df.dropna()
print(df.count())

4056518


In [None]:
# drop duplicated records (if any; should be none)
df = df.dropDuplicates()
print(df.count())

4056518


In [None]:
# check datatypes
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [None]:
# create table based on schema
review_id_table = df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_table.show(5)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1005KN8L3OP23|   51950426|B00COTH9VI|     956367867| 2015-04-07|
|R1008R0427X1FG|   42507369|B009KHHELW|      41559476| 2014-05-28|
|R100AJRT6FE05K|    2458036|B0048ZXXIO|     814772102| 2014-06-27|
|R100AOYGH18ZXK|   23459444|B00GBDWZDU|     936264488| 2015-05-20|
|R100BC7LPZKRNN|   38247406|B007SPQZMC|     192466294| 2013-03-13|
+--------------+-----------+----------+--------------+-----------+
only showing top 5 rows



In [None]:
# convert review-date to date format
from pyspark.sql.functions import to_date, col
review_id_table = review_id_table.withColumn("review_date", to_date(col("review_date"),"yyyy-MM-dd").alias("review_date"))

In [None]:
# check change
review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [None]:
# create table based on schema
products_table = df.select(["product_id", "product_title"])
products_table.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00COTH9VI|Seeking Asian Female|
|B009KHHELW|Duck Dynasty Seas...|
|B0048ZXXIO|Team Umizoomi Sea...|
|B00GBDWZDU|     Christmas Crush|
|B007SPQZMC|Downton Abbey Sea...|
+----------+--------------------+
only showing top 5 rows



In [None]:
# reference table
# entries should be unique
products_table = products_table.select('product_id', 'product_title').distinct()
products_table.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B006MYGL8S|   Deadwood Season 1|
|B005LLSZNM|Sons Of Anarchy S...|
|B00MQOFWLK|  Too Young The Hero|
|B00MQOXI8Y|   The Expendables 3|
|B009OQWQCQ|Absolutely Fabulo...|
+----------+--------------------+
only showing top 5 rows



In [None]:
# create table based on schema
vine_table = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_table.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1005KN8L3OP23|          5|            1|          1|   N|
|R1008R0427X1FG|          4|            0|          0|   N|
|R100AJRT6FE05K|          5|            0|          0|   N|
|R100AOYGH18ZXK|          3|            0|          0|   N|
|R100BC7LPZKRNN|          5|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [None]:
# create table based on schema
customer_count = df.select('customer_id').groupby('customer_id').count()
customer_count.show(5)

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   36771518|    5|
|   23006345|    5|
|    8899358|    1|
|   45518338|    7|
|   31452416|    1|
+-----------+-----+
only showing top 5 rows



In [None]:
# rename column
customer_count_table = customer_count.withColumnRenamed("count","customer_count")

In [None]:
# check changes
customer_count_table.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: long (nullable = false)



# Load
*  Connect to AWS RDS
*  Insert dateframes into AWS PostgreSQL


In [None]:
# Configure settings for RDS
# use imported variables from config.py
mode = "append"
jdbc_url=f"jdbc:postgresql://{rds_url}"
config = {"user": username, 
          "password": password, 
          "driver":"org.postgresql.Driver"}

In [None]:
# Write DataFrame to review_id_table in RDS

review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [None]:
# Write dataframe to products table in RDS

products_table.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [None]:
# Write dataframe to payment_info table in RDS

customer_count_table.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [None]:
# Write dataframe to vine_table in RDS

vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)