In [1]:
import boto3
import pandas as pd
import numpy as np
import psycopg2
import configparser
import pymysql

## LEEMOS ARCHIVOS 

In [30]:
config = configparser.ConfigParser()
config.read('config.cfg')

['config.cfg']

In [3]:
config.get('IAM','ACCES_KEY')

'AKIATCKAOU3HWISGUXTG'

## Nos identificamos con AWS

In [4]:
aws_rds_conn = boto3.client('rds', aws_access_key_id=config.get('IAM','ACCES_KEY'),
                            aws_secret_access_key=config.get('IAM','SECRETE_ACCES_KEY'),
                            region_name='us-east-1')


## Verificamos instancias de AWS disponibles en el usuario 

In [5]:
rds_instances_ids =[]
aws_response= aws_rds_conn.describe_db_instances()

for response in aws_response ['DBInstances']:
    rds_instances_ids.append(response['DBInstanceIdentifier'])
print(f" Instancias Disponilbes:{rds_instances_ids}")

 Instancias Disponilbes:['dw-transactional', 'venta-transactional']


In [6]:
config.get('TRANSACC','DB_INSTANCE_ID')

'dw-transactional'

In [7]:
config.get('TRANSACC','DB_USER')

'mysql_admin'

In [8]:
try:
    response = aws_rds_conn.create_db_instance(
        DBInstanceIdentifier=config.get('TRANSACC', 'DB_INSTANCE_ID'),
        DBName=config.get('TRANSACC', 'DB_NAME'),
        DBInstanceClass='db.t3.micro',
        Engine='mysql',
        MasterUsername=config.get('TRANSACC', 'DB_USER'),
        MasterUserPassword=config.get('TRANSACC', 'DB_PASSWORD'),
        Port=int(config.get('TRANSACC', 'DB_PORT')),
        PubliclyAccessible=True,
        VpcSecurityGroupIds=[config.get('VPC', 'SECURITY_GROUP')],
        AllocatedStorage=15
    )
    print(response)
except aws_rds_conn.exceptions.DBInstanceAlreadyExistsFault as ex:
    print("La instancia ya existe")
except Exception as ex:
    print("Error!!!", ex)


La instancia ya existe


## Obtenemos el hostname de la instancias

In [9]:
try:
    instance = aws_rds_conn.describe_db_instances(DBInstanceIdentifier=config.get('TRANSACC', 'DB_INSTANCE_ID'))
    RDS_HOSTNAME = instance.get('DBInstances')[0].get('Endpoint').get('Address')
    print(RDS_HOSTNAME)
except Exception as ex:
    print("Error!!!", ex)


dw-transactional.c1i8ws84e2em.us-east-1.rds.amazonaws.com


## Nos conectamos a la base de datos desde Python

In [10]:
import sql_queries
sql_queries.DDL_QUERY

'\nCREATE TABLE dim_articulo (\n    articulo_id INT AUTO_INCREMENT PRIMARY KEY,\n    codigo VARCHAR(50),\n    nombre VARCHAR(100),\n    precio_venta DECIMAL(11, 2),\n    stock INT,\n    descripcion VARCHAR(255),\n    imagen VARCHAR(20)\n);\n\n-- Dimension Cliente\nCREATE TABLE dim_cliente (\n    cliente_id INT AUTO_INCREMENT PRIMARY KEY,\n    tipo_persona VARCHAR(20),\n    nombre VARCHAR(100),\n    tipo_documento VARCHAR(20),\n    num_documento VARCHAR(20),\n    direccion VARCHAR(70),\n    telefono VARCHAR(20),\n    email VARCHAR(50)\n);\n\n-- Dimension Usuario\nCREATE TABLE dim_usuario (\n    usuario_id INT AUTO_INCREMENT PRIMARY KEY,\n    nombre VARCHAR(100),\n    tipo_documento VARCHAR(20),\n    num_documento VARCHAR(20),\n    direccion VARCHAR(70),\n    telefono VARCHAR(20),\n    email VARCHAR(50),\n    rol_id INT\n    -- Aquí puedes agregar una clave foránea a dim_rol si tienes una tabla de roles\n    -- FOREIGN KEY (rol_id) REFERENCES dim_rol(rol_id)\n);\n\n-- Dimension Categoria

In [11]:
try:
    db_pg_conn = pymysql.connect(
        host=RDS_HOSTNAME,
        port=int(config.get('TRANSACC', 'DB_PORT')),
        user=config.get('TRANSACC', 'DB_USER'),
        password=config.get('TRANSACC', 'DB_PASSWORD'),
        database=config.get('TRANSACC', 'DB_NAME'),
        cursorclass=pymysql.cursors.DictCursor
    )

    with db_pg_conn.cursor() as cursor:
        ddl_statements = sql_queries.DDL_QUERY.strip().split(';')
        for statement in ddl_statements:
            if statement.strip():  
                cursor.execute(statement)
        db_pg_conn.commit()
    print("Base de Datos Creada Exitosamente")
except Exception as ex:
    print("Error!!!", ex)


Base de Datos Creada Exitosamente


## Insertamos datos en la BD

In [15]:
def insertData2SQL(data_dict,table_name, driver):
    df_data = pd.DataFrame.from_records(data_dict)
    try:
        response = df_data.to_sql(table_name, driver, index=False, if_exists='append')
        print(f"Se han insertado {response} nuevos registros")
    except Exception as ex:
        print(ex)

In [17]:
driver = f"""mysql+pymysql://{config.get('TRANSACC', 'DB_USER')}:{config.get('TRANSACC', 'DB_PASSWORD')}@{RDS_HOSTNAME}:{config.get('TRANSACC', 'DB_PORT')}/{config.get('TRANSACC', 'DB_NAME')}"""
driver

'mysql+pymysql://mysql_admin:W8GB7F4Tks63-gjl@dw-transactional.c1i8ws84e2em.us-east-1.rds.amazonaws.com:3306/modelodw'

In [18]:
data_tipo_transacciones = [
    {'id_tipo_transac': 85095, 'tipo_transaccion': 'Depósito'},
    {'id_tipo_transac': 85098, 'tipo_transaccion': 'Retiro'},
    {'id_tipo_transac': 85194, 'tipo_transaccion': 'Transferencia'},
    {'id_tipo_transac': 85133, 'tipo_transaccion': 'Pago Prestamo'}
]

# insertamos data en tabla tipo_transaccione
insertData2SQL(data_tipo_transacciones, 'tipo_transacciones', driver)


Se han insertado 4 nuevos registros


# ETL

## Conexion Transaccional

In [36]:
def get_trans_hostname():
    instance = aws_rds_conn.describe_db_instances(DBInstanceIdentifier=config.get('OG', 'DB_INSTANCE_ID'))
    TRANS_HOSTNAME = instance.get('DBInstances')[0].get('Endpoint').get('Address')
    return TRANS_HOSTNAME
    
TRANS_HOSTNAME = get_trans_hostname()
TRANS_HOSTNAME

'venta-transactional.c1i8ws84e2em.us-east-1.rds.amazonaws.com'

## Dimensiones

In [37]:
trans_driver = f"""postgresql://{config.get('OG', 'DB_USER')}:{config.get('OG', 'DB_PASSWORD')}@{TRANS_HOSTNAME}:{config.get('OG', 'DB_PORT')}/{config.get('OG', 'DB_NAME')}"""  
trans_driver

'postgresql://postgres_admin:W8GB7F4Tks63-gjl@venta-transactional.c1i8ws84e2em.us-east-1.rds.amazonaws.com:5432/venta'

### Articulo

In [39]:
sql_query = 'SELECT * FROM articulo;'
dim_articulo = pd.read_sql(sql_query, trans_driver)
dim_articulo.head()

Unnamed: 0,idarticulo,idcategoria,codigo,nombre,precio_venta,stock,descripcion,imagen,estado
0,1,8,EJML6D1A82,Económico Pan Integral,95.82,110,El complemento perfecto para cualquier comida.,img00001.jpg,True
1,2,3,RBIRFY8PF2,Calidad Pan Integral,83.72,51,El favorito de todos a un precio accesible.,img00002.jpg,True
2,3,9,B0URP6ECZ7,Fresco Chocolate,87.91,107,Disfruta de la mejor calidad en cada bocado.,img00003.jpg,True
3,4,1,I2J0WQTJJZ,Importado Chocolate,76.7,6,Cultivado localmente con prácticas sostenibles.,img00004.jpg,True
4,5,1,QWI4BZ23VX,Delicioso Vino,5.52,198,Empacado con nutrientes y sabor.,img00005.jpg,True


### Cliente

In [41]:
sql_query = 'SELECT * FROM persona;'
dim_cliente = pd.read_sql(sql_query, trans_driver)
dim_cliente.head()

Unnamed: 0,idpersona,tipo_persona,nombre,tipo_documento,num_documento,direccion,telefono,email
0,1,Natural,Sarah Little,DNI,5094558868,6653 Williams Ridges Apt. 363 East Stephanieto...,+502 6208 7689,griffinjared@garcia.com
1,2,Jurídica,Riley Sawyer,RUC,6294099776,"19116 Hale Parks Lake Carl, VA 72820",+502 6160 9808,aliciadudley@lopez.org
2,3,Jurídica,Kimberly Hamilton,DNI,754055998,"0747 Evans Shoals Suite 029 East Joseph, WI 73168",+502 6733 6206,phillipskathryn@hotmail.com
3,4,Natural,Leon Hernandez,DNI,8797802153,"19049 Katherine Passage North Johnside, CO 95119",+502 6880 2601,shortmichael@gmail.com
4,5,Natural,Jose Sullivan,RUC,1691304141,"2730 Vincent River Apt. 900 South Louistown, I...",+502 6216 2838,blake67@hotmail.com


### usuario

In [42]:
sql_query = 'SELECT * FROM usuario;'
dim_usuario = pd.read_sql(sql_query, trans_driver)
dim_usuario.head()

Unnamed: 0,idusuario,idrol,nombre,tipo_documento,num_documento,direccion,telefono,email,clave,estado
0,1,1,Steven Myers,PAS,88604995,"51327 Ramirez IslandsHollowayport, KS 87700",+502 5082 6571,pricecrystal@gmail.com,"[b'$', b'g', b'5', b'W', b'8', b'I', b'7', b'h...",True
1,2,4,Darren Phelps,PAS,14293769,"34305 Vega Junction Jeremyburgh, AZ 32844",+502 3152 2884,daniel73@gmail.com,"[b'*', b'8', b'7', b'r', b'Q', b'&', b'l', b'@...",True
2,3,4,Lisa Walker,DNI,79275202,"38323 Poole Isle Apt. 496 Blackburnshire, SC 0...",+502 5440 6174,mahoneymatthew@leonard.net,"[b'3', b'C', b'8', b'Z', b'e', b'e', b'f', b'3...",True
3,4,3,Kathryn Brown,DNI,66854544,"08148 Robert Lodge Suite 112 Morenoton, MO 68236",+502 5262 3162,ahoffman@yahoo.com,"[b'F', b'k', b'n', b'f', b'P', b'b', b'J', b'e...",True
4,5,1,Ryan Barton,PAS,96713205,"09778 Morgan Club Apt. 651 North Bryan, UT 89281",+502 4078 4623,kpeterson@thompson.com,"[b'+', b'j', b'l', b'4', b'!', b'Q', b'^', b'N...",True


### Categoria

In [43]:
sql_query = 'SELECT * FROM categoria;'
dim_categoria = pd.read_sql(sql_query, trans_driver)
dim_categoria.head()

Unnamed: 0,idcategoria,nombre,descripcion,estado
0,1,Frutas,Frutas frescas y secas,True
1,2,Verduras,Verduras y hortalizas frescas,True
2,3,Carnes,Carnes rojas y blancas,True
3,4,Pescados,Pescados y mariscos,True
4,5,Lácteos,"Leche, yogures y quesos",True


### Tiempo

In [45]:
sql_query = 'SELECT * FROM venta;'
pd_venta = pd.read_sql(sql_query, trans_driver)
pd_venta.head()

dim_fecha = pd_venta["fecha"].unique()
dim_fecha


<DatetimeArray>
['2016-07-10 00:00:00', '2000-03-20 00:00:00', '2001-11-25 00:00:00',
 '2017-08-10 00:00:00', '2005-03-16 00:00:00', '2012-09-05 00:00:00',
 '2012-03-11 00:00:00', '2022-01-15 00:00:00', '2001-06-10 00:00:00',
 '2023-01-17 00:00:00',
 ...
 '2017-11-05 00:00:00', '2011-10-25 00:00:00', '2023-01-10 00:00:00',
 '2000-09-16 00:00:00', '2003-09-22 00:00:00', '2014-07-02 00:00:00',
 '2020-07-15 00:00:00', '2020-04-19 00:00:00', '2012-06-15 00:00:00',
 '2014-09-01 00:00:00']
Length: 476, dtype: datetime64[ns]

## Hechos Detalle_Venta

##

In [65]:
sql_query = 'SELECT * FROM detalle_venta;'
pd_detalle = pd.read_sql(sql_query, trans_driver)
pd_detalle.head()

Unnamed: 0,iddetalle_venta,idventa,idarticulo,cantidad,precio,descuento
0,500,1,41,3,77.87,0.0
1,501,1,30,2,21.84,0.0
2,502,1,312,1,30.03,0.0
3,503,1,307,2,76.15,0.0
4,504,1,326,2,61.4,0.0


#### unir fecha, cliente y usuario

In [87]:
pd_venta_reducido = pd_venta[["idventa","fecha","idcliente","idusuario","impuesto"]]

hechos = pd.merge(pd_detalle,pd_venta_reducido, on="idventa",how="inner")
hechos

Unnamed: 0,iddetalle_venta,idventa,idarticulo,cantidad,precio,descuento,fecha,idcliente,idusuario,impuesto
0,500,1,41,3,77.87,0.0,2016-07-10,50,3,12.0
1,501,1,30,2,21.84,0.0,2016-07-10,50,3,12.0
2,502,1,312,1,30.03,0.0,2016-07-10,50,3,12.0
3,503,1,307,2,76.15,0.0,2016-07-10,50,3,12.0
4,504,1,326,2,61.40,0.0,2016-07-10,50,3,12.0
...,...,...,...,...,...,...,...,...,...,...
12107,249529,499,17,3,49.45,0.0,2014-09-01,20,39,12.0
12108,249530,499,160,3,90.39,0.0,2014-09-01,20,39,12.0
12109,249531,499,265,3,9.22,0.0,2014-09-01,20,39,12.0
12110,249532,499,207,1,79.54,0.0,2014-09-01,20,39,12.0


#### unir categoria y producto

In [88]:
dim_cat_prod = dim_articulo[["idarticulo","idcategoria"]]
hechos = pd.merge(hechos,dim_cat_prod, on="idarticulo",how="inner")
hechos


Unnamed: 0,iddetalle_venta,idventa,idarticulo,cantidad,precio,descuento,fecha,idcliente,idusuario,impuesto,idcategoria
0,500,1,41,3,77.87,0.0,2016-07-10,50,3,12.0,4
1,4009,8,41,4,77.87,0.0,2022-01-15,45,41,12.0,4
2,4502,9,41,5,77.87,0.0,2001-06-10,44,32,12.0,4
3,17008,34,41,5,77.87,0.0,2013-05-18,16,20,12.0,4
4,32007,64,41,4,77.87,0.0,2002-05-03,25,50,12.0,4
...,...,...,...,...,...,...,...,...,...,...,...
12107,192521,385,259,4,30.40,0.0,2017-06-25,29,8,12.0,8
12108,208032,416,259,4,30.40,0.0,2004-01-21,47,7,12.0,8
12109,214021,428,259,5,30.40,0.0,2003-10-24,12,45,12.0,8
12110,214035,428,259,4,30.40,0.0,2003-10-24,12,45,12.0,8


#### Limpiar columnas no necesarios

In [89]:
hechos.drop('idventa', axis=1, inplace=True)
hechos

Unnamed: 0,iddetalle_venta,idarticulo,cantidad,precio,descuento,fecha,idcliente,idusuario,impuesto,idcategoria
0,500,41,3,77.87,0.0,2016-07-10,50,3,12.0,4
1,4009,41,4,77.87,0.0,2022-01-15,45,41,12.0,4
2,4502,41,5,77.87,0.0,2001-06-10,44,32,12.0,4
3,17008,41,5,77.87,0.0,2013-05-18,16,20,12.0,4
4,32007,41,4,77.87,0.0,2002-05-03,25,50,12.0,4
...,...,...,...,...,...,...,...,...,...,...
12107,192521,259,4,30.40,0.0,2017-06-25,29,8,12.0,8
12108,208032,259,4,30.40,0.0,2004-01-21,47,7,12.0,8
12109,214021,259,5,30.40,0.0,2003-10-24,12,45,12.0,8
12110,214035,259,4,30.40,0.0,2003-10-24,12,45,12.0,8


#### Renombrar columnas

In [91]:
hechos_actualizado = hechos.rename(columns={
    'iddetalle_venta': 'fact_venta_id',
    'precio': 'precio_unitario',
    'idarticulo':'articulo_id',
    'idcliente':'cliente_id',
    'idusuario':'usuario_id',
    'idcategoria':'categoria_id',
})
hechos_actualizado

Unnamed: 0,fact_venta_id,articulo_id,cantidad,precio_unitario,descuento,fecha,cliente_id,usuario_id,impuesto,categoria_id
0,500,41,3,77.87,0.0,2016-07-10,50,3,12.0,4
1,4009,41,4,77.87,0.0,2022-01-15,45,41,12.0,4
2,4502,41,5,77.87,0.0,2001-06-10,44,32,12.0,4
3,17008,41,5,77.87,0.0,2013-05-18,16,20,12.0,4
4,32007,41,4,77.87,0.0,2002-05-03,25,50,12.0,4
...,...,...,...,...,...,...,...,...,...,...
12107,192521,259,4,30.40,0.0,2017-06-25,29,8,12.0,8
12108,208032,259,4,30.40,0.0,2004-01-21,47,7,12.0,8
12109,214021,259,5,30.40,0.0,2003-10-24,12,45,12.0,8
12110,214035,259,4,30.40,0.0,2003-10-24,12,45,12.0,8


#### Agregar total

In [93]:
hechos_actualizado['total'] = hechos_actualizado['cantidad'] * hechos_actualizado['precio_unitario']
hechos_actualizado

Unnamed: 0,fact_venta_id,articulo_id,cantidad,precio_unitario,descuento,fecha,cliente_id,usuario_id,impuesto,categoria_id,total
0,500,41,3,77.87,0.0,2016-07-10,50,3,12.0,4,233.61
1,4009,41,4,77.87,0.0,2022-01-15,45,41,12.0,4,311.48
2,4502,41,5,77.87,0.0,2001-06-10,44,32,12.0,4,389.35
3,17008,41,5,77.87,0.0,2013-05-18,16,20,12.0,4,389.35
4,32007,41,4,77.87,0.0,2002-05-03,25,50,12.0,4,311.48
...,...,...,...,...,...,...,...,...,...,...,...
12107,192521,259,4,30.40,0.0,2017-06-25,29,8,12.0,8,121.60
12108,208032,259,4,30.40,0.0,2004-01-21,47,7,12.0,8,121.60
12109,214021,259,5,30.40,0.0,2003-10-24,12,45,12.0,8,152.00
12110,214035,259,4,30.40,0.0,2003-10-24,12,45,12.0,8,121.60


### Carga al DW