In [21]:
import pandas as pd
import os
from dotenv import load_dotenv
from sqlalchemy import inspect, create_engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import text
import json
from datetime import datetime
import time

In [22]:
table_names = {
    'ft_balance_f': 'utf-8',
    'ft_posting_f': 'utf-8',
    'md_account_d': 'utf-8',
    'md_currency_d': 'mbcs',
    'md_exchange_rate_d': 'utf-8',
    'md_ledger_account_s': 'oem'
}

load_dotenv(".env")

True

In [16]:
def connect_to_db(db_name, db_user, db_password, db_host, db_port):
    """Создание движка для подключения к базе данных

        :param db_name: название базы данных
        :type db_name: str

        :param db_user: имя пользователя для подключения к бд
        :type db_user: str

        :param db_password: пароль пользователя для подключения к бд
        :type db_password: str

        :param db_host: адрес сервера базы данных
        :type db_host: str

        :param db_port: порт для подключения
        :type db_port: str
        
        :rtype: объект класса Engine
        :return: движок, отвечающий за взаимодействие с базой данных
    """
    try:
        engine = create_engine(
            f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
        return engine
    except ConnectionError as error:
        print(f'Unable to connect to the server: {error}')

In [4]:
def extract_data(path_file, code):
    """Извлечение данных

        :param path_file: путь к файлу
        :type path_file: str

        :param code: кодировка файла
        :type code: str

        :rtype: DataFrame
        :return: извлеченные данные из csv файла
    """
    data = pd.read_csv(path_file, sep=';', encoding=code, na_filter=False) 
    return data

In [5]:
def transform_data(data, table_name):
     """Трансформация данных

          :param data: извлеченные данные из csv файла
          :type data: DataFrame

          :param table_name: имя файла
          :type table_name: str
     """
     data.rename(columns=lambda x: x.lower(), inplace=True)
     data.drop(data.columns[[0]], axis=1 , inplace=True)

     if table_name == 'md_currency_d':
          data.loc[data['currency_code'] == '999', 'code_iso_char'] = 'XXX'

In [6]:
def load_to_db(data, table_name, engine):
    """Загрузка данных в базу данных

        :param data: извлеченные данные из csv файла
        :type data: DataFrame

        :param table_name: имя файла
        :type table_name: str

        :param engine: движок, отвечающий за взаимодействие с базой данных
        :type engine: объект класса Engine
    """
    inspector = inspect(engine)
    primary_key_columns = inspector.get_pk_constraint(table_name=table_name, schema='ds')['constrained_columns']
     
    data_dict = data.to_dict(orient='records')
    columns = ', '.join(f':{column}' for column in list(data))

    sql_query = text(f"""
            INSERT INTO ds.{table_name} ({', '.join(list(data))})
            VALUES ({columns}) 
            ON CONFLICT ({', '.join(primary_key_columns)}) 
            DO UPDATE SET ({', '.join(list(data))}) = ({columns})
        """)
    
    try:
        with engine.connect() as conn:
            conn.execute(sql_query, data_dict)
            conn.commit()
    except IntegrityError as error:
        print(f'Error adding/updating records: {error}')

In [25]:
database_name = os.environ.get('DB_NAME')
user_name = os.environ.get('DB_USER')
password = os.environ.get('DB_PASSWORD')
host = os.environ.get('DB_HOST')
port = os.environ.get('DB_PORT')

engine = connect_to_db(database_name, user_name, password, host, port)  

In [26]:
def etl_process():    
    for table_name, code in table_names.items():
        start_time = datetime.now()

        with engine.connect() as conn:
            
            try:
                data = extract_data(f'csv_file/{table_name}.csv', code)
            except Exception as error:
                json_str = json.dumps({"error_message": str(error)})
                conn.execute(text(f"""
                            CALL logs.insert_etl_logs(
                                '{start_time}',
                                'Error while extracting data from csv file {table_name}.csv',
                                'ERROR',
                                $${json_str}$$)
                            """))
                conn.commit()
                return

            try:    
                transform_data(data, table_name)
            except Exception as error:
                json_str = json.dumps({"error_message": str(error)})
                conn.execute(text(f"""
                            CALL logs.insert_etl_logs(
                                '{start_time}',
                                'Error while processing data from file {table_name}.csv',
                                'ERROR',
                                $${json_str}$$)
                            """))
                conn.commit()
                return
        
            try:
                load_to_db(data, table_name, engine)
            except Exception as error:
                json_str = json.dumps({"error_message": str(error)})
                conn.execute(text(f"""
                            CALL logs.insert_etl_logs(
                                '{start_time}',
                                'Error loading data into database {table_name} table',
                                'ERROR',
                                $${json_str}$$)
                            """))
                conn.commit()
                return
            
            time.sleep(5)
            conn.execute(text(f"""
                        CALL logs.insert_etl_logs(
                            '{start_time}',
                            'ETL process table {table_name} completed successfully',
                            'INTO',
                            null)
                        """))
            conn.commit()

In [28]:
etl_process()