# Carga de las tablas desde MySQL a las distintas fuentes 

In [1]:
import os
import pandas as pd
from dotenv import load_dotenv

# Cargar las variables de entorno desde el archivo .env
load_dotenv();

## MySQL

### Conexión 

In [2]:
from sqlalchemy import create_engine, text

In [3]:
# Conexión a MySQL (ajusta usuario, password, host, puerto y base de datos)
engine = create_engine("mysql+pymysql://root:root@mysql:3306/retail_db")
conn = engine.connect()

### Lectura de los datos

In [4]:
# lectura de la tabla categories 
categories_df = pd.read_sql_query(text("SELECT * FROM categories"), conn)
categories_df.head()

Unnamed: 0,category_id,category_department_id,category_name
0,1,2,Football
1,2,2,Soccer
2,3,2,Baseball & Softball
3,4,2,Basketball
4,5,2,Lacrosse


In [5]:
# lectura de la tabla customers 
customers_df = pd.read_sql_query(text("SELECT * FROM customers"), conn)
customers_df.head()

Unnamed: 0,customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcode
0,1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
1,2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
2,3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,725
3,4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069
4,5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,10 Crystal River Mall,Caguas,PR,725


In [6]:
# lectura de la tabla departments 
departments_df = pd.read_sql_query(text("SELECT * FROM departments"), conn)
departments_df.head()

Unnamed: 0,department_id,department_name
0,2,Fitness
1,3,Footwear
2,4,Apparel
3,5,Golf
4,6,Outdoors


In [7]:
# lectura de la tabla order_items 
order_items_df = pd.read_sql_query(text("SELECT * FROM order_items"), conn)
order_items_df.head()

Unnamed: 0,order_item_id,order_item_order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_product_price
0,1,1,957,1,299.98,299.98
1,2,2,1073,1,199.99,199.99
2,3,2,502,5,250.0,50.0
3,4,2,403,1,129.99,129.99
4,5,4,897,2,49.98,24.99


In [8]:
# lectura de la tabla orders 
orders_df = pd.read_sql_query(text("SELECT * FROM orders"), conn)
orders_df.head()

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25,11599,CLOSED
1,2,2013-07-25,256,PENDING_PAYMENT
2,3,2013-07-25,12111,COMPLETE
3,4,2013-07-25,8827,CLOSED
4,5,2013-07-25,11318,COMPLETE


In [9]:
# lectura de la tabla products 
products_df = pd.read_sql_query(text("SELECT * FROM products"), conn)
products_df.head()

Unnamed: 0,product_id,product_category_id,product_name,product_description,product_price,product_image
0,1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+F...
1,2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+M...
2,3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+M...
3,4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+M...
4,5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+...


## MongoDB

### Conexión

In [10]:
from pymongo import MongoClient

In [11]:
def get_database():
    connection_string = os.getenv("MONGO_URI")  
    client = MongoClient(connection_string)
    
    return client["retail_db"]

### Carga de los datos

In [12]:
# Carga de la tabla categories

# Procesar DataFrame
df = categories_df.reset_index(drop=True)
df_to_dict = df.to_dict("records")

# Insertar datos
db_connection  = get_database()
result = db_connection ["categories"].insert_many(df_to_dict)
print(f"Se insertaron {len(result.inserted_ids)} documentos en la colección 'categories'.")

Se insertaron 58 documentos en la colección 'categories'.


In [13]:
# Carga de la tabla customers

# Procesar DataFrame
df = customers_df.reset_index(drop=True)
df_to_dict = df.to_dict("records")

# Insertar datos
db_connection  = get_database()
result = db_connection ["customers"].insert_many(df_to_dict)
print(f"Se insertaron {len(result.inserted_ids)} documentos en la colección 'customers'.")

Se insertaron 12435 documentos en la colección 'customers'.


In [14]:
# Carga de la tabla departments

# Procesar DataFrame
df = departments_df.reset_index(drop=True)
df_to_dict = df.to_dict("records")

# Insertar datos
db_connection  = get_database()
result = db_connection ["departments"].insert_many(df_to_dict)
print(f"Se insertaron {len(result.inserted_ids)} documentos en la colección 'departments'.")

Se insertaron 6 documentos en la colección 'departments'.


In [15]:
# Carga de la tabla order_items

# Procesar DataFrame
df = order_items_df.reset_index(drop=True)
df_to_dict = df.to_dict("records")

# Insertar datos
db_connection  = get_database()
result = db_connection ["order_items"].insert_many(df_to_dict)
print(f"Se insertaron {len(result.inserted_ids)} documentos en la colección 'order_items'.")

Se insertaron 172198 documentos en la colección 'order_items'.


In [16]:
# Carga de la tabla orders

# Procesar DataFrame
df = orders_df.reset_index(drop=True)
df_to_dict = df.to_dict("records")

# Insertar datos
db_connection  = get_database()
result = db_connection ["orders"].insert_many(df_to_dict)
print(f"Se insertaron {len(result.inserted_ids)} documentos en la colección 'orders'.")

Se insertaron 68883 documentos en la colección 'orders'.


In [17]:
# Carga de la tabla products

# Procesar DataFrame
df = products_df.reset_index(drop=True)
df_to_dict = df.to_dict("records")

# Insertar datos
db_connection  = get_database()
result = db_connection ["products"].insert_many(df_to_dict)
print(f"Se insertaron {len(result.inserted_ids)} documentos en la colección 'products'.")

Se insertaron 1345 documentos en la colección 'products'.


## ADLS

### Conexión

In [18]:
import io
from azure.storage.blob import ContainerClient

In [19]:
# Cargar credenciales desde variable de entorno
conn_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
container = "retail-source"

container_client = ContainerClient.from_connection_string(conn_str, container_name=container)


### Carga de los datos 

In [20]:
# Carga de la tabla categories

# Generar CSV en memoria
output = io.StringIO()
categories_df.to_csv(output, index=False, encoding="utf-8")
output.seek(0)

# Subir a ADLS con extensión
blob_name = "categories.csv"
container_client.upload_blob(blob_name, output.getvalue(), overwrite=True)

<azure.storage.blob._blob_client.BlobClient at 0x7f4c51f79d10>

In [21]:
# Carga de la tabla customers

# Generar CSV en memoria
output = io.StringIO()
customers_df.to_csv(output, index=False, encoding="utf-8")
output.seek(0)

# Subir a ADLS con extensión
blob_name = "customers.csv"
container_client.upload_blob(blob_name, output.getvalue(), overwrite=True)

<azure.storage.blob._blob_client.BlobClient at 0x7f4c51f7a410>

In [22]:
# Carga de la tabla departments

# Generar CSV en memoria
output = io.StringIO()
departments_df.to_csv(output, index=False, encoding="utf-8")
output.seek(0)

# Subir a ADLS con extensión
blob_name = "departments.csv"
container_client.upload_blob(blob_name, output.getvalue(), overwrite=True)

<azure.storage.blob._blob_client.BlobClient at 0x7f4c51f7ac50>

In [23]:
# Carga de la tabla order_items

# Generar CSV en memoria
output = io.StringIO()
order_items_df.to_csv(output, index=False, encoding="utf-8")
output.seek(0)

# Subir a ADLS con extensión
blob_name = "order_items.csv"
container_client.upload_blob(blob_name, output.getvalue(), overwrite=True)

<azure.storage.blob._blob_client.BlobClient at 0x7f4c175bc190>

In [24]:
# Carga de la tabla orders

# Generar CSV en memoria
output = io.StringIO()
orders_df.to_csv(output, index=False, encoding="utf-8")
output.seek(0)

# Subir a ADLS con extensión
blob_name = "orders.csv"
container_client.upload_blob(blob_name, output.getvalue(), overwrite=True)

<azure.storage.blob._blob_client.BlobClient at 0x7f4c51d00190>

In [25]:
# Carga de la tabla products

# Generar CSV en memoria
output = io.StringIO()
products_df.to_csv(output, index=False, encoding="utf-8")
output.seek(0)

# Subir a ADLS con extensión
blob_name = "products.csv"
container_client.upload_blob(blob_name, output.getvalue(), overwrite=True)

<azure.storage.blob._blob_client.BlobClient at 0x7f4c177d7f90>