In [29]:
import os
import re
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

load_dotenv()
USERNAME = os.getenv("USERNAME")
PASSWORD = os.getenv("PGPASSWORD")
HOST = os.getenv("HOST")
PORT = os.getenv("PORT")
DATABASE = os.getenv("DATABASE")
URL = f"postgresql://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}"

ENGINE = create_engine(URL)

# Fetching constraints of database

In [None]:
query = """
SELECT 
    conname AS constraint_name,
    contype AS constraint_type,
    conrelid::regclass AS table_name,
    CASE 
        WHEN contype IN ('f', 'c') THEN confrelid::regclass::text
        ELSE NULL
    END AS referenced_table,
    conkey AS constraint_columns,
    confkey AS referenced_columns
FROM 
    pg_constraint
WHERE 
    conrelid::regclass::text NOT LIKE 'pg_%'
ORDER BY 
    conrelid::regclass::text, conname
"""

with ENGINE.connect() as conn:
    constraints = pd.DataFrame(conn.execute(text(query)))

constraints

# Fetching column list from each table

In [None]:
query = """
SELECT
    c.table_schema,
    c.table_name,
    string_agg(c.column_name || ' (' || c.data_type || ')', ', ' ORDER BY c.ordinal_position) AS column_list
FROM
    information_schema.columns AS c
JOIN
    information_schema.tables AS t
    ON c.table_name = t.table_name
    AND c.table_schema = t.table_schema
WHERE
    t.table_type = 'BASE TABLE'
    AND t.table_schema NOT IN ('information_schema', 'pg_catalog')
GROUP BY
    c.table_schema,
    c.table_name
ORDER BY
    c.table_schema,
    c.table_name;
"""

with ENGINE.connect() as conn:
    columns = pd.DataFrame(conn.execute(text(query)))

columns[['table_name', 'column_list']].to_csv("../metadata/tables.txt", index=False, sep="\t", header=False)
columns

# Saving constraint of database

In [32]:
with open("../metadata/constraints.txt", "a") as f:
    for col in columns['table_name']:
        consts = constraints[constraints['table_name'] == col]
        for c in consts.iterrows():
            if c[1]['constraint_type'] == 'p':
                table_name = c[1]['table_name']
                column_list = columns[columns['table_name'] == table_name]['column_list'].values[0].split(', ')
                if len(c[1]['constraint_columns']) == 1:
                    constrained_index = int(c[1]['constraint_columns'][0]-1)
                else:
                    continue
                column_name = column_list[constrained_index]
                f.write(f"Column {column_name} is a primary key of the table {table_name}\n")
            elif c[1]['constraint_type'] == 'f':

                table_name = c[1]['table_name']
                column_list = columns[columns['table_name'] == table_name]['column_list'].values[0].split(', ')
                if len(c[1]['constraint_columns']) == 1:
                    constrained_index = int(c[1]['constraint_columns'][0]-1)
                else:
                    continue
                column_name = column_list[constrained_index]
                referenced_table = c[1]['referenced_table']
                referenced_columns = c[1]['referenced_columns']
                referenced_column_list = columns[columns['table_name'] == referenced_table]['column_list'].values[0].split(', ')
                referenced_column_name = referenced_column_list[referenced_columns[0]-1]
                f.write(f"Column {column_name} is a foreign key of the table {table_name} and references column {referenced_column_name} of the table {referenced_table}\n")


# Executing PowerShell script

In [None]:
import subprocess

script_path = '../scripts/postgres.ps1'

result = subprocess.run(['powershell.exe', '-ExecutionPolicy', 'Unrestricted', '-File', script_path], capture_output=True, text=True)

print(result.stderr)

# Clearing `metadata.sql`

In [None]:
with open("../metadata/metadata.sql", "r") as f:
    metadata = f.read()
    print(metadata.strip())

In [35]:
import re

def extract_create_table_statements(file_path, output_path):
    with open(file_path, 'r') as file:
        content = file.read()
    
    # Wyrażenie regularne do znalezienia poleceń CREATE TABLE
    pattern = re.compile(r'CREATE TABLE[\s\S]*?;', re.MULTILINE)
    matches = pattern.findall(content)

    with open(output_path, 'w') as file:
        for match in matches:
            file.write(match + '\n')

# Wykonaj funkcję
extract_create_table_statements('../metadata/metadata.sql', '../metadata/metadata.sql')


In [None]:
import os
import pandas as pd
from dotenv import load_dotenv
import matplotlib.pyplot as plt
from sqlalchemy import create_engine

load_dotenv()

USERNAME = os.getenv('PGUSERNAME')
PASSWORD = os.getenv('PGPASSWORD')
HOST = os.getenv('HOST')
PORT = os.getenv('PORT')
DATABASE = os.getenv('DATABASE')
URL = f'postgresql://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}'

engine = create_engine(URL)

query = 'SELECT category.name, COUNT(film_category.film_id) AS film_count FROM film_category JOIN category ON film_category.category_id = category.category_id GROUP BY category.name ORDER BY film_count DESC NULLS LAST'

df = pd.read_sql(query, engine)

import matplotlib.pyplot as plt
import seaborn as sns

plt.style.use('ggplot')

category = df['name']
film_count = df['film_count']
colors = sns.color_palette()
plt.pie(x= film_count, labels=category, colors = colors)
plt.title("Films by category")
plt.axis('equal')
plt.tight_layout()
plt.savefig('plot.png', format='png')
plt.show()

In [None]:
data[1]