# Trabajo Integrador: DuckDB

## Instalación

In [None]:
%pip install duckdb --upgrade

### Creando conexión

Nos permite establecer una conexión a una base de datos, por defecto, si no especificamos su nombre, la base de datos no persistirá y operará en memoria, por lo tanto no se almacenarán las tablas creadas. Trabajaremos en memoria ya que consideramos que es el fuerte de DUCKDB y su análisis es el pertinente de este trabajo de investigación. 

In [2]:
import duckdb as db

database = db.connect(database=":memory:")


##### Prueba con el dataset. En PostgreSQL almacenaremos archivos de órdenes y sus pagos. En AWS almacenaremos los productos y su categoría. El resto de archivos será almacenado de manera local, algunos en CSV y otros en PARQUET. Esta información se encuentra representada meidante un esquema en la documentación adjunta

In [None]:
%pip install -q kagglehub        
import kagglehub, shutil, pathlib

path = kagglehub.dataset_download("olistbr/brazilian-ecommerce")

local_csv_files = [
    "olist_customers_dataset.csv",
    "olist_geolocation_dataset.csv",
    "olist_order_items_dataset.csv",
    "olist_order_reviews_dataset.csv",
    "olist_sellers_dataset.csv",
    ## Estos archivos se almacenaran en PSQL 
    "olist_orders_dataset.csv",
    "olist_order_payments_dataset.csv", 
]

for file_name in local_csv_files:
    shutil.copy(f"{path}/{file_name}", f"dataset/{file_name}")

for file_name in local_csv_files:
    table = pathlib.Path(f"dataset/{file_name}").stem
    database.execute(f"""
        CREATE OR REPLACE TABLE {table} AS
        SELECT * FROM read_csv_auto('dataset/{file_name}');
    """)


#### Verificación básica de la creación de las tablas

In [None]:
for file_name in local_csv_files:
    table = pathlib.Path(f"dataset/{file_name}").stem
    schema = database.sql(f"DESCRIBE {table}")
    print(schema)

#### Obteniendo los datos de forma remota - Conexión con AWS S3

**NOTA:** Para utilizar AWS S3 es necesario setear las credenciales para acceder al bucket, para eso debemos crear un user en la IAM de AWS, asignarle permisos y finalmente crear las claves de acceso para este usuario. Luego, estas credenciales son obtenidas desde un .env.
Es importante que la región del bucket y del usuario sean la misma.

In [None]:
%pip install python-dotenv
from dotenv import load_dotenv
load_dotenv()  

import os

database.sql("INSTALL httpfs; LOAD httpfs;")

database.sql(f"""
SET s3_region='{os.getenv("AWS_REGION")}';
SET s3_access_key_id='{os.getenv("AWS_ACCESS_KEY_ID")}';
SET s3_secret_access_key='{os.getenv("AWS_SECRET_ACCESS_KEY")}';
""")

### Creando las tablas con los datos de forma remota

In [6]:
database.sql(f"""
 CREATE OR REPLACE TABLE olist_products_dataset AS
        SELECT * FROM read_csv_auto('s3://ti-quadrelli-ribarov/olist_products_dataset.csv');
""")

database.sql(f"""
 CREATE OR REPLACE TABLE product_category_name_translation AS
        SELECT * FROM read_csv_auto('s3://ti-quadrelli-ribarov/product_category_name_translation.csv');
""")


#### Validación básica de la creación de las tablas con los datos remotos


In [None]:
remote_csv_files  = [
    "olist_products_dataset.csv",
    "product_category_name_translation.csv",
]

for file_name in remote_csv_files:
    table = pathlib.Path(f"dataset/{file_name}").stem
    schema = database.sql(f"DESCRIBE {table}")
    print(schema)

#### Obteniendo los datos desde PostgreeSQL

#### Nota: este paso se podría haber hecho sin DuckDB, mismo desde un manejador de base de datos como DBeaver o con la librería Pandas, sin embargo, nuevamente debido al objetivo de este proyecto, se eligió realizarlo utilizando DuckDB

In [None]:

import os

# Esta celda utiliza duckdb para conectarse a PostgreSQL, crea la base de datos trabajo integrador
# en caso de que no exista 

database.execute("INSTALL postgres;")
database.execute("LOAD postgres;")

try:
    database.execute("DETACH pgadmin;")
except Exception:
    pass

conninfo = f"host={os.getenv('PG_HOST')} port={os.getenv('PG_PORT')} user={os.getenv('PG_USER')} password={os.getenv('PG_PASSWORD')} dbname=postgres"
database.execute(f"ATTACH '{conninfo}' AS pgadmin (TYPE postgres);")

exists = database.execute("""
    SELECT COUNT(*) > 0
    FROM postgres_query(
        'pgadmin',
        $$SELECT 1 FROM pg_database WHERE datname = 'trabajo_integrador'$$
    );
""").fetchone()[0]

if not exists:
    database.execute("""
        CALL postgres_execute(
            'pgadmin',
            $$CREATE DATABASE trabajo_integrador$$,
            use_transaction => false
        );
    """)

database.execute("DETACH pgadmin;")

In [None]:
import pathlib, os

orders_csv   = pathlib.Path("olist_orders_dataset.csv")
payments_dataset = pathlib.Path("olist_order_payments_dataset.csv")
   

conninfo = f"host={os.getenv("PG_HOST")} port={os.getenv("PG_PORT")} user={os.getenv("PG_USER")} password={os.getenv("PG_PASSWORD")} dbname={os.getenv("PG_DB")}"
database.execute(f"ATTACH '{conninfo}' AS pgdb (TYPE postgres);")

#### Creación de tablas

In [None]:
import pathlib

orders_csv = pathlib.Path(path) / "olist_orders_dataset.csv"
payments_dataset = pathlib.Path(path) / "olist_order_payments_dataset.csv"

database.execute(f"""
    DROP TABLE IF EXISTS pgdb.olist_orders;
    CREATE TABLE pgdb.olist_orders AS
    SELECT *
    FROM read_csv_auto('{orders_csv.as_posix()}', HEADER=TRUE);
""")

database.execute(f"""
    DROP TABLE IF EXISTS pgdb.olist_orders_payments;
    CREATE TABLE pgdb.olist_orders_payments AS
    SELECT *
    FROM read_csv_auto('{payments_dataset.as_posix()}', HEADER=TRUE);
""")

In [None]:
postgreee_csv_files  = [
   "olist_orders", 
   "olist_orders_payments"
]

for file_name in postgreee_csv_files:
    table = pathlib.Path(f"dataset/{file_name}").stem
    schema = database.sql(f"DESCRIBE pgdb.{table}")
    print(schema)

### Exploracion del dataset