# Local Data Engineering Workflow

## Objective
This notebook demonstrates a complete local data engineering workflow using:
- **dlt** for data loading and transformation
- **DuckDB** for embedded analytics
- **Jupyter** for interactive development

## Environment Setup
Make sure you have activated your virtual environment and installed all dependencies from `requirements.txt`


In [9]:
# Import required libraries
import dlt
import duckdb
import pandas as pd
import numpy as np
from datetime import datetime
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

print("✅ All libraries imported successfully!")
print(f"Current working directory: {os.getcwd()}")


✅ All libraries imported successfully!
Current working directory: /Users/adrianosanges/personal/local-data-engineering-environment/notebooks


In [10]:
# Initialize dlt pipeline with DuckDB destination
pipeline = dlt.pipeline(
    pipeline_name="local_data",
    destination="duckdb",
    dataset_name="my_data"
)

print("✅ dlt pipeline initialized successfully!")
print(f"Pipeline name: {pipeline.pipeline_name}")
print(f"Destination: {pipeline.destination}")
print(f"Dataset: {pipeline.dataset_name}")


✅ dlt pipeline initialized successfully!
Pipeline name: local_data
Destination: <dlt.destinations.duckdb(destination_type='duckdb', staging_dataset_name_layout='%s_staging', enable_dataset_name_normalization=True, info_tables_query_threshold=1000, truncate_tables_on_staging_destination_before_load=True, local_dir='/Users/adrianosanges/personal/local-data-engineering-environment/notebooks', pipeline_name='local_data', pipeline_working_dir='/Users/adrianosanges/.dlt/pipelines/local_data', create_indexes=False)>
Dataset: my_data


In [11]:
def load_csv_with_transformations():
    """
    Load CSV data with basic transformations and data quality checks.
    """
    # Read the CSV file
    df = pd.read_csv("../data/sample.csv")
    
    print(f"📊 Loaded {len(df)} records from CSV")
    print(f"📋 Columns: {list(df.columns)}")
    
    # Data quality checks
    print("\n🔍 Data Quality Checks:")
    print(f"- Null values: {df.isnull().sum().sum()}")
    print(f"- Duplicate rows: {df.duplicated().sum()}")
    print(f"- Date range: {df['date'].min()} to {df['date'].max()}")
    
    # Basic transformations
    df['date'] = pd.to_datetime(df['date'])
    df['total_revenue'] = df['quantity'] * df['price']
    df['month'] = df['date'].dt.month
    df['day_of_week'] = df['date'].dt.day_name()
    
    # Add data quality assertions
    assert df['quantity'].min() > 0, "Quantity should be positive"
    assert df['price'].min() > 0, "Price should be positive"
    assert df['total_revenue'].min() > 0, "Total revenue should be positive"
    
    print("✅ Data transformations completed successfully!")
    
    # Yield records for dlt
    for record in df.to_dict(orient="records"):
        yield record

# Load data into dlt pipeline
info = pipeline.run(
    load_csv_with_transformations(),
    table_name="sample_data",
    write_disposition="replace"
)

print(f"\n✅ Data loaded successfully!")
print(f"Load info: {info}")
print(f"Load info type: {type(info)}")

# Get the database path and connect directly to DuckDB
db_path = pipeline.sql_client().credentials.database
print(f"\n🔗 Database path: {db_path}")

# Connect to DuckDB and check tables with proper schema handling
con = duckdb.connect(db_path)

# Get all tables from all schemas
all_tables = con.execute("""
    SELECT table_schema, table_name 
    FROM information_schema.tables 
    WHERE table_type = 'BASE TABLE' AND table_schema != 'information_schema'
""").fetchdf()

print(f"\n📋 Available tables:")
print(all_tables)

# Find our data table (exclude dlt internal tables)
data_tables = all_tables[~all_tables['table_name'].str.startswith('_dlt')]
if not data_tables.empty:
    table_schema = data_tables.iloc[0]['table_schema']
    table_name = data_tables.iloc[0]['table_name']
    full_table_name = f"{table_schema}.{table_name}"
    
    row_count = con.execute(f"SELECT COUNT(*) FROM {full_table_name}").fetchone()[0]
    print(f"\n📊 Records in table '{full_table_name}': {row_count}")
    
    # Show sample data
    sample = con.execute(f"SELECT * FROM {full_table_name} LIMIT 3").fetchdf()
    print(f"\n📋 Sample data:")
    print(sample)
else:
    print("\n❌ No data tables found in the database")

con.close()


📊 Loaded 10 records from CSV
📋 Columns: ['date', 'product', 'category', 'quantity', 'price', 'region']

🔍 Data Quality Checks:
- Null values: 0
- Duplicate rows: 0
- Date range: 2024-01-01 to 2024-01-05
✅ Data transformations completed successfully!

✅ Data loaded successfully!
Load info: Pipeline local_data load step completed in 0.04 seconds
1 load package(s) were loaded to destination duckdb and into dataset my_data
The duckdb destination used duckdb:////Users/adrianosanges/personal/local-data-engineering-environment/notebooks/local_data.duckdb location to store data
Load package 1750598657.8910341 is LOADED and contains no failed jobs
Load info type: <class 'dlt.common.pipeline.LoadInfo'>

🔗 Database path: /Users/adrianosanges/personal/local-data-engineering-environment/notebooks/local_data.duckdb

📋 Available tables:
  table_schema           table_name
0      my_data          sample_data
1      my_data           _dlt_loads
2      my_data  _dlt_pipeline_state
3      my_data        

In [12]:
# Connect to DuckDB and query the loaded data
# Get the database path from the pipeline
db_path = pipeline.sql_client().credentials.database
print(f"🔗 Connected to DuckDB at: {db_path}")

# Connect to DuckDB
con = duckdb.connect(db_path)

# Get available tables with proper schema handling
all_tables = con.execute("""
    SELECT table_schema, table_name 
    FROM information_schema.tables 
    WHERE table_type = 'BASE TABLE' AND table_schema != 'information_schema'
""").fetchdf()

print(f"\n📋 Available tables:")
print(all_tables)

# Find our data table (exclude dlt internal tables)
data_tables = all_tables[~all_tables['table_name'].str.startswith('_dlt')]

if data_tables.empty:
    print("❌ No data tables found. Please run the data loading cell first.")
else:
    table_schema = data_tables.iloc[0]['table_schema']
    table_name = data_tables.iloc[0]['table_name']
    full_table_name = f"{table_schema}.{table_name}"
    
    print(f"\n📊 Using table: {full_table_name}")
    
    # Query 1: Basic summary statistics
    print("\n📈 Summary Statistics:")
    summary_query = f"""
    SELECT 
        COUNT(*) as total_records,
        SUM(quantity) as total_quantity,
        SUM(total_revenue) as total_revenue,
        AVG(price) as avg_price,
        MIN(date) as earliest_date,
        MAX(date) as latest_date
    FROM {full_table_name}
    """
    
    summary_result = con.execute(summary_query).fetchdf()
    print(summary_result.to_string(index=False))
    
    # Query 2: Sales by category
    print("\n🏷️ Sales by Category:")
    category_query = f"""
    SELECT 
        category,
        COUNT(*) as record_count,
        SUM(quantity) as total_quantity,
        SUM(total_revenue) as total_revenue,
        AVG(price) as avg_price
    FROM {full_table_name}
    GROUP BY category
    ORDER BY total_revenue DESC
    """
    
    category_result = con.execute(category_query).fetchdf()
    print(category_result.to_string(index=False))
    
    # Query 3: Sales by region
    print("\n🌍 Sales by Region:")
    region_query = f"""
    SELECT 
        region,
        COUNT(*) as record_count,
        SUM(quantity) as total_quantity,
        SUM(total_revenue) as total_revenue,
        ROUND(AVG(price), 2) as avg_price
    FROM {full_table_name}
    GROUP BY region
    ORDER BY total_revenue DESC
    """
    
    region_result = con.execute(region_query).fetchdf()
    print(region_result.to_string(index=False))

con.close()
print("\n✅ DuckDB connection closed")


🔗 Connected to DuckDB at: /Users/adrianosanges/personal/local-data-engineering-environment/notebooks/local_data.duckdb

📋 Available tables:
  table_schema           table_name
0      my_data          sample_data
1      my_data           _dlt_loads
2      my_data  _dlt_pipeline_state
3      my_data         _dlt_version

📊 Using table: my_data.sample_data

📈 Summary Statistics:
 total_records  total_quantity  total_revenue  avg_price             earliest_date               latest_date
            10            65.0        21125.0      372.5 2024-01-01 01:00:00+01:00 2024-01-05 01:00:00+01:00

🏷️ Sales by Category:
   category  record_count  total_quantity  total_revenue  avg_price
Electronics             5            46.0        17350.0      530.0
  Furniture             5            19.0         3775.0      215.0

🌍 Sales by Region:
region  record_count  total_quantity  total_revenue  avg_price
 North             3            26.0         9750.0     583.33
  East             2          

In [13]:
# Create output directory if it doesn't exist
os.makedirs("../output", exist_ok=True)

# Get database path and connect
db_path = pipeline.sql_client().credentials.database
con = duckdb.connect(db_path)

# Get table name with proper schema handling
all_tables = con.execute("""
    SELECT table_schema, table_name 
    FROM information_schema.tables 
    WHERE table_type = 'BASE TABLE' AND table_schema != 'information_schema'
""").fetchdf()

data_tables = all_tables[~all_tables['table_name'].str.startswith('_dlt')]

if not data_tables.empty:
    table_schema = data_tables.iloc[0]['table_schema']
    table_name = data_tables.iloc[0]['table_name']
    full_table_name = f"{table_schema}.{table_name}"
    
    # Export summary results to CSV
    summary_query = f"""
    SELECT 
        COUNT(*) as total_records,
        SUM(quantity) as total_quantity,
        SUM(total_revenue) as total_revenue,
        AVG(price) as avg_price
    FROM {full_table_name}
    """
    summary_result = con.execute(summary_query).fetchdf()
    summary_result.to_csv("../output/summary_statistics.csv", index=False)
    
    category_query = f"""
    SELECT 
        category,
        COUNT(*) as record_count,
        SUM(total_revenue) as total_revenue
    FROM {full_table_name}
    GROUP BY category
    ORDER BY total_revenue DESC
    """
    category_result = con.execute(category_query).fetchdf()
    category_result.to_csv("../output/sales_by_category.csv", index=False)
    
    region_query = f"""
    SELECT 
        region,
        COUNT(*) as record_count,
        SUM(total_revenue) as total_revenue
    FROM {full_table_name}
    GROUP BY region
    ORDER BY total_revenue DESC
    """
    region_result = con.execute(region_query).fetchdf()
    region_result.to_csv("../output/sales_by_region.csv", index=False)
    
    print("✅ Results exported to CSV files in ../output/ directory")
    print("📁 Files created:")
    print("  - summary_statistics.csv")
    print("  - sales_by_category.csv")
    print("  - sales_by_region.csv")
else:
    print("❌ No data tables found. Please run the data loading cell first.")

con.close()


✅ Results exported to CSV files in ../output/ directory
📁 Files created:
  - summary_statistics.csv
  - sales_by_category.csv
  - sales_by_region.csv


## Summary and Next Steps

### What we accomplished:
1. ✅ Set up dlt pipeline with DuckDB destination
2. ✅ Loaded and transformed sample data with quality checks
3. ✅ Performed comprehensive analytics using SQL
4. ✅ Exported results to CSV files
5. ✅ Implemented data quality monitoring

### Next steps for enhancement:
- Add more data sources (APIs, databases)
- Implement incremental loading
- Add data validation schemas
- Create automated data quality alerts
- Build interactive dashboards
- Set up scheduled data processing

### Key metrics from this analysis:
- Total records processed: [See summary above]
- Total revenue: [See summary above]
- Top performing category: [See category analysis above]
- Most active region: [See region analysis above]
