In [43]:
import os
import pyspark
import requests
from pyspark.sql import SparkSession, types
from pyspark.sql.functions import col, year, month, dayofmonth
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import re
from datetime import date, datetime, timedelta

In [3]:
# launch a Spark Session with google config

# establish home directory
home_dir = os.path.expanduser('~')

# specify path to google cloud storage connector jar
gcs_connector_jar = os.path.join(home_dir, "spark", "jars", "gcs-connector-hadoop3-2.2.22.jar")

# define the path to the service account JSON key file
credentials_location = os.path.join(home_dir, '.google', 'service_acct_creds.json')

# set up configuration
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", gcs_connector_jar) \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [4]:
# create context
sc = SparkContext(conf=conf)
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

24/05/08 06:48:57 WARN Utils: Your hostname, thinkpad resolves to a loopback address: 127.0.1.1; using 192.168.1.162 instead (on interface wlp3s0)
24/05/08 06:48:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/08 06:48:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
# create spark session
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [16]:
# # test writing local CSV to GCS parquet - success
# zones_path = os.path.join(home_dir, 'taxi_zone_lookup.csv')
# taxi_zone_lookup = spark.read.csv(zones_path, header=True)
# taxi_zone_lookup.write.parquet('gs://ny_taxi_storage_413811/zones.parquet')

In [22]:
# did not work!
# test = spark.read.parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2020-01.parquet')


In [23]:
# csv_test = requests.get('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2020-01.parquet')

In [1]:
# import pandas as pd
# check = pd.read_parquet('green_tripdata_2020-01.parquet')
# pd.concat([check.head(), check.tail()])

In [6]:
# read parquet file from TLC site
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2020-01.parquet'
response = requests.get(url, stream=True)

with open("green_tripdata_2020-01.parquet", mode="wb") as file:
    for chunk in response.iter_content(chunk_size=10 * 1024):
        file.write(chunk)

In [7]:
green_schema = types.StructType([
    types.StructField('VendorID', types.LongType(), True),
    types.StructField('lpep_pickup_datetime', types.TimestampNTZType(), True),
    types.StructField('lpep_dropoff_datetime', types.TimestampNTZType(), True),
    types.StructField('store_and_fwd_flag', types.StringType(), True),
    types.StructField('RatecodeID', types.DoubleType(), True),
    types.StructField('PULocationID', types.LongType(), True),
    types.StructField('DOLocationID', types.LongType(), True),
    types.StructField('passenger_count', types.DoubleType(), True),
    types.StructField('trip_distance', types.DoubleType(), True),
    types.StructField('fare_amount', types.DoubleType(), True),
    types.StructField('extra', types.DoubleType(), True),
    types.StructField('mta_tax', types.DoubleType(), True),
    types.StructField('tip_amount', types.DoubleType(), True),
    types.StructField('tolls_amount', types.DoubleType(), True),
    types.StructField('ehail_fee', types.IntegerType(), True),
    types.StructField('improvement_surcharge', types.DoubleType(), True),
    types.StructField('total_amount', types.DoubleType(), True),
    types.StructField('payment_type', types.DoubleType(), True),
    types.StructField('trip_type', types.DoubleType(), True),
    types.StructField('congestion_surcharge', types.DoubleType(), True)
])

In [72]:
# convert to spark df
green_trips = spark.read.parquet('green_tripdata_2020-01.parquet', schema = green_schema)

In [102]:
# function to clean up column names
def column_cleanup(col_name):
    col_name = re.sub(r'(?<!_)ID', r'_ID', col_name)
    col_name = re.sub(r'(?<!_)PU', r'PU_', col_name)
    col_name = re.sub(r'(?<!_)DO', r'DO_', col_name)
    col_name = col_name.lower()
    return col_name

# apply cleanup function to green trips data
green_trips = green_trips.select(*[col(c).alias(column_cleanup(c)) for c in green_trips.columns])

# remove out-of-range dates
green_trips = green_trips.filter((col('lpep_pickup_datetime') >= date(2020, 1, 1)) & (col('lpep_pickup_datetime') < date(2020, 2, 1)))

## Alternate method of filtering - use SQL
# green_trips.createOrReplaceTempView('trips_df')
# spark.sql('''
# SELECT *
# FROM trips_df
# WHERE trip_date BETWEEN '2020-01-01' AND '2022-01-31'
# ''')


# replace missing values for location IDs with "unknown" ID
green_trips = green_trips.na.fill({'pu_location_id': 264, 'do_location_id': 264})

In [103]:
# add datepart columns for partitioning

green_trips = green_trips \
    .withColumn("year", year(col("lpep_pickup_datetime"))) \
    .withColumn("month", month(col("lpep_pickup_datetime"))) \
    .withColumn("day", dayofmonth(col("lpep_pickup_datetime")))

# green_trips.select(["lpep_pickup_datetime","year","month","day"]).show(10)

+--------------------+----+-----+---+
|lpep_pickup_datetime|year|month|day|
+--------------------+----+-----+---+
| 2020-01-20 00:02:59|2020|    1| 20|
| 2020-01-20 00:00:27|2020|    1| 20|
| 2020-01-20 00:00:46|2020|    1| 20|
| 2020-01-20 00:00:52|2020|    1| 20|
| 2020-01-20 00:07:16|2020|    1| 20|
| 2020-01-20 00:00:12|2020|    1| 20|
| 2020-01-20 00:01:45|2020|    1| 20|
| 2020-01-20 00:49:32|2020|    1| 20|
| 2020-01-20 00:24:23|2020|    1| 20|
| 2020-01-20 00:01:24|2020|    1| 20|
+--------------------+----+-----+---+
only showing top 10 rows



In [105]:
# write to cloud storage
green_trips.write.format('parquet').partitionBy('year', 'month', 'day').save('gs://ny_taxi_storage_413811/green2/')

                                                                                

In [None]:
# test that Spark can also read the parquet files that have been written to GCS
read_test = spark.read.parquet('gs://ny_taxi_storage_413811/green2/*/*')
read_test.show(10)

In [19]:
todo = '''

RECREATE BACKFILL (green only ok)

1) Data fetch:
[x] download files (done requests.get)
[ ] clean data (translate pandas -> Spark)
    [x] filter for out-of-range dates
    [x] column name cleanup
    [x] create date cast column for repartitioning
    [x] fix missing/invalid values
[x] repartition
[x] write to GCS parquet

2) Transfer to BQ:
[ ] read from GCS (basically done)
[ ] clean data (mostly schemas)
[ ] Connect to BQ
[ ] write datasets to tables
    [ ] partitioning/clustering
    [ ] schema
    
3) Batch
[ ] Parametrize (month / year / taxi-type)
[ ] Run with dataproc
'''