# Scaling Outside the Warehouse with DuckDB + Python
* PyData Global 2024 demo
* DuckDb blog (great resources): https://duckdb.org/news/
* coiled.io: https://docs.coiled.io/user_guide/index.html
* github link : **https://github.com/AdarshNamala/PyDataGlobal_2024**
* slides: https://docs.google.com/presentation/d/1q-i1sU_WaL-Fzm6dYwwS5e57DUOwV6YTpTvTvn77_hM/edit#slide=id.p1

In [None]:
import numpy as np
import pandas as pd
import duckdb 
import os
from pathlib import Path
import seaborn as sns
import matplotlib.pyplot as plt
import psutil
import textwrap
from datetime import datetime,UTC
import sys

import boto3
import json
from functools import partial
import re
import s3fs
from time import sleep

import coiled

In [None]:
print("version")
print("duckdb: ",duckdb.__version__)
print("pandas: ",pd.__version__)
print("python: ",sys.version)

In [None]:
def memory_used():
    """ Get current memory used """
    _mbs = psutil.Process().memory_full_info().uss/(1024**2)
    print(f'Memory used: {_mbs:,.0f} MBs') 

    
def get_months(start: str,end :str) -> list:
    """ Function to get the months between the start and end date """
    return pd.date_range(start,end,freq='MS').strftime('%Y-%m')

def get_s3_files_info(files):
    """ get the file metadata from s3 using s3fs"""
    return pd.DataFrame([_s3fs.info(_file) for _file in files])

# other functions
%run utility.ipynb

In [None]:
memory_used()

## DuckDB 
* in-memory and OLAP
* Natively Read PyArrow, Pandas, R, Polars, etc., 
* DuckDB WASM!
* Can read
    * Parquet, CSV, json, iceberg, delta (expr), etc., 
    * from: Local, https, AWS, GCP, Azure, etc., 
* Connection: 
    * There are various ways to create the connection object
        * in-memory only (RAM only). Does not support out of core operations 
        * in-memory with temporary storage:  allows out of core operation. When the connection is closed the temp storage is not persisted
        * persisted database:  in-memory + out of core operation + persistent db that can be re-used. 

* YT videos on DuckDB:
    * https://www.youtube.com/@duckdb/videos (DuckDB official channel)
    * https://www.youtube.com/@motherduckdb/playlists (MotherDuck channel)
    * https://www.youtube.com/watch?v=fZj6kTwXN1U&list=PLw2SS5iImhEThtiGNPiNenOr2tVvLj6H7 (Learn Data with Mark)


    

In [None]:
_persistent_db = "~/Desktop/PyDataGlobal2024/storage.ddb"
con = duckdb.connect(_persistent_db) 

# other options to create connection: 
# duckdb.connection(":memory:")

In [None]:
# if "~/.aws/credentials" file is setup, duckdb can use the credentials to setup AWS keys
_qry = f"""
INSTALL AWS; LOAD AWS;
CREATE or replace SECRET secret2 (TYPE S3, PROVIDER CREDENTIAL_CHAIN);
"""
con.sql(_qry)

In [None]:
def show_db_size():
    """ """
    display(con.sql("call pragma_database_size()").df())


## Load the Data
* We will use the popular NYC Taxi Data: 
* read parquet files via https

In [None]:
def get_nytaxi_data_path(month:str|list) -> str:
    """ 
    Funciton to get the month path for NYC taxi
    https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
    params
        month(str): yyyy-mm format
    """
    # You can get the base path by copying the link address from the website    
    _base_path = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata"

    return f"{_base_path}_{month}.parquet"

# get 2024 months. Get Jan - Sep 2024 files
months = get_months("2024-01","2024-09")
nytaxi_files = [get_nytaxi_data_path(month) for month in months]
nytaxi_files[:3]


### Modular SQL pipeline with Python

**Read parquet**

In [None]:
# define a fucntion that you can reuse
def ddb_read_parquet(files:str|list[str],
                     columns:list[str]=[]) -> str:
    """ 
    Function to read parquet file into duckdb
    params:
        * files(str | list): file path of list of file_path
        * cus
    """
    _cols = ',\n'.join(columns) or "*"
    _files = files if isinstance(files,list) else [files]
    # placeholder to add more params
    _read_params = "filename=true"
    
    _qry = f""" 
    select 
        {_cols}
    from read_parquet({_files},{_read_params})
    """
    
    return _qry

In [None]:
_read_qry = ddb_read_parquet(nytaxi_files[:2])
print(_read_qry)

In [None]:
# execute the query
_read_qry = ddb_read_parquet(nytaxi_files[:1])
con.sql(_read_qry).limit(10)

**Create table**

In [None]:
if 1==0:
    _qry = ddb_read_parquet(files=nytaxi_files)
    _cr_qry = f""" 
    create or replace table ny_taxi as {_qry}
    """
    con.sql(_cr_qry)

In [None]:
con.sql("show tables")

In [None]:
show_db_size()

In [None]:
con.sql('select * from ny_taxi').shape

**Aggregate: daily, monthly, etc.,**

In [None]:
def get_avg_fare_qry(input_tb: str, index_col:str) -> str:
    """ 
    helper function to get aggregate query for avg fare
    """ 
    _agg_qry = f""" 
        select 
            {index_col},
            total_amount.sum().round()::int as tot_amt,
            sum(trip_distance).round()::int as tot_dist,
            (tot_amt/tot_dist).round(2) as avg_fare,
            count(*) as counts
        from {input_tb}
        group by all
    """
    return _agg_qry


def get_daily_avg_fare_qry(input_tb: str) -> str:
    """ Function to get daily avg fare"""
    index = "tpep_pickup_datetime::date as pickup_date"
    return get_avg_fare_qry(input_tb=input_tb, index_col=index)


def get_monthly_avg_fare_qry(input_tb: str) -> str:
    """ Function to get monthly avg fare"""
    index = "tpep_pickup_datetime.strftime('%Y-%m') as pickup_month"
    return get_avg_fare_qry(input_tb=input_tb, index_col=index)

In [None]:
_read_qry = ddb_read_parquet(nytaxi_files)
_agg_qry = get_daily_avg_fare_qry(input_tb='base')

if 1==0:_read_qry = "select * from ny_taxi" # incase the https returns 403 

daily_agg = f""" 
with base AS (
    {_read_qry}
)
{_agg_qry}
"""
print(daily_agg)

In [None]:
con.sql(daily_agg).limit(10)

In [None]:
_agg_qry = get_monthly_avg_fare_qry(input_tb='base')

monthly_agg = f""" 
with base AS (
    {_read_qry}
),
agg_tb AS (
    {_agg_qry}
)
select 
    *
from agg_tb
where pickup_month between '2024-01' and '2024-09'
order by pickup_month
"""
print(monthly_agg)

In [None]:
con.sql(monthly_agg)

## Profiling Statements
* Describe: gets the schema of the output query
* Explain: Query plan without executing it
* Explain Analyze: Query plan with execution time.

* reference: https://duckdb.org/docs/sql/statements/profiling.html 

In [None]:
# checkthe schema of the output query
print("Check the schema using the DESCRIBE statement")
_desc_qry = f"DESCRIBE {_read_qry}"
con.sql(_desc_qry)

In [None]:
# # helper function.
# ddb_describe = lambda _qry:con.sql(f"DESCRIBE {_qry}")
# ddb_sql = lambda _qry:con.sql(f"{_qry}")
# ddb_get_reltb = lambda _tbname: con.sql(f"from {_tbname}") 
# ddb_explain = lambda _qry:print(con.sql(f"EXPLAIN {_qry}").fetchall()[0][1])

## Friendlier SQL
* Great blog posts by Alex Monahan
    * https://duckdb.org/2022/05/04/friendlier-sql.html
    * https://duckdb.org/2023/08/23/even-friendlier-sql.html
* highlights
    * select * Exclude 
    * select * Replace 
    * COLUMNS 


## Natively reads Pandas DF
* No more serialization, de-serialization and socket data transfer! 
* duckdb can read pandas df without the need for serialization and de-serialization. We can execute SQL commands on pdf without significant overhead!!! 
* same for arrow and polars
* certain joins such as value between A and B is more efficient in SQL

In [None]:
# load iris dataset from seaborn
iris_df = sns.load_dataset('iris')
iris_df.head()

In [None]:
type(iris_df)

In [None]:
# for each species get the max value of each column
# we use the COLUMNS(* EXCLUDE ..) expression to select all columns excpet species
_qry = """ 
select
    species,
    MAX(COLUMNS(* EXCLUDE species))
    -- max(sepal_length), max(sepal_width), max(petal_length), max(petal_width)
from iris_df
group by all
order by species
"""
con.sql(_qry)

## Python API
* lazy execution
* can pipe multiple 
* Reference:
    * https://duckdb.org/docs/api/python/overview
    * Relational API: https://duckdb.org/docs/api/python/relational_api
    * Functional API: https://duckdb.org/docs/api/python/function

In [None]:
# create the relational table
ny_taxi_tb = con.sql("select * from ny_taxi")

# duckdb also support from table select cols in addition  to the traditional select cols from table
if 1==0: ny_taxi_tb = con.sql("from ny_taxi")

In [None]:
type(ny_taxi_tb)

In [None]:
# can conver duckdb table to arrow/pandas/polars/etc.
# reference: https://duckdb.org/docs/api/python/overview
# to pandas
display(ny_taxi_tb.limit(10).df())

# to arrow
if 1==0: ny_taxi_tb.limit(10).arrow()

# to polars
if 1==0: ny_taxi_tb.limit(10).pl()

In [None]:
# support chaining operations
(
    ny_taxi_tb
    # variable.fun1().fun2()... as opposed to func2(fun1(vairable))
    .aggregate("tpep_pickup_datetime.min(),tpep_pickup_datetime.max()")
)

In [None]:
_st_time = datetime.now()
_=plt.figure(figsize=(10,7))
_=\
(
    ny_taxi_tb
    .filter("tpep_pickup_datetime between '2024-01-01' and '2024-9-30'")
    # daily avg_fare
    .aggregate(""" 
               tpep_pickup_datetime::date as pickup_date,
               total_amount.sum().round()::int as tot_amt,
               sum(trip_distance).round()::int as tot_dist,
               (tot_amt/tot_dist).round(2) as avg_fare,
               count(1) as counts,
               """
            )
    .order('pickup_date')
    .df()
    .pipe(sns.lineplot,x='pickup_date',y='avg_fare')
)

_=plt.title("Avg Fare by Pickup Date")
_=get_elapsed_time(_st_time)

In [None]:
# Not the best way to measure max memory used by duckdb. 
# to highlight the memory usage between duckdb and pandas 
memory_used()

In [None]:
# convert output to Pandas or Arrow
# ~30M rows. ~4GB in Pandas
ny_taxi_df = ny_taxi_tb.df()

In [None]:
ny_taxi_df.info()

In [None]:
memory_used()

In [None]:
# using pandas
_st_time = datetime.now()
_=plt.figure(figsize=(10,7))
_=\
(
    ny_taxi_df
    .query("tpep_pickup_datetime >= '2024-01-01' and tpep_pickup_datetime <='2024-9-30'")
    .assign(pickup_date=lambda x:x['tpep_pickup_datetime'].dt.date)
    .groupby('pickup_date')
    .agg(
        tot_amt=('total_amount','sum'),
        tot_dist = ('trip_distance','sum'),
    )
    .assign(avg_fare=lambda df:df['tot_amt']/df['tot_dist'])
    .sort_index()
    .reset_index()
    .pipe(sns.lineplot,x='pickup_date',y='avg_fare')
)
_=plt.title("Avg Fare by Pickup Date")
_=get_elapsed_time(_st_time)

In [None]:
memory_used()

## Get the best of Python + SQL
* Better code organization using function and classes, for loops, etc.,
* wider python eco system for plotting,ML, etc., 
* Awesome for using db storage for saving all tables related to ML projects - raw data, features, etc., Makes it easy to update and share. 

## Scaling outside the Warehouse using AWS Lambda
* Lambda function with required tagging. 
* invoke lambda function -> request_id
* get CloudWatch Logs
* track status of each request_id : started, success/error

In [None]:
lambda_client = boto3.client('lambda')
log_client = boto3.client('logs')

# create s3fs object
creds = boto3.Session().get_credentials().get_frozen_credentials()
_s3fs = s3fs.S3FileSystem(key=creds.access_key,secret=creds.secret_key,skip_instance_cache=True)

def get_s3_file_info(files: list):
    """ Function to get file info of s3 files"""
    _files = files if isinstance(files,list) else [files]
    return pd.DataFrame([_s3fs.info(file) for file in _files])

In [None]:
def get_aggregate_qry(input_path: str|list,output_path: str=None) -> str:
    """ 
    Function to get daily aggregate
    input: https path
    """
    # if input path is str (signle file) convert to list
    input_path = input_path if isinstance(input_path,list) else [input_path]
    
    _agg_qry = f""" 
        select 
            tpep_pickup_datetime::date as pickup_date,
            total_amount.sum().round()::int as tot_amt,
            sum(trip_distance).round()::int as tot_dist,
            (tot_amt/tot_dist).round(2) as avg_fare,
            count(*) as counts
        from read_parquet({input_path})
        group by all
    """
    
    if output_path:
        _final_qry = f""" 
        COPY (
            {_agg_qry}
        ) 
        TO '{output_path}' (FORMAT PARQUET,OVERWRITE true)
        """
    else:
        _final_qry = _agg_qry
    
    return _final_qry
    

In [None]:
def generate_nytaxi_tasks(months:list,s3_output_base) -> dict:
    """ 
    Function to generate the tasks for each month
    """
    
    def get_output_path(month: str) -> str:
        return f"{s3_output_base.rstrip('/')}/agg_{month}.parquet"

    
    def get_qry(month: str) ->str:
        """ 
        get the aggregate query for the specified input/output paths
        """
        _input_file = get_nytaxi_data_path(month)
        _output_file = get_output_path(month)
        return get_aggregate_qry(
            input_path=_input_file,
            output_path=_output_file
        )
    
    def create_task(month: str) -> dict:
        """ create (single) task for month
        """    
        return {
            'qry':get_qry(month),
            'month':month,
            'output_path':get_output_path(month)
        }
    
    def create_tasks() -> dict:
        """ create multiple tasks for input list (months)
        """
        print(f"Generating {len(months)} tasks")
        return {month:create_task(month) for month in months}

    return create_tasks()

In [None]:
# months = get_months("2020-01","2024-09")
months = get_months('2024-01','2024-09')
output_base = "s3://adarshnamala/pydata_demo/ny_taxi/lambda/"
tasks = generate_nytaxi_tasks(months,s3_output_base=output_base)

In [None]:
tasks.keys()

In [None]:
tasks[months[0]]

In [None]:
print(tasks[months[0]]['qry'])

In [None]:
print(datetime.now(UTC))

In [None]:
lambda_tasks = DuckdbLambda_Tasks(
    tasks=tasks,
    lambda_client=lambda_client,
    log_client=log_client
)


In [None]:
# invoke the tasks
lambda_tasks.event_invoke_tasks(qry_key='qry')

In [None]:
lambda_tasks.wait_until_tasks_complete(total_wait_time_secs=60,interval_check_time_sec=3)

In [None]:
# get the last modified date of the output files
_files = [task['output_path'] for task in tasks.values()]
(
    get_s3_files_info(_files)
    ['LastModified'].agg(['min','max','size'])
)

**Read daily_agg parquest from S3**

In [None]:
_outfiles = [_['output_path'] for _ in tasks.values()]
_read_qry = ddb_read_parquet(_outfiles)

con.sql(_read_qry).limit(20)

**Run for 60 months**

In [None]:
# months = get_months("2020-01","2024-09")
months = get_months('2019-10','2024-09')
output_base = "s3://adarshnamala/pydata_demo/ny_taxi/lambda/"
tasks = generate_nytaxi_tasks(months,s3_output_base=output_base)

lambda_tasks = DuckdbLambda_Tasks(tasks=tasks,
                      lambda_client=lambda_client,
                      log_client=log_client)

In [None]:
print(datetime.now(UTC))

In [None]:
lambda_tasks.event_invoke_tasks(qry_key='qry')

In [None]:
lambda_tasks.wait_until_tasks_complete(total_wait_time_secs=60,interval_check_time_sec=3)

In [None]:
# get the last modified date of the output files
_files = [task['output_path'] for task in tasks.values()]
(
    get_s3_files_info(_files)
    ['LastModified'].agg(['min','max','size'])
)

## Scaling using Coiled.io
* challenges in managing cluster:
    * start/stop
    * install packages
    * copy code
    
* notebooks: https://docs.coiled.io/user_guide/notebooks.html
* serverless: https://docs.coiled.io/user_guide/functions.html
* price: https://www.coiled.io/pricing
* build vs buy: https://www.coiled.io/build-vs-buy

In [None]:
coiled.__version__

In [None]:
# baseline settings for coiled Function params
ip_address= '170.85.72.183'
tags ={'Application ID':'RSH'}
coiled_function_params = dict(region='us-east-1',
                 arm=True,
                 idle_timeout='30 seconds',
                 spot_policy='spot',
                 memory='8 GB',
                 threads_per_worker=1,
                 name='pydata_demo',
                 allow_ingress_from = ip_address,
                 tags=tags)


def setup_ddb_con():
    """ 
    function to create the duckdb connection with AWS credentials
    """
    con = duckdb.connect(":memory:")
    _aws_creds = """ 
    INSTALL AWS; LOAD AWS;
    CREATE SECRET secret2 (TYPE S3, PROVIDER CREDENTIAL_CHAIN);
    """
    con.sql(_aws_creds)
    return con
    

@coiled.function(**coiled_function_params)
def run_coiled_tasks(input: dict):
    """ 
    """
    
    _st = datetime.now()
    qry = input['qry']
    name = input['month']
    print(f"Running for task:{name}")
    try:
        # initiate duckdb connection
        con = setup_ddb_con()
        _ =con.sql(qry)
        print(f"completed Task: {name} in {(datetime.now() - _st).seconds:.2f} seconds")
        val = True
    except Exception as e:
        print(f"Error in Task: {name}")
        print(e)
        val = False
        
    
    return val
    

In [None]:
months = get_months('2024-01','2024-09')
output_base = "s3://adarshnamala/pydata_demo/ny_taxi/coiled/"
tasks = generate_nytaxi_tasks(months,s3_output_base=output_base)

In [None]:
tasks[months[0]]

In [None]:
if 1==1:
    run_coiled_tasks.cluster.adapt(minimum=len(tasks),maximum=len(tasks))
    results = run_coiled_tasks.map(list(tasks.values()))

In [None]:
task_status = list(results)
print(f"Completed {sum(task_status)}/{len(task_status)} tasks")


## Coiled with Jupyter notbeooks

In [None]:
import pandas as pd
import numpy as np

In [None]:
# np.random.rand(10)