# ETL

## Librerias

In [1]:
from sqlalchemy import create_engine
import socket
import pandas as pd
import sys

## Connection

In [2]:
DATABASE_CONFIG = {
    "host": socket.gethostbyname(socket.gethostname()),
    "port": 3310,
    "user": "root",
    "password": "root",
    "database": "retail_db"
}

In [3]:
def create_db_engine(config):
    """
    This method provides the connection to the mysql Data Base.
    
    return MySQL connection object
    """
    try:
        engine = create_engine(f"mysql://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}")
        return engine
    except Exception as e:
        print(e)

## Metodos

In [38]:
def read_csv(file_path, columns):
    """
    """
    try:
        df = pd.read_csv(file_path, header=None, sep='|', names=columns)
        return df    
    except Exception as e:
        print(e)

In [39]:
def validate_ids(df_retail, df, id_retail, id_df):
    valid_ids = set(df[id_df])
    
    if not df_retail[id_retail].isin(valid_ids).all():
        print("Hay order_item_product_id que no existen en product")
        sys.exit(1)

In [23]:
def transform_departments(df):
    """
    """
    try:
        if df['department_name'].duplicated().any():
            print("Error: Filas Duplicadas")
            sys.exit(1)
        else:
            return df
    except Exception as e:
        print(e)

In [24]:
df_departments = read_csv(r'../data/departments',['department_id','department_name'])
transform_departments(df_departments).head()

Unnamed: 0,department_id,department_name
0,2,Fitness
1,3,Footwear
2,4,Apparel
3,5,Golf
4,6,Outdoors


In [25]:
def transform_customers(df):
    """
    """
    try:
        # Validacion de Campos Obligatorios
        if df[['customer_fname','customer_lname','customer_email']].isnull().any().any():
            print("Existen Datos faltantes en el DataFrame de customers")
            sys.exit(1)
        # Transformacion de campo customer_email
        df['customer_email'] =df['customer_email'].str.lower()
        
        return df
    except Exception as e:
        print(e)

In [26]:
df_customers = read_csv(
    r'../data/customers',
    [
        "customer_id","customer_fname","customer_lname","customer_email","customer_password",
        "customer_street","customer_city","customer_state","customer_zipcode"
    ]
)
transform_customers(df_customers).head()

Unnamed: 0,customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcode
0,1,Richard,Hernandez,xxxxxxxxx,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
1,2,Mary,Barrett,xxxxxxxxx,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
2,3,Ann,Smith,xxxxxxxxx,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,725
3,4,Mary,Jones,xxxxxxxxx,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069
4,5,Robert,Hudson,xxxxxxxxx,XXXXXXXXX,10 Crystal River Mall,Caguas,PR,725


In [41]:
def transform_products(df, df_categories):
    """
    """
    try:
        # Asegurar que product_category_id exista en categories
        validate_ids(df,df_categories,'product_category_id','category_id')
                
        return df
    except Exception as e:
        print(e)

In [42]:
df_products = read_csv(
    r'../data/products',
    ["product_id","product_category_id","product_name","product_description","product_price","product_image"]
)

df_categories = read_csv(
    r'../data/categories',
    ["category_id", "category_department_id", "category_name"]
)

transform_products(df_products, df_categories).head()

Unnamed: 0,product_id,product_category_id,product_name,product_description,product_price,product_image
0,1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+F...
1,2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+M...
2,3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+M...
3,4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+M...
4,5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+...


In [51]:
def transform_order_items(df, df_products, df_orders):
    """
    """
    try:
        # Asegurar que order_item_order_id exista en orders
        validate_ids(df,df_orders,'order_item_order_id','order_id')
        
        # Asegurar que order_item_product_id exista en product
        validate_ids(df,df_products,'order_item_product_id','product_id')
        
        # Asegurar que el subtotal si sea la multiplicacion de la cantidad por su precio unitario.
        
        calculate_subtotal = df['order_item_quantity'] * df['order_item_product_price']
        if not (df['order_item_subtotal'] == calculate_subtotal).all():
            df['order_item_subtotal'] = calculate_subtotal
        
        return df
    except Exception as e:
        print(e)

In [52]:
df_order_items = read_csv(
    r'../data/order_items',
    ["order_item_id","order_item_order_id","order_item_product_id","order_item_quantity","order_item_subtotal","order_item_product_price"]
)

df_orders = read_csv(
    r'../data/orders',
    ["order_id","order_date","order_customer_id","order_status"]
)

transform_order_items(df_order_items,df_products, df_orders).head()

Unnamed: 0,order_item_id,order_item_order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_product_price
0,1,1,957,1,299.98,299.98
1,2,2,1073,1,199.99,199.99
2,3,2,502,5,250.0,50.0
3,4,2,403,1,129.99,129.99
4,5,4,897,2,49.98,24.99


In [53]:
def load_data(engine, table_name, df):
    """
    """
    
    try:
        df.to_sql(name = table_name, con = engine, if_exists = 'replace', index = False)
    except Exception as e:
        print(e)

In [54]:
engine =  create_db_engine(DATABASE_CONFIG)
load_data(engine, 'categories2', df_categories)