# Librerías



In [None]:
import os
import sys
from pathlib import Path
import psycopg2
from psycopg2.extras import RealDictCursor
import pandas as pd

notebook_dir = os.getcwd() 
project_root = os.path.abspath(os.path.join(notebook_dir, '..'))
sys.path.append(project_root)

from config.config import get_db_config

pd.set_option('display.max_columns', None)

# Constantes Globales

In [5]:
DDL_COMMANDS = {
    'CREATE',
    'DROP',
    'ALTER'
}

DML_COMMANDS = {
    'INSERT',
    'UPDATE',
    'DELETE'
}

# Funciones

In [31]:
def get_create_table_as(query: str, schema: str, table_name: str) -> str:
    """
    Generate a CREATE TABLE AS query.

    Args:
        query (str): The base query to create the table from.
        schema (str): The schema name for the new table.
        table_name (str): The name of the new table.

    Returns:
        str: The complete CREATE TABLE AS query.
    """
    create_table_as_query = []

    create_table_as_query.append(f'CREATE TABLE {schema}.{table_name} AS (')
    create_table_as_query.extend(query.replace(';', '').splitlines())
    create_table_as_query.append(');')
    
    return '\n'.join(create_table_as_query)



def execute_commands(commands: list[str], port: int = None) -> None:
    """
    Execute a list of SQL DDL/DML commands against the database.

    This function takes a list of SQL commands and executes them sequentially
    against the database. It ensures that only valid DDL (Data Definition Language)
    and DML (Data Manipulation Language) commands are executed.

    Args:
        commands (list[str]): A list of SQL commands to execute. Each command
                              must be a valid DDL or DML command.
        port (int): An integer to force a different port that the .env one.

    Raises:
        AssertionError: If any command in the list is not a valid DDL or DML command.
        psycopg2.DatabaseError: If there is a database error during execution.
        Exception: For any other exceptions that occur during execution.

    Returns:
        None
    """

    not_valid_commands = len([
        True 
        for command in commands 
        if command.strip().upper().split()[0] not in DDL_COMMANDS | DML_COMMANDS
    ])

    assert not_valid_commands == 0, f'The commands list contains {not_valid_commands} invalid DDL/DML commands.'
    
    config = get_db_config()
    if port:
        config['port'] = port

    try:
        with psycopg2.connect(**config) as connection:
            with connection.cursor() as cursor:
                for command in commands:
                    cursor.execute(command)
                    print(f'\nThe following command was executed sucessfully:\n{command}')

    except (psycopg2.DatabaseError, Exception) as e:
        print(e)



def execute_query(query: str, port: int = None) -> list[dict] | None:
    """
    Executes a SQL SELECT query and returns the results as a list of dictionaries.

    Args:
        query (str): The SQL SELECT query to execute.

    Returns:
        list[dict] | None: A list of dictionaries representing the query results,
                           or None if the query fails.
        port (int): An integer to force a different port that the .env one.

    Raises:
        AssertionError: If the query is not a SELECT command.
        psycopg2.DatabaseError: If there is a database error.
        Exception: For any other exceptions that occur.
    """
    assert query.strip().upper().split()[0] in ('SELECT', 'WITH'), 'The query must be a `SELECT` command.'

    query_results = None
    config = get_db_config()
    if port:
        config['port'] = port

    try:
        with psycopg2.connect(**config) as connection:
            with connection.cursor(cursor_factory=RealDictCursor) as cursor:
                cursor.execute(query)
                query_results = cursor.fetchall()

    except (psycopg2.DatabaseError, Exception) as e:
        print(e)

    return query_results


# Creación de esquemas

## Esquema de Tabla Plana (Flat Table)

Para crear este esquema, utilizaremos el script almacenado en el fichero `/data/database/postgres/reduced_schemas/flat_table.sql`. Adicionalmente, debemos ejecutar algunos comandos previos para asegurarnos la existencia de un nuevo esquema en blanco.

In [None]:
FLAT_TABLE_SCHEMA = 'flat_table'
FLAT_TABLE_NAME = 'full_sales_data'
FLAT_TABLE_QUERY_PATH = '../data/database/postgres/reduced_schemas/flat_table/flat_table.sql'

commands_flat_table = []
commands_flat_table.append(f'DROP SCHEMA IF EXISTS {FLAT_TABLE_SCHEMA} CASCADE;')
commands_flat_table.append(f'CREATE SCHEMA {FLAT_TABLE_SCHEMA};')

flat_table_query = Path(FLAT_TABLE_QUERY_PATH).read_text()
commands_flat_table.append(get_create_table_as(flat_table_query, FLAT_TABLE_SCHEMA, FLAT_TABLE_NAME))

execute_commands(commands_flat_table)

query_resulst = execute_query(f'SELECT * FROM {FLAT_TABLE_SCHEMA}.{FLAT_TABLE_NAME} LIMIT 10')

display(pd.DataFrame(query_resulst))


The following command was executed sucessfully:
DROP SCHEMA IF EXISTS flat_table CASCADE;

The following command was executed sucessfully:
CREATE SCHEMA flat_table;

The following command was executed sucessfully:
CREATE TABLE flat_table.full_sales_data AS (
WITH
    all_sales AS (
        (
            SELECT
                product_key,
                -- order_date_key,
                -- due_date_key,
                -- ship_date_key,
                NULL AS reseller_key,
                NULL AS employee_key,
                customer_key,
                promotion_key,
                -- currency_key,
                sales_territory_key,
                sales_order_number,
                sales_order_line_number,
                -- revision_number,
                order_quantity,
                unit_price,
                extended_amount,
                unit_price_discount_pct,
                discount_amount,
                product_standard_cost,
                total_product_

Unnamed: 0,sale_source,sales_order_number,sales_order_line_number,order_quantity,unit_price,extended_amount,unit_price_discount_pct,discount_amount,product_standard_cost,total_product_cost,...,reseller_business_type,reseller_name,reseller_product_line,employee_key,employee_full_name,sales_territory_city,sales_territory_state_province,sales_territory_region,sales_territory_country,sales_territory_group
0,reseller_sales,SO44124,14,1,20.1865,20.1865,0.0,0.0,12.0278,12.0278,...,Warehouse,Advanced Bike Components,Road,283,Jillian Carson,Irving,Texas,United States - Southwest,Estados Unidos,North America
1,reseller_sales,SO45568,11,3,20.1865,60.5595,0.0,0.0,12.0278,36.0834,...,Warehouse,Advanced Bike Components,Road,283,Jillian Carson,Irving,Texas,United States - Southwest,Estados Unidos,North America
2,reseller_sales,SO46377,2,1,20.1865,20.1865,0.0,0.0,12.0278,12.0278,...,Warehouse,Advanced Bike Components,Road,283,Jillian Carson,Irving,Texas,United States - Southwest,Estados Unidos,North America
3,reseller_sales,SO43913,10,5,20.1865,100.9325,0.0,0.0,12.0278,60.139,...,Value Added Reseller,Journey Sporting Goods,Mountain,283,Jillian Carson,Laredo,Texas,United States - Southwest,Estados Unidos,North America
4,reseller_sales,SO44566,13,2,20.1865,40.373,0.0,0.0,12.0278,24.0556,...,Value Added Reseller,Journey Sporting Goods,Mountain,283,Jillian Carson,Laredo,Texas,United States - Southwest,Estados Unidos,North America
5,reseller_sales,SO46103,1,2,20.1865,40.373,0.0,0.0,12.0278,24.0556,...,Value Added Reseller,Journey Sporting Goods,Mountain,283,Jillian Carson,Laredo,Texas,United States - Southwest,Estados Unidos,North America
6,reseller_sales,SO46099,13,2,20.1865,40.373,0.0,0.0,12.0278,24.0556,...,Warehouse,Every Bike Shop,Road,285,Tsvi Reiter,La Vergne,Tennessee,United States - Southeast,Estados Unidos,North America
7,reseller_sales,SO44129,16,2,20.1865,40.373,0.0,0.0,12.0278,24.0556,...,Warehouse,Larger Cycle Shop,Road,281,Michael Blythe,Melville,New York,United States - Northeast,Estados Unidos,North America
8,reseller_sales,SO44797,32,3,20.1865,60.5595,0.0,0.0,12.0278,36.0834,...,Warehouse,Larger Cycle Shop,Road,281,Michael Blythe,Melville,New York,United States - Northeast,Estados Unidos,North America
9,reseller_sales,SO45575,17,2,20.1865,40.373,0.0,0.0,12.0278,24.0556,...,Warehouse,Larger Cycle Shop,Road,281,Michael Blythe,Melville,New York,United States - Northeast,Estados Unidos,North America


## Esquema de Ventas (Sales)

Para crear este esquema, nos valdremos de las definiciones de las tablas que tenemos en los scripts almacenados en la ruta `/data/database/postgres/reduced_schemas/sales/`. Nuevamente, ejecutaremos algunos comandos previos para asegurar la existencia del nuevo esquema en blanco y, posteriormente, ejecutaremos sentencias para definir claves primarias y relaciones entre tablas.

In [29]:
SALES_SCHEMA = 'sales'
SALES_SCHEMA_TABLES_PATH = '../data/database/postgres/reduced_schemas/sales/'
SALES_SCHEMA_TABLES_PREFIXES = ['fact_', 'dim_']

for file in Path(SALES_SCHEMA_TABLES_PATH).iterdir():
    if file.is_file() and file.suffix == '.sql' and not any(
        file.name.startswith(prefix) for prefix in SALES_SCHEMA_TABLES_PREFIXES
    ):
        with open(file.as_posix(), 'r') as sql_file:
            for line in sql_file:
                line = line.strip()
                if not line:
                    continue

                command = line.replace('[SCHEMA]', SALES_SCHEMA)
                if not command.endswith(';'):
                    command += ';'

                print(command)

ALTER TABLE sales.dim_customer ADD PRIMARY KEY (customer_key);
ALTER TABLE sales.dim_sales_person ADD PRIMARY KEY (employee_key);
ALTER TABLE sales.dim_geography ADD PRIMARY KEY (geography_key);
ALTER TABLE sales.dim_product ADD PRIMARY KEY (product_key);
ALTER TABLE sales.dim_promotion ADD PRIMARY KEY (promotion_key);
ALTER TABLE sales.dim_reseller ADD PRIMARY KEY (reseller_key);
ALTER TABLE sales.dim_sales_reason ADD PRIMARY KEY (sales_reason_key);
ALTER TABLE sales.dim_sales_territory ADD PRIMARY KEY (sales_territory_key);
ALTER TABLE sales.fact_sales ADD PRIMARY KEY (sales_order_number, sales_order_line_number);
ALTER TABLE sales.fact_internet_sales_reason ADD PRIMARY KEY (sales_order_number, sales_order_line_number, sales_reason_key);
ALTER TABLE sales.dim_customer ADD CONSTRAINT fk_dim_customer_geography FOREIGN KEY (geography_key) REFERENCES sales.dim_geography (geography_key);
ALTER TABLE sales.dim_sales_person ADD CONSTRAINT fk_dim_sales_person_sales_territory FOREIGN KEY (sal

In [None]:
SALES_SCHEMA = 'sales'
SALES_SCHEMA_TABLES_PATH = '../data/database/postgres/reduced_schemas/sales/'
SALES_SCHEMA_TABLES_PREFIXES = ['fact_', 'dim_']

commands_sales_schema = []
commands_sales_schema.append(f'DROP SCHEMA IF EXISTS {SALES_SCHEMA} CASCADE;')
commands_sales_schema.append(f'CREATE SCHEMA {SALES_SCHEMA};')

for file in Path(SALES_SCHEMA_TABLES_PATH).iterdir():
    if file.is_file() and file.suffix == '.sql' and any(
        file.name.startswith(prefix) for prefix in SALES_SCHEMA_TABLES_PREFIXES
    ):
        table_query = Path(file).read_text()
        table_name = '.'.join(file.name.split('.')[:-1])
        commands_sales_schema.append(get_create_table_as(table_query, SALES_SCHEMA, table_name))


for file in Path(SALES_SCHEMA_TABLES_PATH).iterdir():
    if file.is_file() and file.suffix == '.sql' and not any(
        file.name.startswith(prefix) for prefix in SALES_SCHEMA_TABLES_PREFIXES
    ):
        with open(file.as_posix(), 'r') as sql_file:
            for line in sql_file:
                line = line.strip()

                if not line:
                    continue

                command = line.replace('[SCHEMA]', SALES_SCHEMA)
                if not command.endswith(';'):
                    command += ';'

                commands_sales_schema.append(command)

execute_commands(commands_sales_schema, port=5432)

query_resulst = execute_query(
    f'SELECT * FROM {SALES_SCHEMA}.dim_geography',
    port= 5432
)

display(pd.DataFrame(query_resulst))


The following command was executed sucessfully:
DROP SCHEMA IF EXISTS sales CASCADE;

The following command was executed sucessfully:
CREATE SCHEMA sales;

The following command was executed sucessfully:
CREATE TABLE sales.dim_customer AS (
WITH
    years_dif AS (
        SELECT
            (EXTRACT(YEAR FROM CURRENT_DATE) - EXTRACT(YEAR FROM MAX(order_date))) + 1 AS years_difference

        FROM (
            SELECT order_date FROM adventure_works.fact_internet_sales
            UNION
            SELECT order_date FROM adventure_works.fact_reseller_sales
        )
    )


SELECT
    customer_key,
    geography_key,
    -- customer_alternate_key,
    -- title,
    -- first_name,
    -- middle_name,
    -- last_name,
    -- name_style,
    CONCAT(first_name, ' ', last_name) AS customer_full_name,
    (birth_date + ((SELECT years_difference FROM years_dif)::TEXT || ' years')::INTERVAL)::DATE AS birth_date,
    marital_status,
    -- suffix,
    gender,
    -- email_address,
    yearly_

Unnamed: 0,sales_territory_key,sales_territory_region,sales_territory_country,sales_territory_group
0,1,Northwest,United States,North America
1,2,Northeast,United States,North America
2,3,Central,United States,North America
3,4,Southwest,United States,North America
4,5,Southeast,United States,North America
5,6,Canada,Canada,North America
6,7,France,France,Europe
7,8,Germany,Germany,Europe
8,9,Australia,Australia,Pacific
9,10,United Kingdom,United Kingdom,Europe
