In [None]:
# Azure storage access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = "r"

# Allow SPARK to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print('Remote blob path: ' + wasbs_path)

# SPARK read parquet, note that it won't load any data yet
df = spark.read.parquet(wasbs_path)
print('Register the DataFrame as a SQL temporary view: nyc_tlc')
df.createOrReplaceTempView('nyc_tlc')

# Enable cache to avoid repeated reads when querying full dataset
spark.conf.set("spark.databricks.io.cache.enabled", "true")

# Display top 10 rows
print('Displaying top 10 rows: ')
display(spark.sql('SELECT * FROM nyc_tlc LIMIT 10'))

Remote blob path: wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow
Register the DataFrame as a SQL temporary view: nyc_tlc
Displaying top 10 rows: 


vendorID,tpepPickupDateTime,tpepDropoffDateTime,passengerCount,tripDistance,puLocationId,doLocationId,startLon,startLat,endLon,endLat,rateCodeId,storeAndFwdFlag,paymentType,fareAmount,extra,mtaTax,improvementSurcharge,tipAmount,tollsAmount,totalAmount,puYear,puMonth
CMT,2012-02-29T23:53:14Z,2012-03-01T00:00:43Z,1,2.1,,,-73.980494,40.730601,-73.983532,40.752311,1,N,CSH,7.3,0.5,0.5,,0.0,0.0,8.3,2012,3
VTS,2012-03-17T08:01:00Z,2012-03-17T08:15:00Z,1,11.06,,,-73.986067,40.699862,-73.814838,40.737052,1,,CRD,24.5,0.0,0.5,,4.9,0.0,29.9,2012,3
CMT,2012-02-29T23:58:51Z,2012-03-01T00:15:48Z,1,3.4,,,-73.968967,40.754359,-73.957048,40.743289,1,N,CRD,12.5,0.5,0.5,,1.5,0.0,15.0,2012,3
CMT,2012-03-01T19:24:16Z,2012-03-01T19:31:22Z,1,1.3,,,-73.99374,40.75307,-74.005428,40.741118,1,N,CRD,6.1,1.0,0.5,,0.0,0.0,7.6,2012,3
CMT,2012-02-29T23:46:32Z,2012-03-01T00:05:18Z,3,2.0,,,-73.973723,40.752323,-73.948275,40.769413,1,N,CSH,11.7,0.5,0.5,,0.0,0.0,12.7,2012,3
VTS,2012-03-07T15:17:00Z,2012-03-07T15:26:00Z,5,1.87,,,-73.988237,40.75929,-73.97114,40.78275,1,,CSH,7.7,0.0,0.5,,0.0,0.0,8.2,2012,3
CMT,2012-02-29T23:41:58Z,2012-03-01T00:02:29Z,1,12.4,,,-73.954536,40.727742,-73.768994,40.760246,1,N,CSH,28.5,0.5,0.5,,0.0,0.0,29.5,2012,3
VTS,2012-03-18T15:21:00Z,2012-03-18T15:32:00Z,6,2.51,,,-74.001705,40.732345,-73.974888,40.750835,1,,CSH,8.9,0.0,0.5,,0.0,0.0,9.4,2012,3
CMT,2012-02-29T23:47:08Z,2012-03-01T00:06:42Z,4,6.3,,,-73.992319,40.724503,-73.923589,40.76113,1,N,CRD,16.5,0.5,0.5,,4.37,0.0,21.87,2012,3
VTS,2012-03-13T22:26:00Z,2012-03-13T22:37:00Z,1,1.34,,,-74.009907,40.706292,-74.000512,40.71733,1,,CSH,7.3,0.5,0.5,,0.0,0.0,8.3,2012,3


In [None]:
# Display potential columns of interest, top 10 rows
sql = '''
SELECT 
    vendorID,
    paymentType,
    puYear,
    puMonth,
    passengerCount,
    fareAmount,
    improvementSurcharge,
    extra,
    mtaTax,
    tollsAmount,
    tipAmount,
    totalAmount
FROM nyc_tlc
LIMIT 10;
'''

# Display the result of the SQL query
display(spark.sql(sql))
# NOTE: Payment column is misleading in small sample, data is messy and inconsistent (evident in next run below), requires transforms to align

vendorID,paymentType,puYear,puMonth,passengerCount,fareAmount,improvementSurcharge,extra,mtaTax,tollsAmount,tipAmount,totalAmount
CMT,CSH,2012,3,1,7.3,,0.5,0.5,0.0,0.0,8.3
VTS,CRD,2012,3,1,24.5,,0.0,0.5,0.0,4.9,29.9
CMT,CRD,2012,3,1,12.5,,0.5,0.5,0.0,1.5,15.0
CMT,CRD,2012,3,1,6.1,,1.0,0.5,0.0,0.0,7.6
CMT,CSH,2012,3,3,11.7,,0.5,0.5,0.0,0.0,12.7
VTS,CSH,2012,3,5,7.7,,0.0,0.5,0.0,0.0,8.2
CMT,CSH,2012,3,1,28.5,,0.5,0.5,0.0,0.0,29.5
VTS,CSH,2012,3,6,8.9,,0.0,0.5,0.0,0.0,9.4
CMT,CRD,2012,3,4,16.5,,0.5,0.5,0.0,4.37,21.87
VTS,CSH,2012,3,1,7.3,,0.5,0.5,0.0,0.0,8.3


In [None]:
# Calculate mean and median costs, prices, and passenger counts
# Aggregate by payment type, year, and month
# ALL Vendors together
sql = '''
SELECT
    paymentType,
    puYear,
    puMonth,
    AVG(passengerCount) AS mean_passenger_count,
    MEDIAN(passengerCount) AS median_passenger_count,
    AVG(fareAmount) AS mean_cost,
    MEDIAN(fareAmount) AS median_cost,
    AVG(improvementSurcharge),
    MEDIAN(improvementSurcharge),
    AVG(extra),
    MEDIAN(extra),
    AVG(mtaTax),
    MEDIAN(mtaTax),
    AVG(tollsAmount),
    MEDIAN(tollsAmount),
    AVG(tipAmount),
    MEDIAN(tipAmount),
    AVG(totalAmount) AS mean_price,
    MEDIAN(totalAmount) AS median_price
FROM nyc_tlc
GROUP BY paymentType, puYear, puMonth
ORDER BY puYear, puMonth, paymentType;
'''

# Display the result of the SQL query
display(spark.sql(sql))

paymentType,puYear,puMonth,mean_passenger_count,median_passenger_count,mean_cost,median_cost,avg(improvementSurcharge),median(improvementSurcharge),avg(extra),median(extra),avg(mtaTax),median(mtaTax),avg(tollsAmount),median(tollsAmount),avg(tipAmount),median(tipAmount),mean_price,median_price
1,2001,1,1.0,1.0,3.5,3.5,0.3,0.3,0.5,0.5,0.5,0.5,0.0,0.0,0.0,0.0,4.8,4.8
2,2001,1,1.3076923076923077,1.0,7.8076923076923075,5.0,0.2999999999999999,0.3,0.1538461538461538,0.0,0.5,0.5,0.0,0.0,0.0,0.0,8.953846153846154,5.8
2,2001,2,1.0,1.0,2.5,2.5,0.3,0.3,0.5,0.5,0.5,0.5,0.0,0.0,0.0,0.0,3.8,3.8
1,2002,1,1.6666666666666667,1.0,10.5,2.75,0.1999999999999999,0.3,0.8333333333333334,0.0,0.3333333333333333,0.5,0.96,0.0,2.3116666666666665,0.0,15.138333333333334,3.8
2,2002,1,1.6666666666666667,1.0,13.5,11.0,0.3,0.3,0.3333333333333333,0.25,0.5,0.5,0.96,0.0,0.0,0.0,15.593333333333334,12.05
2,2002,12,1.6,1.0,12.65,8.0,0.3,0.3,0.1,0.0,0.5,0.5,0.0,0.0,0.0,0.0,13.55,8.8
1,2003,1,1.0,1.0,22.0,22.0,0.3,0.3,0.75,0.75,0.5,0.5,0.0,0.0,4.83,4.83,28.38,28.38
2,2003,1,1.6875,1.0,18.625,7.5,0.2625,0.3,0.0625,0.0,0.40625,0.5,1.3762499999999998,0.0,0.0,0.0,20.7325,8.3
2,2003,12,1.0,1.0,6.5,6.5,0.3,0.3,0.0,0.0,0.5,0.5,0.0,0.0,0.0,0.0,7.3,7.3
1,2008,1,1.5625,1.0,19.3359375,13.0,0.290625,0.3,0.375,0.5,0.484375,0.5,1.3565625,0.0,4.4409375,2.725,26.9475,17.46


In [None]:
# Calculate mean and median costs, prices, and passenger counts
# Aggregate by payment type, year, and month
# Transform paymentType for conistency & proper grouping
# Limit time frame to years called out in documentation
sql = '''
SELECT
    CASE 
        WHEN UPPER(paymentType) IN ('CREDIT', 'CRE', '1', 'CRD') THEN 'CRD'
        WHEN UPPER(paymentType) IN ('CASH', 'CAS', '2', 'CSH') THEN 'CSH'
        WHEN UPPER(paymentType) IN ('NO CHARGE', 'NO', '3', 'NOC') THEN 'NOC'
        WHEN UPPER(paymentType) IN ('DISPUTE', '4', 'DIS') THEN 'DIS'
        WHEN UPPER(paymentType) IN ('VOIDED TRIP', '6') THEN 'VDT'
        ELSE 'UNK'
      END AS paymentType,
    puYear as Year,
    puMonth as Month,
    concat(string(puMonth),'/', string(puYear)) AS month_year,
    AVG(passengerCount) AS avg_passenger_count,
    MEDIAN(passengerCount) AS median_passenger_count,
    AVG(fareAmount) AS avg_fareAmount,
    MEDIAN(fareAmount) AS median_fareAmount,
    AVG(improvementSurcharge) AS avg_improvementSurcharge,
    MEDIAN(improvementSurcharge) median_improvementSurcharge,
    AVG(extra) AS avg_extra,
    MEDIAN(extra) AS median_extra,
    AVG(mtaTax) AS avg_mtaTax,
    MEDIAN(mtaTax) AS median_mtaTax,
    AVG(tollsAmount) AS avg_tollsAmount,
    MEDIAN(tollsAmount) AS median_tollsAmount,
    AVG(tipAmount) AS avg_tipAmount,
    MEDIAN(tipAmount) AS median_tipAmount,
    AVG(totalAmount) AS avg_totalAmount,
    MEDIAN(totalAmount) AS median_totalAmount
FROM nyc_tlc
WHERE puYear > 2008
AND puYear < 2019
GROUP BY 1,2,3
ORDER BY 2,3,1;
'''

# Display the result of the SQL query
display(spark.sql(sql))


paymentType,Year,Month,month_year,avg_passenger_count,median_passenger_count,avg_fareAmount,median_fareAmount,avg_improvementSurcharge,median_improvementSurcharge,avg_extra,median_extra,avg_mtaTax,median_mtaTax,avg_tollsAmount,median_tollsAmount,avg_tipAmount,median_tipAmount,avg_totalAmount,median_totalAmount
CRD,2009,1,1/2009,1.65816229667933,1.0,11.454249645930654,8.9,0.2952191235059763,0.3,0.189913594710107,0.0,0.4962686567164179,0.5,0.2209217490357222,0.0,2.158186914448176,1.92,14.02977266187383,10.69
CSH,2009,1,1/2009,1.7135672056311473,1.0,8.96202954615678,7.1,0.2972014925373121,0.3,0.1767882556810389,0.0,0.4907626717195641,0.5,0.0863069228303572,0.0,0.0007775540904687334,0.0,9.22757578175731,7.3
DIS,2009,1,1/2009,1.2743879706723005,1.0,11.599537715919,7.4,-0.3,-0.3,0.00018640487138063877,0.0,0.25,0.5,0.1760581583198704,0.0,0.0166285572262955,0.0,11.893022244314672,7.4
NOC,2009,1,1/2009,1.2134895898267049,1.0,10.208470016207398,6.5,-0.3,-0.3,7.480364044383494e-05,0.0,0.3571428571428571,0.5,0.1286946764742559,0.0,0.0076035407056476,0.0,10.405557661139431,6.6
CRD,2009,2,2/2009,1.6455503526338036,1.0,11.535709424126676,8.9,,,0.188811602373622,0.0,,,0.2274173708061613,0.0,2.149019923203615,1.89,14.107208788196337,10.8
CSH,2009,2,2/2009,1.6983248677398122,1.0,9.060377264193315,7.3,,,0.1789825851207975,0.0,,,0.0891274063978688,0.0,0.000882070320131244,0.0,9.331125581664995,7.4
DIS,2009,2,2/2009,1.208433596883236,1.0,12.360977426377918,8.1,,,0.0,0.0,,,0.207577632634353,0.0,0.0147095221725678,0.0,12.701620258966445,8.1
NOC,2009,2,2/2009,1.239495598382108,1.0,10.329285272424425,6.7,,,0.0,0.0,,,0.1315979062574355,0.0,0.0099764453961456,0.0,10.530414703782965,6.9
CRD,2009,3,3/2009,1.6401270761976885,1.0,11.83218248997868,8.9,,,0.1938050353626966,0.0,,,0.2533647035679565,0.0,2.1865139349822216,1.9,14.472881236502335,10.9
CSH,2009,3,3/2009,1.6952313159394772,1.0,9.260848945475626,7.3,,,0.1871413495601502,0.0,,,0.1028430077564462,0.0,0.0008250131913486845,0.0,9.553394833411032,7.5


Databricks visualization. Run in Databricks to view.