# Proyecto Final Big Data

## Conexión a PostgreSQL y carga de las tablas orders y products

In [6]:
# Importar las librerías necesarias
from dotenv import load_dotenv
import os
import pandas as pd
from sqlalchemy import create_engine

# Cargar el archivo .env
load_dotenv()

# Obtener las variables de entorno
POSTGRES_USER = os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD')
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'localhost')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', '5432')
POSTGRES_DB = os.getenv('POSTGRES_DB')

# Crear el string de conexión usando SQLAlchemy
DATABASE_TYPE = 'postgresql'
DBAPI = 'psycopg2'

# URL de conexión usando las variables de entorno
engine = create_engine(f"{DATABASE_TYPE}+{DBAPI}://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}")

# Conectar a la base de datos y cargar las tablas orders y products
try:
    orders_df = pd.read_sql('SELECT * FROM orders', con=engine)
    products_df = pd.read_sql('SELECT * FROM products', con=engine)
    print("Tablas cargadas exitosamente")
except Exception as e:
    print(f"Error conectando a la base de datos o cargando las tablas: {e}")

# Mostrar las primeras filas de cada tabla
print(orders_df.shape)
print(products_df.shape)

Tablas cargadas exitosamente
(830, 14)
(77, 10)


In [3]:
# Verificar dataframe orders
orders_df

Unnamed: 0,order_id,customer_id,employee_id,order_date,required_date,shipped_date,ship_via,freight,ship_name,ship_address,ship_city,ship_region,ship_postal_code,ship_country
0,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France
1,10249,TOMSP,6,1996-07-05,1996-08-16,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany
2,10250,HANAR,4,1996-07-08,1996-08-05,1996-07-12,2,65.83,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil
3,10251,VICTE,3,1996-07-08,1996-08-05,1996-07-15,1,41.34,Victuailles en stock,"2, rue du Commerce",Lyon,,69004,France
4,10252,SUPRD,4,1996-07-09,1996-08-06,1996-07-11,2,51.30,Suprêmes délices,"Boulevard Tirou, 255",Charleroi,,B-6000,Belgium
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
825,11073,PERIC,2,1998-05-05,1998-06-02,,2,24.95,Pericles Comidas clásicas,Calle Dr. Jorge Cash 321,México D.F.,,05033,Mexico
826,11074,SIMOB,7,1998-05-06,1998-06-03,,2,18.44,Simons bistro,Vinbæltet 34,Kobenhavn,,1734,Denmark
827,11075,RICSU,8,1998-05-06,1998-06-03,,2,6.19,Richter Supermarkt,Starenweg 5,Genève,,1204,Switzerland
828,11076,BONAP,4,1998-05-06,1998-06-03,,2,38.28,Bon app',"12, rue des Bouchers",Marseille,,13008,France


In [4]:
# Verificar dataframe products
products_df

Unnamed: 0,product_id,product_name,supplier_id,category_id,quantity_per_unit,unit_price,units_in_stock,units_on_order,reorder_level,discontinued
0,1,Chai,8,1,10 boxes x 30 bags,18.00,39,0,10,1
1,2,Chang,1,1,24 - 12 oz bottles,19.00,17,40,25,1
2,3,Aniseed Syrup,1,2,12 - 550 ml bottles,10.00,13,70,25,0
3,4,Chef Anton's Cajun Seasoning,2,2,48 - 6 oz jars,22.00,53,0,0,0
4,5,Chef Anton's Gumbo Mix,2,2,36 boxes,21.35,0,0,0,1
...,...,...,...,...,...,...,...,...,...,...
72,73,Röd Kaviar,17,8,24 - 150 g jars,15.00,101,0,5,0
73,74,Longlife Tofu,4,7,5 kg pkg.,10.00,4,20,5,0
74,75,Rhönbräu Klosterbier,12,1,24 - 0.5 l bottles,7.75,125,0,25,0
75,76,Lakkalikööri,23,1,500 ml,18.00,57,0,20,0


## Cargar el archivo CSV order_details

In [8]:
# Cargar los datos del archivo CSV
order_details_df = pd.read_csv('data/order_details.csv', sep=';')

# Ver las primeras filas del CSV
order_details_df.head()

Unnamed: 0,order_id,product_id,unit_price,quantity,discount
0,11060,60,34.0,4,0.0
1,11060,77,13.0,10,0.0
2,11061,60,34.0,15,0.0
3,11062,53,32.8,10,0.2
4,11062,70,15.0,12,0.2


## Integración de los datos usando las llaves correspondientes
Vamos a unir las tablas mediante las llaves que las relacionan. En este caso:

* orders.order_id con order_details.order_id
* products.product_id con order_details.product_id

In [9]:
# Unión de las tablas
merged_df = pd.merge(order_details_df, orders_df, on='order_id', how='inner')
merged_df = pd.merge(merged_df, products_df, on='product_id', how='inner')

# Ver las primeras filas de la tabla resultante
merged_df.head()

Unnamed: 0,order_id,product_id,unit_price_x,quantity,discount,customer_id,employee_id,order_date,required_date,shipped_date,...,ship_country,product_name,supplier_id,category_id,quantity_per_unit,unit_price_y,units_in_stock,units_on_order,reorder_level,discontinued
0,11060,60,34.0,4,0.0,FRANS,2,1998-04-30,1998-05-28,1998-05-04,...,Italy,Camembert Pierrot,28,4,15 - 300 g rounds,34.0,19,0,0,0
1,11060,77,13.0,10,0.0,FRANS,2,1998-04-30,1998-05-28,1998-05-04,...,Italy,Original Frankfurter grüne Soße,12,2,12 boxes,13.0,32,0,15,0
2,11061,60,34.0,15,0.0,GREAL,4,1998-04-30,1998-06-11,,...,USA,Camembert Pierrot,28,4,15 - 300 g rounds,34.0,19,0,0,0
3,11062,53,32.8,10,0.2,REGGC,4,1998-04-30,1998-05-28,,...,Italy,Perth Pasties,24,6,48 pieces,32.8,0,0,0,1
4,11062,70,15.0,12,0.2,REGGC,4,1998-04-30,1998-05-28,,...,Italy,Outback Lager,7,1,24 - 355 ml bottles,15.0,15,10,30,0


## Guardar la tabla resultante en un CSV

In [10]:
# Guardar la tabla integrada en un nuevo archivo CSV
merged_df.to_csv('data/merged_orders.csv', index=False)

## Cargar el CSV resultante a S3

In [12]:
import boto3
from botocore.exceptions import NoCredentialsError

# Cargar las variables de entorno desde el archivo .env
load_dotenv()

# Obtener las variables de entorno para S3
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')

# Verificar que todas las variables se han cargado correctamente
assert AWS_ACCESS_KEY_ID is not None, "La variable AWS_ACCESS_KEY_ID no está definida en el archivo .env"
assert AWS_SECRET_ACCESS_KEY is not None, "La variable AWS_SECRET_ACCESS_KEY no está definida en el archivo .env"
assert S3_BUCKET_NAME is not None, "La variable S3_BUCKET_NAME no está definida en el archivo .env"

# Conexión a S3 usando las credenciales de las variables de entorno
s3 = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

# Definir el bucket y la ruta dentro de la carpeta 'proyecto_final'
s3_file_name = 'proyecto_final/merged_orders.csv'

# Cargar el archivo a S3
try:
    s3.upload_file('data/merged_orders.csv', S3_BUCKET_NAME, s3_file_name)
    print(f'Archivo {s3_file_name} subido exitosamente a {S3_BUCKET_NAME}')
except FileNotFoundError:
    print('El archivo no fue encontrado')
except NoCredentialsError:
    print('Credenciales de AWS no disponibles')

Archivo proyecto_final/merged_orders.csv subido exitosamente a 2024-02-grupo4-acm


## Configurar Athena para consultas

### Crear Base de Datos en Athena

In [16]:
import boto3
from dotenv import load_dotenv
import os

# Cargar las variables de entorno desde el archivo .env
load_dotenv()

# Obtener las variables de entorno para Athena y S3
ATHENA_RESULT_BUCKET = os.getenv('ATHENA_RESULT_BUCKET')
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')

# Cliente de Athena
athena = boto3.client('athena', region_name='us-east-1')

# Crear base de datos (puedes cambiar el nombre 'proyecto_final_db' a uno adecuado)
database_name = 'proyecto_final_db'

# Query para crear la base de datos
create_database_query = f"""
CREATE DATABASE IF NOT EXISTS {database_name}
"""

# Ejecutar la consulta de creación de la base de datos
response = athena.start_query_execution(
    QueryString=create_database_query,
    ResultConfiguration={'OutputLocation': ATHENA_RESULT_BUCKET}
)

query_execution_id = response['QueryExecutionId']
status = athena.get_query_execution(QueryExecutionId=query_execution_id)
print(f"Estado de la consulta de creación de la base de datos: {status['QueryExecution']['Status']['State']}")

Estado de la consulta de creación de la base de datos: RUNNING


### Crear Tabla en Athena

In [19]:
# Crear tabla en Athena apuntando al archivo CSV en S3
table_name = 'integracion_ordenes'

create_table_query = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.{table_name} (
  order_id SMALLINT,
  product_id SMALLINT,
  unit_price DOUBLE,
  quantity SMALLINT,
  discount DOUBLE,
  customer_id STRING,
  employee_id SMALLINT,
  order_date STRING,
  required_date STRING,
  shipped_date STRING,
  ship_via SMALLINT,
  freight DOUBLE,
  ship_name STRING,
  ship_address STRING,
  ship_city STRING,
  ship_region STRING,
  ship_postal_code STRING,
  ship_country STRING,
  product_name STRING,
  supplier_id SMALLINT,
  category_id SMALLINT,
  quantity_per_unit STRING,
  product_unit_price DOUBLE,
  units_in_stock SMALLINT,
  units_on_order SMALLINT,
  reorder_level SMALLINT,
  discontinued INTEGER
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ','
)
LOCATION 's3://{S3_BUCKET_NAME}/proyecto_final/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
);
"""

# Ejecutar la consulta para crear la tabla
response = athena.start_query_execution(
    QueryString=create_table_query,
    QueryExecutionContext={'Database': database_name},
    ResultConfiguration={'OutputLocation': ATHENA_RESULT_BUCKET}
)

query_execution_id = response['QueryExecutionId']
status = athena.get_query_execution(QueryExecutionId=query_execution_id)
print(f"Estado de la consulta de creación de la tabla: {status['QueryExecution']['Status']['State']}")

Estado de la consulta de creación de la tabla: QUEUED
