In [1]:
import pandas as pd
from sqlalchemy import create_engine, text
import numpy as np
import chardet
import time

def upsert_md_account_d(engine, csv_file_path):
    # Запись о готовности начать работать
    ready_to_start = time.strftime('%Y-%m-%d %H:%M:%S')
    
    # Тайм-аут 5 секунд
    time.sleep(5)
    
    # Запись о начале работы с таблицей
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    
    account_df = pd.read_csv(csv_file_path, sep=';')
    
    # Запись переменной о количестве записей в csv файле
    row_count = len(account_df)
    
    # Преобразование df для корректной отправки в бд
    account_df.columns = account_df.columns.str.lower()
    account_df['data_actual_date'] = pd.to_datetime(account_df['data_actual_date']).dt.date
    account_df['data_actual_end_date'] = pd.to_datetime(account_df['data_actual_end_date']).dt.date
    account_df['currency_code'] = account_df['currency_code'].astype(int)
    
    # Преобразование данных из DataFrame в список словарей
    records = account_df.to_dict('records')

    upsert_query = """
    INSERT INTO ds.md_account_d (data_actual_date, data_actual_end_date, account_rk, account_number, char_type, currency_rk, currency_code)
    VALUES (:data_actual_date, :data_actual_end_date, :account_rk, :account_number, :char_type, :currency_rk, :currency_code)
    ON CONFLICT (data_actual_date, account_rk)
    DO UPDATE SET
    data_actual_end_date = EXCLUDED.data_actual_end_date,
    account_number = EXCLUDED.account_number,
    char_type = EXCLUDED.char_type,
    currency_rk = EXCLUDED.currency_rk,
    currency_code = EXCLUDED.currency_code;
    """
    with engine.connect() as conn:
        conn.execute(text("SET search_path TO ds"))
        conn.execute(text(upsert_query), records)
         # Запись времени об окончании импорта записей в таблицу
        end_time = time.strftime('%Y-%m-%d %H:%M:%S')
        # Логирование информации в таблицу logs.log_info
        log_query = """
        INSERT INTO logs.log_info (table_or_function_name, ready_to_start, start_time, end_time, count_record)
        VALUES (:table_or_function_name, :ready_to_start, :start_time, :end_time, :count_record);
        """
        log_params = {
            'table_or_function_name': 'ds.md_account_d',
            'ready_to_start': ready_to_start,
            'start_time': start_time,
            'end_time': end_time,
            'count_record': row_count
        }
        conn.execute(text(log_query), log_params)
    
    print('md_account_d upsert complete')

def upsert_ft_balance_f(engine, csv_file_path):

    ready_to_start = time.strftime('%Y-%m-%d %H:%M:%S')

    time.sleep(5)
    
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    
    balance_df = pd.read_csv(csv_file_path, sep=';')
    
    balance_df.columns = balance_df.columns.str.lower()
    
    row_count = len(balance_df)

    balance_df['on_date'] = pd.to_datetime(balance_df['on_date'], format='%d.%m.%Y').dt.date
    records = balance_df.to_dict('records')

    upsert_query = """
    INSERT INTO ds.ft_balance_f (on_date, account_rk, currency_rk, balance_out)
    VALUES (:on_date, :account_rk, :currency_rk, :balance_out)
    ON CONFLICT (on_date, account_rk)
    DO UPDATE SET
    currency_rk = EXCLUDED.currency_rk,
    balance_out = EXCLUDED.balance_out;
    """
    with engine.connect() as conn:
        conn.execute(text("SET search_path TO ds"))
        conn.execute(text(upsert_query), records)
        end_time = time.strftime('%Y-%m-%d %H:%M:%S')
        log_query = """
        INSERT INTO logs.log_info (table_or_function_name, ready_to_start, start_time, end_time, count_record)
        VALUES (:table_or_function_name, :ready_to_start, :start_time, :end_time, :count_record);
        """
        log_params = {
            'table_or_function_name': 'ds.ft_balance_f',
            'ready_to_start': ready_to_start,
            'start_time': start_time,
            'end_time': end_time,
            'count_record': row_count
        }
        conn.execute(text(log_query), log_params)
        
    print('ft_balance_f upsert complete')

def upsert_md_currency_d(engine, csv_file_path):
    
    ready_to_start = time.strftime('%Y-%m-%d %H:%M:%S')
    
    time.sleep(5)
    
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    
    with open(csv_file_path, 'rb') as f:
        result = chardet.detect(f.read())
    currency_df = pd.read_csv(csv_file_path, encoding=result['encoding'], sep=';')
    
    currency_df.columns = currency_df.columns.str.lower()
    
    row_count = len(currency_df)
    
    currency_df['code_iso_char'] = currency_df['code_iso_char'].replace('˜', np.nan)
    currency_df['data_actual_date'] = pd.to_datetime(currency_df['data_actual_date']).dt.date
    currency_df['data_actual_end_date'] = pd.to_datetime(currency_df['data_actual_end_date']).dt.date
    
    records = currency_df.to_dict('records')

    upsert_query = """
    INSERT INTO ds.md_currency_d (currency_rk, data_actual_date, data_actual_end_date, currency_code, code_iso_char)
    VALUES (:currency_rk, :data_actual_date, :data_actual_end_date, :currency_code, :code_iso_char)
    ON CONFLICT (currency_rk, data_actual_date)
    DO UPDATE SET
    data_actual_end_date = EXCLUDED.data_actual_end_date,
    currency_code = EXCLUDED.currency_code,
    code_iso_char = EXCLUDED.code_iso_char;
    """
    with engine.connect() as conn:
        conn.execute(text("SET search_path TO ds"))
        conn.execute(text(upsert_query), records)
        end_time = time.strftime('%Y-%m-%d %H:%M:%S')
        log_query = """
        INSERT INTO logs.log_info (table_or_function_name, ready_to_start, start_time, end_time, count_record)
        VALUES (:table_or_function_name, :ready_to_start, :start_time, :end_time, :count_record);
        """
        log_params = {
            'table_or_function_name': 'ds.md_currency_d',
            'ready_to_start': ready_to_start,
            'start_time': start_time,
            'end_time': end_time,
            'count_record': row_count
        }
        conn.execute(text(log_query), log_params)
        
    print('md_currency_d upsert complete')

def upsert_md_exchange_rate_d(engine, csv_file_path):

    ready_to_start = time.strftime('%Y-%m-%d %H:%M:%S')
    
    time.sleep(5)
    
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    
    exchange_df = pd.read_csv(csv_file_path, sep=';')
    
    exchange_df.drop_duplicates(inplace=True)
    
    row_count = len(exchange_df)
    
    exchange_df.columns = exchange_df.columns.str.lower()
    
    exchange_df['data_actual_date'] = pd.to_datetime(exchange_df['data_actual_date']).dt.date
    exchange_df['data_actual_end_date'] = pd.to_datetime(exchange_df['data_actual_end_date']).dt.date
    records = exchange_df.to_dict('records')

    upsert_query = """
    INSERT INTO ds.md_exchange_rate_d (data_actual_date, data_actual_end_date, currency_rk, reduced_cource, code_iso_num)
    VALUES (:data_actual_date, :data_actual_end_date, :currency_rk, :reduced_cource, :code_iso_num)
    ON CONFLICT (data_actual_date, currency_rk)
    DO UPDATE SET
    data_actual_end_date = EXCLUDED.data_actual_end_date,
    reduced_cource = EXCLUDED.reduced_cource,
    code_iso_num = EXCLUDED.code_iso_num;
    """
    
    with engine.connect() as conn:
        conn.execute(text("SET search_path TO ds"))
        conn.execute(text(upsert_query), records)
        end_time = time.strftime('%Y-%m-%d %H:%M:%S')
        log_query = """
        INSERT INTO logs.log_info (table_or_function_name, ready_to_start, start_time, end_time, count_record)
        VALUES (:table_or_function_name, :ready_to_start, :start_time, :end_time, :count_record);
        """
        log_params = {
            'table_or_function_name': 'ds.md_exchange_rate_d',
            'ready_to_start': ready_to_start,
            'start_time': start_time,
            'end_time': end_time,
            'count_record': row_count
        }
        conn.execute(text(log_query), log_params)
        
    print('md_exchange_rate_d upsert complete')

def upsert_md_ledger_account_s(engine, csv_file_path):
    
    ready_to_start = time.strftime('%Y-%m-%d %H:%M:%S')
    
    time.sleep(5)
    
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    
    ledger_df = pd.read_csv(csv_file_path, sep=';')
    
    row_count = len(ledger_df)
    
    ledger_df.columns = ledger_df.columns.str.lower()
    
    ledger_df['start_date'] = pd.to_datetime(ledger_df['start_date']).dt.date
    ledger_df['end_date'] = pd.to_datetime(ledger_df['end_date']).dt.date
    
    records = ledger_df.to_dict('records')

    upsert_query = """
    INSERT INTO ds.md_ledger_account_s (chapter, chapter_name, section_number, section_name, subsection_name, ledger1_account,
    ledger1_account_name, ledger_account, ledger_account_name, characteristic, start_date, end_date)
    VALUES (:chapter, :chapter_name, :section_number, :section_name, :subsection_name, :ledger1_account,
    :ledger1_account_name, :ledger_account, :ledger_account_name, :characteristic, :start_date, :end_date)
    ON CONFLICT (ledger_account, start_date)
    DO UPDATE SET
    chapter = EXCLUDED.chapter,
    chapter_name = EXCLUDED.chapter_name,
    section_number = EXCLUDED.section_number,
    section_name = EXCLUDED.section_name,
    subsection_name = EXCLUDED.subsection_name,
    ledger1_account = EXCLUDED.ledger1_account,
    ledger1_account_name = EXCLUDED.ledger1_account_name,
    ledger_account_name = EXCLUDED.ledger_account_name,
    characteristic = EXCLUDED.characteristic,
    end_date = EXCLUDED.end_date;
    """
    with engine.connect() as conn:
        conn.execute(text("SET search_path TO ds"))
        conn.execute(text(upsert_query), records)
        end_time = time.strftime('%Y-%m-%d %H:%M:%S')
        log_query = """
        INSERT INTO logs.log_info (table_or_function_name, ready_to_start, start_time, end_time, count_record)
        VALUES (:table_or_function_name, :ready_to_start, :start_time, :end_time, :count_record);
        """
        log_params = {
            'table_or_function_name': 'ds.md_ledger_account_s',
            'ready_to_start': ready_to_start,
            'start_time': start_time,
            'end_time': end_time,
            'count_record': row_count
        }
        conn.execute(text(log_query), log_params)
        
    print('md_ledger_account_s upsert complete')

def upsert_ft_posting_f(engine, csv_file_path):

    ready_to_start = time.strftime('%Y-%m-%d %H:%M:%S')
    
    time.sleep(5)
    
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    
    posting_df = pd.read_csv(csv_file_path, sep=';')
    
    row_count = len(posting_df)
    posting_df.columns = posting_df.columns.str.lower()
    posting_df['oper_date'] = pd.to_datetime(posting_df['oper_date'], format='%d-%m-%Y').dt.date
    
    posting_df.to_sql('ft_posting_f', engine, schema='ds', if_exists='replace', index=False)
    
    end_time = time.strftime('%Y-%m-%d %H:%M:%S')
    log_query = """
        INSERT INTO logs.log_info (table_or_function_name, ready_to_start, start_time, end_time, count_record)
        VALUES (:table_or_function_name, :ready_to_start, :start_time, :end_time, :count_record);
        """
    log_params = {
        'table_or_function_name': 'ds.ft_posting_f',
        'ready_to_start': ready_to_start,
        'start_time': start_time,
        'end_time': end_time,
        'count_record': row_count
    }
    
    with engine.connect() as conn:
        conn.execute(text(log_query), log_params)
    
    print('ft_posting_f upsert complete')

def run_all_upserts():
    db_user = 'postgres'
    db_password = '1503'
    db_host = 'localhost'
    db_port = '5432'
    db_name = 'project'

    connection_string = f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'
    engine = create_engine(connection_string)

    upsert_md_account_d(engine, '/Users/nikitos/Desktop/import/md_account_d.csv')
    upsert_ft_balance_f(engine, '/Users/nikitos/Desktop/import/ft_balance_f.csv')
    upsert_md_currency_d(engine, '/Users/nikitos/Desktop/import/md_currency_d.csv')
    upsert_md_exchange_rate_d(engine, '/Users/nikitos/Desktop/import/md_exchange_rate_d.csv')
    upsert_md_ledger_account_s(engine, '/Users/nikitos/Desktop/import/md_ledger_account_s.csv')
    upsert_ft_posting_f(engine, '/Users/nikitos/Desktop/import/ft_posting_f.csv')

run_all_upserts()

md_account_d upsert complete
ft_balance_f upsert complete
md_currency_d upsert complete
md_exchange_rate_d upsert complete
md_ledger_account_s upsert complete
ft_posting_f upsert complete
