In [4]:
import os
os.stat('Homework/data/fhv_tripdata_2021-06.parquet').st_size // (1024)

12898

In [8]:
import pandas as pd
import requests
import pickle
import os
from datetime import datetime
from dateutil.relativedelta import relativedelta

from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error


from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner


@task
def get_paths(date):
    logger = get_run_logger()

    date_list = sorted([(pd.to_datetime(date)- relativedelta(months=i)).strftime("%Y-%m") for i in range(1, 3)])

    file_paths = []

    for i in date_list:

        file_name = f"fhv_tripdata_{i}"

        result= requests.get(f"https://nyc-tlc.s3.amazonaws.com/trip+data/{file_name}.parquet") 

        file_path = f"Homework/data/{file_name}.parquet"

        with open(file_path, 'wb') as file:

            file.write(result.content)

        logger.info(f'{file_name} has been successfully downloaded and placed in the file path- {file_path}')

        file_paths.append(file_path)

    return file_paths[0], file_paths[1]

@task
def read_data(path):
    df = pd.read_parquet(path)
    return df

@task
def prepare_features(df, categorical, train=True):
    logger = get_run_logger()
    df['duration'] = df.dropOff_datetime - df.pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    mean_duration = df.duration.mean()
    if train:
        logger.info(f"The mean duration of training is {mean_duration}")
    else:
        logger.info(f"The mean duration of validation is {mean_duration}")
    
    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    return df

@task
def train_model(df, categorical):

    logger = get_run_logger()

    train_dicts = df[categorical].to_dict(orient='records')
    dv = DictVectorizer()
    X_train = dv.fit_transform(train_dicts) 
    y_train = df.duration.values

    logger.info(f"The shape of X_train is {X_train.shape}")
    logger.info(f"The DictVectorizer has {len(dv.feature_names_)} features")

    lr = LinearRegression()
    lr.fit(X_train, y_train)
    y_pred = lr.predict(X_train)
    mse = mean_squared_error(y_train, y_pred, squared=False)
    logger.info(f"The MSE of training is: {mse}")
    return lr, dv

@task
def run_model(df, categorical, dv, lr):
    logger = get_run_logger()
    val_dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(val_dicts) 
    y_pred = lr.predict(X_val)
    y_val = df.duration.values

    mse = mean_squared_error(y_val, y_pred, squared=False)
    logger.info(f"The MSE of validation is: {mse}")
    return

@flow(task_runner=SequentialTaskRunner())
def main(date : str = None):

    logger = get_run_logger()

    if date is None:
        date = datetime.now()
    
    train_path, val_path = get_paths(date).result()


    categorical = ['PUlocationID', 'DOlocationID']

    df_train = read_data(train_path)
    df_train_processed = prepare_features(df_train, categorical)

    df_val = read_data(val_path)
    df_val_processed = prepare_features(df_val, categorical, False)

    # train the model
    lr, dv = train_model(df_train_processed, categorical).result()


    with open(f'Homework/models/model-{date}.bin', 'wb') as lin_reg:
        pickle.dump(lr, lin_reg )
    
    dv_path = f'Homework/models/dv-{date}.b'
    
    with open(dv_path, 'wb') as dict_vect:
        pickle.dump(dv, dict_vect)
    
    dv_size = os.stat(dv_path).st_size

    logger.info(f"The Dictvectorizer size is -: {dv_size} bytes")
        
    run_model(df_val_processed, categorical, dv, lr)

main(date="2021-08-15")

18:28:44.127 | INFO    | prefect.engine - Created flow run 'convivial-serval' for flow 'main'
18:28:44.133 | INFO    | Flow run 'convivial-serval' - Using task runner 'SequentialTaskRunner'
18:28:46.981 | INFO    | Flow run 'convivial-serval' - Created task run 'get_paths-6e696e34-0' for task 'get_paths'
18:30:18.249 | INFO    | Task run 'get_paths-6e696e34-0' - fhv_tripdata_2021-06 has been successfully downloaded and placed in the file path- Homework/data/fhv_tripdata_2021-06.parquet
18:31:18.874 | INFO    | Task run 'get_paths-6e696e34-0' - fhv_tripdata_2021-07 has been successfully downloaded and placed in the file path- Homework/data/fhv_tripdata_2021-07.parquet
18:31:21.269 | INFO    | Task run 'get_paths-6e696e34-0' - Finished in state Completed()
18:31:22.816 | INFO    | Flow run 'convivial-serval' - Created task run 'read_data-4c7f9de4-0' for task 'read_data'
18:33:11.386 | INFO    | Task run 'read_data-4c7f9de4-0' - Finished in state Completed()
18:33:20.096 | INFO    | Flow 

In [None]:
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import os
import requests
import pandas

In [13]:
result = requests.get("https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-01.parquet")
with open('Homework/data/fhv_tripdata_2021-01.parquet', 'wb') as file_name:
    file_name.write(result.content)

In [30]:
datetime.now()

datetime.datetime(2022, 6, 13, 10, 42, 41, 292056)

In [26]:
def get_path(date):
    date_list = sorted([(pandas.to_datetime(date)- relativedelta(months=i)).strftime("%Y-%m") for i in range(1, 3)])
    paths = []
    for i in date_list:
        file_name = f"fhv_tripdata_{i}"
        result= requests.get(f"https://nyc-tlc.s3.amazonaws.com/trip+data/{file_name}.parquet") 
        file_path = f"Homework/data/{file_name}.parquet"
        with open(file_path, 'wb') as file:
            file.write(result.content)
        print(f'{file_name} has been successfully downloaded')
        paths.append(file_path)
    return paths[0], paths[1]

def get_date_difference(date : str = None):
    if date is None:
        date = datetime.now()





In [38]:
date = datetime.now()
date_list = sorted([(pandas.to_datetime(date)- relativedelta(months=i)).strftime("%Y-%m") for i in range(1, 3)])
print(date_list)

['2022-04', '2022-05']
