# Proyecto Final: Ciencia de Datos en Python

##### Nombre: Werner Omar Chanta Bautista  
##### Carne: 23000328
##### Curso: Ciencia de Datos en Python, Seccion V

Scope del proyecto:
El proyecto consiste en la implementación de un pipeline de ingeniería de datos utilizando Python, SQL y AWS. El objetivo del pipeline será procesar datos, transformarlos y cargarlos en un destino final, que esun Data Warehouse (DW). El proyecto contempla las siguientes etapas:

Exploración de la data: se realiza un análisis exploratorio de los datos para determinar la estructura de los mismos y definir el modelo de datos a utilizar, (los datos se crean como en las clases recibidas)

Modelo de datos: se define el modelo de datos a utilizar, un esquema de estrella un tipo de esquema de base de datos relacional que consta de una sola tabla de hechos central rodeada de tablas de dimensiones. (se adjunta la imagen) se implementa un Data Warehouse.

Procesamiento: se desarrollará un conjunto de scripts en Python que permiten extraer, transformar y cargar los datos en el destino final. Para ello, se utilizarán diversas herramientas de AWS, como Amazon RDS para bases de datos.

Analítica: se plantean 5 preguntas de análisis que puedan ser resueltas con la estructura definida. Estas preguntas se utilizarán como base para realizar el análisis de los datos una vez cargados en el destino final.

Descripción de fuentes de información:
Las fuentes de información utilizadas en este proyecto serán diversas, como: bases de datos relacionales, datos en tiempo real y otros servicios web.

In [50]:
import pandas as pd
import numpy as np
from faker import Faker
import random
import datetime
import boto3
import psycopg2
import configparser

# Iniciación de Variables

In [51]:
cantidad_clientes = np.random.randint(500, 1000) #cantidad de clientes a crear
#rdsIdentifier = 'banco-db-v1' #nombre de la instancia
rdsIdentifier = 'proyectof' #nombre de la instancia
fake = Faker() #inicialización para creación de data random

# Cargamos archivo de configuraciones

In [52]:
config = configparser.ConfigParser()
config.read('escec2.cfg')

['escec2.cfg']

# Creamos Instancia de RDS 

In [53]:
aws_conn = boto3.client('rds', aws_access_key_id=config.get('IAM', 'ACCESS_KEY'),
                    aws_secret_access_key=config.get('IAM', 'SECRET_ACCESS_KEY'),
                    region_name='us-east-1')

# Verificamos Instancias de RDS disponibles

In [54]:
rdsInstanceIds = []

response = aws_conn.describe_db_instances()
for resp in response['DBInstances']:
    rdsInstanceIds.append(resp['DBInstanceIdentifier'])
    db_instance_status = resp['DBInstanceStatus']

print(f"DBInstanceIds {rdsInstanceIds}")

DBInstanceIds ['banco-db-v1']


# Creación de Servicio RDS

In [55]:
try:
    response = aws_conn.create_db_instance(
            AllocatedStorage=10,
            DBName=config.get('RDS', 'DB_NAME'),
            DBInstanceIdentifier=rdsIdentifier,
            DBInstanceClass="db.t3.micro",
            Engine="postgres",
            MasterUsername=config.get('RDS', 'DB_USER'),
            MasterUserPassword=config.get('RDS', 'DB_PASSWORD'),
            Port=int(config.get('RDS', 'DB_PORT')),
            VpcSecurityGroupIds=[config.get('VPC', 'SECURITY_GROUP')],
            PubliclyAccessible=True
        )
    print(response)
except aws_conn.exceptions.DBInstanceAlreadyExistsFault as ex:
    print("La Instancia de Base de Datos ya Existe.")

{'DBInstance': {'DBInstanceIdentifier': 'proyectof', 'DBInstanceClass': 'db.t3.micro', 'Engine': 'postgres', 'DBInstanceStatus': 'creating', 'MasterUsername': 'postgres', 'DBName': 'pfinal', 'AllocatedStorage': 10, 'PreferredBackupWindow': '03:46-04:16', 'BackupRetentionPeriod': 1, 'DBSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-074d1584c8c8d3ed2', 'Status': 'active'}], 'DBParameterGroups': [{'DBParameterGroupName': 'default.postgres14', 'ParameterApplyStatus': 'in-sync'}], 'DBSubnetGroup': {'DBSubnetGroupName': 'default', 'DBSubnetGroupDescription': 'default', 'VpcId': 'vpc-092cd1fa1bdedff0c', 'SubnetGroupStatus': 'Complete', 'Subnets': [{'SubnetIdentifier': 'subnet-0a88fa338fb50a66f', 'SubnetAvailabilityZone': {'Name': 'us-east-1a'}, 'SubnetOutpost': {}, 'SubnetStatus': 'Active'}, {'SubnetIdentifier': 'subnet-05ded3f609ce1b10e', 'SubnetAvailabilityZone': {'Name': 'us-east-1c'}, 'SubnetOutpost': {}, 'SubnetStatus': 'Active'}, {'SubnetIdentifier': 'subnet-0ad91

##### Recordemos Esperar unos minutos para consultar la informaicón de la instancia.

# Obtenemos URL del Host

In [57]:
try:
     instances = aws_conn.describe_db_instances(DBInstanceIdentifier=rdsIdentifier)
     RDS_HOST = instances.get('DBInstances')[0].get('Endpoint').get('Address')
     print(RDS_HOST)
except Exception as ex:
     print("La instancia de base de datos no existe o aun no se ha terminado de crear.")
     print(ex)

proyectof.csgdbjf8hjye.us-east-1.rds.amazonaws.com


# Conexión a Base de Datos desde Python

In [90]:
import sql_queriess

try:
    db_conn = psycopg2.connect(
        database=config.get('RDS', 'DB_NAME'), 
        user=config.get('RDS', 'DB_USER'),
        password=config.get('RDS', 'DB_PASSWORD'), 
        host=RDS_HOST,
        port=config.get('RDS', 'DB_PORT')
    )

    cursor = db_conn.cursor()
    cursor.execute(sql_queriess.DDL_QUERY)
    db_conn.commit()
    print("Base de Datos Creada Exitosamente")
except Exception as ex:
    print("ERROR: Error al crear la base de datos.")
    print(ex)

Base de Datos Creada Exitosamente


##### Carga de datos al 

In [112]:
def insertDataToSQL(data_dict, table_name):
     postgres_driver = f"""postgresql://{config.get('RDS', 'DB_USER')}:{config.get('RDS', 'DB_PASSWORD')}@{RDS_HOST}:{config.get('RDS', 'DB_PORT')}/{config.get('RDS', 'DB_NAME')}"""    
     df_data = pd.DataFrame.from_records(data_dict)
     try:
          response = df_data.to_sql(table_name, postgres_driver, index=False, if_exists='append')
          print(f'Se han insertado {response} nuevos registros.' )
     except Exception as ex:
          print(ex)

# <span style="color:yellow">dim_fecha</span>

In [91]:
cantidad_dim_fecha = np.random.randint(1000, 2000)
data_dim_fecha = []

for transac in range(cantidad_dim_fecha):
    nueva_fecha = {
            'id_fecha': transac,
            'fecha':fake.date_between(start_date='-2y', end_date='+1y'),
            'dia_semana':fake.day_of_week(),
            'mes': fake.month_name(),   
        }
    data_dim_fecha.append(nueva_fecha)
   
insertDataToSQL(data_dim_fecha, 'dim_fecha')

Se han insertado 122 nuevos registros.


# <span style="color:yellow">dim_tienda</span>

In [93]:
numero_dim_tienda = np.random.randint(50,200 )
data_dim_tienda = []

for ntienda in range(numero_dim_tienda):
    numero_tienda = {
            'id_tienda': ntienda,
            'nombre_tienda':fake.company(),
            'id_gerente':[fake.random_number(digits=5) for _ in range(5)],
        }
    data_dim_tienda.append(numero_tienda)
   
insertDataToSQL(data_dim_tienda, 'dim_tienda')

Se han insertado 144 nuevos registros.


# <span style="color:yellow">dim_cajeros</span>

In [101]:
numero_dim_cajeros = np.random.randint(50,200 )
data_dim_cajeros = []

for ncajero in range(numero_dim_cajeros):
    numero_cajero = {
            'id_cajero': ncajero,
            'nombre':fake.name(),
            'apellido':fake.last_name(),
            'id_cargo':[fake.random_number(digits=3) for _ in range(5)],
        }
    data_dim_cajeros.append(numero_cajero)
   
insertDataToSQL(data_dim_cajeros, 'dim_cajeros')

Se han insertado 76 nuevos registros.


# <span style="color:yellow">dim_productos</span>

In [103]:
numero_dim_productos = np.random.randint(50,200 )
data_dim_productos = []

for ncajero in range(numero_dim_productos):
    numero_productos = {
            'id_producto': fake.random_number(digits=4),
            'sku_numero':fake.bothify(text='SKU-####'),
            'producto':fake.sentence(nb_words=3),
            'marca':fake.company(),
            'categoria_producto':fake.word(),
        }
    data_dim_productos.append(numero_productos)
   
insertDataToSQL(data_dim_productos, 'dim_productos')

Se han insertado 196 nuevos registros.


# <span style="color:yellow">dim_promocion</span>

In [104]:
numero_dim_promocion = np.random.randint(50,200 )
data_dim_promocion = []

for ncajero in range(numero_dim_promocion):
    numero_promociones = {
            'id_promocion': fake.random_number(digits=2),
            'codigo_promocional':fake.random_number(digits=2),
        }
    data_dim_promocion.append(numero_promociones)
   
insertDataToSQL(data_dim_promocion, 'dim_promocion')

Se han insertado 153 nuevos registros.


# <span style="color:yellow">dim_pagos</span>

In [107]:
fake = Faker()

# Define la lista de palabras
palabras = ['transferencia', 'tarjeta', 'contra entrega']

# Genera el número aleatorio de dimensiones de pagos
numero_dim_pagos = np.random.randint(50, 200)

# Genera la lista de dimensiones de pagos
data_dim_pagos = []
for npago in range(numero_dim_pagos):
    numero_promociones = {
        'id_pago': fake.random_number(digits=3),
        'metodo_pago': ' '.join(fake.words(1, ext_word_list=palabras)),
    }
    data_dim_pagos.append(numero_promociones)

# Inserta los datos en SQL
insertDataToSQL(data_dim_pagos, 'dim_pagos')

Se han insertado 151 nuevos registros.


# <span style="color:yellow">dim_direccionEnvio</span>

In [109]:

numero_dim_direccionEnvio = np.random.randint(50,200 )
data_dim_direccionEnvio = []

for ncajero in range(numero_dim_direccionEnvio):
    numero_direccionE = {  
          'id_direccion': fake.random_number(digits=2),
          'no_casa': str(np.random.randint(1, 99)) + '-' + str(np.random.randint(1, 99)),
          'avenida': np.random.randint(1, 45),
          'calle':fake.street_address(),
          'zona':np.random.randint(1, 21),
        }
    data_dim_direccionEnvio.append(numero_direccionE)
   
insertDataToSQL(data_dim_direccionEnvio, 'dim_direccionEnvio')

Se han insertado 51 nuevos registros.


# <span style="color:yellow">transacciones</span>


In [111]:
cantidad_transacciones = np.random.randint(1000, 2000)
data_transacciones = []

for transac in range(cantidad_transacciones):
    nueva_transaccion = {
            'id_transaccion': fake.random_int(min=1, max=999999),
            'cantidad_vendida': fake.random_int(min=1, max=999),
            'monto': round(random.uniform(1000, 10000), 2),
            'id_fecha': random.sample(data_dim_fecha, 1)[0]['id_fecha'],
            'id_cajero':  random.sample(data_dim_cajeros, 1)[0]['id_cajero'],
            'id_tienda': random.sample(data_dim_tienda, 1)[0]['id_tienda'],
            'id_producto': random.sample(data_dim_productos, 1)[0]['id_producto'],
            'id_promocion': random.sample(data_dim_promocion, 1)[0]['id_promocion'],
            'id_pago': random.sample(data_dim_pagos, 1)[0]['id_pago'],
            'id_direccion' : random.sample(data_dim_direccionEnvio, 1)[0]['id_direccion'],
        }
    
    data_transacciones.append(nueva_transaccion)
   
insertDataToSQL(data_transacciones, 'transacciones')

Se han insertado 937 nuevos registros.
