# DAG Project

In [211]:
import pandas as pd 
from functools import reduce
import datetime as dt
import csv

In [212]:
# Create a function to extract the data.
def extract_data(file_path, chunksize = 10**5):
    for chunk in pd.read_csv(file_path, chunksize=chunksize):
        yield chunk

In [213]:
# Create a function to calaulate the trip duration in minutes.
def calculate_trip_duration_in_minutes(chunks):
    for chunk in chunks:
        chunk = chunk.dropna()
        chunk['trip_duration_minutes'] = chunk['trip_duration'] / 60
        yield chunk

In [214]:
# Create a function to create a rush hour column.
def create_rush_hour_feature(chunks):
    start_time = 7
    end_time = 9
    for chunk in chunks:
        chunk = chunk.loc[(chunk['pickup_datetime']!=0)&(chunk['dropoff_datetime']!=0)]
        rush_hour_start = pd.to_datetime(start_time)
        rush_hour_end = pd.to_datetime(end_time)
        chunk['is_rush_hour'] = (pd.to_datetime(chunk['pickup_datetime'] )>= rush_hour_start) & (pd.to_datetime(chunk['dropoff_datetime'] )<= rush_hour_end)
        yield chunk

In [215]:
# Create a function to extract the day of the week.
def extract_day_of_week(chunks):
    for chunk in chunks:
        chunk['day_of_week'] = pd.to_datetime(chunk['pickup_datetime']).dt.weekday
        yield chunk

In [216]:
# Create a function to filter out unrealistic trips.
def filter_out_unrealistic_trips(chunks):
    for chunk in chunks:
        min_trip_duration = 1
        max_trip_duration = 24 * 60
        chunk = chunk[(chunk['trip_duration'] >= min_trip_duration) & (chunk['trip_duration'] <= max_trip_duration)]
        yield chunk

In [217]:
# Create a function to one hot encode the day of week column.
def one_hot_encode_day_of_week(chunks):
    for chunk in chunks:
        chunk = pd.get_dummies(chunk, columns = ['day_of_week'], prefix='day_of_week')
        yield chunk

In [218]:
# Create a function to load the transformed data into a new file.
def load_data(chunks, destination='transformed_data.json'):
    for chunk in chunks:
        if destination.endswith('.csv'):
            chunk.to_csv(destination)
        elif destination.endswith('.db'):
            chunk.to_sql('trips',destination)
        elif destination.endswith('.json'):
            chunk.to_json(destination)
        else:
            raise ValueError("Invalid destination file type")

In [219]:
# Create a compose function
def compose(*functions):
    return reduce(lambda f, g: lambda x: g(f(x)), functions)

In [220]:
# Execute the compose.
composed = compose(extract_data, calculate_trip_duration_in_minutes, create_rush_hour_feature, extract_day_of_week, filter_out_unrealistic_trips, one_hot_encode_day_of_week, load_data)
composed('train.csv')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  chunk['trip_duration_minutes'] = chunk['trip_duration'] / 60
