Authenticate with Google Cloud CLI


In [None]:
!gcloud init

Create credentials file

In [None]:
!gcloud auth application-default login

Create resources on Google Cloud

In [None]:
# Google Cloud Storage Source File (Data Lake)
!gsutil mb gs://data_lake_<name>

In [None]:
# Google Cloud Storage Job File
!gsutil mb gs://dataproc_job_<name>

In [None]:
# Destination Dataset (Data Warehouse)
!bq --location=US mk --dataset "<project_id>:nyctaxi"

In [None]:
# Destination Table
!bq mk \
--table \
<project_id>:nyctaxi.yellow_tripdata \
vendor_name:STRING,pickup_datetime:STRING,pickup_datetime_day:INTEGER,pickup_datetime_month:INTEGER,pickup_datetime_year:INTEGER,dropoff_datetime:STRING,dropoff_datetime_day:INTEGER,dropoff_datetime_month:INTEGER,dropoff_datetime_year:INTEGER,passenger_count:INT64,trip_distance:FLOAT64,RatecodeID:STRING,store_and_fwd_flag:STRING,PULocationID:INTEGER,DOLocationID:INTEGER,payment_type:STRING,fare_amount:FLOAT64,extra:FLOAT64,mta_tax:FLOAT64,tip_amount:FLOAT64,tolls_amount:FLOAT64,improvement_surcharge:FLOAT64,total_amount:FLOAT64,congestion_surcharge:FLOAT64,airport_fee:FLOAT64

Upload raw data file on data lake

In [None]:
!gsutil cp "yellow_tripdata_<date>.parquet" gs://data_lake_<name>

To run Apache Spark jobs we need to create Dataproc cluster.

In [None]:
!gcloud dataproc clusters create CLUSTER_NAME \
  --region=REGION \
  --num-workers=NUM_WORKERS \
  --master-machine-type=MASTER_MACHINE_TYPE \
  --worker-machine-type=WORKER_MACHINE_TYPE

Install Spark for local development on Google Colab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

Set environment variable

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

Import libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
from pyspark.sql.functions import col, when, dayofmonth, month, year

Create spark session for your production job

In [None]:
spark = SparkSession.builder.appName("NYC Taxi").getOrCreate()

Create spark session for development on Google Colab

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

Read file and create dataframe

In [None]:
df = spark.read.parquet("gs://data_lake_<name>/yellow_tripdata_<date>.parquet")

Add columns for day, month, year using datetime field

In [None]:
df = df.withColumn("tpep_pickup_datetime_day", dayofmonth(col("tpep_pickup_datetime")))
df = df.withColumn("tpep_pickup_datetime_month", month(col("tpep_pickup_datetime")))
df = df.withColumn("tpep_pickup_datetime_year", year(col("tpep_pickup_datetime")))
df = df.withColumn("tpep_dropoff_datetime_day", dayofmonth(col("tpep_dropoff_datetime")))
df = df.withColumn("tpep_dropoff_datetime_month", month(col("tpep_dropoff_datetime")))
df = df.withColumn("tpep_dropoff_datetime_year", year(col("tpep_dropoff_datetime")))

Change values of columns with numeric values to their actual values. This information is provided in data dictionary of this dataset.

In [None]:
df = df.withColumn("VendorID",
    when(col("VendorID") == 1, "Creative Mobile Technologies")
    .when(col("VendorID") == 2, "VeriFone Inc")
    .otherwise(col("VendorID"))
)

df = df.withColumn("RateCodeID",
    when(col("RateCodeID") == 1, "Standard rate")
    .when(col("RateCodeID") == 2, "JFK")
    .when(col("RateCodeID") == 3, "Newark")
    .when(col("RateCodeID") == 4, "Nassau or Westchester")
    .when(col("RateCodeID") == 5, "Negotiated fare")
    .when(col("RateCodeID") == 6, "Group ride")
    .otherwise(col("RateCodeID"))
)

df = df.withColumn("Store_and_fwd_flag",
    when(col("Store_and_fwd_flag") == "Y" ,"store and forward trip")
    .when(col("Store_and_fwd_flag") == "N", "not a store and forward trip")
    .otherwise(col("Store_and_fwd_flag"))
)

df = df.withColumn("Payment_type",
    when(col("Payment_type") == 1, "Credit card")
    .when(col("Payment_type") == 2, "Cash")
    .when(col("Payment_type") == 3, "No charge")
    .when(col("Payment_type") == 4, "Dispute")
    .when(col("Payment_type") == 5, "Unknown")
    .when(col("Payment_type") == 6, "Voided trip")
    .otherwise(col("Payment_type"))
)

Change data type of columns. The default datatype inferred by spark can be inaccurate.

In [None]:
df = df.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast(StringType()))
df = df.withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast(StringType()))
df = df.withColumn("passenger_count", col("passenger_count").cast(IntegerType()))
df = df.withColumn("trip_distance", col("trip_distance").cast(FloatType()))
df = df.withColumn("fare_amount", col("fare_amount").cast(FloatType()))
df = df.withColumn("extra", col("extra").cast(FloatType()))
df = df.withColumn("mta_tax", col("mta_tax").cast(FloatType()))
df = df.withColumn("tip_amount", col("tip_amount").cast(FloatType()))
df = df.withColumn("tolls_amount", col("tolls_amount").cast(FloatType()))
df = df.withColumn("improvement_surcharge", col("improvement_surcharge").cast(FloatType()))
df = df.withColumn("total_amount", col("total_amount").cast(FloatType()))
df = df.withColumn("congestion_surcharge", col("congestion_surcharge").cast(FloatType()))
df = df.withColumn("Airport_fee", col("Airport_fee").cast(FloatType()))

Rename columns

In [None]:
df = df.withColumnRenamed('VendorID', 'vendor_name')
df = df.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')
df = df.withColumnRenamed('tpep_pickup_datetime_day', 'pickup_datetime_day')
df = df.withColumnRenamed('tpep_pickup_datetime_month', 'pickup_datetime_month')
df = df.withColumnRenamed('tpep_pickup_datetime_year', 'pickup_datetime_year')
df = df.withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
df = df.withColumnRenamed('tpep_dropoff_datetime_day', 'dropoff_datetime_day')
df = df.withColumnRenamed('tpep_dropoff_datetime_month', 'dropoff_datetime_month')
df = df.withColumnRenamed('tpep_dropoff_datetime_year', 'dropoff_datetime_year')

In [None]:
df.printSchema()

Change order of columns

In [None]:
new_column_order = ["vendor_name",
"pickup_datetime",
"pickup_datetime_day",
"pickup_datetime_month",
"pickup_datetime_year",
"dropoff_datetime",
"dropoff_datetime_day",
"dropoff_datetime_month",
"dropoff_datetime_year",
"passenger_count",
"trip_distance",
"RatecodeID",
"store_and_fwd_flag",
"PULocationID",
"DOLocationID",
"payment_type",
"fare_amount",
"extra",
"mta_tax",
"tip_amount",
"tolls_amount",
"improvement_surcharge",
"total_amount",
"congestion_surcharge",
"airport_fee"]
df = df.select(*new_column_order)

In [None]:
df.printSchema()

Load data into BigQuery table


In [None]:
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "data_lake_<project_id>"
spark.conf.set('temporaryGcsBucket', bucket)

df.write.format('bigquery') \
  .option('table', 'nyctaxi.yellow_tripdata11') \
  .mode('append') \
  .save()

Create a python file and add all the code in it. Upload pyspark python file to dataproc job bucket

In [None]:
!gsutil cp dataproc-job.py gs://dataproc_job_<project_id>


Submit job to Dataproc cluster

In [None]:
!gcloud dataproc jobs submit pyspark \
    gs://dataproc_job_<project_id>/dataproc-job.py \
    --cluster <cluster_name>  \
    --region us-central1