In [1]:
import pandas as pd


sales_df = pd.read_csv("seeds/sales_raw.csv", sep=';')
print(sales_df.columns)

zipcode_df = pd.read_csv("seeds/zipcode_raw.csv", sep=',', dtype={"ZIPCODE": "str"})
print(zipcode_df.columns)

distinct_zipcodes = zipcode_df['ZIPCODE'].unique().tolist()

meteo_df = pd.read_csv("seeds/meteo_raw.csv", sep=';', dtype={"zipcode": "str", "date": "str"})
#print(meteo_df.shape[0])
print(meteo_df.columns)

meteo_df['date'] = pd.to_datetime(meteo_df['date'], errors='coerce').dt.date


filtered_meteo_df = meteo_df.loc[
    (meteo_df['zipcode'].isin(distinct_zipcodes)) & (meteo_df['date'] >= pd.to_datetime('2024-12-01').date())
]

filtered_meteo_df.to_csv("seeds/meteo_raw.csv", sep=';', index=False)


# Now you can work with the filtered DataFrame
#print(filtered_meteo_df.shape[0])



Index(['LEAD_ID', 'FINANCING_TYPE', 'CURRENT_PHASE', 'PHASE_PRE_KO',
       'IS_MODIFIED', 'OFFER_SENT_DATE', 'CONTRACT_1_DISPATCH_DATE',
       'CONTRACT_2_DISPATCH_DATE', 'CONTRACT_1_SIGNATURE_DATE',
       'CONTRACT_2_SIGNATURE_DATE', 'VISIT_DATE', 'TECHNICAL_REVIEW_DATE',
       'PROJECT_VALIDATION_DATE', 'SALE_DISMISSAL_DATE', 'KO_DATE', 'ZIPCODE',
       'VISITING_COMPANY', 'KO_REASON', 'INSTALLATION_PEAK_POWER_KW',
       'INSTALLATION_PRICE', 'N_PANELS', 'CUSOMER_TYPE'],
      dtype='object')
Index(['ZIPCODE', 'ZC_LATITUDE', 'ZC_LONGITUDE', 'AUTONOMOUS_COMMUNITY',
       'AUTONOMOUS_COMMUNITY_NK', 'PROVINCE'],
      dtype='object')
Index(['date', 'temperature', 'relative_humidity', 'precipitation_rate',
       'wind_speed', 'zipcode'],
      dtype='object')


In [13]:
import subprocess
import sys

# Function to run dbt command with error handling
def run_dbt_command(command):
    try:
        result = subprocess.run(
            command, 
            capture_output=True, 
            text=True, 
            check=True  # Raises CalledProcessError if the command fails
        )
        return result.stdout, result.stderr, result.returncode
    except subprocess.CalledProcessError as e:
        # Handle the error by printing it and exiting gracefully
        print(f"Error occurred while running command: {' '.join(command)}")
        print("STDOUT:", e.stdout)
        print("STDERR:", e.stderr)


# Run dbt seed to create sales_raw, meteo_raw, and zipcode_raw tables in mysql
raw_layer_stdout, raw_layer_stderr, raw_layer_code = run_dbt_command(
    ["dbt", "seed", "--select", 
     "sales_raw", 
     "zipcode_raw", 
     "meteo_raw"]
)

# Run the dbt model for curated layer
curated_layer = subprocess.run(
    ["dbt", "run", "--select", 
     "dim_customer_type", 
     "dim_date", 
     "dim_finance_type", 
     "dim_ko_reason", 
     "dim_phases", 
     "dim_zipcode", 
     "fact_sales", 
     "fact_weather"]  # Closing the list here
)

# Run the dbt model for KPI layer
kpi_layer = subprocess.run(
    ["dbt", "run", "--select", 
     "kpi_top_ko_reasons", 
     "kpi_top_sales_month_cash", 
     "kpi_avg_installation_price", 
     "kpi_leads_province_analysis", 
     "kpi_sales_temp_correlation"]
)

# Print the command output for raw layer
print("Raw Layer - STDOUT:", raw_layer_stdout)
print("Raw Layer - STDERR:", raw_layer_stderr)

# Print the command output for curated layer
print("Curated Layer - STDOUT:", curated_layer)
print("Curated Layer - STDERR:", curated_layer)

# Print the command output for KPI layer
print("KPI Layer - STDOUT:", kpi_layer)
print("KPI Layer - STDERR:", kpi_layer)



Raw Layer - STDOUT: [0m05:15:26  Running with dbt=1.7.9
[0m05:15:26  Registered adapter: mysql=1.7.0
[0m05:15:27  Found 15 models, 3 seeds, 4 tests, 0 sources, 0 exposures, 0 metrics, 375 macros, 0 groups, 0 semantic models
[0m05:15:27  
[0m05:15:27  Concurrency: 1 threads (target='dev')
[0m05:15:27  
[0m05:15:27  1 of 3 START seed file dmbi_fa.meteo_raw ....................................... [RUN]
[0m05:22:50  1 of 3 OK loaded seed file dmbi_fa.meteo_raw ................................... [[32mINSERT 334180[0m in 443.71s]
[0m05:22:50  2 of 3 START seed file dmbi_fa.sales_raw ....................................... [RUN]
[0m05:25:26  2 of 3 OK loaded seed file dmbi_fa.sales_raw ................................... [[32mINSERT 35158[0m in 155.63s]
[0m05:25:26  3 of 3 START seed file dmbi_fa.zipcode_raw ..................................... [RUN]
[0m05:25:40  3 of 3 OK loaded seed file dmbi_fa.zipcode_raw ................................. [[32mINSERT 11407[0m in 13.84s