# Data Ingestion

## Introduction

I have 8 csv files that I need to ingest into my Relational Database.

Files:
- Weather.csv
- Titanic.csv
- Stations.csv
- Rides.csv
- NBA.csv
- Enderecos.txt
- Clientes.txt

### Import Libraries

In [2]:
!pip install SQLAlchemy
!pip install pymysql
!pip install python-dotenv

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Collecting python-dotenv
  Downloading python_dotenv-1.0.0-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.0


In [3]:
import os
import pymysql
import pandas as pd 
from dotenv import load_dotenv
from sqlalchemy import create_engine


### Global Variables

In [35]:
load_dotenv(dotenv_path='.env')  # Load the environment variables from the .env file

host = os.getenv('HOST')
port = os.getenv('PORT')
database = os.getenv('DATABASE')
username = os.getenv('DB_USERNAME')
password = os.getenv('PASSWORD')

root


### Establish DB Connection and Creating functions for the process

In [6]:
def connect_database(host, port, username, password, database):
    """
    Connect to database
    Returns
        engine: connection with the db
    """
    try:
        engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}')
        print('Connected to database')
        return engine      
    except Exception as e:
        print('Connection failed! Exception: ', e)

def read_source(filename, delimiter):
    """
    Read files to return dataframe pandas
    """
    try:
        df = pd.read_csv(filename, delimiter=delimiter, encoding='utf-8')
        df['date_process'] = df.apply(lambda row: pd.Timestamp.now(), axis=1)
        return df 
    except Exception as e:
        print('File reading problem: ', e)

def write_destination(df, table, engine):
    """
    Generate instance to sql
    """
    try:
        df.to_sql(name=table, con=engine, if_exists='append', index=False)
    except Exception as e:
        print('Unable to write to the table: ', e)

### Init Ingestion Process

In [37]:
def main():
    """
    Executes the main process of ingesting data into a database.
    """
    try:
        # Define conection with destination
        engine = connect_database(host, port, username, password, database)

        # Iteration in the parameter table
        tb_parameters = os.path.abspath('parameters/tb_parameters.csv')
        df = pd.read_csv(tb_parameters, delimiter=',', encoding='utf-8')

        # Iteration in the parameter table
        for row in df.iterrows():
            path_to_read = row[1]['path_read']
            delimiter = row[1]['params']
            nm_table = row[1]['table']

            print(f"Performing file ingest: {path_to_read}")
            df = read_source(path_to_read,delimiter)

            write_destination(df, nm_table, engine)
            print(f"Data ingestion in table {nm_table} has been finalized!\n\n")
    except Exception as e:
        print('Unable to write to the table: ', e)

if __name__ == '__main__':
    main()

Connected to database
Performing file ingest: ARQUIVOS_GENERICOS/NBA.csv
Data ingestion in table NBA has been finalized!


Performing file ingest: ARQUIVOS_GENERICOS/TITANIC.csv
Data ingestion in table TITANIC has been finalized!


Performing file ingest: ARQUIVOS_GENERICOS/ENDERECOS.txt
Data ingestion in table ENDERECOS has been finalized!


Performing file ingest: ARQUIVOS_GENERICOS/CLIENTES.txt
Data ingestion in table CLIENTES has been finalized!


Performing file ingest: ARQUIVOS_GENERICOS/STATIONS.csv
Data ingestion in table STATIONS has been finalized!


Performing file ingest: ARQUIVOS_GENERICOS/RIDES.csv
Data ingestion in table RIDES has been finalized!


Performing file ingest: ARQUIVOS_GENERICOS/WEATHER.csv
Data ingestion in table WEATHER has been finalized!


