In [1]:
import duckdb
import os

def env(name, default=None):
    v = os.environ.get(name)
    return v if v not in (None, "") else default

def install_extensions(conn: duckdb.DuckDBPyConnection) -> None:
    conn.execute("""
        INSTALL ducklake;
        INSTALL postgres_scanner;
        INSTALL httpfs;
    """)

def chech_extensions(conn: duckdb.DuckDBPyConnection) -> None:
    print("Extensions:")
    conn.sql("""
        select extension_name, loaded, installed, description, aliases
        from duckdb_extensions()
        where 1 = 1
            and extension_name in ('ducklake', 'httpfs', 'postgres_scanner')
    """).show()


def create_secrets(conn: duckdb.DuckDBPyConnection) -> None:
    conn.execute(f"""
    CREATE OR REPLACE SECRET minio_storage (
        TYPE S3,
        KEY_ID '{env("MINIO_ROOT_USER")}',
        SECRET '{env("MINIO_ROOT_PASSWORD")}',
        ENDPOINT '{env("MINIO_ENDPOINT")}',
        REGION 'eu-central-1',
        USE_SSL false,
        URL_STYLE 'path',
        SCOPE 's3://{env("MINIO_BUCKET")}/'
    );
    """)

    conn.execute(f"""
    CREATE OR REPLACE SECRET pg_meta (
        TYPE POSTGRES,
        HOST '{env("POSTGRES_HOST")}',
        PORT {env("POSTGRES_PORT")},
        DATABASE '{env("POSTGRES_DB")}',
        USER '{env("POSTGRES_USER")}',
        PASSWORD '{env("POSTGRES_PASSWORD")}'
    );
    """)

    conn.execute(f"""
    ATTACH 'ducklake:postgres:' AS lake (
        META_SECRET pg_meta,
        DATA_PATH 's3://{env("MINIO_BUCKET")}/'
    );
    """)
    
    conn.execute("USE lake;")

with duckdb.connect() as conn:
    install_extensions(conn)
    # chech_extensions(conn)
    create_secrets(conn)

    print("Available secrets:")
    conn.sql("select name, type, scope FROM duckdb_secrets()").show()
    
    print("Database tables:")
    conn.sql("show tables").show()
    
    print("Schemas in lake:")
    conn.sql("from duckdb_schemas()").show()

Available secrets:
┌───────────────┬──────────┬────────────────────┐
│     name      │   type   │       scope        │
│    varchar    │ varchar  │     varchar[]      │
├───────────────┼──────────┼────────────────────┤
│ minio_storage │ s3       │ ['s3://ducklake/'] │
│ pg_meta       │ postgres │ []                 │
└───────────────┴──────────┴────────────────────┘

Database tables:
┌─────────┐
│  name   │
│ varchar │
├─────────┤
│ 0 rows  │
└─────────┘

Schemas in lake:
┌───────┬──────────────────────────┬──────────────┬────────────────────┬─────────┬───────────────────────┬──────────┬─────────┐
│  oid  │      database_name       │ database_oid │    schema_name     │ comment │         tags          │ internal │   sql   │
│ int64 │         varchar          │    int64     │      varchar       │ varchar │ map(varchar, varchar) │ boolean  │ varchar │
├───────┼──────────────────────────┼──────────────┼────────────────────┼─────────┼───────────────────────┼──────────┼─────────┤
│  2059 │ _

In [2]:
with duckdb.connect() as conn:
    create_secrets(conn)
    
    conn.sql("SELECT * FROM __ducklake_metadata_lake.information_schema.schemata;").show()

┌──────────────────┬────────────────────┬───────────────────┬───────────────────────────────┬──────────────────────────────┬────────────────────────────┬──────────┐
│   catalog_name   │    schema_name     │   schema_owner    │ default_character_set_catalog │ default_character_set_schema │ default_character_set_name │ sql_path │
│     varchar      │      varchar       │      varchar      │            varchar            │           varchar            │          varchar           │ varchar  │
├──────────────────┼────────────────────┼───────────────────┼───────────────────────────────┼──────────────────────────────┼────────────────────────────┼──────────┤
│ ducklake_catalog │ public             │ pg_database_owner │ NULL                          │ NULL                         │ NULL                       │ NULL     │
│ ducklake_catalog │ pg_toast_temp_7    │ ducklake          │ NULL                          │ NULL                         │ NULL                       │ NULL     │
│ ducklake

In [15]:
with duckdb.connect() as conn:
    create_secrets(conn)
    # conn.sql("from duckdb_schemas()").show()
    # conn.execute("use raw")
    conn.sql("""
    SELECT 
        be.event_id,
        be.click_id,
        be.event_timestamp,
        be.event_type,
        be.browser_name,
        de.os_name,
        de.device_type,
        de.user_custom_id,
        ge.geo_country,
        ge.geo_region_name,
        le.page_url_path,
        le.referer_medium,
        le.utm_source,
        le.utm_campaign,
        le.utm_medium
    FROM raw.browser_events be
    LEFT JOIN raw.device_events de ON be.click_id = de.click_id
    LEFT JOIN raw.geo_events ge ON be.click_id = ge.click_id
    LEFT JOIN raw.location_events le ON be.event_id = le.event_id
    """).show()

┌──────────────────────────────────────┬──────────────────────────────────────┬─────────────────────────┬────────────┬──────────────────┬─────────┬─────────────┬─────────────────────────────────┬─────────────┬──────────────────────────────────┬───────────────────────────────────┬────────────────┬────────────┬──────────────┬────────────┐
│               event_id               │               click_id               │     event_timestamp     │ event_type │   browser_name   │ os_name │ device_type │         user_custom_id          │ geo_country │         geo_region_name          │           page_url_path           │ referer_medium │ utm_source │ utm_campaign │ utm_medium │
│                 uuid                 │                 uuid                 │        timestamp        │  varchar   │     varchar      │ varchar │   varchar   │             varchar             │   varchar   │             varchar              │              varchar              │    varchar     │  varchar   │   varchar  

In [11]:
import time

def chech_extensions(conn: duckdb.DuckDBPyConnection) -> None:
    print("Extensions:")
    conn.sql("""
        select extension_name, loaded, installed, description, aliases
        from duckdb_extensions()
        where 1 = 1
            and extension_name in ('ducklake', 'httpfs', 'postgres_scanner')
    """).show()

with duckdb.connect() as conn:
    create_secrets(conn)
    # chech_extensions(conn)
    start_time = time.time()
    
    conn.sql("""
        select 'browser_events' as source, count(*) as cnt 
        from read_json('s3://ducklake/data/browser_events/*.jsonl')
            union all
        select 'device_events' as source, count(*) as cnt 
        from read_json('s3://ducklake/data/device_events/*.jsonl')
            union all
        select 'geo_events' as source, count(*) as cnt 
        from read_json('s3://ducklake/data/geo_events/*.jsonl')
            union all
        select 'location_events' as source, count(*) as cnt 
        from read_json('s3://ducklake/data/location_events/*.jsonl')
    """).show()

    end_time = time.time()

    elapsed_time = end_time - start_time
    print(f"The task took {elapsed_time:.2f} seconds to complete.")

┌─────────────────┬─────────┐
│     source      │   cnt   │
│     varchar     │  int64  │
├─────────────────┼─────────┤
│ browser_events  │ 3478779 │
│ device_events   │ 3478779 │
│ geo_events      │ 3478779 │
│ location_events │ 3478779 │
└─────────────────┴─────────┘

The task took 773.87 seconds to complete.
