In [26]:
"""
Moduł zawiera funkcje przeznaczone do pracy z bazą danych PostgreSQL.
"""

import json
import pandas as pd
import sql
import sqlalchemy as sa
import subprocess
import psycopg2
import os

def get_connection_string(config_file = "database_creds.json"):
    """
    Zwraca connection string do połączenia z bazą PostgreSQL na podstawie pliku konfiguracyjnego.
    Używana w dalszych funkcjach.
    """
    with open(config_file, encoding="utf-8") as db_con_file:
        creds = json.load(db_con_file)

    return "postgresql+psycopg2://{user}:{password}@{host}:{port}/{db_name}".format(
        user=creds['user_name'],
        password=creds['password'],
        host=creds['host_name'],
        port=creds['port_number'],
        db_name=creds['db_name']
    )

def connect_to_db(config_file = "database_creds.json"):
    """
    Łączy się z bazą danych PostgreSQL za pośrednictwem danych zawartych w pliku database_creds.json.
    """
    #%load_ext sql

    with open(config_file, encoding = "utf-8") as db_con_file:
        creds = json.load(db_con_file)

    connection_string = get_connection_string(config_file)
    
    print("Połączono z bazą.")
    #%sql $connection_string

def table_to_csv(table, csv_file, config_file = "database_creds.json"):
    """
    Przepisuje zawartość tabeli z bazy danych PostgreSQL do pliku CSV.
    """
    with open(config_file, encoding = "utf-8") as db_con_file:
        creds = json.load(db_con_file)

    connection_string = get_connection_string(config_file)
    
    engine = sa.create_engine(connection_string)

    query = f"SELECT * FROM {table}"
    df = pd.read_sql(query, engine)

    df.to_csv(csv_file, index = False, encoding = 'utf-8')

    print(f"Wyeksportowano dane z tabeli '{table}' do pliku '{csv_file}'.")
    return df.head()

def csv_to_table(table, csv_file, config_file = "database_creds.json"):
    """
    Prepisuje zawartość pliku CSV do tabeli w bazie PostgreSQL.
    Jeśli tabela o podanej nazwie już istnieje, zostanie zamieniona.
    """
    with open(config_file, encoding = "utf-8") as db_con_file:
        creds = json.load(db_con_file)

    connection_string = get_connection_string(config_file)
    
    df = pd.read_csv(csv_file, dtype = {"PESEL": str})  # Zastosowanie typu string ma na celu zachowanie zer występujących na początku.

    engine = sa.create_engine(connection_string)

    df.to_sql(table, con=engine, if_exists = 'replace', index=False)

    print(f"Zaimportowano dane do tabeli '{table}'.")

def create_backup(backup_file = "backup.bak", config_file = "database_creds.json"):
    """
    Tworzy kopię bazy danych.
    """
    with open(config_file, encoding = "utf-8") as db_con_file:
        creds = json.load(db_con_file)
    
    command = [
        "pg_dump",
        "-h", creds["host_name"],
        "-p", str(creds["port_number"]),
        "-U", creds["user_name"],
        "-F", "c",
        "-b",
        "-f", backup_file,
        creds["db_name"]
    ]
    print("Tworzę backup PostgreSQL...")
    
    env = os.environ.copy()
    env["PGPASSWORD"] = creds["password"]
    
    result = subprocess.run(command, env=env)
    
    if result.returncode == 0:
        print(f"Kopia zapisana do pliku '{backup_file}'.")
    else:
        print("Błąd podczas tworzenia kopii zapasowej.")

def clear_db(tables = None, config_file = "database_creds.json"):
    """
    Czyści zawartość bazy danych.
    """
    with open(config_file, encoding = "utf-8") as db_con_file:
        creds = json.load(db_con_file)

    connection_string = get_connection_string(config_file)
    
    engine = sa.create_engine(connection_string)
    meta = sa.MetaData()
    meta.reflect(bind = engine)

    with engine.begin() as conn:
        if tables == None:
            for table in reversed(meta.sorted_tables):
                conn.execute(sa.text(f"DROP TABLE IF EXISTS {table} CASCADE;"))
                print("Baza danych została wyczyszczona.")
        else:
            for table in tables:
                conn.execute(sa.text(f"DROP TABLE IF EXISTS {table} CASCADE;"))
                print(f"Usunięto tabelę {table}.")
