In [4]:
# import the goodies:
import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))
import dask
import dask.dataframe as dd
import gc
import subprocess
import pandas as pd
import numpy as np
!pip install meteostat
from meteostat import Point, Daily
import calendar
from datetime import datetime

In [5]:
# we'll import the taxi data inside a function to save memory:
def imports2(year,months_number,sample_part):
    df_list = []
    for el in list((range(1,months_number+1))):
        month = str(el).zfill(2)
        link = 's3://nyc-tlc/trip data/yellow_tripdata_'+str(year)+'-'+month+'.csv'
        df = dd.read_csv(link, dtype={'tolls_amount': 'float64', 'RatecodeID':'float64',
                                                           'trip_distance':'float64','store_and_fwd_flag':'category', 'fare_amount':'float64'}, storage_options={'anon': True, 'use_ssl': False})
        # 1. remove nulls and insanely long trips:
        df = df[~df['payment_type'].isnull()]
        df = df[df['trip_distance']<500]
        # 2. create a pickup date column, modify column dtypes:
        df['pickup_date'] = df['tpep_pickup_datetime'].str[:11]
        df['tpep_pickup_datetime'] = dd.to_datetime(df['tpep_pickup_datetime'])
        df['tpep_dropoff_datetime'] = dd.to_datetime(df['tpep_dropoff_datetime'])
        df['pickup_date'] = dd.to_datetime(df['pickup_date'])
        # 3. lets make sure that the dataframe has only this months days:
        df = df[(df['pickup_date']>=str(year)+'-'+month+'-01') & (df['pickup_date']<=str(year)+'-'+month+'-'+str(calendar.monthrange(year, el)[1]))].copy()
        # 4. now lets sample only some % of the original dataset:
        df = df.sample(frac=sample_part)
        
        #### NEW TRICK CHECK TIMING: ###
        df['trip_length'] = df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
        df['trip_length'] = df['trip_length'].astype('timedelta64[m]')
        
#         5. change a few columns to integers to save memory:
        
        integerize = ['passenger_count','VendorID','RatecodeID', 'payment_type']
        for col in integerize:
            df[col] = df[col].astype(int)
            df[col] = dd.to_numeric(df[col])
        # 6. new trick:
        df_list.append(df)
        del df
        gc.collect()
        if len(df_list)>1:
            df_both = dd.concat([df_list[0],df_list[1]], axis=0)          
            del df_list[0]
            del df_list[0]            
            gc.collect()
            df_list.append(df_both)
            del df_both
            gc.collect()
    return df_list[0]

In [6]:
def get_weather(start, end):
    # Create Point for NYC
    location = Point(40.785091,-73.968285)

    # Get daily data for 2018
    data = Daily(location, start, end)
    data = data.fetch()
    data = data.drop(columns=['wpgt','tsun'])
    data['time'] = data.index
    data = data.rename(columns={'time':'pickup_date'})
    data = data.fillna(data.mean())
    return data

In [7]:
def double_import(year, months, sample_size):
    last_day = calendar.monthrange(year, months)[1]
    start = pd.to_datetime(str(year))
    end = pd.to_datetime(str(year)+'-'+str(months)+'-'+ str(last_day))
    df = imports2(year,months,sample_size)
    data = get_weather(start, end)
    df2=df.merge(data, on='pickup_date', how='left') 
    del df
    del data
    gc.collect()
    return df2
# df2 = double_import(year,months, sample_size)    

In [8]:
# removing outliers in the dataset:
def iqring(df, col):
    Q1 = df[col].quantile(0.25)
    Q3 = df[col].quantile(0.75)
    IQR = Q3 - Q1
    df2 = df[~((df[col] < (Q1 - 1.5 * IQR)) |(df[col] > (Q3 + 1.5 * IQR)))]
    return df2


In [9]:
year = 2017
months = 12
sample_size = 0.05

df2 = double_import(year,months, sample_size)    
df3 = iqring(df2, 'total_amount')
del df2
gc.collect()
df3.to_csv('my_first_dask_only'+str(year)+'.csv')
del df3
gc.collect()

In [9]:
year = 2018
months = 12
# sample_size = 0.00001

df2 = double_import(year,months, sample_size)    
df3 = iqring(df2, 'total_amount')
del df2
gc.collect()
df3.to_csv('my_first_dask_only'+str(year)+'.csv')
del df3
gc.collect()