In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as psf

import os

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
spark = SparkSession.builder.appName("DataOps").config("hive.metastore.client.factory.class","com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory").enableHiveSupport().getOrCreate()
sqlContext = SQLContext(spark)
spark.sparkContext.setLogLevel("ERROR")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
bronze_path = 's3://citi-bike-batch-data-con/bronze/'
silver_path = 's3://citi-bike-batch-data-con/silver/'
gold_path   = 's3://citi-bike-batch-data-con/gold/'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
trip_schema = StructType([
  StructField('tripduration', IntegerType()),
  StructField('start_time', StringType()),
  StructField('stop_time',  StringType()),
  StructField('start_station_id', IntegerType()),
  StructField('start_station_name', StringType()),
  StructField('start_station_latitude', StringType()),
  StructField('start_station_longitude', StringType()),
  StructField('end_station_id', IntegerType()),
  StructField('end_station_name', StringType()),
  StructField('end_station_latitude', StringType()),
  StructField('end_station_longitude', StringType()),
  StructField('bike_id', IntegerType()),
  StructField('user_type', StringType()),
  StructField('birth_year', StringType()),
  StructField('user_gender', StringType()),
  ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# read the raw trip history data to dataframe, without triggering job, by passing csv schema
bronze_all_csv = bronze_path + '*'

bronzeDF = spark.read.csv(
  bronze_all_csv, 
  header=True,
  schema=trip_schema
  )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
#drop duplicate rows
bronzeDF = bronzeDF.distinct()
#order by trip start time
bronzeDF = bronzeDF.orderBy(bronzeDF.start_time.desc())
#view top of DF
bronzeDF.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(tripduration=2459, start_time='2018-12-31 23:59:51.0850', stop_time='2019-01-01 00:40:50.8800', start_station_id=128, start_station_name='MacDougal St & Prince St', start_station_latitude='40.72710258', start_station_longitude='-74.00297088', end_station_id=402, end_station_name='Broadway & E 22 St', end_station_latitude='40.7403432', end_station_longitude='-73.98955109', bike_id=34775, user_type='Subscriber', birth_year='1996', user_gender='2')

In [9]:
bronzeDF.write.format('parquet').mode('overwrite').save(silver_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
slvrDF = spark.read.format('parquet').load(silver_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
#isolating circular trips
slvrDF.createOrReplaceTempView("silver")

# Making DIM_STATION starts:
# 2 start cols:
startDF = slvrDF.select(["start_station_id","start_station_name"]).distinct().withColumnRenamed("start_station_id","station_id").withColumnRenamed("start_station_name","station_name")
# 2 end cols:
endDF = slvrDF.select(["end_station_id","end_station_name"]).distinct().withColumnRenamed("end_station_id","station_id").withColumnRenamed("end_station_name","station_name")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Merge them into dim station DF:
unionDF = startDF.unionAll(endDF).distinct()

#display(unionDF)
unionDF.write.format('parquet').mode('overwrite').save(f"{gold_path}dim_station")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…