# Coiled Notebooks

In [1]:
import os

import boto3
import polars as pl
import requests
import s3fs
from IPython.display import display, HTML

In [2]:
display(HTML("<style>.jp-OutputArea-output {display:flex}</style>"))

## About

Coiled notebooks demo from [this Coiled webinar](https://youtu.be/s4Ge6QICz98?si=NCOYt27i_rOkQpFD&t=508).

## User Inputs

In [3]:
url = "s3://nyc-tlc/trip data/"

In [4]:
dtypes = [
    pl.col('vendorid').cast(pl.Int64),
    pl.col('passenger_count').cast(pl.Int64),
    pl.col('trip_distance').cast(pl.Float64),
    pl.col('ratecodeid').cast(pl.Int64),
    pl.col('pulocationid').cast(pl.Int64),
    pl.col('dolocationid').cast(pl.Int64),
    pl.col('payment_type').cast(pl.Int64),
    pl.col('fare_amount').cast(pl.Float64),
    pl.col('extra').cast(pl.Float64),
    pl.col('mta_tax').cast(pl.Float64),
    pl.col('tip_amount').cast(pl.Float64),
    pl.col('tolls_amount').cast(pl.Float64),
    pl.col('improvement_surcharge').cast(pl.Float64),
    pl.col('total_amount').cast(pl.Float64),
    pl.col('congestion_surcharge').cast(pl.Float64),
    pl.col('airport_fee').cast(pl.Float64),
]

Show parent directory

In [5]:
os.path.abspath(os.pardir)

'/scratch/synced'

List buckets

In [6]:
s3_client = boto3.client('s3')
s3_client.list_buckets()['Buckets']

[{'Name': 'oss-shared-scratchp',
  'CreationDate': datetime.datetime(2023, 9, 28, 3, 41, 18, tzinfo=tzlocal())}]

## Get Data

### Get Data Filepaths

In [7]:
s3 = s3fs.S3FileSystem()

In [8]:
filenames_raw = s3.ls(url)
filenames_raw[0:5]

['nyc-tlc/trip data/',
 'nyc-tlc/trip data/fhv_tripdata_2015-01.parquet',
 'nyc-tlc/trip data/fhv_tripdata_2015-02.parquet',
 'nyc-tlc/trip data/fhv_tripdata_2015-03.parquet',
 'nyc-tlc/trip data/fhv_tripdata_2015-04.parquet']

In [9]:
filenames = sorted(
    [
        os.path.join("s3://", f) for f in filenames_raw
        if "yellow_tripdata_2023" in f
    ]
)
filenames = [filenames[0]]
filenames

['s3://nyc-tlc/trip data/yellow_tripdata_2023-01.parquet']

### Run ETL Pipeline

In [10]:
%%time
for filename in filenames:
    df = (
        # extract
        pl.read_parquet(filename)
        # transform
        .select(
            pl.all()
            .map_alias(
                lambda col_name: col_name.replace('.', '_').lower()
            )
        )
        .with_columns([*dtypes])
        .filter((pl.col('tip_amount') != 0))
    )

    m = os.path.splitext(filename)[0].split('-', -1)[-1]
    fpath_out = (
        os.path.join(
            "s3://oss-shared-scratchp/raw",
            f"{filename.split('/')[-1].replace('-', '_')}.gzip"
        )
    )

    # load
    df.write_parquet(fpath_out, compression='gzip', use_pyarrow=True)
    print(f"Exported {len(df):,} rows from {os.path.basename(filename)} to {fpath_out}")

Exported 2,372,009 rows from yellow_tripdata_2023-01.parquet to s3://oss-shared-scratchp/raw/yellow_tripdata_2023_01.parquet.gzip
CPU times: user 6.97 s, sys: 727 ms, total: 7.7 s
Wall time: 7.42 s
