In [1]:
#!pip install duckdb # uncomment run if not installed

In [2]:
# !pip install dbt-core # uncomment run if not installed

In [3]:
# !pip install dbt-duckdb # uncomment run if not installed

In [4]:
# !pip install pyyaml # uncomment run if not installed

In [5]:
import duckdb
import os
import yaml

In [6]:
# set path for local duckdb db
db_name = 'my_db.duckdb'
db_directory = os.path.join(os.getcwd(), db_name)

In [7]:
# change db path in profiles.yml
profiles_path = os.path.join(os.getcwd(),'customers_transactions','profiles.yml')
with open(profiles_path,'r') as file:
    dbt_profiles = yaml.safe_load(file)
    
dbt_profiles['main_db']['outputs']['dev']['path'] = db_directory

with open(profiles_path, 'w') as file:
    yaml.dump(dbt_profiles, file, default_flow_style = False)

In [8]:
connection = duckdb.connect(db_directory)

In [9]:
# load customers data set into db
connection.execute("""
    drop table if exists customers;
    create table if not exists customers 
    (
        customer_id int
        ,created_at timestamp
    );
    copy customers from 'customers.csv' (header true)
""")

<duckdb.duckdb.DuckDBPyConnection at 0x1078d92b0>

In [10]:
# load subscription_state_changes data set into db
connection.execute("""
    drop table if exists subscription_state_changes;
    create table if not exists subscription_state_changes 
    (
        subscription_change_id varchar
        ,subscription_id varchar
        ,customer_id int
        ,subscription_change_timestamp timestamp -- changed column name for readability
        ,start_state varchar
        ,end_state varchar
    );
    copy subscription_state_changes from 'subscription_changes.csv' (header true)
""")

<duckdb.duckdb.DuckDBPyConnection at 0x1078d92b0>

In [11]:
# load transactions data set into db
connection.execute("""
    drop table if exists transactions;
    create table if not exists transactions 
    (
        transaction_id varchar
        ,subscription_id varchar
        ,customer_id int
        ,transaction_timestamp timestamp
        ,net_revenue decimal
        ,category varchar
        ,category_type_id int -- added column to add detail
    );
    copy transactions from 'transactions.csv' (header true)
""")


<duckdb.duckdb.DuckDBPyConnection at 0x1078d92b0>

In [12]:
# check for existing locks on db (connections)
!lsof my_db.duckdb

COMMAND     PID USER   FD   TYPE DEVICE SIZE/OFF    NODE NAME
python3.1 27415 coop   75u   REG   1,17  2371584 7057965 my_db.duckdb


In [13]:
connection.close()

In [14]:
# confirm locks are removed
!lsof my_db.duckdb

In [15]:
# run dbt workflow
!cd customers_transactions && dbt run

[0m18:23:45  Running with dbt=1.9.3
[0m18:23:45  Registered adapter: duckdb=1.9.2
[0m18:23:46  Unable to do partial parsing because a project config has changed
[0m18:23:46  Found 2 models, 3 sources, 426 macros
[0m18:23:46  
[0m18:23:46  Concurrency: 2 threads (target='dev')
[0m18:23:46  
[0m18:23:47  1 of 2 START sql table model main.dim_customer_acquisition ..................... [RUN]
[0m18:23:47  1 of 2 OK created sql table model main.dim_customer_acquisition ................ [[32mOK[0m in 0.07s]
[0m18:23:47  2 of 2 START sql table model main.cube_customer_transaction_behavior ........... [RUN]
[0m18:23:47  2 of 2 OK created sql table model main.cube_customer_transaction_behavior ...... [[32mOK[0m in 0.02s]
[0m18:23:47  
[0m18:23:47  Finished running 2 table models in 0 hours 0 minutes and 0.19 seconds (0.19s).
[0m18:23:47  
[0m18:23:47  [32mCompleted successfully[0m
[0m18:23:47  
[0m18:23:47  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2


In [16]:
connection = duckdb.connect(db_directory)

In [17]:
connection.execute("""
    select * from dim_customer_acquisition
""").fetchdf()

Unnamed: 0,customer_id,customer_created_timestamp,acquisition_timestamp,acquisition_type,subscription_id,subscription_start_timestamp,subscription_end_timestamp,prior_subscription_id
0,7406,2024-06-21 04:14:25,2024-06-21 04:14:25,gift,sub_35054,2024-06-26 04:14:25,2024-11-22 00:16:39,
1,7841,2025-03-07 22:34:26,2025-03-07 22:34:26,subscription,sub_74635,2025-03-07 22:34:26,2025-03-10 20:35:32,
2,8233,2025-02-01 19:54:44,2025-02-01 19:54:44,alacart,sub_63639,2025-02-07 19:54:44,2025-03-04 19:38:48,
3,8233,2025-02-01 19:54:44,2025-02-01 19:54:44,alacart,sub_88361,2025-03-04 19:38:49,NaT,sub_63639
4,1896,2024-03-22 01:40:10,2024-03-22 01:40:10,subscription,sub_74860,2024-03-22 01:40:10,2024-10-27 14:51:01,
5,1896,2024-03-22 01:40:10,2024-03-22 01:40:10,subscription,sub_16352,2024-10-27 14:51:02,NaT,sub_74860
6,7079,2024-08-20 05:55:14,2024-08-20 05:55:14,gift,sub_16622,2024-09-10 05:55:14,2024-11-29 02:09:50,
7,1926,2024-07-30 01:44:57,2024-07-30 01:44:57,gift,sub_20975,2024-08-25 01:44:57,2024-11-22 02:52:19,
8,9453,2024-12-03 14:42:11,2024-12-09 14:42:11,subscription,,NaT,NaT,
9,1715,2024-05-01 10:33:11,2024-05-01 10:33:11,gift,,NaT,NaT,


In [18]:
connection.execute("""
    select * from cube_customer_transaction_behavior
""").fetchdf()

Unnamed: 0,transaction_year,transaction_month,acquisition_type,acquisition_year,acquisition_month,customer_type,category,total_customers,total_transactions,total_net_revenue
0,2025,3,subscription,2025,3,new,subscription,1,2,63.0
1,2025,3,alacart,2025,2,existing,subscription,1,2,71.4
2,2025,2,alacart,2025,2,new,alacart,1,1,23.89
3,2025,2,alacart,2025,2,new,subscription,1,1,31.5
4,2025,2,subscription,2024,12,existing,gift,1,1,17.32
5,2025,1,subscription,2024,6,existing,alacart,1,1,10.35
6,2024,12,subscription,2024,12,new,subscription,1,1,39.9
7,2024,11,gift,2024,7,existing,subscription,1,1,39.9
8,2024,11,gift,2024,8,existing,subscription,1,1,39.9
9,2024,11,gift,2024,6,existing,subscription,1,1,39.9


In [19]:
connection.close()