## 0.0 Imports

In [20]:
import pandas as pd
import pyodbc 
import os


from sqlalchemy import create_engine, inspect

### 0.1 Functions

In [21]:
def extract(server, database_name , path):
    # create pyodbc connection
    conn = pyodbc.connect('Driver={SQL Server};'
                          f'Server={server};'
                          'Trusted_Connection=yes;')

        
    # check connection
    if conn:
        print( "Successfull Connection" )
    else:
        print( "Connection Failded" )

    # create a cursor
    cursor = conn.cursor()

    # check if a database already exists
    database_name = database_name
    check_db = f'''
        IF NOT EXISTS ( SELECT name FROM sys.databases WHERE name = '{database_name}' )
            BEGIN
            CREATE DATABASE {database_name}
            END
    '''
    # execute check_db query
    cursor.execute( check_db )

    # create sql alchemy connection
    connection_string = f'mssql+pyodbc://{server}/{database_name}?driver=ODBC+Driver+17+for+SQL+Server'

    # create engine
    engine = create_engine( connection_string )

    # for each file in folder load csv files into database
    path = os.listdir(path)
    for file in path:
            
        file_name = os.path.join(path, file)

        table_name = os.path.basename( file_name )

        if file.endswith( '.csv' ):
            
            df = pd.read_csv( table_name, encoding = 'UTF-8' )
            
            df.to_sql( name = table_name , con = engine , if_exists= 'replace', index = False)
            
            print( f'{ table_name } has been loaded into database' )

        else:
            "File not found"

    # closing cursor and connection
    cursor.close()
    conn.close()

def transform( server_name , database_name ):

    SERVER = server_name
    # create sql alchemy connection
    connection_string = f'mssql+pyodbc://{SERVER}/{database_name}?driver=ODBC+Driver+17+for+SQL+Server'
    
    # create engine
    engine = create_engine( connection_string )

    # inspect database schema
    inspector = inspect( engine )

    # get the list of tables in the database
    tables = inspector.get_table_names()

    # create empty dict
    tables_dict = {}
    df_Lan_final = pd.DataFrame()
    
    for file in tables:
        

        if file == 'dEstruturaDRE.csv':
            # read data
            query = 'SELECT * FROM [dbo].[dEstruturaDRE.csv]'
            df_dEstruturaDre = pd.read_sql( query , engine )

            # drop na
            df_dEstruturaDre = df_dEstruturaDre.dropna(axis = 0)

            # change dtypes
            df_dEstruturaDre['id']                = df_dEstruturaDre['id'].astype( 'str' )
            df_dEstruturaDre['index']             = df_dEstruturaDre['index'].astype( 'int64' )
            df_dEstruturaDre['contaGerencial']    = df_dEstruturaDre['contaGerencial'].astype( 'str' )
            df_dEstruturaDre['subtotal']          = df_dEstruturaDre['subtotal'].astype( 'int64' )
            df_dEstruturaDre['empresa']           = df_dEstruturaDre['empresa'].astype( 'str' )


            # rename columns 
            df_dEstruturaDre = df_dEstruturaDre.rename(columns = 
                                    {'contaGerencial' : 'ManagementAccount', 
                                      'empresa' : 'Branch', 
                                      'subtotal' : 'Subtotal', 
                                      'index' : 'Index'}  )

            tables_dict[file] = df_dEstruturaDre
            print( f'{file} has been cleaned. It has {df_dEstruturaDre.shape[0]} rows ' )


        elif file == 'dPlanoConta.csv':
            
            # read data
            query = 'SELECT * FROM [dbo].[dPlanoConta.csv]'
            df_dPlanoConta = pd.read_sql( query , engine )

            # drop na
            df_dPlanoConta = df_dPlanoConta.dropna(axis = 0)

            # change dtypes
            df_dPlanoConta['mascaraDRE_id'] = df_dPlanoConta['mascaraDRE_id'].astype( 'str' )

            # rename columns
            columns = {"descricaoN1"   : "DescriptionLevel1", 
             "descricaoN2"   : "DescriptionLevel2", 
             "detalharN2"    : "DetailLevel2?",
             "mascaraDRE_id" : "IncomeStatementTemplate_id",
            "tipoLancamento" : "EntryType",
            "index"          : "Index"}

            df_dPlanoConta = df_dPlanoConta.rename( columns = columns )   

            tables_dict[file] = df_dPlanoConta
            print( f'{file} has been cleaned. It has {df_dPlanoConta.shape[0]} rows ' )


        elif file == 'fOrcamento.csv':
            
            # read data
            query = 'SELECT * FROM [dbo].[fOrcamento.csv]'
            df_fOrcamento = pd.read_sql( query , engine )
            
            # drop na
            df_fOrcamento = df_fOrcamento.dropna(axis = 0)

            # change dtypes
            df_fOrcamento['competencia_data'] = pd.to_datetime( df_fOrcamento['competencia_data'] ) 
            
            # formatting integers numbers as .00
            df_fOrcamento['valor'] = df_fOrcamento['valor'].apply(lambda x: f"{x}.00" if '.' not in x else x)
            
            # replace the last . for #
            df_fOrcamento['valor'] = df_fOrcamento['valor'].apply(lambda x: '#'.join( x.rpartition('.') [::2] ))
            
            # replace the last . for nothing
            df_fOrcamento['valor'] = df_fOrcamento['valor'].apply( lambda x: x.replace('.', '') )
            
            # replace # for .
            df_fOrcamento['valor'] = df_fOrcamento['valor'].apply( lambda x: x.replace('#', '.') )
            
            # convert value
            df_fOrcamento['valor'] = df_fOrcamento['valor'].astype( 'float64' )

            # rename columns
            columns = {"competencia_data" : "AccrualDate",
             "planoContas_id"   : "ChartOfAccounts_id",
             "valor"            : "Amount"}
            df_fOrcamento = df_fOrcamento.rename( columns =  columns )

            tables_dict[file] = df_fOrcamento
            print( f'{file} has been cleaned. It has {df_fOrcamento.shape[0]} rows ' )

        elif file == 'fPrevisao.csv':
    
            # read data
            query = 'SELECT * FROM [dbo].[fPrevisao.csv]'
            df_fPrevisao = pd.read_sql( query , engine )

            # drop na
            df_fPrevisao = df_fPrevisao.dropna(axis = 0)

            # change dtypes
            df_fPrevisao['competencia_data'] = pd.to_datetime( df_fPrevisao['competencia_data'] ) 
            
            # formatting integers numbers as .00
            df_fPrevisao['valor'] = df_fPrevisao['valor'].apply(lambda x: f"{x}.00" if '.' not in x else x)
            
            # replace the last . for #
            df_fPrevisao['valor'] = df_fPrevisao['valor'].apply(lambda x: '#'.join( x.rpartition('.') [::2] ))
            
            # replace the last . for nothing
            df_fPrevisao['valor'] = df_fPrevisao['valor'].apply( lambda x: x.replace('.', '') )
            
            # replace # for .
            df_fPrevisao['valor'] = df_fPrevisao['valor'].apply( lambda x: x.replace('#', '.') )
            
            # convert value
            df_fPrevisao['valor'] = df_fPrevisao['valor'].astype( 'float64' )
            
            # rename columns
            columns = {
                "competencia_data": "AccrualDate",
                 "planoContas_id"  : "ChartOfAccounts_id",
                 "valor"           : "Amount"}
            
            df_fPrevisao = df_fPrevisao.rename( columns = columns )

            tables_dict[file] = df_fPrevisao
            print( f'{file} has been cleaned. It has {df_fPrevisao.shape[0]} rows ' )

        elif file.startswith( 'fLan' ):
            
            # read data
            query = f'SELECT * FROM [dbo].[{file}]'
            df_fLancamento = pd.read_sql( query , engine )
            
            # drop na
            df_fLancamento = df_fLancamento.dropna(axis = 0)

            # change dtypes
            df_fLancamento['competencia_data'] = pd.to_datetime( df_fLancamento['competencia_data'] )

            # formatting integers numbers as .00
            df_fLancamento['valor'] = df_fLancamento['valor'].apply(lambda x: f"{x}.00" if '.' not in x else x)
            
            # replace the last . for #
            df_fLancamento['valor'] = df_fLancamento['valor'].apply(lambda x: '#'.join( x.rpartition('.') [::2] ))
            
            # replace the last . for nothing
            df_fLancamento['valor'] = df_fLancamento['valor'].apply( lambda x: x.replace('.', '') )
            
            # replace # for .
            df_fLancamento['valor'] = df_fLancamento['valor'].apply( lambda x: x.replace('#', '.') )
            
            # convert value
            df_fLancamento['valor'] = df_fLancamento['valor'].astype( 'float64' )
            
            # rename columns
            columns = {
                "competencia_data":"AccrualDate",
                "planoContas_id"  : "ChartOfAccounts_id",
                "valor"           : "Amount"
            }

            df_fLancamento = df_fLancamento.rename( columns = columns )

            
            
            df_Lan_final = pd.concat( [df_Lan_final , df_fLancamento] , ignore_index=True )

            print( f'Processing : {file}' )
    print( f'df_Lan_final has been cleaned. It has {df_Lan_final.shape[0]} rows.' )

    tables_dict['dfLancamentos'] = df_Lan_final


        
    return tables_dict

def load( server, origin_database , destination_database ):

    # db connections
    conn = pyodbc.connect('Driver={SQL Server};'
                      f'Server={server};'
                      'Trusted_Connection=yes;')

    print("="*100)
    print( 'Connecting Database:' )

    if conn:
        print( "Successfull Connection" )
    else:
        print( "Connection Failded" )
    
    # create a cursor
    cursor = conn.cursor()
    
    print("="*100)
    print( 'Data Cleaning:' )

    # cleaning data
    tables = transform( server , origin_database )

    print("="*100)
    print( 'Creating Destination Database:' )
 
    # check if a database already exists
    destination_database = 'DRE_Cleaned_Data' # database where cleaned data will be stored.
    check_db = f'''
        IF NOT EXISTS ( SELECT name FROM sys.databases WHERE name = '{destination_database}' )
            BEGIN
            CREATE DATABASE {destination_database}
            END
    '''
    # execute query
    cursor.execute( check_db )
    cursor.close()

    print( f'{destination_database} database has been created.' )

    print("="*100)
    print( 'Data Loading:' )

    # loading data
    for table_name, df in tables.items():

        df.to_sql( table_name , engine_db , if_exists= 'replace', index = False)
        print( f'{table_name} loaded into {database_name}.' )
    print("="*100)
        
    return None 


## 0.2 Class

In [22]:
# ========================================================================================================#
# IMPORTS
# ========================================================================================================#

import pandas as pd
import pyodbc 
import os

from sqlalchemy import create_engine, inspect

# ========================================================================================================#
# ETL CLASS
# ========================================================================================================#
class ETL:

    def __init__( self, server , file_dir , staging_database_name , destination_database_name ):

        print("="*100)
        print( 'STEP 1 :EXTRACT' )
        
        # create sql alchemy connection
        self.connection_string_sql_alchemy                = f'mssql+pyodbc://{server}/{ staging_database_name }?driver=ODBC+Driver+17+for+SQL+Server'
        self.connection_string_sql_alchemy_db_destination = f'mssql+pyodbc://{server}/{ destination_database_name }?driver=ODBC+Driver+17+for+SQL+Server'

        # connection string SQL Server
        self.connection_string_sql_server = 'Driver={SQL Server};'f'Server={server};' 'Trusted_Connection=yes;'
        
        self.server                     = server
        self.file_dir                   = file_dir
        self.staging_database_name      = staging_database_name
        self.engine                     = create_engine( self.connection_string_sql_alchemy )
        self.engine_db_destination      = create_engine( self.connection_string_sql_alchemy_db_destination )
        self.conn                       = pyodbc.connect(  self.connection_string_sql_server )
        self.destination_database_name  = destination_database_name
        self.cursor                     = self.conn.cursor()
        print("="*100)
        print( 'Connecting Database:' )

        # check connection
        if self.conn:
            print( "Successfull Connection" )
        else:
            print( "Connection Failded" )
            
        self.cursor.close()
        self.conn.close()

    
    def extract( self ):

        print("="*100)
        print( 'Checking Existing Database:' )

        # temporary connection
        conn_temp = pyodbc.connect( self.connection_string_sql_server )
        cursor_temp = conn_temp.cursor()

        # check if a database already exists
        check_db = f'''
            IF NOT EXISTS ( SELECT name FROM sys.databases WHERE name = '{self.staging_database_name}' )
                
                CREATE DATABASE {self.staging_database_name}
                
        '''
        # execute check_db query
        cursor_temp.execute(check_db)
        cursor_temp.close()
        conn_temp.close()

        print( 'Database checked' )

        # for each file in folder load csv files into database
       
        print("="*100)
        print( 'Loading Data:' )
        
        for file in os.listdir( self.file_dir ):

            file = os.path.join( self.file_dir , file ) 
            
            table_name = os.path.basename( file )
                
            if file.endswith( '.csv' ):
                
                df = pd.read_csv( file, encoding = 'UTF-8' )
                
                df.to_sql( name = table_name , con = self.engine , if_exists= 'replace', index = False)
                
                print( f'{ table_name } has been loaded into database' )
        
        # close engine connection
        self.engine.dispose()
        

        return None
        

    def transform( self ):

        print("="*100)
        print( 'STEP 2 :CLEANING DATA' )
    
        # inspect database schema
        inspector = inspect( self.engine )

        # get the list of tables in the database
        tables = inspector.get_table_names()

        # create empty dict
        tables_dict = {}
        df_Lan_final = pd.DataFrame()
        
        for file in tables:
            
            if file == 'dEstruturaDRE.csv':
                # read data
                query = 'SELECT * FROM [dbo].[dEstruturaDRE.csv]'
                df_dEstruturaDre = pd.read_sql( query , self.engine )
    
                # drop na
                df_dEstruturaDre = df_dEstruturaDre.dropna( axis = 0 )
    
                # change dtypes
                df_dEstruturaDre['id']                = df_dEstruturaDre['id'].astype( 'str' )
                df_dEstruturaDre['index']             = df_dEstruturaDre['index'].astype( 'int64' )
                df_dEstruturaDre['contaGerencial']    = df_dEstruturaDre['contaGerencial'].astype( 'str' )
                df_dEstruturaDre['subtotal']          = df_dEstruturaDre['subtotal'].astype( 'int64' )
                df_dEstruturaDre['empresa']           = df_dEstruturaDre['empresa'].astype( 'str' )
    
                # rename columns 
                df_dEstruturaDre = df_dEstruturaDre.rename(columns = 
                                        {'contaGerencial' : 'ManagementAccount', 
                                          'empresa' : 'Branch', 
                                          'subtotal' : 'Subtotal', 
                                          'index' : 'Index'}  )
    
                tables_dict[file] = df_dEstruturaDre
                print( f'{file} has been cleaned. It has {df_dEstruturaDre.shape[0]} rows ' )

            elif file == 'dPlanoConta.csv':
                
                # read data
                query = 'SELECT * FROM [dbo].[dPlanoConta.csv]'
                df_dPlanoConta = pd.read_sql( query , self.engine )
    
                # drop na
                df_dPlanoConta = df_dPlanoConta.dropna(axis = 0)
    
                # change dtypes
                df_dPlanoConta['mascaraDRE_id'] = df_dPlanoConta['mascaraDRE_id'].astype( 'str' )
    
                # rename columns
                columns = {"descricaoN1"   : "DescriptionLevel1", 
                 "descricaoN2"   : "DescriptionLevel2", 
                 "detalharN2"    : "DetailLevel2?",
                 "mascaraDRE_id" : "IncomeStatementTemplate_id",
                "tipoLancamento" : "EntryType",
                "index"          : "Index"}
    
                df_dPlanoConta = df_dPlanoConta.rename( columns = columns )   
    
                tables_dict[file] = df_dPlanoConta
                print( f'{file} has been cleaned. It has {df_dPlanoConta.shape[0]} rows ' )

            elif file == 'fOrcamento.csv':
                
                # read data
                query = 'SELECT * FROM [dbo].[fOrcamento.csv]'
                df_fOrcamento = pd.read_sql( query , self.engine )
                
                # drop na
                df_fOrcamento = df_fOrcamento.dropna(axis = 0)
    
                # change dtypes
                df_fOrcamento['competencia_data'] = pd.to_datetime( df_fOrcamento['competencia_data'] ) 
                
                # formatting integers numbers as .00
                df_fOrcamento['valor'] = df_fOrcamento['valor'].apply(lambda x: f"{x}.00" if '.' not in x else x)
                
                # replace the last . for #
                df_fOrcamento['valor'] = df_fOrcamento['valor'].apply(lambda x: '#'.join( x.rpartition('.') [::2] ))
                
                # replace the last . for nothing
                df_fOrcamento['valor'] = df_fOrcamento['valor'].apply( lambda x: x.replace('.', '') )
                
                # replace # for .
                df_fOrcamento['valor'] = df_fOrcamento['valor'].apply( lambda x: x.replace('#', '.') )
                
                # convert value
                df_fOrcamento['valor'] = df_fOrcamento['valor'].astype( 'float64' )
    
                # rename columns
                columns = {"competencia_data" : "AccrualDate",
                 "planoContas_id"   : "ChartOfAccounts_id",
                 "valor"            : "Amount"}
                df_fOrcamento = df_fOrcamento.rename( columns =  columns )
    
                tables_dict[file] = df_fOrcamento
                print( f'{file} has been cleaned. It has {df_fOrcamento.shape[0]} rows ' )

            elif file == 'fPrevisao.csv':
                            # read data
                query = 'SELECT * FROM [dbo].[fPrevisao.csv]'
                df_fPrevisao = pd.read_sql( query , self.engine )
    
                # drop na
                df_fPrevisao = df_fPrevisao.dropna(axis = 0)
    
                # change dtypes
                df_fPrevisao['competencia_data'] = pd.to_datetime( df_fPrevisao['competencia_data'] ) 
                
                # formatting integers numbers as .00
                df_fPrevisao['valor'] = df_fPrevisao['valor'].apply(lambda x: f"{x}.00" if '.' not in x else x)
                
                # replace the last . for #
                df_fPrevisao['valor'] = df_fPrevisao['valor'].apply(lambda x: '#'.join( x.rpartition('.') [::2] ))
                
                # replace the last . for nothing
                df_fPrevisao['valor'] = df_fPrevisao['valor'].apply( lambda x: x.replace('.', '') )
                
                # replace # for .
                df_fPrevisao['valor'] = df_fPrevisao['valor'].apply( lambda x: x.replace('#', '.') )
                
                # convert value
                df_fPrevisao['valor'] = df_fPrevisao['valor'].astype( 'float64' )
                
                # rename columns
                columns = {
                    "competencia_data": "AccrualDate",
                     "planoContas_id"  : "ChartOfAccounts_id",
                     "valor"           : "Amount"}
                
                df_fPrevisao = df_fPrevisao.rename( columns = columns )
    
                tables_dict[file] = df_fPrevisao
                print( f'{file} has been cleaned. It has {df_fPrevisao.shape[0]} rows ' )

            elif file.startswith( 'fLan' ):
                
                # read data
                query = f'SELECT * FROM [dbo].[{file}]'
                df_fLancamento = pd.read_sql( query , self.engine )
                
                # drop na
                df_fLancamento = df_fLancamento.dropna(axis = 0)
    
                # change dtypes
                df_fLancamento['competencia_data'] = pd.to_datetime( df_fLancamento['competencia_data'] )
    
                # formatting integers numbers as .00
                df_fLancamento['valor'] = df_fLancamento['valor'].apply(lambda x: f"{x}.00" if '.' not in x else x)
                
                # replace the last . for #
                df_fLancamento['valor'] = df_fLancamento['valor'].apply(lambda x: '#'.join( x.rpartition('.') [::2] ))
                
                # replace the last . for nothing
                df_fLancamento['valor'] = df_fLancamento['valor'].apply( lambda x: x.replace('.', '') )
                
                # replace # for .
                df_fLancamento['valor'] = df_fLancamento['valor'].apply( lambda x: x.replace('#', '.') )
                
                # convert value
                df_fLancamento['valor'] = df_fLancamento['valor'].astype( 'float64' )
                
                # rename columns
                columns = {
                    "competencia_data":"AccrualDate",
                    "planoContas_id"  : "ChartOfAccounts_id",
                    "valor"           : "Amount"
                }
    
                df_fLancamento = df_fLancamento.rename( columns = columns )
                
                df_Lan_final = pd.concat( [df_Lan_final , df_fLancamento] , ignore_index=True )
    
                print( f'Processing : {file}' )
        print( f'df_Lan_final has been cleaned. It has {df_Lan_final.shape[0]} rows.' )
    
        tables_dict['dfLancamentos'] = df_Lan_final

        return tables_dict

    def load( self , tables_dict ):

        print("="*100)
        print( 'STPE 3:LOAD' )

        print("="*100)
        print( 'Creating Destination Database:' )

        conn_temp = pyodbc.connect(f'Driver={{SQL Server}};Server={self.server};Trusted_Connection=yes;')
        cursor_temp = conn_temp.cursor()
 
        # check if a database already exists
        check_db = f'''
            IF NOT EXISTS ( SELECT name FROM sys.databases WHERE name = '{self.destination_database_name}' )
                BEGIN
                CREATE DATABASE {self.destination_database_name}
                END
        '''
        # execute query
        cursor_temp.execute( check_db )
        cursor_temp.close()
        conn_temp.close()
        

        print( f'{destination_database} database has been created.' )

        print("="*100)
        print( 'Data Loading:' )

        # loading data
        for table_name, df in tables_dict.items():
    
            df.to_sql( name = table_name , con = self.engine_db_destination , if_exists= 'replace', index = False)
            print( f'{table_name} loaded into {self.destination_database_name}.' )
            
        print("="*100)
        print( 'ETL STATUS: COMPLETED' )
        print("="*100)

        # close engine connection
        self.engine_db_destination.dispose()
            
        return None 


In [23]:
server               = 'DESKTOP-U9M4TSR'
file_dir             = 'D:/repos/ETL/data'
staging_database     = 'DRE'
destination_database = 'DRE_Cleaned'

Test = ETL( server , file_dir , staging_database , destination_database )
Test.extract()
cleaned_tables = Test.transform()
Test.load( cleaned_tables )

STEP 1 :EXTRACT
Connecting Database:
Successfull Connection
Checking Existing Database:
Database checked
Loading Data:
dEstruturaDRE.csv has been loaded into database
dPlanoConta.csv has been loaded into database
fLancamento1_ano1.csv has been loaded into database
fLancamento1_ano2.csv has been loaded into database
fLancamento1_ano3.csv has been loaded into database
fLancamento2_ano1.csv has been loaded into database
fLancamento2_ano2.csv has been loaded into database
fLancamento2_ano3.csv has been loaded into database
fLancamento3_ano1.csv has been loaded into database
fLancamento3_ano2.csv has been loaded into database
fLancamento3_ano3.csv has been loaded into database
fOrcamento.csv has been loaded into database
fPrevisao.csv has been loaded into database
STEP 2 :CLEANING DATA
dEstruturaDRE.csv has been cleaned. It has 39 rows 
dPlanoConta.csv has been cleaned. It has 129 rows 
Processing : fLancamento1_ano1.csv
Processing : fLancamento1_ano2.csv
Processing : fLancamento1_ano3.csv


## 1.0 Connecting to SQL Server

In [34]:
conn = pyodbc.connect('Driver={SQL Server};'
                      'Server=DESKTOP-U9M4TSR;'
                      'Trusted_Connection=yes;')

if conn:
    print( "Successfull Connection" )
else:
    print( "Connection Failded" )

# create a cursor
cursor = conn.cursor()

# # closing cursor and connection
# cursor.close()
# conn.close()


Successfull Connection


### 1.3 Check if a database exists

In [35]:
# check if a database already exists
database_name = 'DRE'
check_db = f'''
    IF NOT EXISTS ( SELECT name FROM sys.databases WHERE name = '{database_name}' )
        BEGIN
        CREATE DATABASE {database_name}
        END
'''
# execute query
cursor.execute( check_db )


<pyodbc.Cursor at 0x1a65a199530>

### 1.4 Loading Tables into SQL Server Database

In [31]:
SERVER = 'DESKTOP-U9M4TSR'
# create sql alchemy connection
connection_string = f'mssql+pyodbc://{SERVER}/{database_name}?driver=ODBC+Driver+17+for+SQL+Server'

# create engine
engine = create_engine( connection_string )

# for each file in folder load csv files into database
path = os.listdir()
for file in path:
    
    file_name = file

    if file.endswith( '.csv' ):
        df = pd.read_csv( file_name, encoding = 'UTF-8' )
        df.to_sql( name = file_name , con = engine , if_exists= 'replace', index = False)
        print( f'{ file_name } has been loaded into database' )

# closing cursor and connection
cursor.close()
conn.close()




ProgrammingError: Attempt to use a closed cursor.

## 2 Transform Data

### dEstruturaDRE

In [8]:
# read data
query = 'SELECT * FROM [dbo].[dEstruturaDRE.csv]'
df_dEstruturaDre = pd.read_sql( query , engine )

# check na
print( 'Check na' )
display( df_dEstruturaDre.isna().sum() )

print("="*50)

# drop na
df_dEstruturaDre = df_dEstruturaDre.dropna(axis = 0)

# check type
print( 'Check dtypes' )
display( df_dEstruturaDre.dtypes )

# change dtypes
df_dEstruturaDre['id']                = df_dEstruturaDre['id'].astype( 'str' )
df_dEstruturaDre['index']             = df_dEstruturaDre['index'].astype( 'int64' )
df_dEstruturaDre['contaGerencial']    = df_dEstruturaDre['contaGerencial'].astype( 'str' )
df_dEstruturaDre['subtotal']          = df_dEstruturaDre['subtotal'].astype( 'int64' )
df_dEstruturaDre['empresa']           = df_dEstruturaDre['empresa'].astype( 'str' )

print("="*50)

# check data type after transformations
print( 'Check dtypes after changing dtypes' )
display( df_dEstruturaDre.dtypes )

# rename columns 
df_dEstruturaDre = df_dEstruturaDre.rename(columns = 
                        {'contaGerencial' : 'ManagementAccount', 
                          'empresa' : 'Branch', 
                          'subtotal' : 'Subtotal', 
                          'index' : 'Index'}  )

Check na


id                955
index             955
contaGerencial    955
subtotal          955
empresa           955
dtype: int64

Check dtypes


id                float64
index             float64
contaGerencial     object
subtotal          float64
empresa            object
dtype: object

Check dtypes after changing dtypes


id                object
index              int64
contaGerencial    object
subtotal           int64
empresa           object
dtype: object

### dPlanoConta

In [9]:
# read data
query = 'SELECT * FROM [dbo].[dPlanoConta.csv]'
df_dPlanoConta = pd.read_sql( query , engine )

# check na
print( 'Check na' )
display( df_dPlanoConta.isna().sum() )

print("="*50)

# drop na
df_dPlanoConta = df_dPlanoConta.dropna(axis = 0)

# check type
print( 'Check dtypes' )
display( df_dPlanoConta.dtypes )

# change dtypes
df_dPlanoConta['mascaraDRE_id'] = df_dPlanoConta['mascaraDRE_id'].astype( 'str' )

print("="*50)

# check data type after transformations
print( 'Check dtypes after changing dtypes' )
display( df_dPlanoConta.dtypes )

# rename columns
columns = {"descricaoN1"   : "DescriptionLevel1", 
 "descricaoN2"   : "DescriptionLevel2", 
 "detalharN2"    : "DetailLevel2?",
 "mascaraDRE_id" : "IncomeStatementTemplate_id",
"tipoLancamento" : "EntryType",
"index"          : "Index"}

df_dPlanoConta = df_dPlanoConta.rename( columns = columns )   


Check na


id                0
index             0
descricaoN1       0
descricaoN2       0
detalharN2        0
mascaraDRE_id     0
tipoLancamento    0
dtype: int64

Check dtypes


id                 object
index               int64
descricaoN1        object
descricaoN2        object
detalharN2          int64
mascaraDRE_id     float64
tipoLancamento      int64
dtype: object

Check dtypes after changing dtypes


id                object
index              int64
descricaoN1       object
descricaoN2       object
detalharN2         int64
mascaraDRE_id     object
tipoLancamento     int64
dtype: object

### fOrcamento

In [10]:
# read data
query = 'SELECT * FROM [dbo].[fOrcamento.csv]'
df_fOrcamento = pd.read_sql( query , engine )

# check na
print( 'Check na' )
display( df_fOrcamento.isna().sum() )

# drop na
df_fOrcamento = df_fOrcamento.dropna(axis = 0)

print("="*50)

# check type
print( 'Check dtypes' )
display( df_fOrcamento.dtypes )

# change dtypes
df_fOrcamento['competencia_data'] = pd.to_datetime( df_fOrcamento['competencia_data'] ) 

# formatting integers numbers as .00
df_fOrcamento['valor'] = df_fOrcamento['valor'].apply(lambda x: f"{x}.00" if '.' not in x else x)

# replace the last . for #
df_fOrcamento['valor'] = df_fOrcamento['valor'].apply(lambda x: '#'.join( x.rpartition('.') [::2] ))

# replace the last . for nothing
df_fOrcamento['valor'] = df_fOrcamento['valor'].apply( lambda x: x.replace('.', '') )

# replace # for .
df_fOrcamento['valor'] = df_fOrcamento['valor'].apply( lambda x: x.replace('#', '.') )

# convert value
df_fOrcamento['valor'] = df_fOrcamento['valor'].astype( 'float64' )


print("="*50)

# check data type after transformations
print( 'Check dtypes after changing dtypes' )
display( df_fOrcamento.dtypes )

# rename columns
columns = {"competencia_data" : "AccrualDate",
 "planoContas_id"   : "ChartOfAccounts_id",
 "valor"            : "Amount"}
df_fOrcamento = df_fOrcamento.rename( columns =  columns )

Check na


competencia_data    0
planoContas_id      0
valor               0
dtype: int64

Check dtypes


competencia_data    object
planoContas_id      object
valor               object
dtype: object

Check dtypes after changing dtypes


competencia_data    datetime64[ns]
planoContas_id              object
valor                      float64
dtype: object

### fPrevisao

In [11]:
# read data
query = 'SELECT * FROM [dbo].[fPrevisao.csv]'
df_fPrevisao = pd.read_sql( query , engine )

# check na
print( 'Check na' )
display( df_fPrevisao.isna().sum() )

# drop na
df_fPrevisao = df_fPrevisao.dropna(axis = 0)

print("="*50)

# check type
print( 'Check dtypes' )
display( df_fPrevisao.dtypes )

# change dtypes
df_fPrevisao['competencia_data'] = pd.to_datetime( df_fPrevisao['competencia_data'] ) 

# formatting integers numbers as .00
df_fPrevisao['valor'] = df_fPrevisao['valor'].apply(lambda x: f"{x}.00" if '.' not in x else x)

# replace the last . for #
df_fPrevisao['valor'] = df_fPrevisao['valor'].apply(lambda x: '#'.join( x.rpartition('.') [::2] ))

# replace the last . for nothing
df_fPrevisao['valor'] = df_fPrevisao['valor'].apply( lambda x: x.replace('.', '') )

# replace # for .
df_fPrevisao['valor'] = df_fPrevisao['valor'].apply( lambda x: x.replace('#', '.') )

# convert value
df_fPrevisao['valor'] = df_fPrevisao['valor'].astype( 'float64' )

print("="*50)

# check data type after transformations
print( 'Check dtypes after changing dtypes' )
display( df_fPrevisao.dtypes )

# rename columns
columns = {
    "competencia_data": "AccrualDate",
     "planoContas_id"  : "ChartOfAccounts_id",
     "valor"           : "Amount"}

df_fPrevisao = df_fPrevisao.rename( columns = columns )


Check na


competencia_data    0
planoContas_id      0
valor               0
dtype: int64

Check dtypes


competencia_data    object
planoContas_id      object
valor               object
dtype: object

Check dtypes after changing dtypes


competencia_data    datetime64[ns]
planoContas_id              object
valor                      float64
dtype: object

### fLancamentos

In [12]:
# read data
query = 'SELECT * FROM [dbo].[fLancamento1_ano1.csv]'
df_fLancamento1_ano1 = pd.read_sql( query , engine )

# check na
print( 'Check na' )
display( df_fLancamento1_ano1.isna().sum() )

# drop na
df_fLancamento1_ano1 = df_fLancamento1_ano1.dropna(axis = 0)

print("="*50)

# check type
print( 'Check dtypes' )
display( df_fLancamento1_ano1.dtypes )

# change dtypes
df_fLancamento1_ano1['competencia_data'] = pd.to_datetime( df_fLancamento1_ano1['competencia_data'] )

# formatting integers numbers as .00
df_fLancamento1_ano1['valor'] = df_fLancamento1_ano1['valor'].apply(lambda x: f"{x}.00" if '.' not in x else x)

# replace the last . for #
df_fLancamento1_ano1['valor'] = df_fLancamento1_ano1['valor'].apply(lambda x: '#'.join( x.rpartition('.') [::2] ))

# replace the last . for nothing
df_fLancamento1_ano1['valor'] = df_fLancamento1_ano1['valor'].apply( lambda x: x.replace('.', '') )

# replace # for .
df_fLancamento1_ano1['valor'] = df_fLancamento1_ano1['valor'].apply( lambda x: x.replace('#', '.') )

# convert value
df_fLancamento1_ano1['valor'] = df_fLancamento1_ano1['valor'].astype( 'float64' )

print("="*50)

# check data type after transformations
print( 'Check dtypes after changing dtypes' )
display( df_fLancamento1_ano1.dtypes )

# rename columns
columns = {
    "competencia_data":"AccrualDate",
    "planoContas_id"  : "ChartOfAccounts_id",
    "valor"           : "Amount"
}

df_fLancamento1_ano1 = df_fLancamento1_ano1.rename( columns = columns )


Check na


competencia_data    0
planoContas_id      0
valor               0
dtype: int64

Check dtypes


competencia_data    object
planoContas_id      object
valor               object
dtype: object

Check dtypes after changing dtypes


competencia_data    datetime64[ns]
planoContas_id              object
valor                      float64
dtype: object

## 3 Load 

### 3.1 Connecting to SQL Server

In [13]:
server = 'DESKTOP-U9M4TSR'
conn = pyodbc.connect('Driver={SQL Server};'
                      f'Server={server};'
                      'Trusted_Connection=yes;')

if conn:
    print( "Successfull Connection" )
else:
    print( "Connection Failded" )

# create a cursor
cursor = conn.cursor()

# # closing cursor and connection
# cursor.close()
# conn.close()


Successfull Connection


### 3.2 Cleaning and transforming Data

In [14]:
server = 'DESKTOP-U9M4TSR' # server containing the database where raw data is stored
database_name = 'DRE'      # database where raw data is stored.


tables = transform( server , database_name )

dEstruturaDRE.csv has been cleaned. It has 39 rows 
dPlanoConta.csv has been cleaned. It has 129 rows 
Processing : fLancamento1_ano1.csv
Processing : fLancamento1_ano2.csv
Processing : fLancamento1_ano3.csv
Processing : fLancamento2_ano1.csv
Processing : fLancamento2_ano2.csv
Processing : fLancamento2_ano3.csv
Processing : fLancamento3_ano1.csv
Processing : fLancamento3_ano2.csv
Processing : fLancamento3_ano3.csv
fOrcamento.csv has been cleaned. It has 4644 rows 
fPrevisao.csv has been cleaned. It has 4644 rows 
df_Lan_final has been cleaned. It has 3741 rows.


### 3.3 Creating New DataBase

In [15]:
# check if a database already exists
db_destiantion = 'DRE_Cleaned' # database where cleaned data will be stored.
check_db = f'''
    IF NOT EXISTS ( SELECT name FROM sys.databases WHERE name = '{database_name}' )
        BEGIN
        CREATE DATABASE {database_name}
        END
'''
# execute query
cursor.execute( check_db )

<pyodbc.Cursor at 0x27e182a7e30>

### 3.4 Loading Data

In [16]:
conn_string_db_destination = f'mssql+pyodbc://{SERVER}/{ db_destiantion }?driver=ODBC+Driver+17+for+SQL+Server'

# create engine
engine_destination = create_engine( conn_string_db_destination )
for table_name, df in tables.items():

    df.to_sql( table_name , engine_destination , if_exists= 'replace', index = False)
    print( f'{table_name} loaded into { db_destiantion}.' )
    

dEstruturaDRE.csv loaded into DRE_Cleaned_Data.
dPlanoConta.csv loaded into DRE_Cleaned_Data.
fOrcamento.csv loaded into DRE_Cleaned_Data.
fPrevisao.csv loaded into DRE_Cleaned_Data.
dfLancamentos loaded into DRE_Cleaned_Data.


In [17]:
def load( server, origin_database , destination_database ):

    # db connections
    conn = pyodbc.connect('Driver={SQL Server};'
                      f'Server={server};'
                      'Trusted_Connection=yes;')
    conn_string_db_destination = f'mssql+pyodbc://{SERVER}/{ db_destiantion }?driver=ODBC+Driver+17+for+SQL+Server'
    # create engine
    engine_destination = create_engine( conn_string_db_destination )

    print("="*100)
    print( 'Connecting Database:' )

    if conn:
        print( "Successfull Connection" )
    else:
        print( "Connection Failded" )
    
    # create a cursor
    cursor = conn.cursor()
    
    print("="*100)
    print( 'Data Cleaning:' )

    # cleaning data
    tables = transform( server , origin_database )

    print("="*100)
    print( 'Creating Destination Database:' )
 
    # check if a database already exists
    destination_database = 'DRE_Cleaned_Data' # database where cleaned data will be stored.
    check_db = f'''
        IF NOT EXISTS ( SELECT name FROM sys.databases WHERE name = '{destination_database}' )
            BEGIN
            CREATE DATABASE {destination_database}
            END
    '''
    # execute query
    cursor.execute( check_db )
    cursor.close()

    print( f'{destination_database} database has been created.' )

    print("="*100)
    print( 'Data Loading:' )

    # loading data
    for table_name, df in tables.items():

        df.to_sql( table_name , engine_destination , if_exists= 'replace', index = False)
        print( f'{table_name} loaded into {database_name}.' )
    print("="*100)
        
    return None    

In [18]:
server = 'DESKTOP-U9M4TSR'
origin_database = 'DRE'
destination_database = 'DRE_Cleaned_Data'
load( server , origin_database , destination_database )

Connecting Database:
Successfull Connection
Data Cleaning:
dEstruturaDRE.csv has been cleaned. It has 39 rows 
dPlanoConta.csv has been cleaned. It has 129 rows 
Processing : fLancamento1_ano1.csv
Processing : fLancamento1_ano2.csv
Processing : fLancamento1_ano3.csv
Processing : fLancamento2_ano1.csv
Processing : fLancamento2_ano2.csv
Processing : fLancamento2_ano3.csv
Processing : fLancamento3_ano1.csv
Processing : fLancamento3_ano2.csv
Processing : fLancamento3_ano3.csv
fOrcamento.csv has been cleaned. It has 4644 rows 
fPrevisao.csv has been cleaned. It has 4644 rows 
df_Lan_final has been cleaned. It has 3741 rows.
Creating Destination Database:
DRE_Cleaned_Data database has been created.
Data Loading:
dEstruturaDRE.csv loaded into DRE.
dPlanoConta.csv loaded into DRE.
fOrcamento.csv loaded into DRE.
fPrevisao.csv loaded into DRE.
dfLancamentos loaded into DRE.
