In [29]:
import duckdb
import pandas as pd
from datetime import datetime

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.width', None)  # Largura automática
pd.set_option('display.max_colwidth', None)  # Sem limite para largura das colunas

## Step 1: Load Source Data and Create Joined View

In [30]:
# Connect to the persistent DuckDB database
conn = duckdb.connect('zillow_dwh.duckdb')

# Define CSV file paths
csv_paths = [
    r"C:\Users\JOHNV\Documents\CS\ED\main-project\ZILLOW_DATA_962c837a6ccefddddf190101e0bafdaf\ZILLOW_DATA_962c837a6ccefddddf190101e0bafdaf.csv",
    r"C:\Users\JOHNV\Documents\CS\ED\main-project\ZILLOW_INDICATORS_e93833a53d6c88463446a364cda611cc\ZILLOW_INDICATORS_e93833a53d6c88463446a364cda611cc.csv",
    r"C:\Users\JOHNV\Documents\CS\ED\main-project\ZILLOW_REGIONS_1a51d107db038a83ac171d604cb48d5b\ZILLOW_REGIONS_1a51d107db038a83ac171d604cb48d5b.csv"
]

names = ["staging_zillow_data", "staging_zillow_indicators", "staging_zillow_regions"]

# Load CSV files into staging tables
for i, csv_path in enumerate(csv_paths):
    conn.execute(f"DROP TABLE IF EXISTS {names[i]}")
    conn.execute(f"CREATE TABLE {names[i]} AS SELECT * FROM read_csv_auto('{csv_path.replace(chr(92), chr(92)*2)}')")
    count = conn.execute(f"SELECT COUNT(*) FROM {names[i]}").fetchone()[0]
    print(f"✓ Loaded {names[i]}: {count:,} rows")

print("\n✓ All source data loaded!")

# Create unified staging table with all joins
print("\nCreating unified staging table...")
conn.execute("DROP TABLE IF EXISTS staging_unified")
conn.execute("""
    CREATE TABLE staging_unified AS
    SELECT 
        d.*,
        i.*,
        r.*
    FROM staging_zillow_data d
    JOIN staging_zillow_indicators i ON d.indicator_id = i.indicator_id
    JOIN staging_zillow_regions r ON d.region_id = r.region_id
    LIMIT 1000
""")

unified_count = conn.execute("SELECT COUNT(*) FROM staging_unified").fetchone()[0]
print(f"✓ Created staging_unified table: {unified_count:,} rows")

# Drop the individual staging tables as we no longer need them
conn.execute("DROP TABLE IF EXISTS staging_zillow_data")
conn.execute("DROP TABLE IF EXISTS staging_zillow_indicators")
conn.execute("DROP TABLE IF EXISTS staging_zillow_regions")
print("✓ Individual staging tables dropped (no longer needed)")

✓ Loaded staging_zillow_data: 159,663,189 rows
✓ Loaded staging_zillow_indicators: 56 rows
✓ Loaded staging_zillow_regions: 89,305 rows

✓ All source data loaded!

Creating unified staging table...
✓ Created staging_unified table: 1,000 rows
✓ Individual staging tables dropped (no longer needed)


In [31]:
# Add after line 48 (after creating staging_unified)

# Filter staging_unified to keep only selected indicators
print("\nFiltering staging_unified to selected indicators only...")
selected_indicators = ['SAAW', 'SRAW', 'NRAW', 'IRAW', 'CRAW', 'LRAW', 'RSNA', 'RSSA', 'ZSFH', 'ZATT', 'ZABT', 'ZCON']

# Count before filtering
before_count = conn.execute("SELECT COUNT(*) FROM staging_unified").fetchone()[0]

# Delete rows that don't match selected indicators
placeholders = ','.join([f"'{ind}'" for ind in selected_indicators])
conn.execute(f"""
    DELETE FROM staging_unified
    WHERE indicator_id NOT IN ({placeholders})
""")

# Count after filtering
after_count = conn.execute("SELECT COUNT(*) FROM staging_unified").fetchone()[0]
print(f"✓ Before filter: {before_count:,} rows")
print(f"✓ After filter:  {after_count:,} rows")
print(f"✓ Removed:       {before_count - after_count:,} rows ({((before_count - after_count) / before_count * 100):.1f}%)")


Filtering staging_unified to selected indicators only...
✓ Before filter: 1,000 rows
✓ After filter:  999 rows
✓ Removed:       1 rows (0.1%)


In [32]:
# Sample one row per region type - show ALL fields
print("="*80)
print("SAMPLE DATA - ONE ROW PER REGION TYPE (ALL FIELDS)")
print("="*80)

region_types = conn.execute("SELECT DISTINCT region_type FROM staging_unified LIMIT 5").fetchdf()['region_type'].tolist()

for rt in region_types:
    print(f"\n{'='*80}")
    print(f"REGION TYPE: {rt}")
    print('='*80)
    sample = conn.execute(f"""
        SELECT * 
        FROM staging_unified
        WHERE region_type = '{rt}'
        LIMIT 1
    """).fetchdf()
    
    # Transpose the dataframe to show all fields vertically
    sample_t = sample.T
    sample_t.columns = ['Value']
    display(sample_t)

SAMPLE DATA - ONE ROW PER REGION TYPE (ALL FIELDS)

REGION TYPE: city


Unnamed: 0,Value
indicator_id,ZATT
region_id,11674
date,2002-05-31 00:00:00
value,278955.687718
indicator_id_1,ZATT
indicator,ZHVI All Homes- Top Tier Time Series ($)
category,Home values
region_id_1,11674
region_type,city
region,"Grandview Heights;OH;Columbus, OH;Franklin County"



REGION TYPE: county


Unnamed: 0,Value
indicator_id,ZATT
region_id,2678
date,2009-05-31 00:00:00
value,286653.683766
indicator_id_1,ZATT
indicator,ZHVI All Homes- Top Tier Time Series ($)
category,Home values
region_id_1,2678
region_type,county
region,"Del Norte County;CA;Crescent City, CA"


In [33]:
# Add this cell after the region type samples

# Sample one row per INDICATOR - show ALL fields
print("="*80)
print("SAMPLE DATA - ONE ROW PER INDICATOR (ALL FIELDS)")
print("="*80)

indicators = conn.execute("SELECT DISTINCT indicator_id FROM staging_unified ORDER BY indicator_id").fetchdf()['indicator_id'].tolist()

for ind in indicators:
    print(f"\n{'='*80}")
    print(f"INDICATOR: {ind}")
    print('='*80)
    sample = conn.execute(f"""
        SELECT * 
        FROM staging_unified
        WHERE indicator_id = '{ind}'
        LIMIT 1
    """).fetchdf()
    
    # Transpose the dataframe to show all fields vertically
    sample_t = sample.T
    sample_t.columns = ['Value']
    display(sample_t)

SAMPLE DATA - ONE ROW PER INDICATOR (ALL FIELDS)

INDICATOR: ZATT


Unnamed: 0,Value
indicator_id,ZATT
region_id,2678
date,2009-05-31 00:00:00
value,286653.683766
indicator_id_1,ZATT
indicator,ZHVI All Homes- Top Tier Time Series ($)
category,Home values
region_id_1,2678
region_type,county
region,"Del Norte County;CA;Crescent City, CA"


## Step 2: Explore Unified Staging Table

In [34]:
# Check the structure of staging_unified
print("STAGING_UNIFIED columns:")
cols = conn.execute("DESCRIBE staging_unified").fetchdf()
display(cols)

print("\n" + "="*80)
print("PREVIEW - First 10 rows")
print("="*80)
display(conn.execute("SELECT * FROM staging_unified LIMIT 10").fetchdf())

STAGING_UNIFIED columns:


Unnamed: 0,column_name,column_type,null,key,default,extra
0,indicator_id,VARCHAR,YES,,,
1,region_id,BIGINT,YES,,,
2,date,DATE,YES,,,
3,value,DOUBLE,YES,,,
4,indicator_id_1,VARCHAR,YES,,,
5,indicator,VARCHAR,YES,,,
6,category,VARCHAR,YES,,,
7,region_id_1,BIGINT,YES,,,
8,region_type,VARCHAR,YES,,,
9,region,VARCHAR,YES,,,



PREVIEW - First 10 rows


Unnamed: 0,indicator_id,region_id,date,value,indicator_id_1,indicator,category,region_id_1,region_type,region
0,ZATT,2678,2009-05-31,286653.683766,ZATT,ZHVI All Homes- Top Tier Time Series ($),Home values,2678,county,"Del Norte County;CA;Crescent City, CA"
1,ZATT,2358,2009-05-31,188791.69948,ZATT,ZHVI All Homes- Top Tier Time Series ($),Home values,2358,county,"Jennings County;IN;North Vernon, IN"
2,ZATT,559,2009-05-31,205644.892693,ZATT,ZHVI All Homes- Top Tier Time Series ($),Home values,559,county,Henry County;OH;nan
3,ZATT,2677,2009-05-31,657018.15177,ZATT,ZHVI All Homes- Top Tier Time Series ($),Home values,2677,county,"Currituck County;NC;Virginia Beach-Norfolk-Newport News, VA-NC"
4,ZATT,779,2009-05-31,169075.560045,ZATT,ZHVI All Homes- Top Tier Time Series ($),Home values,779,county,"Upson County;GA;Thomaston, GA"
5,ZATT,3133,2009-05-31,283365.49183,ZATT,ZHVI All Homes- Top Tier Time Series ($),Home values,3133,county,San Jacinto County;TX;nan
6,ZATT,608,2009-05-31,214938.853844,ZATT,ZHVI All Homes- Top Tier Time Series ($),Home values,608,county,Logan County;KY;nan
7,ZATT,1955,2009-05-31,316994.229973,ZATT,ZHVI All Homes- Top Tier Time Series ($),Home values,1955,county,Orleans County;VT;nan
8,ZATT,279,2009-05-31,192049.974439,ZATT,ZHVI All Homes- Top Tier Time Series ($),Home values,279,county,Pike County;OH;nan
9,ZATT,424,2009-05-31,144016.346727,ZATT,ZHVI All Homes- Top Tier Time Series ($),Home values,424,county,Caddo County;OK;nan


In [35]:
# Data quality checks
print("DATA QUALITY CHECKS")
print("="*80)

# Check for nulls in key fields
null_check = conn.execute("""
    SELECT 
        COUNT(*) as total_rows,
        SUM(CASE WHEN value IS NULL THEN 1 ELSE 0 END) as null_values,
        SUM(CASE WHEN date IS NULL THEN 1 ELSE 0 END) as null_dates,
        SUM(CASE WHEN region_id IS NULL THEN 1 ELSE 0 END) as null_regions,
        SUM(CASE WHEN indicator_id IS NULL THEN 1 ELSE 0 END) as null_indicators
    FROM staging_unified
""").fetchdf()
display(null_check)

DATA QUALITY CHECKS


Unnamed: 0,total_rows,null_values,null_dates,null_regions,null_indicators
0,999,0.0,0.0,0.0,0.0


In [36]:
# Breakdown statistics
print("="*80)
print("BREAKDOWN BY REGION TYPE")
print("="*80)
region_breakdown = conn.execute("""
    SELECT 
        region_type,
        COUNT(*) as num_records,
        COUNT(DISTINCT region_id) as unique_regions,
        COUNT(DISTINCT indicator_id) as unique_indicators,
        MIN(value) as min_value,
        MAX(value) as max_value,
        AVG(value) as avg_value
    FROM staging_unified
    WHERE value IS NOT NULL
    GROUP BY region_type
    ORDER BY num_records DESC
""").fetchdf()
display(region_breakdown)


BREAKDOWN BY REGION TYPE


Unnamed: 0,region_type,num_records,unique_regions,unique_indicators,min_value,max_value,avg_value
0,county,998,811,1,86568.462023,3953673.0,294712.76405
1,city,1,1,1,278955.687718,278955.7,278955.687718


## Step 3: Ingestão de Dados - Processamento por Linha

In [37]:
from enum import Enum


us_states_to_code = {
    'California': 'CA',
    'Connecticut': 'CT',
    'Texas': 'TX',
    'Utah': 'UT',
    'Montana': 'MT',
    'Florida': 'FL',
    'New York': 'NY',
    'Illinois': 'IL',
    'Ohio': 'OH',
    'North Carolina': 'NC',
    'Michigan': 'MI',
    'New Jersey': 'NJ',
    'Virginia': 'VA',
    'Indiana': 'IN',
    'Missouri': 'MO',
    'Colorado': 'CO',
    'Alabama': 'AL',
    'Louisiana': 'LA',
    'Kentucky': 'KY',
    'Oklahoma': 'OK',
    'North Dakota': 'ND',
    'Alaska': 'AK',
    'District of Columbia': 'DC',
    'South Carolina': 'SC',
    'Hawaii': 'HI',
    'New Hampshire': 'NH',
    'Pennsylvania': 'PA',
    'Washington': 'WA',
    'Arizona': 'AZ',
    'Massachusetts': 'MA',
    'Iowa': 'IA',
    'Arkansas': 'AR',
    'Mississippi': 'MS',
    'Kansas': 'KS',
    'New Mexico': 'NM',
    'Nebraska': 'NE',
    'Tennessee': 'TN',
    'Wyoming': 'WY',
    'Georgia': 'GA',
    'Delaware': 'DE',
    'Minnesota': 'MN',
    'Oregon': 'OR',
    'Maryland': 'MD',
    'Wisconsin': 'WI',
    'Idaho': 'ID',
    'Nevada': 'NV',
    'Maine': 'ME',
    'West Virginia': 'WV',
    'Vermont': 'VT',
    'Rhode Island': 'RI',
    'South Dakota': 'SD'
}

us_code_to_states = {
    'CA': 'California',
    'CT': 'Connecticut',
    'TX': 'Texas',
    'UT': 'Utah',
    'MT': 'Montana',
    'FL': 'Florida',
    'NY': 'New York',
    'IL': 'Illinois',
    'OH': 'Ohio',
    'NC': 'North Carolina',
    'MI': 'Michigan',
    'NJ': 'New Jersey',
    'VA': 'Virginia',
    'IN': 'Indiana',
    'MO': 'Missouri',
    'CO': 'Colorado',
    'AL': 'Alabama',
    'LA': 'Louisiana',
    'KY': 'Kentucky',
    'OK': 'Oklahoma',
    'ND': 'North Dakota',
    'AK': 'Alaska',
    'DC': 'District of Columbia',
    'SC': 'South Carolina',
    'HI': 'Hawaii',
    'NH': 'New Hampshire',
    'PA': 'Pennsylvania',
    'WA': 'Washington',
    'AZ': 'Arizona',
    'MA': 'Massachusetts',
    'IA': 'Iowa',
    'AR': 'Arkansas',
    'MS': 'Mississippi',
    'KS': 'Kansas',
    'NM': 'New Mexico',
    'NE': 'Nebraska',
    'TN': 'Tennessee',
    'WY': 'Wyoming',
    'GA': 'Georgia',
    'DE': 'Delaware',
    'MN': 'Minnesota',
    'OR': 'Oregon',
    'MD': 'Maryland',
    'WI': 'Wisconsin',
    'ID': 'Idaho',
    'NV': 'Nevada',
    'ME': 'Maine',
    'WV': 'West Virginia',
    'VT': 'Vermont',
    'RI': 'Rhode Island',
    'SD': 'South Dakota'
}

get_indicator_description = {
    'CRAW':'Percentage of listings with a price reduction (RAW, ALL HOMES, WEEKLY)',
    'IRAW':'Number of properties listed for sale (RAW, ALL HOMES, WEEKLY)',
    'LRAW':'Average list price (RAW, ALL HOMES, WEEKLY)',
    'NRAW':'Average days to properties enter pending status (RAW, ALL HOMES, WEEKLY)',
    'RSNA':'ZOOM rent index, rental trends (SMOOTHED, ALL HOMES + MULTI-FAMILY)',
    'RSSA':'ZOOM rent index, rental trends (SMOOTHED + SEASONALLY ADJUSTED, ALL HOMES + MULTI-FAMILY)',
    'SAAW':'Average sale price (SMOOTHED + SEASONALLY ADJUSTED, ALL HOMES, WEEKLY)',
    'SRAW':'Average sale price (RAW, ALL HOMES, WEEKLY)',
    'ZABT':'Typical home values - Segment: Bottom Tier',
    'ZATT':'Typical home values - Segment: Top Tier',
    'ZCON':'Typical home values - Segment: Condo/Co-op',
    'ZSFH':'Typical home values - Segment: Single Family Homes',
}

# ENUM('USD', 'PERCENTAGE', 'DAYS', 'UNITS')
class IndicatorUnit(Enum):
    PERCENTAGE = 'Percentage'
    USD = 'USD'
    DAYS = 'Days'
    UNITS = 'Units'

get_unit = {
    'CRAW': IndicatorUnit.PERCENTAGE,
    'IRAW': IndicatorUnit.UNITS,
    'LRAW': IndicatorUnit.USD,
    'NRAW': IndicatorUnit.DAYS,
    'RSNA': IndicatorUnit.USD,
    'RSSA': IndicatorUnit.USD,
    'SAAW': IndicatorUnit.USD,
    'SRAW': IndicatorUnit.USD,
    'ZABT': IndicatorUnit.USD,
    'ZATT': IndicatorUnit.USD,
    'ZCON': IndicatorUnit.USD,
    'ZSFH': IndicatorUnit.USD,
}

In [None]:
# Funções de processamento para cada campo

import mmh3

def hash_int32(text):
    """Cria um hash int32 unsigned usando MurmurHash3"""
    if text is None:
        return None
    # Retorna unsigned 32-bit (0 a 4294967295)
    return mmh3.hash(str(text), signed=True)

def metro_name_extractor(metro_aux):
    if ',' in metro_aux:
        metro_aux = metro_aux.rsplit(', ', 1)[0]
    return metro_aux if metro_aux != 'nan' else None

def process_region(row):
    """Processa o campo region e retorna informações estruturadas por tipo de região"""
    region_type = row['region_type']
    
    result = {
        'region_type': region_type,
        'country_name': 'United States',
        'country_key': 'USA',
        'state_name': None,
        'state_code': None,
        'metro_name': None,
        'county_name': None,
        'city_name': None,
        'zip_key': None,
        'neighborhood_name': None,
    }
    
    # Processamento específico por tipo de região

    # California
    if region_type == 'state':
        result['state_name'] = row['region']
        result['state_code'] = us_states_to_code.get(row['region'])
        
    # New Orleans, LA
    elif region_type == 'metro':
        if ',' in result['region']:
            metro_aux, state_aux = result['region'].rsplit(',')
            state_aux = state_aux.strip()
        else:
            metro_aux = result['region']
            state_aux = None

        result['metro_name'] = metro_aux
        if state_aux:
            result['state_code'] = state_aux
            result['state_name'] = us_code_to_states.get(state_aux)

    # Washington County;GA;nan
    elif region_type == 'county':
        county_aux, state_aux, metro_aux = row['region'].split(';')
        result['county_name'] = county_aux
        result['state_code'] = state_aux
        result['state_name'] = us_code_to_states.get(state_aux)

        result['metro_name'] = metro_name_extractor(metro_aux)
        
    # Pine Island;TX;Houston-The Woodlands-Sugar Land, TX;Waller County
    elif region_type == 'city':
        city_aux, state_aux, metro_aux, county_aux = row['region'].split(';')
        result['city_name'] = city_aux
        result['state_code'] = state_aux
        result['state_name'] = us_code_to_states.get(state_aux)

        result['metro_name'] = metro_name_extractor(metro_aux)

        result['county_name'] = county_aux
        
    # 46590;IN;Warsaw, IN;Winona Lake;Kosciusko County
    elif region_type == 'zip':
        zip_aux, state_aux, metro_aux, city_aux, county_aux = row['region'].split(';')
        result['zip_key'] = zip_aux if zip_aux != 'nan' else 000000
        result['state_code'] = state_aux
        result['state_name'] = us_code_to_states.get(state_aux)

        result['metro_name'] = metro_name_extractor(metro_aux)

        result['city_name'] = city_aux
        result['county_name'] = county_aux
        
    # Kendall;FL;Miami-Fort Lauderdale-Pompano Beach, FL;Miami;Miami-Dade County
    elif region_type == 'neigh':
        neigh_aux, state_aux, metro_aux, county_aux, city_aux = row['region'].split(';')
        
        result['neighborhood_name'] = neigh_aux
        result['state_code'] = state_aux
        result['state_name'] = us_code_to_states.get(state_aux)

        result['metro_name'] = metro_name_extractor(metro_aux)

        result['city_name'] = city_aux
        result['county_name'] = county_aux
        
    return result


def process_indicator(row):
    """Processa o campo indicator"""
    return {
        'realestate_indicator_key': row['indicator_id'],
        'indicator_name': row['indicator'],
        'realestate_indicator_description': get_indicator_description[row['indicator_id']], 
    }


def process_date(row):
    """Processa o campo date e extrai informações temporais"""
    date_obj = row.get('date')

    # print(type(date_str))
    
    result = {
        'year': None,
        'day_name': None, 
        'week_number': None,
        'month_number': None,
        'month_name': None,
    }

    if date_obj:
        try:
            # Parse da string de data
            # date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
            
            # Preencher o resultado
            result['year'] = date_obj.year
            result['day_name'] = date_obj.strftime('%A')  # Monday, Tuesday, etc.
            result['week_number'] = date_obj.isocalendar()[1]  # Número da semana no ano
            result['month_number'] = date_obj.month
            result['month_name'] = date_obj.strftime('%B')  # January, February, etc.
            
        except ValueError:
            # Se falhar o parse, mantém os valores None
            pass


    return result


def process_value(row):
    """Processa o campo value"""
    return {
        'value': row.get('value'),
        'unit': get_unit[row['indicator_id']]  
    }



def insert_into_dim_fact(row, processed_data):
    """Insere dados nas tabelas de dimensão e fato"""
    
    # Extract processed information
    region_info = processed_data['region_info']
    indicator_info = processed_data['indicator_info']
    date_info = processed_data['date_info']
    value_info = processed_data['value_info']
    # 2. INSERT INTO dim_time - ok
    date_key = None
    if row.get('date'):
        date_key = hash_int32(row['date'])
        conn.execute("""
            INSERT INTO dim_time (date_key, day_name, week_number, month_number, month_name)
            VALUES (?, ?, ?, ?, ?)
            ON CONFLICT DO NOTHING
        """, [
            date_key,
            date_info['day_name'],
            date_info['week_number'],
            date_info['month_number'],
            date_info['month_name']
        ])
    
    # 1. INSERT INTO dim_year - ok
    if date_info['year']:
        conn.execute("""
            INSERT INTO dim_year (year_key, date_key)
            VALUES (?, ?)
            ON CONFLICT DO NOTHING
        """, [date_info['year'], date_key])
    

    # INSERT INTO dim_country - ok
    country_key = hash_int32(region_info['country_key'])
    conn.execute("""
        INSERT INTO dim_country (country_key, country_name)
        VALUES (?, ?)
        ON CONFLICT DO NOTHING
    """, [country_key, region_info['country_name']])
    
    # 3. INSERT/SELECT dim_state - ok
    state_key = None
    if region_info['state_name']:
        state_key = hash_int32(region_info['state_code'])
        conn.execute("""
            INSERT INTO dim_state (state_key, state_name, state_code, country_key)
            VALUES (?, ?, ?, ?)
            ON CONFLICT DO NOTHING
        """, [state_key, region_info['state_name'], region_info['state_code'], country_key])
        
        # Buscar a chave que está no banco (pode já existir)
        result = conn.execute("""
            SELECT state_key FROM dim_state 
            WHERE state_code = ?
        """, [region_info['state_code']]).fetchone()
        if result:
            state_key = result[0]
    
    # 4. INSERT/SELECT dim_metro - ok 
    metro_key = None
    if region_info['metro_name']:
        metro_key = hash_int32(f"{region_info['metro_name']}_{region_info['state_code']}")
        conn.execute("""
            INSERT INTO dim_metro (metro_key, metro_name, state_key)
            VALUES (?, ?, ?)
            ON CONFLICT DO NOTHING
        """, [metro_key, region_info['metro_name'], state_key])
        
        # Buscar a chave existente
        result = conn.execute("""
            SELECT metro_key FROM dim_metro 
            WHERE metro_name = ? AND state_key = ?
        """, [region_info['metro_name'], state_key]).fetchone()
        if result:
            metro_key = result[0]
    
    # 5. INSERT/SELECT dim_county - ok
    county_key = None
    if region_info['county_name']:
        county_key = hash_int32(f"{region_info['county_name']}_{region_info['state_code']}")
        conn.execute("""
            INSERT INTO dim_county (county_key, county_name, metro_key, state_key)
            VALUES (?, ?, ?, ?)
            ON CONFLICT DO NOTHING
        """, [county_key, region_info['county_name'], metro_key, state_key])
        
        # Buscar a chave existente
        result = conn.execute("""
            SELECT county_key FROM dim_county 
            WHERE county_name = ? AND state_key = ?
        """, [region_info['county_name'], state_key]).fetchone()
        if result:
            county_key = result[0]
    
    # 6. INSERT/SELECT dim_city - ok
    city_key = None
    if region_info['city_name']:
        city_key = hash_int32(f"{region_info['city_name']}_{region_info['county_name']}_{region_info['state_code']}")
        conn.execute("""
            INSERT INTO dim_city (city_key, city_name, county_key, metro_key)
            VALUES (?, ?, ?, ?)
            ON CONFLICT DO NOTHING
        """, [city_key, region_info['city_name'], county_key, metro_key])
        
        # Buscar a chave existente
        result = conn.execute("""
            SELECT city_key FROM dim_city 
            WHERE city_name = ? AND county_key IS NOT DISTINCT FROM ?
        """, [region_info['city_name'], county_key]).fetchone()
        if result:
            city_key = result[0]
    
    # 7. INSERT/SELECT dim_neighborhood - ok
    neighborhood_key = None
    if region_info['neighborhood_name']:
        neighborhood_key = hash_int32(f"{region_info['neighborhood_name']}_{region_info['city_name']}_{region_info['state_code']}")
        conn.execute("""
            INSERT INTO dim_neighborhood (neighborhood_key, neighborhood_name, city_key)
            VALUES (?, ?, ?)
            ON CONFLICT DO NOTHING
        """, [neighborhood_key, region_info['neighborhood_name'], city_key])
        
        # Buscar a chave existente
        result = conn.execute("""
            SELECT neighborhood_key FROM dim_neighborhood 
            WHERE neighborhood_name = ? AND city_key IS NOT DISTINCT FROM ?
        """, [region_info['neighborhood_name'], city_key]).fetchone()
        if result:
            neighborhood_key = result[0]
    
    # 8. INSERT/SELECT dim_zip - ok
    zip_key = None
    if region_info['zip_key']:
        zip_key = int(region_info['zip_key'])
        conn.execute("""
            INSERT INTO dim_zip (zip_key, city_key)
            VALUES (?, ?)
            ON CONFLICT DO NOTHING
        """, [zip_key, city_key])
        
        # Buscar a chave existente
        result = conn.execute("""
            SELECT zip_key FROM dim_zip 
            WHERE zip_key = ?
        """, [zip_key]).fetchone()
        if result:
            zip_key = result[0]
    
    # 9. INSERT INTO dim_region (consolidated) - ok
    region_key = hash_int32(row['region_id'])
    conn.execute("""
        INSERT INTO dim_region (
            region_key, region_type,
            zip_key, neighborhood_key, city_key, county_key, metro_key, state_key, country_key
        )
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT DO NOTHING
    """, [
        region_key,
        region_info['region_type'],
        zip_key,
        neighborhood_key,
        city_key,
        county_key,
        metro_key,
        state_key,
        country_key
    ])
    
    # 10. INSERT INTO dim_realestate_indicator - ok
    conn.execute("""
        INSERT INTO dim_realestate_indicator (
            realestate_indicator_key, indicator_name, indicator_description, region_key
        )
        VALUES (?, ?, ?, ?)
        ON CONFLICT DO NOTHING
    """, [
        indicator_info['realestate_indicator_key'],
        indicator_info['indicator_name'],
        indicator_info['realestate_indicator_description'],
        region_key
    ])
    
    # 11. INSERT INTO dim_value - ok
    value_key = hash_int32(f"{value_info['value']}_{value_info['unit'].value}")
    conn.execute("""
        INSERT INTO dim_value (value_key, value, unit)
        VALUES (?, ?, ?)
        ON CONFLICT DO NOTHING
    """, [value_key, value_info['value'], value_info['unit'].value])
    
    # 12. INSERT INTO dim_asset (Real Estate) - diz que nao precisa 
    asset_key = 'REAL_ESTATE'
    conn.execute("""
        INSERT INTO dim_asset (asset_key, type)
        VALUES (?, 'REAL_ESTATE')
        ON CONFLICT DO NOTHING
    """, [asset_key])
    
    # 13. INSERT INTO fact_value - ok
    conn.execute("""
        INSERT INTO fact_value ( 
            value_key, asset_key, year_key, 
            socioeconomical_indicator_key, realestate_indicator_key, cryptostock_value_key
        )
        VALUES (?, ?, ?, NULL, ?, NULL)
    """, [
        value_key,
        asset_key,
        date_info['year'],
        indicator_info['realestate_indicator_key']
    ])


# Buscar dados da tabela unificada
print("Iniciando processamento de dados...")
result = conn.execute("SELECT * FROM staging_unified LIMIT 10").fetchdf()


# Processar cada linha
processed_count = 0
for idx, row in result.iterrows():
    # Processar a linha
    print(row)
    processed_row = {
        'region_info': process_region(row),
        'indicator_info': process_indicator(row),
        'date_info': process_date(row),
        'value_info': process_value(row)
    }
    
    insert_into_dim_fact(row, processed_row)
    
    processed_count += 1
    
    # Mostrar progresso
    if (processed_count) % 1000 == 0:
        print(f"Processadas {processed_count} linhas...")

print(f"\n✓ Processamento concluído: {processed_count} linhas")
print("✓ Dados inseridos nas tabelas do Data Warehouse")

Iniciando processamento de dados...
indicator_id                                          ZATT
region_id                                             2678
date                                   2009-05-31 00:00:00
value                                        286653.683766
indicator_id_1                                        ZATT
indicator         ZHVI All Homes- Top Tier Time Series ($)
category                                       Home values
region_id_1                                           2678
region_type                                         county
region               Del Norte County;CA;Crescent City, CA
Name: 0, dtype: object
indicator_id                                          ZATT
region_id                                             2358
date                                   2009-05-31 00:00:00
value                                         188791.69948
indicator_id_1                                        ZATT
indicator         ZHVI All Homes- Top Tier Time Series (

In [None]:
# Clean up staging table
conn.execute("DROP TABLE IF EXISTS staging_unified")
print("✓ Staging table cleaned up")

In [28]:
# Close connection
conn.close()
print("\n" + "="*60)
print("✓ ETL PROCESS COMPLETE!")
print("✓ Data successfully loaded into zillow_dwh.duckdb")
print("="*60)


✓ ETL PROCESS COMPLETE!
✓ Data successfully loaded into zillow_dwh.duckdb
