# ETL PROCESS

Database (Snowflake DW)
This Ingest process uses the Snowflake data warehouse to hold the external data.
“Snowflake is an analytics data warehouse delivered as software as a service (SaaS). Snowflake’s data warehouse does not rely on an existing database or “big data” software platform like Hadoop. Snowflake’s data warehouse uses a new SQL database engine with a unique architecture designed for the cloud.”
For more information: https://www.snowflake.com/workloads/data-warehouse-modernization/

## Definition of functions
The necessary functions for the ETL process will be defined below.


## Warehouse Functions
Next we will define the functions that will allow us to interact between Python and our Warehouse
Connection for Snowflake DW
Note: Create venv file with variable user and password, this for security of our data.
This is a Personal warehouse, but the database and schemas have been isolated to work in this specific case.

In [22]:
import snowflake.connector
import os
from dotenv import load_dotenv
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas

"""
Set up connection
"""
load_dotenv()
con = snowflake.connector.connect(
    user=os.getenv('SNOWFLAKE_USER'),
    password=os.getenv('SNOWFLAKE_PASSWORD'),
    account=os.getenv('SNOWFLAKE_ACCOUNT'),
    warehouse=os.getenv('SNOWFLAKE_WAREHOUSE'),
    database='BELVO_ANALYTICS',
    schema='RAW',

)


def get_data_from_snowflake(sql: str):
    """
    Function for get data from Snowflake, from a qwuery string
    :param sql: Query String
    :return: parsing Dataframe
    """
    cursor = con.cursor()
    cursor.execute(sql)
    result = cursor.fetch_pandas_all()
    return result


def map_cols_sf(df) -> str:
    """
    :param df:
    :return: columns remaping
    """
    columns_sql = ""
    for column in df.columns:
        if (df[column].dtype.name == "int" or df[column].dtype.name == "int64"):
            columns_sql = columns_sql + column + " int"
        elif df[column].dtype.name == "object":
            columns_sql = columns_sql + column + " varchar(16777216)"
        elif df[column].dtype.name == "datetime64[ns]":
            columns_sql = columns_sql + column + " datetime"
        elif df[column].dtype.name == "float64":
            columns_sql = columns_sql + column + " float8"
        elif df[column].dtype.name == "bool":
            columns_sql = columns_sql + column + " boolean"
        else:
            columns_sql = columns_sql + column + " varchar(16777216)"
        if df[column].name != df.columns[-1]:
            columns_sql = columns_sql + ",\n"

    return columns_sql


def send_data_to_snowflake(df, schema: str, name: str, hard_create, quote_identifiers=False):
    """
    Function for send data to Snowflake from a data Frame
    :param quote_identifiers: Optional for column String with space or special char
    :param hard_create: Optional to force creation of a table
    :param df: Input Data Frame
    :param schema: Schema to aim our table
    :param name: Table name
    """

    df_1 = df.copy()

    map_columns = map_cols_sf(df_1)

    ddl_create = f"CREATE OR REPLACE TABLE {schema.upper()}.{name.upper()} ({map_columns})"
    con.cursor().execute(ddl_create)

    write_pandas(con, df_1, table_name=name.upper(), schema=schema.upper(), quote_identifiers=quote_identifiers)


def put_data_in_snowflake( df, schema: str, table_name: str, ):
    df_1 = df.copy()
    return write_pandas(con, df_1, table_name=table_name.upper(), schema=schema.upper(), auto_create_table=True)

## Functions to handling file

In [54]:

def csv_to_df(filename):
    """
    Function for read csv File
    :param filename:
    :return: Dataframe parsing from CSV
    """
    df = pd.read_csv(filename, sep=',', encoding='UTF8')
    return df


def standardize_columns(df):
    """
    Function for standardize columns of a dataframe
    :param df:
    :return: Columns standardized
    """

    df_1 = df.copy()
    new_columns = []
    count = 1
    for x in df_1.columns:
        x.lower()
        mt = x.lower().maketrans("/'.,:¿?/()ÁÉÍÓÚáéíóú", '          aeiouaeiou')
        new_column = x.translate(mt).strip().replace(" ", "_").upper()
        new_columns.append(new_column)

    return new_columns


def get_columns_duplicated(df):
    """
    Function to get a list of columns duplicated in a dataframe
    :param df:
    :return: List of columns duplicated in a dataframe
    """
    duplicated_columns = []
    for c in df.columns:

        if df.columns.to_list().count(c) > 1:
            duplicated_columns.append(c)

    return list(set(duplicated_columns))


def rename_duplicated_columns(df):
    """
    Function for rename duplicated values in a dataframe
    :param df: Input dataframe
    :return: Dataframe with renamed columns
    """
    cols = []
    count = 1
    duplicated_columns = get_columns_duplicated(df)

    for duplicated_column in duplicated_columns:
        for column in df.columns:
            if column == duplicated_column:
                cols.append(f'{duplicated_column}_{count}')
                count += 1
                continue
            cols.append(column)
        df.columns = cols


## Let's run our pipeline :)

In [9]:
file_names = os.listdir('sources')
file_names

['contacts.csv',
 'customers.csv',
 'companies.csv',
 'owners.csv',
 'companies_deals_associations.csv',
 'contacts_deals_associations.csv',
 'deals.csv']

In [62]:

for file in file_names:

    #1.  Read local File
    file_name = f"sources/{file}"
    df_raw = csv_to_df(file_name)

    #2. Handling column names
    df_raw.columns = standardize_columns(df_raw)

    #3. Check and handling duplicated columns
    rename_duplicated_columns(df_raw)
    table_name = file.replace('.csv','')

    #4. Send raw data to Snowflake :3
    put_data_in_snowflake(df_raw, 'RAW',table_name )
    print("Everything was ok for table", table_name, " !")



Everything was ok for  contacts  !
Everything was ok for  customers  !
Everything was ok for  companies  !
Everything was ok for  owners  !
Everything was ok for  companies_deals_associations  !
Everything was ok for  contacts_deals_associations  !
Everything was ok for  deals  !
