In [1]:
import psycopg2
import time
import pandas as pd

In [2]:
# Смена кодировки для csv MD_LEDGER_ACCOUNT_S и MD_CURRENCY_D
df = pd.read_csv("data/md_ledger_account_s.csv", delimiter=";", encoding="cp866")
df.to_csv("data/md_ledger_account_s_1.csv", sep=";", encoding="utf-8", index=False)
df = pd.read_csv("data/md_currency_d.csv", delimiter=";", encoding="cp866")
df.to_csv("data/md_currency_d_1.csv", sep=";", encoding="utf-8", index=False)
del df

In [3]:
# Параметры подключеник к БД
conn_params = {
    "user": "your_username",
    "password": "your_password",
    "host": "postgres",
    "port": "5432",
    "database": "docker_postgres_db",
}

In [4]:
def load_data_to_source(table_name, file_path, conn_params, truncate=False):
    """
    Транзакция: копирование сырых данных из CSV в схему SOURCE БД и логирование процесса в LOGS.LOGS,
    откат транзакции в случае ошибки.

    Parameters:
    - table_name (str): Таблица в БД для вставки сырых данных.
    - file_path (str): Путь к CSV.
    - conn_params (dict): Параметры подключеник к БД.
    - truncate (bool): Если True, очистить таблицу перед копированием данных.

    Returns:
    None
    """
    conn = None
    df = pd.read_csv(file_path, delimiter=';')
    df.rename(columns={df.columns.values[0]: 'ID'}, inplace=True)
    columns = ','.join(df.columns.tolist())
    query = f"COPY {table_name}({columns}) FROM '{file_path}' DELIMITER ';' CSV HEADER;"
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()
        
        cursor.execute("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;")

        if truncate == True:
            cursor.execute(f"TRUNCATE {table_name};")

        cursor.execute(query)
        rows_count = cursor.rowcount
        conn.commit()

        time.sleep(5)
        
        end_time = time.strftime('%Y-%m-%d %H:%M:%S')
        status = 'Loading finished'
        error_message = None

    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error: {error}")
        if conn:
            conn.rollback()

        end_time = time.strftime('%Y-%m-%d %H:%M:%S')
        status = 'Loading failed'
        rows_count = None
        error_message = str(error)

    finally:
        if conn:
            cursor.execute(
                "INSERT INTO LOGS.LOGS (table_name, start_time, end_time, status, rows_count, file_path, error_message) "
                "VALUES (%s, %s, %s, %s, %s, %s, %s)",
                (table_name, start_time, end_time, status, rows_count, file_path, error_message)
            )
            conn.commit()
            cursor.close()
            conn.close()

In [5]:
def load_data_to_ds(table_name, file_path, conn_params):
    """
    Транзакция: вставка данных из таблицы схемы SOURCE БД в таблицу схемы DS БД и логирование процесса в LOGS.LOGS,
    откат транзакции в случае ошибки.

    Parameters:
    - table_name (str): Таблица в схеме DS БД для вставки данных из таблицы схемы SOURCE.
    - file_path (str): Путь к SQL-файлу (очистка сырых данных таблицы SOURCE, преобразование форматов, запись в режиме вставка или замена) .
    - conn_params (dict): Параметры подключеник к БД.

    Returns:
    None
    """
    conn = None
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    try:
        conn = psycopg2.connect(**conn_params)
        cursor = conn.cursor()

        cursor.execute("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;")

        with open(file_path, 'r') as file:
            query = file.read()

        cursor.execute(query)
        rows_count = cursor.rowcount  # Get the number of rows inserted
        conn.commit()

        end_time = time.strftime('%Y-%m-%d %H:%M:%S')
        status = 'Update finished'
        error_message = None

    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error: {error}")
        if conn:
            conn.rollback()

        end_time = time.strftime('%Y-%m-%d %H:%M:%S')
        status = 'Update failed'
        rows_count = None
        error_message = str(error)

    finally:
        if conn:
            cursor.execute(
                "INSERT INTO LOGS.LOGS (table_name, start_time, end_time, status, rows_count, file_path, error_message) "
                "VALUES (%s, %s, %s, %s, %s, %s, %s)",
                (table_name, start_time, end_time, status, rows_count, file_path, error_message)
            )
            conn.commit()
            cursor.close()
            conn.close()

In [6]:
# Загрузка данных в схемы SOURCE и DS
load_data_to_source('SOURCE.FT_BALANCE_F', 'data/ft_balance_f.csv', conn_params)
load_data_to_source('SOURCE.FT_POSTING_F', 'data/ft_posting_f.csv', conn_params)
load_data_to_source('SOURCE.MD_ACCOUNT_D', 'data/md_account_d.csv', conn_params)
load_data_to_source('SOURCE.MD_CURRENCY_D_1', 'data/md_currency_d_1.csv', conn_params) # проверка exception
load_data_to_source('SOURCE.MD_CURRENCY_D', 'data/md_currency_d_1.csv', conn_params) 
load_data_to_source('SOURCE.MD_EXCHANGE_RATE_D', 'data/md_exchange_rate_d.csv', conn_params)
load_data_to_source('SOURCE.MD_LEDGER_ACCOUNT_S_1', 'data/md_ledger_account_s_1.csv', conn_params) # проверка exception
load_data_to_source('SOURCE.MD_LEDGER_ACCOUNT_S', 'data/md_ledger_account_s_1.csv', conn_params)

load_data_to_ds('DS.FT_BALANCE_F', 'sql_script/DS.FT_BALANCE_F.sql', conn_params)
load_data_to_ds('DS.FT_POSTING_F', 'sql_script/DS.FT_POSTING_F.sql', conn_params)
load_data_to_ds('DS.MD_ACCOUNT_D', 'sql_script/DS.MD_ACCOUNT_D.sql', conn_params)
load_data_to_ds('DS.MD_CURRENCY_D', 'sql_script/DS.MD_CURRENCY_D1.sql', conn_params) # проверка exception
load_data_to_ds('DS.MD_CURRENCY_D', 'sql_script/DS.MD_CURRENCY_D.sql', conn_params)
load_data_to_ds('DS.MD_EXCHANGE_RATE_D', 'sql_script/DS.MD_EXCHANGE_RATE_D.sql', conn_params)
load_data_to_ds('DS.MD_LEDGER_ACCOUNT_S', 'sql_script/DS.MD_LEDGER_ACCOUNT_S1.sql', conn_params) # проверка exception
load_data_to_ds('DS.MD_LEDGER_ACCOUNT_S', 'sql_script/DS.MD_LEDGER_ACCOUNT_S.sql', conn_params)

Error: relation "source.md_currency_d_1" does not exist

Error: relation "source.md_ledger_account_s_1" does not exist

Error: [Errno 2] No such file or directory: 'sql_script/DS.MD_CURRENCY_D1.sql'
Error: [Errno 2] No such file or directory: 'sql_script/DS.MD_LEDGER_ACCOUNT_S1.sql'


In [7]:
# Проверка режима запись или замена для DS.FT_BALANCE_F
load_data_to_source('SOURCE.FT_BALANCE_F', 'data/ft_balance_f.csv', conn_params, truncate=True)
load_data_to_ds('DS.FT_BALANCE_F', 'sql_script/DS.FT_BALANCE_F.sql', conn_params)