In [1]:
import glob
import time

import pandas as pd
import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
from deltalake.writer import write_deltalake
from tqdm.notebook import tqdm

# Iterating over trip data files

In [2]:
list_tripdata_files = sorted(glob.glob("../datasets/fhvhv_tripdata*.parquet"))

In [3]:
len(list_tripdata_files)

46

In [4]:
list_tripdata_files[:24]

['../datasets/fhvhv_tripdata_2019-02.parquet',
 '../datasets/fhvhv_tripdata_2019-03.parquet',
 '../datasets/fhvhv_tripdata_2019-04.parquet',
 '../datasets/fhvhv_tripdata_2019-05.parquet',
 '../datasets/fhvhv_tripdata_2019-06.parquet',
 '../datasets/fhvhv_tripdata_2019-07.parquet',
 '../datasets/fhvhv_tripdata_2019-08.parquet',
 '../datasets/fhvhv_tripdata_2019-09.parquet',
 '../datasets/fhvhv_tripdata_2019-10.parquet',
 '../datasets/fhvhv_tripdata_2019-11.parquet',
 '../datasets/fhvhv_tripdata_2019-12.parquet',
 '../datasets/fhvhv_tripdata_2020-01.parquet',
 '../datasets/fhvhv_tripdata_2020-02.parquet',
 '../datasets/fhvhv_tripdata_2020-03.parquet',
 '../datasets/fhvhv_tripdata_2020-04.parquet',
 '../datasets/fhvhv_tripdata_2020-05.parquet',
 '../datasets/fhvhv_tripdata_2020-06.parquet',
 '../datasets/fhvhv_tripdata_2020-07.parquet',
 '../datasets/fhvhv_tripdata_2020-08.parquet',
 '../datasets/fhvhv_tripdata_2020-09.parquet',
 '../datasets/fhvhv_tripdata_2020-10.parquet',
 '../datasets

## Splitting monthly parquet files into smaller daily parquet files

In [5]:
import os

In [6]:
dict_dtypes_tripdata = {
    "hvfhs_license_num": pl.Utf8,
    "dispatching_base_num": pl.Utf8,
    "originating_base_num": pl.Utf8,
    "request_datetime": pl.Datetime(time_unit="ns", time_zone="UTC"),
    "on_scene_datetime": pl.Datetime(time_unit="ns", time_zone="UTC"),
    "pickup_datetime": pl.Datetime(time_unit="ns", time_zone="UTC"),
    "dropoff_datetime": pl.Datetime(time_unit="ns", time_zone="UTC"),
    "PULocationID": pl.Int64,
    "DOLocationID": pl.Int64,
    "trip_miles": pl.Float64,
    "trip_time": pl.Int64,
    "base_passenger_fare": pl.Float64,
    "tolls": pl.Float64,
    "bcf": pl.Float64,
    "sales_tax": pl.Float64(),
    "congestion_surcharge": pl.Float64,
    "tips": pl.Float64,
    "driver_pay": pl.Float64,
    "shared_request_flag": pl.Utf8,
    "shared_match_flag": pl.Utf8,
    "access_a_ride_flag": pl.Utf8,
    "wav_request_flag": pl.Utf8,
    "created_at": pl.Date,
}

In [None]:
for parquet_file in tqdm(list_tripdata_files[1:24]):
    print(f"Processing parquet file : {parquet_file}")
    df_tripdata_polars = pl.read_parquet(parquet_file)
    df_tripdata_polars = df_tripdata_polars.drop(
        columns=["airport_fee", "wav_match_flag"]
    ).drop_nulls("request_datetime")
    df_tripdata_polars = df_tripdata_polars.with_columns(
        df_tripdata_polars["request_datetime"].dt.date().alias("created_at")
    )
    df_tripdata_polars = df_tripdata_polars.drop_nulls("created_at")

    # Define dtypes
    for column_name, dtype in dict_dtypes_tripdata.items():
        df_tripdata_polars = df_tripdata_polars.with_columns(
            df_tripdata_polars[column_name].cast(dtype)
        )

    list_dates_current_month = (
        df_tripdata_polars["created_at"].unique().sort().to_list()
    )
    breakpoint()
    print(f"{df_tripdata_polars.shape}")
    print(f"{df_tripdata_polars.columns}")
    print("Iterating over list of days ...")
    for day in list_dates_current_month:
        out_parquet_name = (
            f"../datasets/daily_files/fhvhv_tripdata_{str(day)}.parquet"
        )
        df_tripdata_polars_day = df_tripdata_polars.filter(
            pl.col("created_at") == day
        )
        # check if files exists
        if os.path.exists(out_parquet_name):
            print(
                f"Files {out_parquet_name} already exists",
                "Reading it to append with current daily file",
            )
            df_tripdata_day_processed = pl.read_parquet(out_parquet_name)
            print(
                set(df_tripdata_polars_day.columns)
                - set(df_tripdata_day_processed.columns)
            )
            print(
                set(df_tripdata_day_processed.columns)
                - set(df_tripdata_polars_day.columns)
            )
            df_tripdata_polars_day = pl.concat(
                [df_tripdata_polars_day, df_tripdata_day_processed]
            )
            df_tripdata_polars_day = df_tripdata_polars_day.unique()
            os.remove(out_parquet_name)

        print(f"Saving parquet file : {out_parquet_name}")

        df_tripdata_polars_day.write_parquet(
            out_parquet_name,
            use_pyarrow=True,
        )
    del df_tripdata_polars

# Loading the parquet files as Delta in S3

## Defining the credentials for accessing AWS S3

In [4]:
import boto3

In [5]:
session = boto3.Session()

In [6]:
credentials = session.get_credentials()

In [7]:
storage_options = {
    "AWS_REGION": "us-east-1",
    "AWS_ACCESS_KEY_ID": credentials.access_key,
    "AWS_SECRET_ACCESS_KEY": credentials.secret_key,
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

## Iterating over daily parquet files and uploading it as a Delta table on S3

In [14]:
list_tripdata_daily_files = sorted(
    glob.glob("../datasets/daily_files/*.parquet")
)

In [15]:
len(list_tripdata_daily_files)

733

In [18]:
import pyarrow as pa

In [39]:
dict_dtypes_pyarrow = {
    "hvfhs_license_num": pa.utf8(),
    "dispatching_base_num": pa.string(),
    "originating_base_num": pa.string(),
    "request_datetime": pa.timestamp(unit="s", tz=None),
    "on_scene_datetime": pa.timestamp(unit="s", tz=None),
    "pickup_datetime": pa.timestamp(unit="s", tz=None),
    "dropoff_datetime": pa.timestamp(unit="s", tz=None),
    "PULocationID": pa.int64(),
    "DOLocationID": pa.int64(),
    "trip_miles": pa.float32(),
    "trip_time": pa.int64(),
    "base_passenger_fare": pa.float32(),
    "tolls": pa.float32(),
    "bcf": pa.float32(),
    "sales_tax": pa.float32(),
    "congestion_surcharge": pa.float32(),
    "tips": pa.float32(),
    "driver_pay": pa.float32(),
    "shared_request_flag": pa.string(),
    "shared_match_flag": pa.string(),
    "access_a_ride_flag": pa.string(),
    "wav_request_flag": pa.string(),
    "created_at": pa.date32(),
}

In [40]:
list_tripdata_daily_files[:1]

['../datasets/daily_files/fhvhv_tripdata_2019-01-31.parquet']

In [57]:
assert current_schema == current_schema

In [68]:
from deltalake import DeltaTable

dt = DeltaTable(
    "s3://poor-mans-data-lakehouse/nyc_trip_data",
    storage_options=storage_options,
)

In [69]:
dt.schema().to_pyarrow()

hvfhs_license_num: string
dispatching_base_num: string
originating_base_num: string
request_datetime: timestamp[us]
on_scene_datetime: timestamp[us]
pickup_datetime: timestamp[us]
dropoff_datetime: timestamp[us]
PULocationID: int64
DOLocationID: int64
trip_miles: double
trip_time: int64
base_passenger_fare: double
tolls: double
bcf: double
sales_tax: double
congestion_surcharge: double
tips: double
driver_pay: double
shared_request_flag: string
shared_match_flag: string
access_a_ride_flag: string
wav_request_flag: string
created_at: date32[day]

In [70]:
dt.schema()

Schema([Field(hvfhs_license_num, PrimitiveType("string"), nullable=True), Field(dispatching_base_num, PrimitiveType("string"), nullable=True), Field(originating_base_num, PrimitiveType("string"), nullable=True), Field(request_datetime, PrimitiveType("timestamp"), nullable=True), Field(on_scene_datetime, PrimitiveType("timestamp"), nullable=True), Field(pickup_datetime, PrimitiveType("timestamp"), nullable=True), Field(dropoff_datetime, PrimitiveType("timestamp"), nullable=True), Field(PULocationID, PrimitiveType("long"), nullable=True), Field(DOLocationID, PrimitiveType("long"), nullable=True), Field(trip_miles, PrimitiveType("double"), nullable=True), Field(trip_time, PrimitiveType("long"), nullable=True), Field(base_passenger_fare, PrimitiveType("double"), nullable=True), Field(tolls, PrimitiveType("double"), nullable=True), Field(bcf, PrimitiveType("double"), nullable=True), Field(sales_tax, PrimitiveType("double"), nullable=True), Field(congestion_surcharge, PrimitiveType("double")

In [71]:
%%time
for file_path in list_tripdata_daily_files[:10]:
    start_time = time.time()
    print(f"Processing file : {file_path}")

    df_tripdata_arrow = pq.read_table(
        file_path
    )  # , schema=dict_dtypes_pyarrow)
    # print(df_tripdata_arrow.schema)
    if df_tripdata_arrow.schema != past_schema:
        print(f"Problems when matching schemas for {file_path}")
        print(f"{df_tripdata_arrow.schema}\n")
        print(f"{past_schema.schema}\n")
    past_schema = df_tripdata_arrow.schema

    write_deltalake(
        "s3://poor-mans-data-lakehouse/nyc_trip_data",
        data=df_tripdata_arrow,
        mode="append",
        schema=dt.schema().to_pyarrow(),
        # partition_by="created_at",
        name="nyc_trip_data",
        description="Kaggles Dataset from NYC FHV (Uber/Lyft) Trip Data Expanded (2019-2022)",
        # overwrite_schema=True,
        # max_open_files=15,
        # max_rows_per_group=1_000_000,
        # min_rows_per_group=10_000,
        storage_options=storage_options,
    )

    end_time = time.time()
    delta_time = round((end_time - start_time), 2)
    print(
        f"Time taken for iteration of df of size {df_tripdata_arrow.shape[0]} : {delta_time} seconds"
    )

Processing file : ../datasets/daily_files/fhvhv_tripdata_2019-01-31.parquet
Time taken for iteration of df of size 2233 : 6.62 seconds
Processing file : ../datasets/daily_files/fhvhv_tripdata_2019-02-01.parquet
Time taken for iteration of df of size 818826 : 17.95 seconds
Processing file : ../datasets/daily_files/fhvhv_tripdata_2019-02-02.parquet
Time taken for iteration of df of size 832719 : 9.92 seconds
Processing file : ../datasets/daily_files/fhvhv_tripdata_2019-02-03.parquet
Time taken for iteration of df of size 722934 : 12.26 seconds
Processing file : ../datasets/daily_files/fhvhv_tripdata_2019-02-04.parquet
Time taken for iteration of df of size 591393 : 19.09 seconds
Processing file : ../datasets/daily_files/fhvhv_tripdata_2019-02-05.parquet
Time taken for iteration of df of size 576581 : 13.82 seconds
Processing file : ../datasets/daily_files/fhvhv_tripdata_2019-02-06.parquet
Time taken for iteration of df of size 663824 : 12.95 seconds
Processing file : ../datasets/daily_fi

# Reading the Delta Table

In [1]:
import duckdb
import pyarrow.dataset as ds
from deltalake import DeltaTable

In [2]:
duckdb_con = duckdb.connect()

In [8]:
dt_nyc_tripdata = DeltaTable(
    "s3://poor-mans-data-lakehouse/nyc_trip_data",
    storage_options=storage_options,
)

In [9]:
dataset_arrow = dt_nyc_tripdata.to_pyarrow_dataset()

In [10]:
df_results = duckdb_con.execute(
    """
select count(1)
  from dataset_arrow;
    """
).df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [11]:
df_results

Unnamed: 0,count(1)
0,6554251


# Deleting the S3 folder

```bash
aws s3 rm s3://poor-mans-data-lakehouse/nyc_trip_data/ --recursive
```

In [9]:
%%time
df_tripdata_arrow = pq.read_table("../datasets/fhvhv_tripdata_2020-02.parquet")

CPU times: user 8.13 s, sys: 2.7 s, total: 10.8 s
Wall time: 928 ms


In [10]:
df_tripdata_arrow.schema

hvfhs_license_num: string
dispatching_base_num: string
originating_base_num: string
request_datetime: timestamp[us]
on_scene_datetime: timestamp[us]
pickup_datetime: timestamp[us]
dropoff_datetime: timestamp[us]
PULocationID: int64
DOLocationID: int64
trip_miles: double
trip_time: int64
base_passenger_fare: double
tolls: double
bcf: double
sales_tax: double
congestion_surcharge: double
airport_fee: double
tips: double
driver_pay: double
shared_request_flag: string
shared_match_flag: string
access_a_ride_flag: string
wav_request_flag: string
wav_match_flag: string
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 3168

In [11]:
df_tripdata_arrow[100:].shape

(21725000, 24)

In [12]:
%%time
write_deltalake(
    # Change this URI to your own unique URI
    "s3://poor-mans-data-lakehouse/nyc_trip_data2",
    data=df_tripdata_arrow[:10000],
    mode="append",
    overwrite_schema=True,
    storage_options={
        "AWS_REGION": "us-east-1",
        "AWS_ACCESS_KEY_ID": "AKIA37PHV2EK2YRIQ3WV",
        "AWS_SECRET_ACCESS_KEY": "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",
        "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
    },
)

CPU times: user 93.4 ms, sys: 51.4 ms, total: 145 ms
Wall time: 5.97 s


# Exploratory data analysis Trip Data

In [5]:
%%time
df_tripdata = pl.read_parquet(
    "../datasets/fhvhv_tripdata_2019-02.parquet",
)

CPU times: user 8.31 s, sys: 1.72 s, total: 10 s
Wall time: 890 ms


In [6]:
df_tripdata.schema

{'hvfhs_license_num': Utf8,
 'dispatching_base_num': Utf8,
 'originating_base_num': Utf8,
 'request_datetime': Datetime(time_unit='ns', time_zone=None),
 'on_scene_datetime': Datetime(time_unit='ns', time_zone=None),
 'pickup_datetime': Datetime(time_unit='ns', time_zone=None),
 'dropoff_datetime': Datetime(time_unit='ns', time_zone=None),
 'PULocationID': Int64,
 'DOLocationID': Int64,
 'trip_miles': Float64,
 'trip_time': Int64,
 'base_passenger_fare': Float64,
 'tolls': Float64,
 'bcf': Float64,
 'sales_tax': Float64,
 'congestion_surcharge': Float64,
 'airport_fee': Null,
 'tips': Float64,
 'driver_pay': Float64,
 'shared_request_flag': Utf8,
 'shared_match_flag': Utf8,
 'access_a_ride_flag': Utf8,
 'wav_request_flag': Utf8,
 'wav_match_flag': Null}

In [7]:
for col, dtype in dict_dtypes_tripdata.items():
    df_tripdata.with_columns(pl.col(col).cast(dtype))

In [58]:
df_tripdata.schema

{'hvfhs_license_num': Utf8,
 'dispatching_base_num': Utf8,
 'originating_base_num': Utf8,
 'request_datetime': Datetime(time_unit='ns', time_zone=None),
 'on_scene_datetime': Datetime(time_unit='ns', time_zone=None),
 'pickup_datetime': Datetime(time_unit='ns', time_zone=None),
 'dropoff_datetime': Datetime(time_unit='ns', time_zone=None),
 'PULocationID': Int64,
 'DOLocationID': Int64,
 'trip_miles': Float64,
 'trip_time': Int64,
 'base_passenger_fare': Float64,
 'tolls': Float64,
 'bcf': Float64,
 'sales_tax': Float64,
 'congestion_surcharge': Float64,
 'airport_fee': Null,
 'tips': Float64,
 'driver_pay': Float64,
 'shared_request_flag': Utf8,
 'shared_match_flag': Utf8,
 'access_a_ride_flag': Utf8,
 'wav_request_flag': Utf8,
 'wav_match_flag': Null}

In [8]:
df_tripdata.shape

(20159102, 24)

In [3]:
df_tripdata.sample(3)

hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
str,str,str,datetime[ns],datetime[ns],datetime[ns],datetime[ns],i64,i64,f64,i64,f64,f64,f64,f64,f64,null,f64,f64,str,str,str,str,null
"""HV0003""","""B02765""","""B02765""",2019-02-25 17:55:36,2019-02-25 17:58:46,2019-02-25 18:00:31,2019-02-25 18:14:58,223,7,1.97,868,6.01,0.0,0.15,0.53,0.0,,0.0,6.65,"""Y""","""Y""","""N""","""N""",
"""HV0003""","""B02864""","""B02864""",2019-02-07 17:08:48,2019-02-07 17:33:39,2019-02-07 17:34:46,2019-02-07 17:55:40,115,156,5.09,1254,11.83,0.0,0.3,1.05,0.0,,1.0,13.61,"""Y""","""Y""","""N""","""N""",
"""HV0003""","""B02869""","""B02869""",2019-02-22 13:33:21,2019-02-22 13:34:47,2019-02-22 13:35:22,2019-02-22 13:51:07,175,56,7.08,945,15.4,0.0,0.39,1.37,0.0,,0.0,15.58,"""N""","""N""","""N""","""N""",


In [7]:
grouped = df_tripdata.groupby("hvfhs_license_num").agg(
    pl.count("hvfhs_license_num").alias("count")
)

In [8]:
grouped

hvfhs_license_num,count
str,u32
"""HV0003""",13504994
"""HV0002""",979266
"""HV0005""",4690916
"""HV0004""",983926


# Accessing my bucket

In [6]:
import boto3

In [7]:
session = boto3.Session()
s3 = session.client("s3")

In [8]:
response = s3.list_buckets()

In [9]:
# Print the bucket names
for bucket in response["Buckets"]:
    print(bucket["Name"])

aws-glue-assets-823508979989-us-east-1
databricks-workspace-stack-lambdazipsbucket-omswkmahi53h
datalake-bigdata-poc-2023
db-5f7ee1d6b8e3c9e51664eacd62190787-s3-root-bucket
gabrielsgoncalves-databricks-sandbox
gx-statics-validations
poor-mans-data-lakehouse
serverless-jobs-bucket
udemy-mastering-aws-serverless-analytics-tools


## Create e new bucket for your Data Lakehouse

In [14]:
bucket_name = "poor-mans-data-lakehouse"
region = "us-east-1"

In [18]:
from botocore.exceptions import ClientError

In [23]:
try:
    response = s3.create_bucket(Bucket=bucket_name)
    print(f"Bucket {bucket_name} created in region {region}")
except ClientError as e:
    print(e)

Bucket poor-mans-data-lakehouse created in region us-east-1


# Using Duckdb to query your Delta Table

In [19]:
import duckdb
import pyarrow as pa
import pyarrow.dataset as ds
from pyarrow import csv, json

In [20]:
duckdb.sql(
    "SELECT * from '../datasets/fhvhv_tripdata_2019-02.parquet' limit 100"
).show()

┌───────────────────┬──────────────────────┬──────────────────────┬───┬──────────────────┬────────────────┐
│ hvfhs_license_num │ dispatching_base_num │ originating_base_num │ … │ wav_request_flag │ wav_match_flag │
│      varchar      │       varchar        │       varchar        │   │     varchar      │     int32      │
├───────────────────┼──────────────────────┼──────────────────────┼───┼──────────────────┼────────────────┤
│ HV0003            │ B02867               │ B02867               │ … │ N                │           NULL │
│ HV0003            │ B02879               │ B02879               │ … │ N                │           NULL │
│ HV0005            │ B02510               │ NULL                 │ … │ N                │           NULL │
│ HV0005            │ B02510               │ NULL                 │ … │ N                │           NULL │
│ HV0005            │ B02510               │ NULL                 │ … │ N                │           NULL │
│ HV0005            │ B02510

In [18]:
df_arrow = csv.read_csv("../datasets/fhvhv_tripdata_2019-02.parquet")

In [18]:
df_arrow = csv.read_csv("../datasets/page.csv")

In [None]:
df_arrow = json.read_json("../datasets/link_annotated_text.jsonl")

In [21]:
import duckdb

In [22]:
con = duckdb.connect()

In [24]:
query = con.execute("select * from df_arrow limit 10").df()

In [25]:
query

Unnamed: 0,page_id,item_id,title,views
0,12,6199,Anarchism,31335
1,25,38404,Autism,49693
2,39,101038,Albedo,14573
3,290,9659,A,25859
4,303,173,Alabama,52765
5,305,41746,Achilles,35877
6,307,91,Abraham Lincoln,151008
7,308,868,Aristotle,74700
8,309,853997,An American in Paris,2156
9,316,277751,Academy Award for Best Production Design,3114


In [20]:
df_arrow.schema

page_id: int64
item_id: int64
title: string
views: int64

In [None]:
# Run same query again
nyc.filter("year > 2014 & passenger_count > 0 & trip_distance > 0.25 & fare_amount > 0")
    .aggregate("SELECT AVG(fare_amount), AVG(tip_amount), AVG(tip_amount / fare_amount) as tip_pct","passenger_count").arrow()

# Mage tutorial

In [22]:
import pandas as pd

df = pd.read_csv(
    "https://raw.githubusercontent.com/mage-ai/datasets/master/battle_history.csv",
)

In [23]:
df

Unnamed: 0,enemy_unit_type,odds_of_victory,planet,population,size_of_force,universe
0,tank,0.184663,Aiur,5.1,46000,Zendikar
1,negator,0.808818,Ravnica,7.3,88000,Dominaria
2,negator,0.220505,Korhal,6.7,46000,Zendikar
3,zealot,0.049344,Kamigawa,3.4,3000,Dominaria
4,tank,0.187231,Aiur,5.1,4000,Zendikar
...,...,...,...,...,...,...
9995,mecha,0.145340,Aiur,5.1,43000,Zendikar
9996,overlord,0.586653,Aiur,5.1,46000,Zendikar
9997,zealot,0.196953,Kamigawa,3.4,59000,Dominaria
9998,dreadnought,0.439807,Kamigawa,3.4,91000,Dominaria


In [1]:
storage_options = {
    "AWS_REGION": "us-east-1",
    "AWS_ACCESS_KEY_ID": "AKIA37PHV2EK2YRIQ3WV",
    "AWS_SECRET_ACCESS_KEY": "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",
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

In [2]:
from deltalake.writer import write_deltalake

In [26]:
df_tripdata_filtered = df_tripdata.drop(
    columns=["airport_fee", "wav_match_flag"]
)

In [28]:
df_tripdata_filtered.sample(3)

hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag
str,str,str,datetime[ns],datetime[ns],datetime[ns],datetime[ns],i64,i64,f64,i64,f64,f64,f64,f64,f64,f64,f64,str,str,str,str
"""HV0004""","""B02800""",,2019-02-27 07:21:49,,2019-02-27 07:29:25,2019-02-27 07:42:58,50,163,1.7,814,3.3,0.0,0.0,0.29,0.75,0.0,0.0,"""Y""","""N""","""N""","""N"""
"""HV0003""","""B02875""","""B02875""",2019-02-26 21:31:47,2019-02-26 21:37:09,2019-02-26 21:40:48,2019-02-26 22:14:51,162,61,8.36,2043,33.29,0.0,0.84,2.96,2.75,0.0,26.09,"""N""","""N""","""N""","""N"""
"""HV0003""","""B02883""","""B02883""",2019-02-13 21:39:15,2019-02-13 21:43:47,2019-02-13 21:44:25,2019-02-13 21:58:50,149,155,3.12,866,7.8,0.0,0.19,0.69,0.0,0.0,6.05,"""Y""","""Y""","""N""","""N"""


In [23]:
df_tripdata.head(5)

hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
str,str,str,datetime[ns],datetime[ns],datetime[ns],datetime[ns],i64,i64,f64,i64,f64,f64,f64,f64,f64,null,f64,f64,str,str,str,str,null
"""HV0003""","""B02867""","""B02867""",2019-02-01 00:01:26,2019-02-01 00:02:55,2019-02-01 00:05:18,2019-02-01 00:14:57,245,251,2.45,579,9.35,0.0,0.23,0.83,0.0,,0.0,7.48,"""Y""","""N""","""N""","""N""",
"""HV0003""","""B02879""","""B02879""",2019-02-01 00:26:08,2019-02-01 00:41:29,2019-02-01 00:41:29,2019-02-01 00:49:39,216,197,1.71,490,7.91,0.0,0.2,0.7,0.0,,2.0,7.93,"""N""","""N""","""N""","""N""",
"""HV0005""","""B02510""",,2019-02-01 00:48:58,,2019-02-01 00:51:34,2019-02-01 01:28:29,261,234,5.01,2159,44.96,0.0,1.12,3.99,0.0,,0.0,35.97,"""N""","""Y""","""N""","""N""",
"""HV0005""","""B02510""",,2019-02-01 00:02:15,,2019-02-01 00:03:51,2019-02-01 00:07:16,87,87,0.34,179,7.19,0.0,0.18,0.64,0.0,,3.0,5.39,"""N""","""Y""","""N""","""N""",
"""HV0005""","""B02510""",,2019-02-01 00:06:17,,2019-02-01 00:09:44,2019-02-01 00:39:56,87,198,6.84,1799,24.25,0.11,0.61,2.16,0.0,,4.0,17.07,"""N""","""Y""","""N""","""N""",


In [29]:
df_tripdata_filtered[:1000].to_pandas().dtypes

hvfhs_license_num               object
dispatching_base_num            object
originating_base_num            object
request_datetime        datetime64[ns]
on_scene_datetime       datetime64[ns]
pickup_datetime         datetime64[ns]
dropoff_datetime        datetime64[ns]
PULocationID                     int64
DOLocationID                     int64
trip_miles                     float64
trip_time                        int64
base_passenger_fare            float64
tolls                          float64
bcf                            float64
sales_tax                      float64
congestion_surcharge           float64
tips                           float64
driver_pay                     float64
shared_request_flag             object
shared_match_flag               object
access_a_ride_flag              object
wav_request_flag                object
dtype: object

In [30]:
df_tripdata_filtered[:100].to_pandas().isnull().sum()

hvfhs_license_num        0
dispatching_base_num     0
originating_base_num    26
request_datetime         6
on_scene_datetime       31
pickup_datetime          0
dropoff_datetime         0
PULocationID             0
DOLocationID             0
trip_miles               0
trip_time                0
base_passenger_fare      0
tolls                    0
bcf                      0
sales_tax                0
congestion_surcharge     5
tips                     0
driver_pay               0
shared_request_flag      0
shared_match_flag        0
access_a_ride_flag       0
wav_request_flag         0
dtype: int64

In [50]:
%%time
write_deltalake(
    # Change this URI to your own unique URI
    "s3://poor-mans-data-lakehouse/nyc_trip_data",
    data=df_tripdata_filtered[100:1000].to_pandas(),
    mode="append",
    overwrite_schema=True,
    storage_options={
        "AWS_REGION": "us-east-1",
        "AWS_ACCESS_KEY_ID": "AKIA37PHV2EK2YRIQ3WV",
        "AWS_SECRET_ACCESS_KEY": "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",
        "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
    },
)

CPU times: user 97.7 ms, sys: 38.4 ms, total: 136 ms
Wall time: 5.01 s


## Reading the table

In [68]:
from deltalake import DeltaTable

dt = DeltaTable(
    "s3://poor-mans-data-lakehouse/nyc_trip_data",
    storage_options=storage_options,
)

In [69]:
dt.schema().to_pyarrow()

hvfhs_license_num: string
dispatching_base_num: string
originating_base_num: string
request_datetime: timestamp[us]
on_scene_datetime: timestamp[us]
pickup_datetime: timestamp[us]
dropoff_datetime: timestamp[us]
PULocationID: int64
DOLocationID: int64
trip_miles: double
trip_time: int64
base_passenger_fare: double
tolls: double
bcf: double
sales_tax: double
congestion_surcharge: double
tips: double
driver_pay: double
shared_request_flag: string
shared_match_flag: string
access_a_ride_flag: string
wav_request_flag: string
created_at: date32[day]

In [11]:
dt.history()

[{'timestamp': 1691857087702,
  'operation': 'CREATE TABLE',
  'operationParameters': {'mode': 'ErrorIfExists',
   'metadata': '{"configuration":{},"created_time":1691857087023,"description":"Kaggles Dataset from NYC FHV (Uber/Lyft) Trip Data Expanded (2019-2022)","format":{"options":{},"provider":"parquet"},"id":"c348b5af-8abd-4d91-b062-383f95506fe0","name":"nyc_trip_data","partition_columns":[],"schema":{"fields":[{"metadata":{},"name":"hvfhs_license_num","nullable":true,"type":"string"},{"metadata":{},"name":"dispatching_base_num","nullable":true,"type":"string"},{"metadata":{},"name":"originating_base_num","nullable":true,"type":"string"},{"metadata":{},"name":"request_datetime","nullable":true,"type":"timestamp"},{"metadata":{},"name":"on_scene_datetime","nullable":true,"type":"timestamp"},{"metadata":{},"name":"pickup_datetime","nullable":true,"type":"timestamp"},{"metadata":{},"name":"dropoff_datetime","nullable":true,"type":"timestamp"},{"metadata":{},"name":"PULocationID","nul

In [13]:
dt.files()

['0-43874cee-2553-4168-8476-1bf486c99c82-0.parquet',
 '1-a7b37d33-9cc3-49f2-be81-6450cdc02e06-0.parquet',
 '2-ddc2a44d-3cf6-4897-91c2-1e0bb9f980c6-0.parquet',
 '3-83d663a7-7836-4c18-99b5-9c0eea092e81-0.parquet',
 '4-1027c4f1-70fd-4a8f-ae49-38f02187ee63-0.parquet',
 '5-a56f6f09-68c1-4188-a6da-cca5b33a3b92-0.parquet',
 '6-a767d561-e789-4672-a24a-ffb24afd4bc0-0.parquet',
 '7-d8cadacb-4c55-42eb-af48-7d603c0b4e3d-0.parquet',
 '8-55fa99de-b527-431b-916a-e4055f8668b4-0.parquet',
 '9-e0966848-f346-42d5-8c4a-e1238e8d0b4a-0.parquet',
 '10-2c8b8e82-56a6-4e4c-b0bf-c6183ac9b25a-0.parquet',
 '11-1525d964-0e34-48f3-bb13-01891556b544-0.parquet',
 '12-f66adbf1-4124-4bb1-8b43-6b9554e50b1d-0.parquet',
 '13-425e7989-46cf-42d5-847d-bf382c6a847b-0.parquet',
 '14-6aba3d25-b259-43bc-b04f-215ab8c736b3-0.parquet',
 '15-a399021c-7d1f-45f7-83c5-d08f5e7cbf87-0.parquet',
 '16-21a5ea08-0d67-4853-b07f-ef5ad8c5cd1e-0.parquet',
 '17-8c5f8d14-e3bf-48aa-8d6d-2ec326f61184-0.parquet',
 '18-ba5e3673-5241-492e-8ac9-9013b23b0

In [16]:
dt.schema().to_pyarrow()

hvfhs_license_num: string
dispatching_base_num: string
originating_base_num: string
request_datetime: timestamp[us]
on_scene_datetime: timestamp[us]
pickup_datetime: timestamp[us]
dropoff_datetime: timestamp[us]
PULocationID: int64
DOLocationID: int64
trip_miles: double
trip_time: int64
base_passenger_fare: double
tolls: double
bcf: double
sales_tax: double
congestion_surcharge: double
tips: double
driver_pay: double
shared_request_flag: string
shared_match_flag: string
access_a_ride_flag: string
wav_request_flag: string

In [15]:
dt.metadata()

Metadata()

## Reading only relevant columns



In [17]:
df_arrow_trip_info = dt.to_pyarrow_table(
    columns=["request_datetime", "trip_time", "trip_miles"]
)

In [None]:
## Reading using Pyarrow Datasets

# Table managing

In [17]:
dt.vacuum()

[]

In [18]:
%%time
dt.optimize.z_order(["pickup_datetime", "dropoff_datetime"])

CPU times: user 91.5 ms, sys: 68.2 ms, total: 160 ms
Wall time: 6.28 s


{'numFilesAdded': 1,
 'numFilesRemoved': 1,
 'filesAdded': {'min': 282110,
  'max': 282110,
  'avg': 282110.0,
  'totalFiles': 1,
  'totalSize': 282110},
 'filesRemoved': {'min': 356932,
  'max': 356932,
  'avg': 356932.0,
  'totalFiles': 1,
  'totalSize': 356932},
 'partitionsOptimized': 0,
 'numBatches': 1,
 'totalConsideredFiles': 1,
 'totalFilesSkipped': 0,
 'preserveInsertionOrder': True}

# AWS Wrangler

In [1]:
import awswrangler as wr

In [None]:
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")

## Glue Crawler

In [20]:
import boto3

# Create a Boto3 session
session = boto3.Session()

# Create a Glue client
glue = session.client("glue")

In [21]:
# Create a crawler definition
crawler_definition = {
    "Name": "MyCrawler",
    "Description": "A crawler to find Delta tables in S3",
    "DatabaseName": "MyDatabase",
    "Targets": [
        {
            "Path": "s3://poor-mans-data-lakehouse",
            "IncludePattern": "*.delta",
        }
    ],
}

In [22]:
# Create the crawler
crawler_response = glue.create_crawler(
    CrawlerName=crawler_definition["Name"], CrawlerDefinition=crawler_definition
)

ParamValidationError: Parameter validation failed:
Missing required parameter in input: "Name"
Missing required parameter in input: "Role"
Missing required parameter in input: "Targets"
Unknown parameter in input: "CrawlerName", must be one of: Name, Role, DatabaseName, Description, Targets, Schedule, Classifiers, TablePrefix, SchemaChangePolicy, RecrawlPolicy, LineageConfiguration, LakeFormationConfiguration, Configuration, CrawlerSecurityConfiguration, Tags
Unknown parameter in input: "CrawlerDefinition", must be one of: Name, Role, DatabaseName, Description, Targets, Schedule, Classifiers, TablePrefix, SchemaChangePolicy, RecrawlPolicy, LineageConfiguration, LakeFormationConfiguration, Configuration, CrawlerSecurityConfiguration, Tags

In [None]:
# Print the crawler ID
print(crawler_response["CrawlerId"])

# Pandas df to delta table

In [24]:
from deltalake import DeltaTable

In [28]:
from deltalake.writer import write_deltalake

In [27]:
df_combined_data.head()

Unnamed: 0,movie_id,score,review_date
0,1488844,3.0,2005-09-06
1,822109,5.0,2005-05-13
2,885013,4.0,2005-10-19
3,30878,4.0,2005-12-26
4,823519,3.0,2004-05-03


In [35]:
ls

2023-07-15_gsg_Poor_mans_datalakehouse_EDA.ipynb  [0m[01;34mdelta_tables[0m/


In [32]:
!mkdir delta_tables

In [36]:
write_deltalake("./delta_tables/test", df_combined_data.head())

In [4]:
import awswrangler as wr

In [45]:
wr.s3.to_parquet(
    df=df_combined_data.head(10),
    path="s3://poor-mans-data-lakehouse/test_table.parquet",
    index=False,
    boto3_session=session,
)

{'paths': ['s3://poor-mans-data-lakehouse/test_table.parquet'],
 'partitions_values': {}}

In [1]:
import pandas as pd
from deltalake.writer import write_deltalake

df = pd.DataFrame({"x": [1, 2, 3]})

In [2]:
df

Unnamed: 0,x
0,1
1,2
2,3


In [None]:
storage_options = {
    "AWS_DEFAULT_REGION": "us-east-1",
    # "AWS_ACCESS_KEY_ID": "xxx",
    # "AWS_SECRET_ACCESS_KEY": "xxx",
    "AWS_S3_ALLOW_UNSAFE_RENAME": "false",
}

write_deltalake(
    "s3a://poor-mans-data-lakehouse/delta_table",
    df,
    mode="append",
    storage_options=storage_options,
)