In [11]:
import pandas as pd
import os
import pyarrow
import psycopg2
from dotenv import load_dotenv
from sqlalchemy import create_engine, text, MetaData, Table, Column, Integer, Float, BigInteger, String, Date

In [2]:
os.chdir('..')


In [7]:
load_dotenv()

db_user = os.getenv("DB_USER")
db_pass = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_name = os.getenv("DB_NAME")

# Создание engine для подключения к БД

In [10]:
# пустая postgresql БД предварительно создана, называется berka_db

engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_pass}@{db_host}:5433/{db_name}')

In [4]:
# записываем структуру БД в переменную

metadata = MetaData()

## Создание и заполнение таблицы `clients`

In [None]:
# подгружаем очищенные данные

clients = pd.read_parquet("")

In [7]:
account_clean.head(3)

Unnamed: 0,account_id,district_id,frequency,date
0,576,55,monthly issuance,1993-01-01
1,3818,74,monthly issuance,1993-01-01
2,704,55,monthly issuance,1993-01-01


In [8]:
account_types = account_clean.dtypes
account_types

account_id              int64
district_id             int64
frequency              object
date           datetime64[ns]
dtype: object

In [None]:
def add_table_to_metadata(table_name: str, metadata, df_types: pd.Series):
    """
    table_name - string, name of a SQL table
    metadata - sqlalchemy object MetaData()
    df_types - pandas series with dataframe's data types. You can pass edited series with types as strings.
            To learn accepted type labels refer to keys of {'int64':Integer, 'BigInt':BigInteger, 'float64':Float, 'object':String, 'datetime64[ns]':Date}
            Series index values will be used as columns names in the SQL table
    """
    if not isinstance(df_types, pd.Series):
        raise TypeError(f'df_types must be pd.Series object, but got {type(df_types)}')
        
    # маппинг для датафрейм типов и sqlalchemy типов
    # BigInt это кастомное значение для случаев очень больших целых чисел.
    # если нужен BigInt, то тогда нужно в df_types передать серию где будет значение BigInt для соответсвующей колонки
    
    types_mapping = {'int64':Integer, 'BigInt':BigInteger, 'float64':Float, 'object':String, 'datetime64[ns]':Date}

    # создание генератора на основании типов в датафрейме и типов sqlalchemy
    # затем добавление таблицы в metadata
    return Table(table_name, metadata, *[Column(col_name, types_mapping[str(dtype)]) for col_name, dtype in df_types.items()])

In [10]:
accounts_tab = add_table_to_metadata('accounts', metadata, account_types)

In [11]:
accounts_tab.create(engine)

In [12]:
# функция добавления данных из датафрейма в имеющиюся таблицу SQL

def append_df_to_sql(df, table_name, engine, if_exists='append', index=False, chunksize: int | None=None):
    if not isinstance(table_name, str):
        raise TypeError(f'table_name must be a string, but got {type(table_name)}')
        
    df.to_sql(table_name, engine, if_exists=if_exists, index=index)

In [13]:
# добавляем данные из датафрейма в таблицу accounts

append_df_to_sql(account_clean, 'accounts', engine)

In [14]:
# функция сверки количества строк в датафрейме и строк добавленных в БД
def count_rows(engine, table_name: str, df: pd.DataFrame | None=None):

    # проверка типа для table_name
    if not isinstance(table_name, str):
        raise TypeError(f'table_name must be a string, but got {type(table_name)}')
        
    with engine.connect() as con:
        query = con.execute(text(f'SELECT COUNT(*) FROM {table_name}'))
        row_count = query.scalar() # результат запроса в виде числа
    
    if df is not None:
        df_rows = df.shape[0]
        if df_rows != row_count:
            raise ValueError(f'''Dataframe row count is not equal to SQL table row count!
Dataframe: {df_rows}
SQL table: {row_count}''')
        else:
            print(f'Dataframe and SQL table row counts are equal.\n{row_count} rows')
    else:
        print(f'{row_count} rows in the SQL table')
        return row_count

In [15]:
# сверяем количество строк в датафрейме и в БД

count_rows(engine, 'accounts', account_clean)

Dataframe and SQL table row counts are equal.
4500 rows


## Создание таблицы client

In [16]:
client_clean = df_from_csv('client_df_clean.csv', parse_dates=['birth_date'])

In [17]:
client_clean.head(3)

Unnamed: 0,client_id,district_id,birth_date,sex
0,1,18,1970-12-13,female
1,2,1,1945-02-04,male
2,3,1,1940-10-09,female


In [18]:
client_types = client_clean.dtypes
client_types

client_id               int64
district_id             int64
birth_date     datetime64[ns]
sex                    object
dtype: object

In [19]:
client_tab = add_table_to_metadata('clients', metadata, client_types)

In [20]:
client_tab.create(engine)

In [21]:
append_df_to_sql(client_clean, 'clients', engine)

In [22]:
# сверяем размеры датафрейма и БД таблицы

count_rows(engine, 'clients', client_clean)

Dataframe and SQL table row counts are equal.
5369 rows


## Создание и заполнение таблицы disposition

In [23]:
disp_clean = df_from_csv('disposition_df_clean.csv')

In [24]:
disp_clean.head(3)

Unnamed: 0,disp_id,client_id,account_id,type
0,1,1,1,OWNER
1,2,2,2,OWNER
2,3,3,2,DISPONENT


In [25]:
disp_types = disp_clean.dtypes
disp_types

disp_id        int64
client_id      int64
account_id     int64
type          object
dtype: object

In [26]:
disp_tab = add_table_to_metadata('disposition', metadata, disp_types)

In [27]:
disp_tab.create(engine)

In [28]:
# заполняем таблицу disposition

append_df_to_sql(disp_clean, 'disposition', engine)

In [29]:
# сверяем количество строк

count_rows(engine, 'disposition', disp_clean)

Dataframe and SQL table row counts are equal.
5369 rows


## Создание и заполнение таблицы permanent order

In [30]:
order_clean = df_from_csv('order_df_clean.csv')

In [31]:
order_clean.head(3)

Unnamed: 0,order_id,account_id,bank_to,account_to,amount,k_symbol
0,29401,1,YZ,87144583,2452.0,household
1,29402,2,ST,89597016,3372.7,loan payment
2,29403,2,QR,13943797,7266.0,household


In [32]:
order_types = order_clean.dtypes
order_types

order_id        int64
account_id      int64
bank_to        object
account_to      int64
amount        float64
k_symbol       object
dtype: object

In [33]:
# создаем таблицу в БД

order_tab = add_table_to_metadata('orders', metadata, order_types)

In [34]:
order_tab.create(engine)

In [35]:
# заполняем таблицу

append_df_to_sql(order_clean, 'orders', engine)

In [36]:
# проверяем целостность
count_rows(engine, 'orders', order_clean)

Dataframe and SQL table row counts are equal.
6471 rows


## Создание и заполнение таблицы transaction

In [37]:
trans_clean = df_from_csv('transaction_df_clean.csv', parse_dates=['date'], dtype={'bank':'str'})

In [38]:
trans_clean.head(3)

Unnamed: 0,trans_id,account_id,date,type,operation,amount,balance,k_symbol,bank,account
0,695247,2378,1993-01-01,credit,credit in cash,700.0,700.0,,,
1,171812,576,1993-01-01,credit,credit in cash,900.0,900.0,,,
2,207264,704,1993-01-01,credit,credit in cash,1000.0,1000.0,,,


In [39]:
trans_types = trans_clean.dtypes
trans_types

trans_id               int64
account_id             int64
date          datetime64[ns]
type                  object
operation             object
amount               float64
balance              float64
k_symbol              object
bank                  object
account              float64
dtype: object

In [40]:
trans_tab = add_table_to_metadata('transactions', metadata, trans_types)

In [41]:
trans_tab.create(engine)

In [42]:
# chunksize 1000: time taken - 68 s
append_df_to_sql(trans_clean, 'transactions', engine, chunksize=1000)

In [43]:
count_rows(engine, 'transactions', trans_clean)

Dataframe and SQL table row counts are equal.
1056320 rows


## Создание и заполнение таблицы loan

In [44]:
loan_clean = df_from_csv('loan_df_clean.csv', parse_dates=['date'])

In [45]:
loan_clean.head(3)

Unnamed: 0,loan_id,account_id,date,amount,duration,payments,status
0,5314,1787,1993-07-05,96396.0,12,8033.0,"contract finished, loan not paid"
1,5316,1801,1993-07-11,165960.0,36,4610.0,"contract finished, no problems"
2,6863,9188,1993-07-28,127080.0,60,2118.0,"contract finished, no problems"


In [46]:
loan_types = loan_clean.dtypes
loan_types

loan_id                int64
account_id             int64
date          datetime64[ns]
amount               float64
duration               int64
payments             float64
status                object
dtype: object

In [47]:
loan_tab = add_table_to_metadata('loan', metadata, loan_types)

In [48]:
loan_tab.create(engine)

In [49]:
append_df_to_sql(loan_clean, 'loan', engine)

In [50]:
count_rows(engine, 'loan', loan_clean)

Dataframe and SQL table row counts are equal.
682 rows


## Создание и заполнение таблицы credit card

In [51]:
card_clean = df_from_csv('card_df_clean.csv', parse_dates=['issued'])

In [52]:
card_clean.head(3)

Unnamed: 0,card_id,disp_id,type,issued
0,1005,9285,classic,1993-11-07
1,104,588,classic,1994-01-19
2,747,4915,classic,1994-02-05


In [53]:
card_types = card_clean.dtypes
card_types

card_id             int64
disp_id             int64
type               object
issued     datetime64[ns]
dtype: object

In [54]:
card_tab = add_table_to_metadata('cards', metadata, card_types)

In [55]:
card_tab.create(engine)

In [56]:
append_df_to_sql(card_clean, 'cards', engine)

In [57]:
count_rows(engine, 'cards', card_clean)

Dataframe and SQL table row counts are equal.
892 rows


## Создание и наполнение таблицы district (Demographic data)

In [58]:
district_clean = df_from_csv('district_df_clean.csv')

In [59]:
district_clean.head(3)

Unnamed: 0,district_code,district_name,region,population,no_of_mun_below_500,no_of_mun_between_500_1999,no_of_mun_between_2000_9999,no_of_mun_above_10000,no_of_cities,ratio_of_urban_population,avg_salary,unemployment_rate_95,unemployment_rate_96,enterpreneurs_per_1000,crimes_num_95,crimes_num_96
0,1,Hl.m. Praha,Prague,1204953,0,0,0,1,1,100.0,12541,0.29,0.43,167,85677.0,99107
1,2,Benesov,central Bohemia,88884,80,26,6,2,5,46.7,8507,1.67,1.85,132,2159.0,2674
2,3,Beroun,central Bohemia,75232,55,26,4,1,5,41.7,8980,1.95,2.21,111,2824.0,2813


In [60]:
district_types = district_clean.dtypes
district_types

district_code                    int64
district_name                   object
region                          object
population                       int64
no_of_mun_below_500              int64
no_of_mun_between_500_1999       int64
no_of_mun_between_2000_9999      int64
no_of_mun_above_10000            int64
no_of_cities                     int64
ratio_of_urban_population      float64
avg_salary                       int64
unemployment_rate_95           float64
unemployment_rate_96           float64
enterpreneurs_per_1000           int64
crimes_num_95                  float64
crimes_num_96                    int64
dtype: object

In [61]:
district_tab = add_table_to_metadata('district', metadata, district_types)

In [62]:
district_tab.create(engine)

In [63]:
append_df_to_sql(district_clean, 'district', engine)

In [64]:
count_rows(engine,'district',  district_clean)

Dataframe and SQL table row counts are equal.
77 rows


In [None]:
# создаем функцию подсчета файлов с определенным расширением в указанной директории
# она нам понадобится для проверки количества файлов с очищенными данными и созданных таблиц

def count_specific_files(dir='data/cleaned_data/', ext='.csv'):
    """
    counts visible files with specified extension. Skips temporary and hidden files.
    
    dir - relative or absolute path to directory. Default is 'data/cleaned_data/'
    ext - files extension. Default is '.csv'
    """
    if not os.path.isdir(dir):
        if os.path.isabs(dir):
            raise ValueError(f'dir argument must existing path! {dir} does not exist')
        else:
            raise ValueError(f'dir argument must existing path! {dir} is relative path. Check current directory or if path is correct')

    
    filtered_files = [file for file in os.listdir(dir) if file.endswith(ext) and not file.startswith('.') and not file.startswith('~')]

    return len(filtered_files)

In [66]:
# определяем функцию сверки количества файлов с данными с количеством таблиц

def cleaned_and_tables_count(engine, files_dir='data/cleaned_data/', file_ext='.csv', schema='public'):
    """
    counts tables in a database and counts visible files in specified directory - temporary and hidden files are skipped
    then compares the counts
    ------------------------
    engine - database engine
    ------------------------
    files_dir - directory in which files to be counted. The default is 'data/cleaned_data/'
    file_ext - files extension. The default is '.csv'
    schema - database schema in which tables must be counted. The default is 'public'
    """

    # подсоединяемся к БД. Делаем запрос, считающий количество таблиц в указанной схеме БД
    with engine.connect() as con:
        query = con.execute(text(f"""
        SELECT count(tablename)
        FROM pg_catalog.pg_tables
        WHERE schemaname = '{schema}'"""))
        tables_count = query.scalar()

    # через функцию count_specific_files() считаем количество файлов с указанны
    files_count = count_specific_files(files_dir, file_ext)

    # сравниваем количество. Выводим ошибку если не равно
    if tables_count != files_count:
        raise ValueError(f"""Files count is not equal to tables count!
Files: {files_count}
Tables: {tables_count}""")

    # если равно, то выводим сообщение и количество.
    else:
        print(f'Files count is equal to tables count. The count is {tables_count}')
    

In [67]:
# проверяем количество файлов с очищенными данными и количество созданных таблиц в БД

cleaned_and_tables_count(engine)

Files count is equal to tables count. The count is 8
