In [1]:
import os
import zipfile

import pandas as pd
import geopandas as gpd
import numpy as np

import matplotlib.pyplot as plt
import contextily as cx

from dotenv import dotenv_values
from pprint import pprint
from google.cloud import storage

from keys.KEYS import *


CONFIG = dotenv_values("config.env")
GCP_ST_CLIENT = storage.Client()

LOCAL_ST = os.path.join(os.getcwd(), 'sample_data')




· CLOUD SETUP: working with google cloud

We are using tempprary credentials from Qwiklabs; 
each 2h of use we need to fetch new one and restablish connection:

```
gcloud init
gcloud auth application-default login
```

In [None]:

for blob in GCP_ST_CLIENT.list_blobs('data_eng_test'):
    print(blob)

In [19]:
# !gcloud storage ls --recursive gs://data_eng_test/
import io
c = 0
d = {}
print(f"NUM FILES : {GCP_ST_CLIENT.list_blobs('data_eng_test').num_results}")

for blob in GCP_ST_CLIENT.list_blobs('data_eng_test'):
    c += 1
    d[blob.name] = np.nan

    if blob.name.endswith('zip'):
        zbytes = io.BytesIO(blob.download_as_string())

        if zipfile.is_zipfile(zbytes):
            with zipfile.ZipFile(zbytes, 'r') as z:
                d[blob.name] = len(z.namelist())
                
    if c == 2:
        break
d
    

NUM FILES : 0


{'data_dictionary_trip_records_yellow.pdf': nan,
 'yellow_tripdata_2015-01_00.csv.zip': 1,
 'yellow_tripdata_2015-01_01.csv.zip': 1,
 'yellow_tripdata_2015-01_02.csv.zip': 1,
 'yellow_tripdata_2015-01_03.csv.zip': 1,
 'yellow_tripdata_2015-01_04.csv.zip': 1,
 'yellow_tripdata_2015-01_05.csv.zip': 1,
 'yellow_tripdata_2015-01_06.csv.zip': 1,
 'yellow_tripdata_2015-01_07.csv.zip': 1,
 'yellow_tripdata_2015-01_08.csv.zip': 1,
 'yellow_tripdata_2015-01_09.csv.zip': 1,
 'yellow_tripdata_2015-01_10.csv.zip': 1,
 'yellow_tripdata_2015-01_11.csv.zip': 1,
 'yellow_tripdata_2015-01_12.csv.zip': 1,
 'yellow_tripdata_2015-01_13.csv.zip': 1,
 'yellow_tripdata_2015-01_14.csv.zip': 1,
 'yellow_tripdata_2015-01_15.csv.zip': 1,
 'yellow_tripdata_2015-01_16.csv.zip': 1,
 'yellow_tripdata_2015-01_17.csv.zip': 1,
 'yellow_tripdata_2015-01_18.csv.zip': 1}

Let's download the pdf and a portion of the files to see the head:

In [38]:
# _blobs = {
#     'pdf': 'data_dictionary_trip_records_yellow.pdf',
#     'csv' : 'yellow_tripdata_2015-01_00.csv.zip'
# }

constr = 'yellow_tripdata_2015-04_{part}.csv.zip'
_blobs = list(map(lambda n: constr.format(part=str(n).zfill(2)), range(23)))


bucket = GCP_ST_CLIENT.bucket(CONFIG['BUCKET_NAME'])

for blob_name in _blobs:
    download_path = os.path.join(LOCAL_ST, blob_name)

    if not os.path.exists(download_path):
        bucket.blob(blob_name).download_to_filename(download_path)
    

In [30]:
from datetime import datetime

SOURCE_NAME = 'yellow_tripdata'
NOWUTC = datetime.utcnow().replace(year=2015, month=2)  
SOURCE_CONSTR = "{base_name}_{year}-{month}_{part}"

SOURCE_CONSTR.format(
    base_name = SOURCE_NAME,
    year = NOWUTC.year,
    month = NOWUTC.month,
    part = 1
)


'yellow_tripdata_2015-2_1'

Lets see a bit the data: 

In [22]:
df = pd.read_csv(os.path.join(LOCAL_ST, _blobs['csv']), sep ='\t')

print(df.shape)
df.head()

(499999, 19)


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,2,2015-01-15 19:05:39,2015-01-15 19:23:42,1,1.59,-73.993896,40.750111,1,N,-73.974785,40.750618,1,12.0,1.0,0.5,3.25,0.0,0.3,17.05
1,1,2015-01-10 20:33:38,2015-01-10 20:53:28,1,3.3,-74.001648,40.724243,1,N,-73.994415,40.759109,1,14.5,0.5,0.5,2.0,0.0,0.3,17.8
2,1,2015-01-10 20:33:38,2015-01-10 20:43:41,1,1.8,-73.963341,40.802788,1,N,-73.95182,40.824413,2,9.5,0.5,0.5,0.0,0.0,0.3,10.8
3,1,2015-01-10 20:33:39,2015-01-10 20:35:31,1,0.5,-74.009087,40.713818,1,N,-74.004326,40.719986,2,3.5,0.5,0.5,0.0,0.0,0.3,4.8
4,1,2015-01-10 20:33:39,2015-01-10 20:52:58,1,3.0,-73.971176,40.762428,1,N,-74.004181,40.742653,2,15.0,0.5,0.5,0.0,0.0,0.3,16.3


In [167]:
print(df.columns.tolist())

['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'RateCodeID', 'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']


### geometry validity

There are two types of geometry here: **pickup points** and **dropoff points**.

Validation: 
* Points need to be in NYC (plot to verify)
* Verify projection (gdf.crs). The only projection 

In [168]:
_cols_pickup = ['pickup_longitude', 'pickup_latitude']
gdf_pickup = gpd.GeoDataFrame(
    df[_cols_pickup] ,
    geometry=gpd.points_from_xy(*df[_cols_pickup].values.T), 
    crs=CONFIG['CRS']
)
print(gdf_pickup.describe().loc[['min', 'max'], :].to_dict())

_cols_dropoff = ['dropoff_longitude','dropoff_latitude']
gdf_dropoff = gpd.GeoDataFrame(
    df[_cols_dropoff] ,
    geometry=gpd.points_from_xy(*df[_cols_dropoff].values.T), 
    crs=CONFIG['CRS']
)
print(gdf_dropoff.describe().loc[['min', 'max'], :].to_dict())

{'pickup_longitude': {'min': -87.45187377929686, 'max': 0.0}, 'pickup_latitude': {'min': 0.0, 'max': 42.29415512084961}}
{'dropoff_longitude': {'min': -86.73170471191406, 'max': 0.0}, 'dropoff_latitude': {'min': 0.0, 'max': 49.19465637207031}}


Hay ceros en la geometría y algunos valores incorrectos, por lo que filtramos por distribución.

In [169]:
def filter_by_quantiles(ser:pd.Series, quantile_values:tuple):
    min_quantile = ser.quantile(quantile_values[0])
    max_quantile = ser.quantile(quantile_values[1])
    return ser[(ser>=min_quantile) & (ser<=max_quantile)].index

iqr = (0.02, 0.98)

for _col in _cols_pickup:
    gdf_pickup = gdf_pickup.loc[filter_by_quantiles(gdf_pickup[_col], iqr), :]

for _col in _cols_dropoff:
    gdf_dropoff = gdf_dropoff.loc[filter_by_quantiles(gdf_dropoff[_col], iqr), :]

correct_geom_indexes = list(set(gdf_dropoff.index).intersection(set(gdf_pickup.index)))

gdf_pickup = gdf_pickup.loc[correct_geom_indexes, :].copy(deep=True)
gdf_dropoff = gdf_dropoff.loc[correct_geom_indexes, :].copy(deep=True)

In [63]:
# comprobación mediante visualización

# ax = gdf_dropoff.plot(color = 'red', markersize = 1 )
# cx.add_basemap(ax, crs=gdf_dropoff.crs.to_string())
# plt.show()

Se filtra aprox el 13% de los datos

In [170]:
df = df.loc[correct_geom_indexes, :].copy(deep=True)
df.shape

(436690, 19)

### datetime validity: 

Hay un tiempo de recogida y un tiempo de salida. 

Validación:
* El tiempo de recogida tiene que ser anterior al de bajada
* Los tiempos tienen que ser razonables (menos de 1 día)

In [171]:
dte_cols = df.select_dtypes(include = 'object').columns[:2].to_list()
print(dte_cols)

dtformat = '%Y-%m-%d %H:%M:%S'

for _col in dte_cols:
    df[_col] = pd.to_datetime(df[_col], format = dtformat, errors = 'coerce')

df.shape

['tpep_pickup_datetime', 'tpep_dropoff_datetime']


(436690, 19)

In [172]:
_filter_condition = np.logical_and.reduce([
    (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() > 1,
    (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.days < 1
])

df = df[_filter_condition].copy(deep=True)
df.shape


(436658, 19)

Se filtra un 100-99.99267 % 

### ID validity 

Hay 3 columnas que son IDs o ordinales.

Validación:
* No deberían ser nan
* Mapear en sus valores (ya que son taan cortos)

In [175]:
_id_cols = ['VendorID', 'RateCodeID', 'payment_type']

df['payment_type'] = df['payment_type'].map(PAYMENT_TYPE)
df['VendorID'] = df['VendorID'].map(VENDORID)
df['RateCodeID'] = df['RateCodeID'].map(RATECODEID)

df[_id_cols].isna().sum()

VendorID         0
RateCodeID      11
payment_type     0
dtype: int64

In [176]:
df = df.dropna(axis = 0, how = 'any', subset = _id_cols).copy(deep=True)
df.shape

(436647, 19)

### INT Validation

Todas las columnas que son valores nmericos

Validation:
* Tienen que ser distinto de cero (tiene que haberse desplazado y pagar algo)
* Tienen que ser distinto de nulo

In [177]:
_num_cols = df.select_dtypes(include=['integer', 'float']).columns.to_list()
_num_cols = [_ for _ in _num_cols if not any(_.endswith(w) for w in ['latitude', 'longitude', 'ID'])]
_num_cols_notzero = [_ for _ in _num_cols if not any(_.startswith(w) for w in ['tip', 'tolls', 'extra', 'improvement', 'mta'])]

print(df[_num_cols].isna().sum().to_dict())
print(df[_num_cols_notzero].mask(df == 0).isna().sum().to_dict())

{'passenger_count': 0, 'trip_distance': 0, 'fare_amount': 0, 'extra': 0, 'mta_tax': 0, 'tip_amount': 0, 'tolls_amount': 0, 'improvement_surcharge': 0, 'total_amount': 0}
{'passenger_count': 225, 'trip_distance': 1506, 'fare_amount': 70, 'total_amount': 34}


In [179]:
df.loc[:, _num_cols_notzero] = df.loc[:, _num_cols_notzero].mask(df == 0)
df = df.dropna(axis = 0, how = 'any', subset = _num_cols_notzero).copy(deep=True)
df.shape

(434892, 19)

434892 desde 499999: se filtra un 13% de datos (que son incorrectos por alguna razón) 

## Boolean

Hay un campo booleano

In [182]:
_bool_col = ['store_and_fwd_flag']
df[_bool_col].value_counts()

store_and_fwd_flag
N                     431227
Y                       3665
Name: count, dtype: int64

In [None]:
text_to_bool = {
    'Y' : True,
    'N' : False
}