# Get Data

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import io
import os
import sys
from datetime import datetime
from multiprocessing import cpu_count
from typing import List
from urllib.request import urlretrieve

import boto3
import pandas as pd
from contexttimer import Timer
from joblib import Parallel, delayed

In [3]:
PROJ_ROOT = os.path.join(os.pardir)
src_dir = os.path.join(PROJ_ROOT, "src")
sys.path.append(src_dir)

In [4]:
%aimport pandas_utils
import pandas_utils as pu

## About

Get data using notebook running inside a dev container.

## User Inputs

In [5]:
# dataset
# # base url to access data files
base_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata'
# # list of year and month combinations of data to be loaded
ym_list = ['2023-01', '2023-02', '2023-03']
# # columns to be loaded
cols_to_load = [
    'lpep_pickup_datetime',
    'RatecodeID',
    'trip_distance',
]

# AWS
# # private s3 bucket name
s3_bucket_name = 'oss-shared-scratchp'
# # profile from .aws/credentials
aws_profile_name = 'default'

In [6]:
urls = [f"{base_url}_{ym}.parquet" for ym in ym_list]

## Use `boto3`

Verify the following

1. check that `~/.aws` folder exists inside container
2. check that the AWS profile specified in the **User Inputs** section is found in the `~/.aws/credentials` file

In [7]:
# verify .aws folder exists
assert '.aws' in os.listdir(os.path.expanduser("~"))
# verify required profile is found in .aws/credentials
assert aws_profile_name in boto3.session.Session().available_profiles

Define S3 client

In [8]:
session = boto3.session.Session(profile_name=aws_profile_name)
s3_client = session.client('s3')

Verify S3 buckets can be accessed

In [9]:
assert len(s3_client.list_buckets()['Buckets']) >= 1
assert s3_bucket_name in [
    b['Name'] for b in s3_client.list_buckets()['Buckets']
]

## Get Data

### Run ETL Pipeline to Process Data

In [10]:
def run_etl_process(
    url: str, columns: List[str], s3_bucket_name: str
) -> pd.DataFrame:
    """Run ETL pipeline to get one month of taxi trips data."""
    with Timer() as t:
        # extract
        df_raw = (
            pd.read_parquet(
                url,
                columns=columns,
                filters=[('RatecodeID', 'in', [2, 3, 4, 5])],
            )    
        )

        # transform
        df = df_raw.pipe(pu.convert_dtypes_auto)

        # load
        curr_dtime = datetime.now().strftime("%Y%m%d_%H%M%S")
        proc_fname = (
            f"{os.path.basename(url).replace('.parquet', '').replace('-', '_')}_"
            f"{curr_dtime}.parquet.gzip"
        )
        out_buffer = io.BytesIO(
            df.to_parquet(engine="pyarrow", index=False, compression="gzip")
        )
        response = s3_client.put_object(
            Bucket=s3_bucket_name,
            Key=os.path.join('raw', proc_fname),
            Body=out_buffer.getvalue(),
        )
        assert response['ResponseMetadata']['HTTPStatusCode'] == 200
    duration = t.elapsed

    print(
        f"Loaded {len(df_raw):,} rows and exported {len(df):,} rows of "
        f"data to {proc_fname} in {duration:.3f}s\n"
    )
    return df

In [11]:
%%time
executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
tasks = (
    delayed(run_etl_process)(
        f, cols_to_load, s3_bucket_name
    )
    for f in urls
)
df = pd.concat(executor(tasks), ignore_index=True)
print(f"Loaded {len(df):,} rows of data")
pu.show_df(df)
df.info()

Loaded 1,774 rows and exported 1,774 rows of data to green_tripdata_2023_01_20231106_205413.parquet.gzip in 0.766s

Loaded 2,071 rows and exported 2,071 rows of data to green_tripdata_2023_02_20231106_205413.parquet.gzip in 0.779s

Loaded 2,479 rows and exported 2,479 rows of data to green_tripdata_2023_03_20231106_205413.parquet.gzip in 0.803s

Loaded 6,324 rows of data


column,lpep_pickup_datetime,RatecodeID,trip_distance
dtype,datetime64[us],Int64,Float64
missing,0,0,0
0,2023-01-01 00:10:45,5,1.3
1,2023-01-01 00:46:48,4,10.44
2,2023-01-01 00:40:58,5,6.78
3,2023-01-01 00:16:10,5,6.19
4,2023-01-01 00:39:52,5,2.08
...,...,...,...
6319,2023-03-31 22:15:25,5,2.7
6320,2023-03-31 22:35:53,5,3.61
6321,2023-03-31 23:38:29,5,7.03
6322,2023-03-31 23:02:54,5,2.76


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6324 entries, 0 to 6323
Data columns (total 3 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   lpep_pickup_datetime  6324 non-null   datetime64[us]
 1   RatecodeID            6324 non-null   Int64         
 2   trip_distance         6324 non-null   Float64       
dtypes: Float64(1), Int64(1), datetime64[us](1)
memory usage: 160.7 KB
CPU times: user 70.3 ms, sys: 70.9 ms, total: 141 ms
Wall time: 945 ms
