In [3]:
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.transforms import *
from pyspark.sql.functions import * 
from pyspark.sql.types import * 
from pyspark.sql import SparkSession
from awsglue.utils import *
from awsglue.dynamicframe import DynamicFrame
import sys  
from datetime import datetime 
import pandas as pd
import boto3

In [4]:
sc = SparkContext()
glueContext=GlueContext(sc)
spark=glueContext.spark_session

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
import boto3 as b

load_flag = False 
s3_client  = b.client('s3')
response = s3_client.list_objects_v2(Bucket="raw-data-bucket", Prefix="nyc_taxi_datasets/lookups/")
if 'Contents' in response:
    load_flag = True 

In [8]:
dim_vendors_data = [{"vendor_id":"1","vendor_name":"Creative Mobile Technologies, LLC",},
{"vendor_id":"2","vendor_name":"Curb Mobility, LLC",},
{"vendor_id":"6","vendor_name":"Myle Technologies Inc",},
{"vendor_id":"7","vendor_name":"Helix"}]

In [10]:
spark.createDataFrame(dim_vendors_data).show(5,False)

+---------+---------------------------------+
|vendor_id|vendor_name                      |
+---------+---------------------------------+
|1        |Creative Mobile Technologies, LLC|
|2        |Curb Mobility, LLC               |
|6        |Myle Technologies Inc            |
|7        |Helix                            |
+---------+---------------------------------+



In [11]:
dim_ratecode_data = [{"rate_code_id":"1", "rate_code_description":"Standard rate"},
{"rate_code_id":"2", "rate_code_description":"JFK"},
{"rate_code_id":"3", "rate_code_description":"Newark"},
{"rate_code_id":"4", "rate_code_description":"Nassau or Westchester"},
{"rate_code_id":"5", "rate_code_description":"Negotiated fare"},
{"rate_code_id":"6", "rate_code_description":"Group ride"},
{"rate_code_id":"99", "rate_code_description":"Null/unknown"}
]
spark.createDataFrame(dim_ratecode_data).show(10,False)

+---------------------+------------+
|rate_code_description|rate_code_id|
+---------------------+------------+
|Standard rate        |1           |
|JFK                  |2           |
|Newark               |3           |
|Nassau or Westchester|4           |
|Negotiated fare      |5           |
|Group ride           |6           |
|Null/unknown         |99          |
+---------------------+------------+



In [12]:
dim_store_and_fwd_flag_data = [{"store_and_fwd_flag_id":"Y", "store_and_fwd_flag_description":"store and forward trip"},
{"store_and_fwd_flag_id":"N", "store_and_fwd_flag_description":"not a store and forward trip"}
]
spark.createDataFrame(dim_store_and_fwd_flag_data).show(10,False)

+------------------------------+---------------------+
|store_and_fwd_flag_description|store_and_fwd_flag_id|
+------------------------------+---------------------+
|store and forward trip        |Y                    |
|not a store and forward trip  |N                    |
+------------------------------+---------------------+



In [18]:
dim_payment_type_data = [{"payment_type_id":"0", "payment_type_description":"Flex Fare trip"},
{"payment_type_id":"1", "payment_type_description":"Credit card"},
{"payment_type_id":"2", "payment_type_description":"Cash"},
{"payment_type_id":"3", "payment_type_description":"No charge"},
{"payment_type_id":"4", "payment_type_description":"Dispute"},
{"payment_type_id":"5", "payment_type_description":"Unknown"},
{"payment_type_id":"6", "payment_type_description":"Voided trip"}
]
dim_payment_type = spark.createDataFrame(dim_payment_type_data)
dim_payment_type.show(10,False)

+------------------------+---------------+
|payment_type_description|payment_type_id|
+------------------------+---------------+
|Flex Fare trip          |0              |
|Credit card             |1              |
|Cash                    |2              |
|No charge               |3              |
|Dispute                 |4              |
|Unknown                 |5              |
|Voided trip             |6              |
+------------------------+---------------+



In [14]:
pdf_dates = pd.date_range('2000-01-01','2030-01-01').to_frame(index=0, name = 'date')
dim_date = spark.createDataFrame(pdf_dates)
dim_date.createOrReplaceTempView('dim_date')

In [10]:
dim_date = spark.sql('''
SELECT explode(sequence(to_date('2000-01-01'), to_date('2030-01-01'))) as date
''')

In [11]:
dim_date_cols = {
"date_id":date_format(dim_date.date,'yMMdd'),
"date":date_format(dim_date.date, 'd'),
"month":date_format(dim_date.date, 'M'),
"year":date_format(dim_date.date, 'y'),
"day_short":date_format(dim_date.date, "E"), 
"day_long":date_format(dim_date.date, "EEEE"), 
"month_short":date_format(dim_date.date, "LLL"), 
"month_long":date_format(dim_date.date, "LLLL"),
"is_weekend": when((date_format(dim_date.date, "EEE").isin('Sat','Sun')),'Yes').otherwise('No')
}
dim_date = dim_date.withColumns(dim_date_cols)
dim_date.show()

+----+--------+-----+----+---------+---------+-----------+----------+----------+
|date| date_id|month|year|day_short| day_long|month_short|month_long|is_weekend|
+----+--------+-----+----+---------+---------+-----------+----------+----------+
|   1|20000101|    1|2000|      Sat| Saturday|        Jan|   January|       Yes|
|   2|20000102|    1|2000|      Sun|   Sunday|        Jan|   January|       Yes|
|   3|20000103|    1|2000|      Mon|   Monday|        Jan|   January|        No|
|   4|20000104|    1|2000|      Tue|  Tuesday|        Jan|   January|        No|
|   5|20000105|    1|2000|      Wed|Wednesday|        Jan|   January|        No|
|   6|20000106|    1|2000|      Thu| Thursday|        Jan|   January|        No|
|   7|20000107|    1|2000|      Fri|   Friday|        Jan|   January|        No|
|   8|20000108|    1|2000|      Sat| Saturday|        Jan|   January|       Yes|
|   9|20000109|    1|2000|      Sun|   Sunday|        Jan|   January|       Yes|
|  10|20000110|    1|2000|  

In [19]:
dim_date = spark.sql('''select 
date_format(date,'yMMdd') date_id,
date_format(date, 'd') day,
date_format(date, 'M') month,
date_format(date, 'y') year,
date_format(date, "E") day_short, 
date_format(date, "EEEE") day_long, 
date_format(date, "LLL") month_short, 
date_format(date, "LLLL") month_long, 
case when date_format(date, "EEE") in ('Sat','Sun') then 'Yes' else 'No' end as is_weekend
from dim_date ''')

In [20]:
pdf_time = pd.date_range('2000-01-01 00:00:00','2000-01-01 23:59:59', freq='S', inclusive='both').to_frame(index=0, name = 'date')
dim_time = spark.createDataFrame(pdf_time)

  pdf_time = pd.date_range('2000-01-01 00:00:00','2000-01-01 23:59:59', freq='S', inclusive='both').to_frame(index=0, name = 'date')


In [24]:
dim_time = spark.sql('''
SELECT explode(sequence(to_timestamp('2000-01-01 00:00:00'), to_timestamp('2000-01-02 23:59:59'), interval 1 second)) as date
''')

In [25]:
dim_time.createOrReplaceTempView('dim_time')

In [26]:
dim_time = spark.sql('''select date_format(date,'HHmmss') time_id,
date_format(date, 'H') hour,
date_format(date, 'm') minute,
date_format(date, 's') second
from dim_time ''')

In [27]:
dim_time.show(20,False)

25/08/07 17:52:40 WARN DAGScheduler: Broadcasting large task binary with size 1388.9 KiB


+-------+----+------+------+
|time_id|hour|minute|second|
+-------+----+------+------+
|000000 |0   |0     |0     |
|000001 |0   |0     |1     |
|000002 |0   |0     |2     |
|000003 |0   |0     |3     |
|000004 |0   |0     |4     |
|000005 |0   |0     |5     |
|000006 |0   |0     |6     |
|000007 |0   |0     |7     |
|000008 |0   |0     |8     |
|000009 |0   |0     |9     |
|000010 |0   |0     |10    |
|000011 |0   |0     |11    |
|000012 |0   |0     |12    |
|000013 |0   |0     |13    |
|000014 |0   |0     |14    |
|000015 |0   |0     |15    |
|000016 |0   |0     |16    |
|000017 |0   |0     |17    |
|000018 |0   |0     |18    |
|000019 |0   |0     |19    |
+-------+----+------+------+
only showing top 20 rows



In [5]:
from pyspark import SparkFiles

In [6]:
spark.sparkContext.addFile('https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv')

In [7]:
path = SparkFiles.get('taxi_zone_lookup.csv')

In [8]:
path 

'/tmp/spark-6c3cd8ac-b7ec-43dd-b669-a73502f89f8e/userFiles-c11af852-448a-465f-a959-112bf335e0d2/taxi_zone_lookup.csv'

In [9]:
spark.read.csv('file:///'+path, header=True).show()

                                                                                

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly