In [40]:
# Importing standard libraries
import os

# Importing third party libraries
import yaml
import pandas as pd

# Importing custom libraries
from tools.sql_tools import write_to_database
from tools.logs import log_wrap

In [132]:
# Final Column Names
FINAL_NAMES_WEATHER = {'date':'year','temperature': 'avg_temperature', 'relative_humidity': 'avg_relative_humidity',
                                               'precipitation_rate':'avg_precipitation_rate','wind_speed':'avg_wind_speed'}

# Final Columns
FINAL_COLS_SALES = ['sales_id','zipcode_id','lead_id','financing_type','current_phase','phase_pre_ko',
              'is_modified','offer_sent_date','contract_1_dispatch_date','contract_2_dispatch_date','contract_1_signature_date',
              'contract_2_signature_date','most_recent_contract_signature','visit_date','technical_review_date',
              'project_validation_date','sale_dismissal_date','ko_date','visiting_company','ko_reason',
              'installation_peak_power_kw','installation_price','n_panels','cusomer_type']
FINAL_COLS_WEATHER=['weather_id','zipcode_id','year','avg_temperature','avg_relative_humidity','avg_precipitation_rate',
                    'avg_wind_speed']

In [45]:
# Importing standard libraries
import os

# Importing third party libraries
import yaml
import pandas as pd
import mysql.connector
from sqlalchemy import create_engine

In [46]:
FILENAME = os.path.join(os.getcwd(), 'creds.yaml')

In [47]:
with open(FILENAME_creds, "r") as file:
    creds = yaml.safe_load(file)

In [None]:
with open()

In [125]:
QUERY_CREATE_ZIPCODE_DIM_TABLE ='''
CREATE TABLE IF NOT EXISTS zipcode_dim (
  zipcode_id int NOT NULL,
  zipcode varchar(50) NOT NULL,
  zc_latitude float,
  zc_longitude float,
  autonomous_community varchar(50),
  autonomous_community_nk varchar(50),
  province varchar(50)
);
'''

In [126]:
QUERY_CREATE_WEATHER_DIM_TABLE = '''
CREATE TABLE IF NOT EXISTS weather_dim (
  weather_id int NOT NULL,
  zipcode_id int NOT NULL,
  year int NOT NULL,
  avg_temperature float NOT NULL,
  avg_relative_humidity float NOT NULL,
  avg_precipitation_rate float NOT NULL,
  avg_wind_speed float NOT NULL
);
'''

In [170]:
QUERY_CREATE_SALES_FT_TABLE = '''
CREATE TABLE IF NOT EXISTS sales_fact (
  sales_id int NOT NULL,
  zipcode_id int,
  lead_id varchar(50),
  financing_type varchar(50) NOT NULL,
  current_phase varchar(50) NOT NULL,
  phase_pre_ko varchar(50) NOT NULL,
  is_modified int,
  offer_sent_date date NOT NULL,
  contract_1_dispatch_date date NOT NULL,
  contract_2_dispatch_date date NOT NULL,
  contract_1_signature_date date NOT NULL,
  contract_2_signature_date date NOT NULL,
  most_recent_contract_signature date NOT NULL,
  visit_date date NOT NULL,
  technical_review_date date NOT NULL,
  project_validation_date date NOT NULL,
  sale_dismissal_date date NOT NULL,
  ko_date date NOT NULL,
  visiting_company varchar(50) NOT NULL,
  ko_reason varchar(50) NOT NULL,
  installation_peak_power_kw float NOT NULL,
  installation_price float NOT NULL,
  n_panels smallint,
  cusomer_type varchar(50) NOT NULL
);
'''	

In [51]:
def create_table():
    connection = mysql.connector.connect(
        user = creds['mysql-db']['username'],
        password = creds['mysql-db']['password'],
        host = creds['mysql-db']['host'],
        database = creds['mysql-db']['database'],
    )
    cursor = connection.cursor()
    
    cursor.execute(QUERY_CREATE_ZIPCODE_DIM_TABLE)
    cursor.execute(QUERY_CREATE_WEATHER_DIM_TABLE)
    cursor.execute(QUERY_CREATE_SALES_FT_TABLE)
    connection.commit()
    print("Table structures created successfully.")
    
    cursor.close()
    connection.close()

In [37]:
def write_to_database(dfs_dict, if_exists='append'):
    """
    Write a dataframe into a MySql table.

    Args:
        dfs_dict: The list of tables to load to along with the dfs to insert
        if_exists (str): Default 'append'
    """

    _db_user = creds['username']
    _db_password = creds['password']
    _db_host = creds['host']
    _db_name = creds['database']
    engine = create_engine(f"mysql+pymysql://{_db_user}:{_db_password}@{_db_host}:3306/{_db_name}")
    with engine.connect() as connection:
        for table_name, df in dfs_dict.items():
            if isinstance(df, pd.DataFrame): 
                df.to_sql(table_name, con=connection, if_exists=if_exists, index=False)
                print(f"Data successfully inserted into {table_name}")
            else:
                print(f"Skipping {table_name}: Not a valid DataFrame")
    
    # return logger.info("Completed uploading all data..")

In [38]:
if __name__ == "__main__":
    create_table()
    write_to_database(dfs_dict)

Table structures created successfully.


NameError: name 'dfs_dict' is not defined

In [226]:
#-----------FOR THE BOIS----------------------

#--------LIBRARIES FOR THE BOIS (THE ONES I USED, THE USUAL ONES)-----------

#COOL LIBRARIES
import pandas as pd
import numpy as np
import os
from tools.logs import log_wrap
import logging



# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Create logger instance
logger = logging.getLogger(__name__)



# Importing SQL Queries
SQL_DIR = os.path.join(os.getcwd(),'SQL')
FILENAME_CREATE_ZIPCODE_DIM = os.path.join(SQL_DIR, r'CREATE_ZIPCODE_DIM')
with open(FILENAME_CREATE_ZIPCODE_DIM, 'r') as CREATE_ZIPCODE_DIM_FILE:
    CREATE_ZIPCODE_DIM = CREATE_ZIPCODE_DIM_FILE.read()

FILENAME_CREATE_WEATHER_DIM = os.path.join(SQL_DIR, r'CREATE_WEATHER_DIM')
with open(FILENAME_CREATE_WEATHER_DIM, 'r') as CREATE_WEATHER_DIM_FILE:
    CREATE_WEATHER_DIM = CREATE_WEATHER_DIM_FILE.read()

FILENAME_CREATE_SALES_FACT = os.path.join(SQL_DIR, r'CREATE_SALES_FACT')
with open(FILENAME_CREATE_SALES_FACT, 'r') as CREATE_SALES_FACT_FILE:
    CREATE_SALES_FACT = CREATE_SALES_FACT_FILE.read()

FILENAME_ALTER_ZIPCODE_DIM = os.path.join(SQL_DIR, r'ALTER_ZIPCODE_DIM')
with open(FILENAME_ALTER_ZIPCODE_DIM, 'r') as ALTER_ZIPCODE_DIM_FILE:
    ALTER_ZIPCODE_DIM = ALTER_ZIPCODE_DIM_FILE.read()

FILENAME_ALTER_WEATHER_DIM = os.path.join(SQL_DIR, r'ALTER_WEATHER_DIM')
with open(FILENAME_ALTER_WEATHER_DIM, 'r') as ALTER_WEATHER_DIM_FILE:
    ALTER_WEATHER_DIM = ALTER_WEATHER_DIM_FILE.read()

FILENAME_ALTER_SALES_FACT_1 = os.path.join(SQL_DIR, r'ALTER_SALES_FACT_1')
with open(FILENAME_ALTER_SALES_FACT_1, 'r') as ALTER_SALES_FACT_FILE_1:
    ALTER_SALES_FACT_1 = ALTER_SALES_FACT_FILE_1.read()

FILENAME_ALTER_SALES_FACT_2 = os.path.join(SQL_DIR, r'ALTER_SALES_FACT_2')
with open(FILENAME_ALTER_SALES_FACT_2, 'r') as ALTER_SALES_FACT_FILE_2:
    ALTER_SALES_FACT_2 = ALTER_SALES_FACT_FILE_2.read()

# SET PATHS OF 3 COOL CSVs
DATA_DIR = os.path.join(os.getcwd(),'data')
FILENAME_sales_phases_funnel_df = os.path.join(DATA_DIR, r'sale_phases_funnel.csv')
FILENAME_zipcode_df = os.path.join(DATA_DIR, r'zipcode_eae.csv')
FILENAME_meteo_df = os.path.join(DATA_DIR, r'meteo_eae.csv')

#SETTING TYPES
#Sales
SALES_TYPES = {'LEAD_ID':'str','FINANCING_TYPE':'str',
                    'CURRENT_PHASE':'str','PHASE_PRE_KO':'str',
                    'IS_MODIFIED':'bool','ZIPCODE':'str', 
                    'VISITING_COMPANY': 'str', 'KO_REASON': 'str', 
                    'INSTALLATION_PEAK_POWER_KW': 'float64', 
                    'INSTALLATION_PRICE': 'float', 
                    'N_PANELS': 'int', 'CUSOMER_TYPE': 'str' }

#Zipcosdes
ZIPCODE_TYPES = {'ZIPCODE':'str','ZC_LATITUDE':'float64',
                    'ZC_LONGITUDE':'float64','AUTONOMOUS_COMMUNITY':'str',
                    'AUTONOMOUS_COMMUNITY_NK':'str','PROVINCE':'str'}

#Meteo
METEO_TYPES = {'temperature': 'float', 'relative_humidity': 'float', 
            'precipitation_rate': 'float', 'wind_speed': 'float', 
            'zipcode': 'str' 
}





#----------FUNCTIONS FOR THE BOIS-----------

#CREATE 3 COOL DATAFRAMES FUNCTION
#Creates 3 super cool dataframes from the CSVs with the data types set from the start.
def dataFrameCreate():

    #SALES FUNNEL DATAFRAME

    #Dictionary with data types
   

    #Reading CSV to create dataframe with datatypes implemented from dictionary and additional date time datatypes.
    sales_phases_funnel_df = pd.read_csv(
        FILENAME_sales_phases_funnel_df, 
        delimiter=';', 
        dtype=SALES_TYPES,
        parse_dates=['OFFER_SENT_DATE', 'CONTRACT_1_DISPATCH_DATE', 
                    'CONTRACT_2_DISPATCH_DATE', 
                    'CONTRACT_1_SIGNATURE_DATE', 
                    'CONTRACT_2_SIGNATURE_DATE',
                    'VISIT_DATE',
                    'TECHNICAL_REVIEW_DATE',
                    'PROJECT_VALIDATION_DATE',
                    'SALE_DISMISSAL_DATE',
                    'KO_DATE'],
                    
        dayfirst=True  # This replaces the dayfirst=True in your to_datetime call
    )

    logger.info('sales_phases_funnel_df created')



    #ZIPCODE DATAFRAME

    # Reading CSV to create dataframe with datatypes implemented from dictionary
    zipcode_df = pd.read_csv(FILENAME_zipcode_df, delimiter=',', dtype=ZIPCODE_TYPES)


    logger.info('zipcodedf created')




    #METEO DATAFRAME

    # Reading CSV to create dataframe with datatypes implemented from dictionary and 
    # additional date time datatype formatted to match the ones from the sales dataframe.
    meteo_df = pd.read_csv(FILENAME_meteo_df, delimiter=';',
        dtype=METEO_TYPES, parse_dates=['date'],  # Replace with actual column name
        date_format='%Y/%m/%d %H:%M:%S.%f'  # This matches your input format
    )

    logger.info('meteo_df created')
    list_of_dfs = [sales_phases_funnel_df, zipcode_df, meteo_df]

    return list_of_dfs
    



list_of_dfs = dataFrameCreate()




#--GLOBAL CLEANING FUNCTION--


#DROPPING DUPLICATES FOR ALL DATAFRAMES

#creating the drop duplicate function
def dropDupli(dfs):
    #log
    logger.info(f'There are {dfs[0].duplicated().sum()} duplicate rows in sales_funnel_df before duplicate cleaning') 
    logger.info(f'There are {dfs[1].duplicated().sum()} duplicate rows in zipcode_df before duplicate cleaning')
    logger.info(f'There are {dfs[2].duplicated().sum()} duplicate rows in meteo_df before duplicate cleaning')  
    #DroppingDupli
    dfs[0].drop_duplicates(inplace=True)
    dfs[1].drop_duplicates(inplace=True)
    dfs[2].drop_duplicates(inplace=True)
    # Log after
    logger.info(f'There are {dfs[0].duplicated().sum()} duplicate rows in sales_funnel_df after duplicate cleaning') 
    logger.info(f'There are {dfs[1].duplicated().sum()} duplicate rows in zipcode_df after duplicate cleaning')
    logger.info(f'There are {dfs[2].duplicated().sum()} duplicate rows in meteo_df after duplicate cleaning')

    return dfs  # Return dfs instead of undefined variables



list_of_dfs = dropDupli(list_of_dfs)



# --SALES FUNNEL DATAFRAME CLEANING FUCTIONS--


#DELETE UNUSABLE LEADS FUNCTION 

# Drop rows where KO_REASON is "Unreachable"
def delete_unreachable_leads(dfs):
    dfs[0] = dfs[0][~((dfs[0]['CURRENT_PHASE'] == 'KO') & (dfs[0]['KO_REASON'] == 'Unreachable'))]
    # Reset the index of the updated DataFrame
    dfs[0].reset_index(drop=True, inplace=True)
    logger.info('Unreachable leads deleted')
    return dfs

list_of_dfs = delete_unreachable_leads(list_of_dfs)




# REMOVE OUTLIERS FUNCTION


def delete_outliers(dfs):
    # Calculate Q1 (25th percentile) and Q3 (75th percentile)
    Q1 = dfs[0]['INSTALLATION_PRICE'].quantile(0.25)
    Q3 = dfs[0]['INSTALLATION_PRICE'].quantile(0.75)
    

    # Calculate the Interquartile Range (IQR)
    IQR = Q3 - Q1

    # Identify outliers inside a new Data Frame
    outliers_df = dfs[0][(dfs[0]['INSTALLATION_PRICE'] < (Q1 - 1.5 * IQR)) | 
                                     (dfs[0]['INSTALLATION_PRICE'] > (Q3 + 1.5 * IQR))]

    # Print the number of outliers
    logger.info('outliers_df dataframe created')
    logger.info(f'Number of outliers: {len(outliers_df)}')
    
    # Update sales_phases_funnel_df to exclude the outliers
    sales_phases_funnel_df = dfs[0][~((dfs[0]['INSTALLATION_PRICE'] < (Q1 - 1.5 * IQR)) | 
                                                  (dfs[0]['INSTALLATION_PRICE'] > (Q3 + 1.5 * IQR)))]
    logger.info('outliers removed from sales_phases_funnel_df')
    dfs.append(outliers_df)
    logger.info(f'outliers_df dataframe added to list_of_dfs')
    return dfs
    


list_of_dfs = delete_outliers(list_of_dfs)


#This is how you would use it in th final etl:
'''
from tools.cleaning import dataFrameCreate, dropDupli, delete_unreachable_leads, delete_outliers



list_of_dfs = dataFrameCreate()
list_of_dfs = dropDupli(list_of_dfs)
list_of_dfs = delete_unreachable_leads(list_of_dfs) #this one we may exclude
list_of_dfs = delete_outliers(list_of_dfs)

print(list_of_dfs)
'''




@log_wrap
def transform_data(data: list, logger) -> list:
    '''
    Takes a list of dfs as arguments of size 3 and returns a list of transformed dataframes
    Order:
    [0] = Sales
    [1] = Zipcode
    [2] = Weather
    '''
    try:
        logger.info('Reading Dataframes...')
        sales_fact_df_raw = data[0]
        logger.info(f'Sales Data has {len(sales_fact_df_raw)} records.')
        zipcode_dim_df_raw = data[1]
        logger.info(f'Zipcode Data has {len(zipcode_dim_df_raw)} records.')
        weather_dim_df_raw = data[2]
        logger.info(f'Weather Data has {len(weather_dim_df_raw)} records.')
        
        logger.info(f'Processing Transformations...')
        sales_fact_df_raw.columns = sales_fact_df_raw.columns.str.lower()
        zipcode_dim_df_raw.columns = zipcode_dim_df_raw.columns.str.lower()
        weather_dim_df_raw.columns = weather_dim_df_raw.columns.str.lower()
        
        logger.info(f'Creating a PK in zipcode_dim_df_raw...')
        zipcode_dim_df_raw.insert(0,'zipcode_id',range(1, len(zipcode_dim_df_raw) + 1))
        zipcode_dim_df_raw['zipcode_id'] = zipcode_dim_df_raw['zipcode_id'].astype('int32')
        zipcode_dim_df = zipcode_dim_df_raw


        logger.info(f'Grouping weather_dim_df_raw...')
        weather_dim_df_raw['date'] = weather_dim_df_raw['date'].dt.year
        weather_dim_df_raw = weather_dim_df_raw.groupby(['date','zipcode']).mean().reset_index()
        
        logger.info(f'Adding FK zipcode_id in weather table...')
        weather_dim_df = pd.merge(weather_dim_df_raw,zipcode_dim_df_raw,on= 'zipcode', how='left')
        
        logger.info(f'Dropping null zipcode_id from weather table...')
        weather_dim_df = weather_dim_df.dropna()
        weather_dim_df['zipcode_id'] = weather_dim_df['zipcode_id'].astype('int32')
        
        logger.info(f'Creating a PK in weather_dim_df_raw...')
        weather_dim_df.insert(0,'weather_id',range(1, len(weather_dim_df) + 1))
        weather_dim_df['weather_id'] = weather_dim_df['weather_id'].astype('int32')
        
        logger.info(f'Creating a PK in sales_fact_df_raw...')
        sales_fact_df_raw.insert(0,'sales_id',range(1, len(sales_fact_df_raw) + 1))
        sales_fact_df_raw['sales_id'] = sales_fact_df_raw['sales_id'].astype('int32')
        
        logger.info(f'Adding calculated column most_recent_contract_signature to sales_fact_df...')
        sales_fact_df_raw.insert(16,'most_recent_contract_signature', \
            sales_fact_df_raw[['contract_1_signature_date', 'contract_2_signature_date']].max(axis=1))

        logger.info(f'Adding FK zipcode_id in sales table...')
        sales_fact_df = pd.merge(sales_fact_df_raw, zipcode_dim_df_raw, on='zipcode', how='left')
        
        logger.info(f'Handling column types, names and selection...')
        weather_dim_df = weather_dim_df.rename(columns=FINAL_NAMES_WEATHER)
        weather_dim_df = weather_dim_df[FINAL_COLS_WEATHER]
        sales_fact_df = sales_fact_df[FINAL_COLS_SALES]        
        
        logger.info(f'Packing data for loading...')
        list_of_transformed_dfs = [zipcode_dim_df,weather_dim_df, sales_fact_df]

        return list_of_transformed_dfs
    
    except Exception as e:
        logger.error(f'Transformation error: {e}', exc_info=True)
        raise


list_of_transformed_dfs = transform_data(list_of_dfs)

@log_wrap
def create_table(logger):
    connection = mysql.connector.connect(
        user = creds['mysql-db']['username'],
        password = creds['mysql-db']['password'],
        host = creds['mysql-db']['host'],
        database = creds['mysql-db']['database'],
    )
    cursor = connection.cursor()
    
    cursor.execute(CREATE_ZIPCODE_DIM)
    cursor.execute(CREATE_WEATHER_DIM)
    cursor.execute(CREATE_SALES_FACT)
    connection.commit()
    logger.info("Table structures created successfully.")
    
    cursor.close()
    connection.close()


dfs_dict = {
        "zipcode_dim": list_of_transformed_dfs[0],
        "weather_dim": list_of_transformed_dfs[1],
        "sales_fact": list_of_transformed_dfs[2]
    }

@log_wrap
def write_to_database(dfs_dict, logger, if_exists='replace'):
    """
    Write a dataframe into a MySql table.

    Args:
        dfs_dict: The list of tables to load to along with the dfs to insert
        if_exists (str): Default 'append'
    """

    _db_user = creds['mysql-db']['username']
    _db_password = creds['mysql-db']['password']
    _db_host = creds['mysql-db']['host']
    _db_name = creds['mysql-db']['database']
    
    engine = create_engine(f"mysql+pymysql://{_db_user}:{_db_password}@{_db_host}:3306/{_db_name}")
    with engine.connect() as connection:
        for table_name, df in dfs_dict.items():
            if isinstance(df, pd.DataFrame): 
                df.to_sql(table_name, con=connection, if_exists=if_exists, index=False)
                logger.info(f"Data successfully inserted into {table_name}")
            else:
                logger.info(f"Skipping {table_name}: Not a valid DataFrame")
    
    # return logger.info("Completed uploading all data..")


create_table()
write_to_database(dfs_dict)

@log_wrap
def create_relationships(logger):
    # Establish database connection
    connection = mysql.connector.connect(
        user = creds['mysql-db']['username'],
        password = creds['mysql-db']['password'],
        host = creds['mysql-db']['host'],
        database = creds['mysql-db']['database'],
    )

    cursor = connection.cursor()
    
    cursor.execute(ALTER_ZIPCODE_DIM)
    cursor.execute(ALTER_WEATHER_DIM)
    cursor.execute(ALTER_SALES_FACT_1)
    cursor.execute(ALTER_SALES_FACT_2)

    connection.commit()
    logger.info("ALTER TABLE query executed successfully.")

create_relationships()


2025-03-08 16:37:59,707 - __main__ - INFO - sales_phases_funnel_df created
2025-03-08 16:37:59,716 - __main__ - INFO - zipcodedf created
2025-03-08 16:38:01,836 - __main__ - INFO - meteo_df created
2025-03-08 16:38:01,945 - __main__ - INFO - There are 0 duplicate rows in sales_funnel_df before duplicate cleaning
2025-03-08 16:38:01,952 - __main__ - INFO - There are 0 duplicate rows in zipcode_df before duplicate cleaning
2025-03-08 16:38:03,100 - __main__ - INFO - There are 0 duplicate rows in meteo_df before duplicate cleaning
2025-03-08 16:38:04,196 - __main__ - INFO - There are 0 duplicate rows in sales_funnel_df after duplicate cleaning
2025-03-08 16:38:04,199 - __main__ - INFO - There are 0 duplicate rows in zipcode_df after duplicate cleaning
2025-03-08 16:38:05,516 - __main__ - INFO - There are 0 duplicate rows in meteo_df after duplicate cleaning
2025-03-08 16:38:05,548 - __main__ - INFO - Unreachable leads deleted
2025-03-08 16:38:05,556 - __main__ - INFO - outliers_df datafra

In [216]:
ALTER_ZIPCODE_TABLE = '''
ALTER TABLE zipcode_dim
MODIFY COLUMN zipcode_id INT AUTO_INCREMENT PRIMARY KEY
'''

In [217]:
ALTER_WEATHER_TABLE = '''
ALTER TABLE weather_dim
MODIFY COLUMN weather_id INT AUTO_INCREMENT PRIMARY KEY,
ADD CONSTRAINT fk_constraint_zipcode_id FOREIGN KEY (zipcode_id) REFERENCES zipcode_dim(zipcode_id)
'''


In [218]:
ALTER_SALES_TABLE_1 = '''
ALTER TABLE sales_fact
MODIFY COLUMN zipcode_id INT
'''

In [219]:
ALTER_SALES_TABLE_2 = '''
ALTER TABLE sales_fact
MODIFY COLUMN sales_id INT AUTO_INCREMENT PRIMARY KEY,
ADD CONSTRAINT fk_constraint_zipcode_id_ FOREIGN KEY (zipcode_id) REFERENCES zipcode_dim(zipcode_id)
'''