In [0]:
import io
import requests
from io import StringIO
import csv

In [0]:
url = 'https://storage.googleapis.com/uber-data-engineering-project/uber_data.csv'
response = requests.get(url)

In [0]:
# convert The response data into a Rdd
rdd = spark.sparkContext.parallelize(response.text.splitlines())

In [0]:
# rdd to dataframe
df = spark.read.csv(rdd, header=True, inferSchema=True)

In [0]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window



In [0]:
%sql
create DATABASE IF NOT EXISTS uber_dataset;

In [0]:
df.createOrReplaceTempView("UberDf")

In [0]:
%sql
DROP VIEW IF EXISTS UberDf_with_Trip_id;
create TEMPORARY view   UberDf_with_Trip_id as 
Select   *,(row_number() over (ORDER BY 1))-1 Trip_id from UberDf ;

In [0]:
%sql
--- Create table uber_dataset.datetime_dim
DROP  TABLE IF  EXISTS uber_dataset.datetime_dim;
CREATE TABLE IF NOT EXISTS uber_dataset.datetime_dim as 
select row_number() over (order by 1 ) datetime_id, 
      TO_DATE(tpep_pickup_datetime) tpep_pickup_datetime,
      hour(tpep_pickup_datetime) pick_hour,
      day(tpep_pickup_datetime) pick_day,
      month(tpep_pickup_datetime) pick_month,
      year(tpep_pickup_datetime) pick_year,
      weekday(tpep_pickup_datetime) pick_weekday,
      TO_DATE(tpep_dropoff_datetime)    		tpep_dropoff_datetime,
      hour(tpep_dropoff_datetime) 		drop_hour,
      day(tpep_dropoff_datetime) 		drop_day,
      month(tpep_dropoff_datetime) 		drop_month,
      year(tpep_dropoff_datetime) 		drop_year,
      weekday(tpep_dropoff_datetime) 	drop_weekday
 from UberDf ;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DROP  TABLE IF  EXISTS uber_dataset.passenger_count_dim;
CREATE TABLE IF NOT EXISTS uber_dataset.passenger_count_dim as 
select row_number() over (order by 1 ) passenger_count_id,
       passenger_count  
from UberDf ;

DROP  TABLE IF  EXISTS uber_dataset.trip_distance_dim;
CREATE TABLE IF NOT EXISTS uber_dataset.trip_distance_dim as 
select row_number() over (order by 1 ) trip_distance_id,
        trip_distance  
from UberDf ;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DROP  TABLE IF  EXISTS uber_dataset.rate_code_dim;
CREATE TABLE IF NOT EXISTS uber_dataset.rate_code_dim as 
select 
row_number() over (order by 1 ) rate_code_id,
RatecodeID,
  	CASE WHEN RatecodeID=1 THEN "Standard rate" 
		 WHEN RatecodeID=2 THEN "JFK"
		 WHEN RatecodeID=3 THEN "Newark"
		 WHEN RatecodeID=4 THEN "Nassau or Westchester"
		 WHEN RatecodeID=5 THEN "Negotiated fare"
		 WHEN RatecodeID=6 THEN "Group ride"
		 else null 
		 End As rate_code_name
from UberDf ;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DROP  TABLE IF  EXISTS uber_dataset.pickup_location_dim;
CREATE TABLE IF NOT EXISTS uber_dataset.pickup_location_dim as 
select row_number() over (order by 1 ) pickup_location_id,
        pickup_latitude ,
        pickup_longitude
from UberDf ;
DROP  TABLE IF  EXISTS uber_dataset.dropoff_location_dim;
CREATE TABLE IF NOT EXISTS uber_dataset.dropoff_location_dim as 
select row_number() over (order by 1 ) dropoff_location_id,
        dropoff_latitude ,
        dropoff_longitude
from UberDf ;

num_affected_rows,num_inserted_rows


In [0]:
%sql
DROP  TABLE IF  EXISTS uber_dataset.payment_type_dim;
CREATE TABLE IF NOT EXISTS uber_dataset.payment_type_dim as 
select 
row_number() over (order by 1 ) payment_type_id,
payment_type,
CASE when payment_type=1 THEN "Credit card"
	 when payment_type=2 THEN "Cash"
	 when payment_type=3 THEN "No charge"
	 when payment_type=4 THEN "Dispute"
	 when payment_type=5 THEN "Unknown"
	 when payment_type=6 THEN "Voided trip"
		else null 
		end payment_type_name 
from UberDf ;

num_affected_rows,num_inserted_rows


In [0]:
%sql
use uber_dataset;
show tables;

database,tableName,isTemporary
uber_dataset,datetime_dim,False
uber_dataset,dropoff_location_dim,False
uber_dataset,fact_table,False
uber_dataset,passenger_count_dim,False
uber_dataset,payment_type_dim,False
uber_dataset,pickup_location_dim,False
uber_dataset,rate_code_dim,False
uber_dataset,tbl_analysis_report,False
uber_dataset,trip_distance_dim,False
,uberdf,True


In [0]:
%sql
use uber_dataset;
DROP TABLE IF EXISTS FACT_TABLE;
CREATE TABLE IF NOT EXISTS FACT_TABLE
select trip_id,VendorID, datetime_id, passenger_count_id,trip_distance_id, rate_code_id, store_and_fwd_flag, pickup_location_id, dropoff_location_id,payment_type_id, fare_amount, extra, mta_tax, tip_amount, tolls_amount,improvement_surcharge, total_amount
  from UberDf_with_Trip_id u
left join passenger_count_dim p   on  u.trip_id =   p.passenger_count_id
left join trip_distance_dim td    on  u.trip_id =   td.trip_distance_id
left join rate_code_dim r   on  u.trip_id =   r.rate_code_id
left join pickup_location_dim pic   on  u.trip_id =   pic.pickup_location_id
left join dropoff_location_dim dro   on  u.trip_id =   dro.dropoff_location_id
left join datetime_dim dt   on  u.trip_id =   dt.datetime_id
left join payment_type_dim pa   on  u.trip_id =   pa.payment_type_id

;

num_affected_rows,num_inserted_rows


In [0]:
%sql
select 
*
 FROM
		fact_table f limit 10;

trip_id,VendorID,datetime_id,passenger_count_id,trip_distance_id,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type_id,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,,,,,N,,,,9.0,0.5,0.5,2.05,0.0,0.3,12.35
1,1,1.0,1.0,1.0,1.0,N,1.0,1.0,1.0,11.0,0.5,0.5,3.05,0.0,0.3,15.35
2,2,2.0,2.0,2.0,2.0,N,2.0,2.0,2.0,54.5,0.5,0.5,8.0,0.0,0.3,63.8
3,2,3.0,3.0,3.0,3.0,N,3.0,3.0,3.0,31.5,0.0,0.5,3.78,5.54,0.3,41.62
4,2,4.0,4.0,4.0,4.0,N,4.0,4.0,4.0,98.0,0.0,0.0,0.0,15.5,0.3,113.8
5,2,5.0,5.0,5.0,5.0,N,5.0,5.0,5.0,23.5,1.0,0.5,5.06,0.0,0.3,30.36
6,2,6.0,6.0,6.0,6.0,N,6.0,6.0,6.0,23.0,0.5,0.5,0.0,0.0,0.3,24.3
7,1,7.0,7.0,7.0,7.0,N,7.0,7.0,7.0,20.5,0.5,0.5,0.0,0.0,0.3,21.8
8,1,8.0,8.0,8.0,8.0,N,8.0,8.0,8.0,5.5,0.5,0.5,2.0,0.0,0.3,8.8
9,2,9.0,9.0,9.0,9.0,N,9.0,9.0,9.0,23.5,0.5,0.5,3.2,0.0,0.3,28.0


In [0]:
%sql

use database uber_dataset;
DROP TABLE IF EXISTS uber_dataset.tbl_analysis_report;
 CREATE TABLE IF NOT EXISTS uber_dataset.tbl_analysis_report AS 
 SELECT
  f.VendorID,
  dt.tpep_pickup_datetime,
  dt.tpep_dropoff_datetime,
  p.passenger_count,
  td.trip_distance,
   rc.RatecodeID,
f.store_and_fwd_flag,
   pl.pickup_latitude,
   pl.pickup_longitude,
   dl.dropoff_latitude,
   dl.dropoff_longitude,
   pt.payment_type,
   f.fare_amount,
   f.extra,
   f.mta_tax,
   f.tip_amount,
   f.tolls_amount,
   f.improvement_surcharge,
   f.total_amount
 FROM
		fact_table f
  join datetime_dim dt   on  f.datetime_id =   dt.datetime_id
   JOIN passenger_count_dim p ON f.passenger_count_id = p.passenger_count_id
   JOIN trip_distance_dim td ON f.trip_distance_id = td.trip_distance_id
   JOIN rate_code_dim rc ON f.rate_code_id = rc.rate_code_id
   JOIN pickup_location_dim pl ON f.pickup_location_id = pl.pickup_location_id
   JOIN dropoff_location_dim dl ON f.dropoff_location_id = dl.dropoff_location_id
   JOIN payment_type_dim pt ON f.payment_type_id = pt.payment_type_id
   ;

num_affected_rows,num_inserted_rows


In [0]:
%sql
select * from  uber_dataset.tbl_analysis_report limit 10;

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
1,2016-03-01,2016-03-01,1,2.5,1,N,40.765151977539055,-73.97674560546875,40.74612808227539,-74.00426483154298,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35
2,2016-03-01,2016-03-01,1,2.9,1,N,40.767925262451165,-73.98348236083984,40.7331657409668,-74.00594329833984,1,54.5,0.5,0.5,8.0,0.0,0.3,63.8
2,2016-03-01,2016-03-01,2,19.98,1,N,40.64480972290039,-73.78202056884764,40.6757698059082,-73.97454071044923,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62
2,2016-03-01,2016-03-01,3,10.78,1,N,40.769813537597656,-73.86341857910156,40.757766723632805,-73.96965026855469,1,98.0,0.0,0.0,0.0,15.5,0.3,113.8
2,2016-03-01,2016-03-01,5,30.43,3,N,40.79218292236328,-73.97174072265625,40.69505310058594,-74.17716979980467,1,23.5,1.0,0.5,5.06,0.0,0.3,30.36
2,2016-03-01,2016-03-01,5,5.92,1,N,40.70538330078125,-74.01719665527344,40.75578689575195,-73.97807312011719,1,23.0,0.5,0.5,0.0,0.0,0.3,24.3
1,2016-03-01,2016-03-01,6,5.72,1,N,40.72784805297852,-73.99458312988281,0.0,0.0,2,20.5,0.5,0.5,0.0,0.0,0.3,21.8
1,2016-03-01,2016-03-01,1,6.2,1,N,40.64775848388672,-73.78877258300781,40.712345123291016,-73.82920837402342,3,5.5,0.5,0.5,2.0,0.0,0.3,8.8
2,2016-03-01,2016-03-01,1,0.7,1,N,40.76464080810546,-73.95822143554686,40.76290130615234,-73.9678955078125,1,23.5,0.5,0.5,3.2,0.0,0.3,28.0
2,2016-03-01,2016-03-01,3,7.18,1,N,40.74119186401367,-73.98577880859375,40.79787826538086,-73.94635009765625,1,4.0,0.5,0.5,0.0,0.0,0.3,5.3
