In [1]:
import pandas as pd
import geopandas as gpd
import numpy as np 
import os 
import matplotlib.pyplot as plt

In [2]:
# Sample connection
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("Cleaning connection")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "2g")
    .config("spark.executor.memory", "4g")
    .getOrCreate()
)

22/08/17 22:02:01 WARN Utils: Your hostname, DESKTOP-L9KIK4G resolves to a loopback address: 127.0.1.1; using 172.19.228.94 instead (on interface eth0)
22/08/17 22:02:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/17 22:02:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Preparing weather data

In [178]:
# Looking at the weather data and keeping columns with data
df = pd.read_csv('../data/laguardia.csv')
df.rename(str.lower, axis='columns', inplace=True)
df = df[df.report_type.isin(['FM-12', 'FM-15', 'FM-16'])]
df.drop('report_type', axis=1, inplace=True)
df.head()

  df = pd.read_csv('../data/laguardia.csv')


Unnamed: 0,station,date,source,latitude,longitude,elevation,name,call_sign,quality_control,wnd,...,oc1,od1,oe1,oe2,oe3,rh1,rh2,rh3,rem,eqd
0,72503014732,2019-01-01T00:00:00,4,40.77944,-73.88035,3.4,"LAGUARDIA AIRPORT, NY US",99999,V020,"130,1,N,0041,1",...,,,,,,,,,SYN09872503 11358 81308 10067 20050 30189 4020...,
1,72503014732,2019-01-01T00:49:00,6,40.77944,-73.88035,3.4,"LAGUARDIA AIRPORT, NY US",KLGA,V030,"140,5,N,0067,5",...,1035.0,,,,,,,,MET13312/31/18 19:49:03 SPECI KLGA 010049Z 140...,
2,72503014732,2019-01-01T00:51:00,7,40.77944,-73.88035,3.4,"LAGUARDIA AIRPORT, NY US",KLGA,V030,"130,5,N,0067,5",...,1035.0,,,,,,,,MET13912/31/18 19:51:01 METAR KLGA 010051Z 130...,
3,72503014732,2019-01-01T01:15:00,7,40.77944,-73.88035,3.4,"LAGUARDIA AIRPORT, NY US",KLGA,V030,"140,5,N,0046,5",...,885.0,,,,,,,,MET11212/31/18 20:15:01 SPECI KLGA 010115Z 140...,
4,72503014732,2019-01-01T01:25:00,7,40.77944,-73.88035,3.4,"LAGUARDIA AIRPORT, NY US",KLGA,V030,"130,5,N,0051,5",...,,,,,,,,,MET11912/31/18 20:25:01 SPECI KLGA 010125Z 130...,


In [179]:
# keeping potentially relevant columns
df = df[['date', 'wnd', 'vis', 'tmp', 'dew', 'slp', 'ga1',]]

# loosely transcribed meaning of columns:

# wnd = wind angle clockwise,  quality code, wind obs type, WIND-OBSERVATION speed rate (x10), quality # 9999 missing

# vis = visibility distance, quality, variability, obs qual  # 999999=missing

# tmp = temp (x10), #+9999=missing
# dew = dewpoint(x10), #+9999=missing [dew point temperature The temperature to which a given parcel of air must be cooled at constant pressure and water vapor content in order for saturation to occur

# slp = atm pressure (x10), # 99999 = missing           [seal level pressure]
# ga1 = cloud cover (scale 0-8, 9&10=obstructed), quality, base height, quality, cloud type, quality


# Maybe interesting columns:
# mw1 = weather name, quality
# oc1 = kui suured tuulepuhangud on (x10), quality

# df.drop(['source', 'station', 'latitude', 'longitude', 'name', 'elevation', 'call_sign', 'quality_control', 'cig', 'aa1', 'aa2', 'aa3', 'ab1', 'ad1', 'ae1', 'ah1', 'ah2', 'ah3', 'ah4', 'ah5', 'ah6', 'ai1', 'ai2', 'ai3', 'ai4', 'ai5', 'ai6', 'aj1', 'ak1', 'al1', 'am1', 'an1', 'at1', 'at2', 'at3', 'at4', 'at5', 'at6', 'at7', 'at8', 'au1', 'au2', 'au3', 'au4', 'aw1', 'aw2', 'aw3', 'aw4', 'aw5','ax1', 'ax2', 'ax3', 'ax4', 'ed1', 'ga2', 'ga3', 'gd1', 'gd2', 'gd3', 'gd4', 'ge1', 'gf1', 'ka1', 'ka2', 'kb1','kb2','kb3','kc1','kc2','kd1','kd2','ke1','kg1','kg2','ma1','md1', 'mf1','mg1','mh1','mk1','mv1','mw1','mw2','mw3','oc1','od1','oe1','oe2','oe3','rh1','rh2','rh3','rem','eqd'], axis=1)
df

Unnamed: 0,date,wnd,vis,tmp,dew,slp,ga1
0,2019-01-01T00:00:00,"130,1,N,0041,1",008000199,+00671,+00501,102011,"99,9,+00250,1,99,9"
1,2019-01-01T00:49:00,"140,5,N,0067,5","003219,5,N,5",+00705,+00505,999999,"07,5,+00244,5,99,9"
2,2019-01-01T00:51:00,"130,5,N,0067,5","003219,5,N,5",+00675,+00505,101815,"07,5,+00244,5,99,9"
3,2019-01-01T01:15:00,"140,5,N,0046,5","011265,5,N,5",+00725,+00565,999999,"07,5,+00244,5,99,9"
4,2019-01-01T01:25:00,"130,5,N,0051,5","002414,5,N,5",+00725,+00565,999999,"07,5,+00244,5,99,9"
...,...,...,...,...,...,...,...
14076,2019-12-31T20:51:00,"240,5,N,0046,5","016093,5,N,5",+00725,+00115,100285,"04,5,+01036,5,99,9"
14077,2019-12-31T21:00:00,"240,1,N,0046,1",016000199,+00721,+00111,100281,"99,9,+01250,1,99,9"
14078,2019-12-31T21:51:00,"250,5,N,0062,5","016093,5,N,5",+00725,+00005,100335,"02,5,+00914,5,99,9"
14079,2019-12-31T22:51:00,"240,5,N,0046,5","016093,5,N,5",+00725,+00065,100365,"07,5,+00914,5,99,9"


In [180]:
# spliting out the relevant information
df['date'] = df['date'] = pd.to_datetime(df['date'])
df['wnd'] = df['wnd'].str.split(',', expand=True)[3].astype(np.uint32)      # unit: m/s, scaling factor:10,     missing: 9999
df['vis'] = df['vis'].str.split(',', expand=True)[0].astype(np.uint32)      # unit: m,                          
df['tmp'] = df['tmp'].str.split(',', expand=True)[0].astype(np.int32)       # unit: C, scaling factor:10,       missing: 9999
df['dew'] = df['dew'].str.split(',', expand=True)[0].astype(np.int32)       # unit: C, scaling factor:10,       missing: 9999
df['atm'] = df['slp'].str.split(',', expand=True)[0].astype(np.uint32)      # unit: hP, scaling factor:10,      missing: 99999
df['ga1'].fillna('99,x', inplace=True)
df['cc'] = df['ga1'].str.split(',', expand=True)[0].astype(np.uint32).astype('category') # cloud coverage: unit: okta (0 clear -> 8 covered), 99 missing
df.drop(['slp', 'ga1'], axis=1, inplace=True)

In [184]:
df[df.isnull().any(axis=1)] # no NaN left

Unnamed: 0,date,wnd,vis,tmp,dew,atm,cc


In [187]:
print(df.dtypes)
df.describe()

date    datetime64[ns]
wnd             uint32
vis             uint32
tmp              int32
dew              int32
atm             uint32
cc            category
dtype: object


Unnamed: 0,wnd,vis,tmp,dew,atm
count,13691.0,13691.0,13691.0,13691.0,13691.0
mean,49.218538,14097.872982,134.312979,62.898693,23881.401505
std,88.596347,9506.640887,176.095325,247.540883,32309.299094
min,0.0,0.0,-156.0,-272.0,9858.0
25%,31.0,16000.0,50.0,-17.0,10125.0
50%,46.0,16093.0,133.0,72.0,10186.0
75%,62.0,16093.0,217.0,144.0,10264.0
max,9999.0,999999.0,9999.0,9999.0,99999.0


In [202]:
print(df.shape)
df = df[(df['wnd'] != 9999)]
df = df[(df['vis'] != 999999)]
df = df[(df['tmp'] != 9999)] 
df = df[(df['dew'] != 9999)] 
df = df[(df['atm'] != 99999)]
print(df.shape)

(11598, 7)
(11598, 7)


In [203]:
df.describe()

Unnamed: 0,wnd,vis,tmp,dew,atm
count,11598.0,11598.0,11598.0,11598.0,11598.0
mean,48.211847,14861.065097,133.691499,50.988533,10168.289446
std,24.796421,3461.1155,100.612244,106.230097,81.400765
min,0.0,0.0,-156.0,-272.0,9858.0
25%,31.0,16000.0,50.0,-33.0,10114.0
50%,46.0,16093.0,133.0,61.0,10166.0
75%,62.0,16093.0,222.0,139.0,10225.0
max,175.0,16093.0,372.0,239.0,10408.0


In [220]:
day_count = 0

for group,df_group in df.groupby(df['date'].dt.date):
    day_count += 1
    h_count = 0
    for h, h_group in df_group.groupby(df_group['date'].dt.hour):
        h_count += 1
    if h_count != 24:
        print(f"Problem on date {group}, missing {24-h_count}")
if day_count != (df['date'].iloc[-1] - df['date'].iloc[0]).days:
    print("Some days are missing from dataset")


Problem on date 2019-02-08, missing 1
Problem on date 2019-03-01, missing 1
Problem on date 2019-03-04, missing 10
Problem on date 2019-03-13, missing 1
Problem on date 2019-03-16, missing 2
Problem on date 2019-05-10, missing 1
Problem on date 2019-05-31, missing 1
Problem on date 2019-09-20, missing 5
Problem on date 2019-09-21, missing 16
Problem on date 2019-09-27, missing 2
Problem on date 2019-10-02, missing 1
Problem on date 2019-10-25, missing 1
Problem on date 2019-11-22, missing 1
Some days are missing from dataset


In [222]:
df.to_csv('../data/curated/laguardia.csv')
# TODO missing values like 9999.. should be accounted for better than just removing rn

In [45]:
df['rh3'].value_counts()
#df[df.al1.notna()].drop(['source', 'station', 'latitude', 'longitude', 'name', 'elevation', 'call_sign', 'quality_control', 'cig', 'aa1', 'aa2', 'aa3', 'ab1', 'ad1', 'ae1', 'ah1','ah2', 'ah3','ah4','ah5','ah6', 'ai1','ai2', 'ai3','ai4','ai5','ai6'] , axis=1)


Series([], Name: rh3, dtype: int64)

In [223]:
df.describe()

Unnamed: 0,wnd,vis,tmp,dew,atm
count,11598.0,11598.0,11598.0,11598.0,11598.0
mean,48.211847,14861.065097,133.691499,50.988533,10168.289446
std,24.796421,3461.1155,100.612244,106.230097,81.400765
min,0.0,0.0,-156.0,-272.0,9858.0
25%,31.0,16000.0,50.0,-33.0,10114.0
50%,46.0,16093.0,133.0,61.0,10166.0
75%,62.0,16093.0,222.0,139.0,10225.0
max,175.0,16093.0,372.0,239.0,10408.0


# Taxi data cleaning


In [3]:
from pyspark.sql.types import IntegerType, DoubleType, TimestampType
from pyspark.sql import functions as F

In [249]:
sdf_yellow_1 = spark.read.parquet('../data/raw/yellow/2017_06.parquet')
sdf_yellow_2 = spark.read.parquet('../data/raw/yellow/2018_06.parquet')
sdf_yellow_3 = spark.read.parquet('../data/raw/yellow/2019_06.parquet')

In [250]:
sdf_yellow_1.printSchema() 
# congestion_surcharge: int->double, 
# passenger_count long->double, 
# ratecodeID long->double

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [51]:
sdf_yellow = spark.read.parquet('../data/raw/yellow/')
#problematic = ['congestion_surcharge', 'passenger_count', 'ratecodeID']

# type had changed over the years
sdf_yellow = sdf_yellow.withColumn('passenger_count', sdf_yellow['passenger_count'].cast(IntegerType())) \
    .withColumn('congestion_surcharge', sdf_yellow['congestion_surcharge'].cast(DoubleType())) \
    .drop('ratecodeID')

sdf_yellow.printSchema()
sdf_yellow.show()


root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+------------------+------------+------------+------------+-----------+-----+-------+----------+

In [64]:
sdf_green = spark.read.parquet('../data/raw/green/')

# type had changed over the years and is not relevant
sdf_green = sdf_green.drop('ratecodeID', 'ehail_fee',  'congestion_surcharge', 'trip_type') \
    .withColumn('passenger_count', sdf_green['passenger_count'].cast(IntegerType())) 
sdf_green.show(5)


+--------+--------------------+---------------------+------------------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|
+--------+--------------------+---------------------+------------------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+
|       2| 2017-06-01 00:28:16|  2017-06-01 00:35:25|                 N|          65|          40|              1|         1.75|        7.5|  0.5|    0.5|      1.76|         0.0|                  0.3|       10.56|           1|
|       1| 2017-06-01 00:08:33|  2017-06-01 00:19:25|                 N|         116|       

In [66]:
sdf_green.count()

6284068

In [8]:
sdf_yellow.summary()

                                                                                

summary,VendorID,trip_distance,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,airport_fee
count,70443818.0,70443818.0,70340090,70443818.0,70443818.0,70443818.0,70443818.0,70443818.0,70443818.0,70443818.0,70443818.0,70443818.0,70443818.0,0.0
mean,1.5928169026840653,3.041164661319338,,161.751087653994,159.87422637995004,1.324177971159939,13.357372785076802,0.5667025036888226,0.4961995071022421,1.939768762395335,0.3682168029267528,0.2992284266917364,17.429006456879375,
stddev,0.4968503627576927,7.225769602213949,,66.19801033388443,70.29977836720634,0.4938591183768903,195.1709253716353,1.8773658880245656,0.0636242724268265,2.7277300679029115,2.0035671751718245,0.020473872596737,195.3789271147781,
min,1.0,0.0,N,1.0,1.0,0.0,-1856.0,-50.56,-0.5,-221.0,-39.74,-0.3,-1871.8,
max,5.0,45977.22,Y,265.0,265.0,5.0,907070.24,14000.0,212.42,1624.64,5743.51,1.26,907071.04,


In [12]:
sdf_yellow.describe(['trip_distance', 'fare_amount']).show()



+-------+-----------------+------------------+
|summary|    trip_distance|       fare_amount|
+-------+-----------------+------------------+
|  count|         70443818|          70443818|
|   mean|3.041164661319338|13.357372785076802|
| stddev|7.225769602213949| 195.1709253716353|
|    min|              0.0|           -1856.0|
|    max|         45977.22|         907070.24|
+-------+-----------------+------------------+



                                                                                

In [7]:
# FHV data, not working yet

# sdf_fhv = spark.read.parquet('../data/raw/fhv/')
# #problematic = ['congestion_surcharge', 'passenger_count', 'ratecodeID']

# #sdf_fhv = sdf_fhv.drop('ratecodeID', 'passenger_count', 'ehail_fee', 'payment_type', 'congestion_surcharge', 'trip_type')   
# new = sdf_fhv.withColumn("PUlocationID",F.col("PUlocationID").cast(DoubleType()))

# #new.limit(5)
# new.dtypes
# new.printSchema()
# new.limit(5)

[Stage 15:>                 (0 + 4) / 9][Stage 16:>                 (0 + 0) / 1]

22/08/17 15:14:06 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1247229 ms exceeds timeout 120000 ms
22/08/17 15:14:07 WARN SparkContext: Killing executors is not supported by current scheduler.


ERROR:root:Exception while sending command.age 16:>                 (0 + 0) / 1]
Traceback (most recent call last):
  File "/home/toomas/ADS/adsenv/lib/python3.8/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/toomas/ADS/adsenv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/toomas/ADS/adsenv/lib/python3.8/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Py4JError: An error occurred while calling o96.parquet

In [71]:
def make_easier_to_live_with_life(sdf):

    sdf_new = sdf \
            .drop('VendorID', 'store_and_fwd_flag', 'payment_type', 'fate_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'congestion_surcharge','airport_fee') \
            .dropna(how='any') \
            .filter((F.col('trip_distance') > 0) & (F.col('trip_distance') < 150)) \
            .filter((F.col('PULocationID').isin([1,263])) & (F.col('DOLocationID').isin([1, 263]))) \
            .withColumn('PUDayofweek', (F.dayofweek(F.col('tpep_pickup_datetime')))) \
            .withColumn('PUHour', F.hour(F.col('tpep_pickup_datetime')))
            #.withColumn('passenger_count', F.col('passenger_count').cast('INT')) \
            #.filter(F.col('passenger_count') > 0) \
    return sdf_new

sdf = make_easier_to_live_with_life(sdf_yellow)
sdf.show(100)



+--------------------+---------------------+---------------+-------------+------------+------------+-----------+------------+-----------+------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|total_amount|PUDayofweek|PUHour|
+--------------------+---------------------+---------------+-------------+------------+------------+-----------+------------+-----------+------+
| 2017-06-01 00:03:01|  2017-06-01 00:05:40|              1|          0.4|         263|         263|        3.5|         4.8|          5|     0|
| 2017-06-01 00:22:04|  2017-06-01 00:26:31|              1|          1.1|         263|         263|        5.5|        8.15|          5|     0|
| 2017-06-01 00:30:19|  2017-06-01 00:32:21|              1|         0.52|         263|         263|        4.0|        6.36|          5|     0|
| 2017-06-01 00:19:09|  2017-06-01 00:20:14|              3|         0.35|         263|         263|        3.0|         5.3|     

In [252]:
sdf_yellow_3.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [246]:
print("Total trip count", sdf_yellow.count())
sdf_yellow.printSchema()

Total trip count 19355336
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [248]:
ints = ['DOlocationID', 'PUlocationID']
doubles = ['trip_distance', 'total_amount']
dtimes = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'Pickup_DateTime', 'DropOff_datetime', 'tpep_pickup_datetime', 'tpep_dropoff_datetime')
unwanted = []
dtypes = {column: IntegerType() for column in ints}
dtypes.update({column: DoubleType() for column in doubles})
dtypes.update({column: StringType() for column in strings})
dtypes.update({column: TimestampType() for column in dtimes})

for column in sdf.columns:
    sdf = sdf.withColumn(column, sdf[column].cast(dtypes[column]))


                                                                                

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2019-06-01 00:55:13,2019-06-01 00:56:17,1.0,0.0,1.0,N,145,145,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
1,2019-06-01 00:06:31,2019-06-01 00:06:52,1.0,0.0,1.0,N,262,263,2,2.5,3.0,0.5,0.0,0.0,0.3,6.3,2.5,
1,2019-06-01 00:17:05,2019-06-01 00:36:38,1.0,4.4,1.0,N,74,7,2,17.5,0.5,0.5,0.0,0.0,0.3,18.8,0.0,
1,2019-06-01 00:59:02,2019-06-01 00:59:12,0.0,0.8,1.0,N,145,145,2,2.5,1.0,0.5,0.0,0.0,0.3,4.3,0.0,
1,2019-06-01 00:03:25,2019-06-01 00:15:42,1.0,1.7,1.0,N,113,148,1,9.5,3.0,0.5,2.65,0.0,0.3,15.95,2.5,


In [83]:
#sdf_green = spark.read.parquet('../data/raw/green/').options(inferSchema=False)
# type had changed over the years and is not relevant
#mergedDF = spark.read.option("mergeSchema", "true").parquet("../data/raw/green/")

#sdf_green = sdf_green.drop('ratecodeID', 'ehail_fee',  'congestion_surcharge', 'trip_type') \
#    .withColumn('passenger_count', sdf_green['passenger_count'].cast(IntegerType())) 

#sdf_green.show(5)
#mergedDF.write.mode('overwrite').parquet("../data/curated/green201720")
for file in os.listdir('../data/raw/yellow/'):
    print(file)
    sdf = spark.read.parquet('../data/raw/yellow/'+file)
    sdf = sdf.withColumn('passenger_count', sdf['passenger_count'].cast(IntegerType())) 
    sdf.write.mode('overwrite').parquet('..data/curated/yellow/'+file)


    # congestion_surcharge: int->double, 
    # passenger_count long->double, 
    # ratecodeID long->double

2019_06.parquet


                                                                                

2017_06.parquet


