 # Opis notatnika
 W tym notatniku tworzymy dedykowaną bazę danych wraz ze strukturą tabel, która zostanie uzupełniona w nastęnym notatniku.

##### Importujemy potrzebne biblioteki

In [3]:
import psycopg2
from dotenv import load_dotenv
import os

 ## Połączenie z bazą danych

In [5]:
# Ładujemy zmienne środowiskowe z pliku .env
load_dotenv('DB_pass.env')

True

In [6]:
# Używamy zmiennych środowiskowych do połączenia się z istniejącą bazą danych
connection = psycopg2.connect(
    dbname=os.getenv("O_DB_NAME"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    host=os.getenv("DB_HOST"),
    port=os.getenv("DB_PORT")
)

connection.autocommit = True
cursor = connection.cursor()

# Tworzymy nową bazę danych
cursor.execute("CREATE DATABASE orange;")
connection.close()

In [7]:
# Używamy zmiennych środowiskowych do połączenia się z nowo utworzoną bazą danych
connection = psycopg2.connect(
    dbname=os.getenv("N_DB_NAME"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    host=os.getenv("DB_HOST"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

 ## Wczytanie pliku `database_schema.sql`
 Z katalogu `sql` wczytujemy plik `database_schema.sql`

In [9]:
# Wczytujemy strukturę bazy danych z pliku .sql
with open('../sql/database_schema.sql', 'r') as file:
    schema_sql = file.read()

In [10]:
print(schema_sql)

DROP TABLE IF EXISTS public.baza1 CASCADE;
CREATE TABLE IF NOT EXISTS public.baza1
(
    ID BIGINT,
    NUMBER CHAR(9),
    END_DT TIMESTAMP,
    SEGMENT VARCHAR(10)
)
;

CREATE OR REPLACE VIEW public.baza1_blocker AS
SELECT 1 FROM public.baza1 LIMIT 1;

DROP TABLE IF EXISTS public.baza2 CASCADE;
CREATE TABLE IF NOT EXISTS public.baza2
(
    ID BIGINT,
    NUMBER CHAR(9),
    PLAN VARCHAR(3)
)
;

CREATE OR REPLACE VIEW public.baza2_blocker AS
SELECT 1 FROM public.baza2 LIMIT 1


In [11]:
# Dzielimy zawartość pliku na poszczególne komendy zakończone średnikiem
sql_commands = schema_sql.split(';')

In [12]:
# Dodajemy średnik na końcu każdej komendy, czyścimy tekst z niepotrzbnych znaków białych
sql_commands = [command.strip() + ';' for command in sql_commands]

In [13]:
sql_commands

['DROP TABLE IF EXISTS public.baza1 CASCADE;',
 'CREATE TABLE IF NOT EXISTS public.baza1\n(\n    ID BIGINT,\n    NUMBER CHAR(9),\n    END_DT TIMESTAMP,\n    SEGMENT VARCHAR(10)\n);',
 'CREATE OR REPLACE VIEW public.baza1_blocker AS\nSELECT 1 FROM public.baza1 LIMIT 1;',
 'DROP TABLE IF EXISTS public.baza2 CASCADE;',
 'CREATE TABLE IF NOT EXISTS public.baza2\n(\n    ID BIGINT,\n    NUMBER CHAR(9),\n    PLAN VARCHAR(3)\n);',
 'CREATE OR REPLACE VIEW public.baza2_blocker AS\nSELECT 1 FROM public.baza2 LIMIT 1;']

##### W tym miejscu wczytujemy każdą z kwerend, aby zainicjować strukturę bazy danych

In [15]:
try:
    # Iterujemy przez każdą komendę SQL
    for command in sql_commands:
        cursor.execute(command)

    # Jeśli wszystkie komendy zakończą się sukcesem, zatwierdzamy transakcję
    connection.commit()
    print("Wszystkie operacje zakończyły się sukcesem, commit wykonany.")

except Exception as e:
    # W przypadku błędu, wycofujemy wszystkie zmiany
    connection.rollback()
    print(f"Wystąpił błąd: {e}. Wykonano rollback.")

finally:
    # Zamykamy kursor i połączenie w tym miejscu, aby w przypadku wystąpienia błędu połączenie nie zostało otwarte
    cursor.close()
    connection.close()

Wszystkie operacje zakończyły się sukcesem, commit wykonany.


 ### Sprawdzenie
 Uruchamiamy poniższy kod, aby sprawdzić, czy faktycznie ta baza danych została zainicjowana prawidłowo.

In [17]:
# Używamy zmiennych środowiskowych do połączenia się z nowo utworzoną bazą danych
connection = psycopg2.connect(
    dbname=os.getenv("N_DB_NAME"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    host=os.getenv("DB_HOST"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

In [18]:
def check_if_table_exists(table_name):
    msg = f"Sprawdzam czy istnieje tabela {table_name}"
    print(msg)

    query = f"select 1 from {table_name}"
    # jeżeli tabela nie istnieje, ten krok zwróci wyjątek
    cursor.execute(query)
    print('OK!')

In [19]:
tables_to_test = [
    'baza1',
    'baza2'
]

In [20]:
for table in tables_to_test:
    check_if_table_exists(table)

Sprawdzam czy istnieje tabela baza1
OK!
Sprawdzam czy istnieje tabela baza2
OK!


In [21]:
connection.close()
msg = "Wszystko wygląda OK :) Przechodzimy do kolejnego zadania."
print(msg)

Wszystko wygląda OK :) Przechodzimy do kolejnego zadania.
