# DuckDB: Moc hurtowni danych w Twoim laptopnie (PyData 2024) 

### Connecting to DuckDB:

In [None]:
# This will show time execution under each cell.
%load_ext autotime

In [None]:
import duckdb

# in-memory connection
duck = duckdb.connect()

# persistant connection - db file will be created
duck = duckdb.connect('demo.db')

### Reading CSV file performance test: pandas, polars, spark, duckdb


In [None]:
import pandas as pd
import polars as rs
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
pandas_df = pd.read_csv("data/transactions.csv", header=None)
pandas_df

In [None]:
polars_df = rs.read_csv("data/transactions.csv", has_header=False)
polars_df

In [None]:
spark_df = spark.read.csv("data/transactions.csv", inferSchema=True, header=False)
spark_df.show()

In [None]:
duck.sql("""--sql

    SELECT * FROM read_csv_auto('data/transactions.csv')         

""")

In [None]:
duck.sql("""--sql

    SELECT * FROM read_csv('data/transactions.csv',
        delim = ',',
        header = false,
        columns = {
            "product_id": 'INT',
            "created_at": "DATE",
            "has_discount": 'BOOL',
            "location": "VARCHAR",
            "brand": "VARCHAR",
            "seller": "VARCHAR",
            "amount": "DECIMAL",
            "shipping_costs": "DECIMAL",
            "wharehouse_location": "VARCHAR",
            "is_shipped": "BOOL"
        }
    )         

""")

### Create views and tables

In [None]:
# Create View
# This will still load data directly from file

duck.sql("""--sql

    CREATE OR REPLACE VIEW transactions_view AS (
        SELECT * FROM read_csv('data/transactions.csv',
            delim = ',',
            header = false,
            columns = {
                "product_id": 'INT',
                "created_at": "DATE",
                "has_discount": 'BOOL',
                "location": "VARCHAR",
                "brand": "VARCHAR",
                "seller": "VARCHAR",
                "amount": "DECIMAL",
                "shipping_costs": "DECIMAL",
                "wharehouse_location": "VARCHAR",
                "is_shipped": "BOOL"
            }
        )
    )       

""")

In [None]:
# Simple query
duck.sql("""--sql

    SELECT COUNT(*) FROM transactions_view

""")

In [None]:
# More complex query
# note: data was generated randomly
duck.sql("""--sql

    SELECT
        location, 
        SUM("amount") as amount,
        AVG("shipping_costs") as avg_shipping_costs,
    FROM transactions_view
    WHERE 
        has_discount = true
    GROUP BY
        location

""")

### Loading data into DuckDB

In [None]:
# Load table into DuckDB persistent storage
# Convert CSV into DuckDB column oriented format
duck.sql("""--sql
   
   CREATE OR REPLACE TABLE transactions AS (
       SELECT * FROM transactions_view
   )
           
""")

In [None]:
# Same query after loading into DuckDB storage format (column-oriented)
# Compression about 6x times. 1.2GB -> 200mb
duck.sql("""--sql
   
    SELECT
        location, 
        SUM("amount") as amount,
        AVG("shipping_costs") as avg_shipping_costs,
    FROM transactions
    WHERE 
        has_discount = true
    GROUP BY
        location
           
""")

### Reading from parquet files

In [None]:
# Over 200m rows
duck.sql("""--sql
   
    SELECT COUNT(*) FROM read_parquet("data/nyc_taxi/*.parquet")
           
""")

In [None]:
duck.sql("""--sql
   
    SELECT
        hvfhs_license_num,
        pickup_datetime,
        trip_miles,
        trip_time
    FROM read_parquet("data/nyc_taxi/*.parquet")
    LIMIT 5
           
""").show(max_width=100000)

In [None]:
duck.sql("""--sql
   
        SELECT 
            date_trunc('month', pickup_datetime) as year_month,
            ROUND(SUM(trip_miles)) as total_trip_miles, 
            ROUND(AVG(trip_miles), 2) avg_trip_miles,
            ROUND(AVG(trip_time)/60, 2) avg_trip_time_minutes
        
        FROM read_parquet("data/nyc_taxi/*.parquet")
        GROUP BY
            date_trunc('month', pickup_datetime)
        ORDER BY 
            date_trunc('month', pickup_datetime)
           
""")

# AWS Athena - 0.07 USD

### Export data to other formats

In [None]:
# Create a view - just for conviniance
duck.sql("""--sql
   
    CREATE OR REPLACE VIEW nyc_taxi_report as (
   
        SELECT
            CASE 
                WHEN hvfhs_license_num='HV0003' THEN 'Uber'
                WHEN hvfhs_license_num='HV0005' THEN 'Lyft'
            END as carrier,
            ROUND(SUM(trip_miles)) as total_trip_miles, 
            ROUND(AVG(trip_miles), 2) avg_trip_miles,
            ROUND(AVG(trip_time)/60, 2) avg_trip_time_minutes
        
        FROM read_parquet("data/nyc_taxi/*.parquet")
        GROUP BY
            hvfhs_license_num
    
    )
           
""")

In [None]:
# Save to parquet, csv, json
duck.sql("""--sql
   
    COPY nyc_taxi_report TO 'nyc_taxi_report.parquet' (FORMAT PARQUET);
    COPY nyc_taxi_report TO 'nyc_taxi_report.json';
    COPY nyc_taxi_report TO 'nyc_taxi_report.csv' (HEADER, DELIMITER ',');
           
""")

In [None]:
# Save to excel with spatial extention
duck.sql("""--sql
         
    -- install and load extentions
    INSTALL spatial;
    LOAD spatial;
   
    -- save as xlsx
    COPY nyc_taxi_report TO 'nyc_taxi_report.xlsx' WITH (FORMAT GDAL, DRIVER 'xlsx');
           
""")

### Integration with pandas

In [None]:
# Create a DataFrame with sample data
data = {
    'Name': ['Alice', 'Bob', 'Charlie', 'Diana'],
    'Age': [25, 30, 35, 22],
    'City': ['New York', 'Los Angeles', 'Chicago', 'Houston']
}

my_df = pd.DataFrame(data)
my_df

In [None]:
# Access to pandas dataframes 
duck.sql("""--sql
         
    SELECT * FROM my_df
           
""")

In [None]:
trips_df = duck.sql("""--sql
  
  -- How many trips per day for each carrier.
  WITH trips__day_carrier as (
        SELECT
            date_trunc('day', Pickup_datetime) as pickup_date,
            CASE 
                WHEN hvfhs_license_num='HV0003' THEN 'Uber'
                WHEN hvfhs_license_num='HV0005' THEN 'Lyft'
            END as carrier,
            COUNT(*) as trips_count
        
        FROM read_parquet("data/nyc_taxi/*.parquet")
        GROUP BY
            date_trunc('day', Pickup_datetime), hvfhs_license_num
        ORDER BY
            date_trunc('day', Pickup_datetime), hvfhs_license_num
  ),
  pivoted as (
      PIVOT trips__day_carrier ON carrier USING SUM(trips_count)
  )
  SELECT * FROM pivoted
  ORDER BY pickup_date
           
""").df()

trips_df

In [None]:
import plotly.express as px
px.line(trips_df, x='pickup_date', y=['Uber', 'Lyft'])

### Spark API

drag & drop pySpark replacement (experimental)

In [None]:
from duckdb.experimental.spark.sql import SparkSession as session
from duckdb.experimental.spark.sql.functions import lit, col

# from pyspark.sql import SparkSession as session
# from pyspark.sql.functions import lit, col

import pandas as pd

spark = session.builder.getOrCreate()

pandas_df = pd.DataFrame({
    'name': ['Joan', 'Peter', 'John', 'Bob'],
    'age': [34, 45, 23, 56],
})

df = spark.createDataFrame(pandas_df)
df = df.withColumn(
    'year', lit(2024) - col('age')
)
df = df.select(
    col('name'),
    col('age'),
    col('year'),
    lit('New York').alias('location')
)

df.show()

### Connecting to S3

In [None]:
import os

duck.execute("INSTALL httpfs;")
duck.execute("LOAD httpfs;")

duck.execute(f"SET s3_region='{os.environ['AWS_REGION']}';")
duck.execute(f"SET s3_access_key_id='{os.environ['AWS_ACCESS_KEY_ID']}';")
duck.execute(f"SET s3_secret_access_key='{os.environ['AWS_SECRET_ACCESS_KEY']}';")


duck.sql("""--sql
         
    SELECT * FROM read_parquet("s3://skilzzz/sources/devitjobsuk/offers/parquet/devitjobsuk__offers.parquet")

""").show(max_width=10000)