# Proyecto Data Pipelines

## 1. Importar Librerias

In [1]:
import pandas as pd
import numpy as np
import datetime
import boto3
import psycopg2
import configparser
import random
from faker import Faker

## 2. Se carga el archivo de configuracion y se establece el nombre de la instancia de RDS

In [2]:
rdsIdentifier = 'proyecto-db-1'
rdsMySQLIdentifier = 'proyecto-db-2'

In [3]:
config = configparser.ConfigParser()
config.read('escec.cfg')

['escec.cfg']

## 3. Creamos instancia de RDS

In [4]:
aws_conn = boto3.client('rds', aws_access_key_id=config.get('IAM', 'ACCESS_KEY'),
                    aws_secret_access_key=config.get('IAM', 'SECRET_ACCESS_KEY'),
                    region_name='us-east-1')

## 4. Verificamos Instancias de RDS Disponibles

In [5]:
rdsInstanceIds = []

response = aws_conn.describe_db_instances()
for resp in response['DBInstances']:
    rdsInstanceIds.append(resp['DBInstanceIdentifier'])
    db_instance_status = resp['DBInstanceStatus']

print(f"DBInstanceIds {rdsInstanceIds}")

DBInstanceIds ['proyecto-db-1', 'proyecto-db-2']


## 6. Obtenemos URL de HOSTS

In [6]:
try:
     instances = aws_conn.describe_db_instances(DBInstanceIdentifier=rdsIdentifier)
     RDS_HOST = instances.get('DBInstances')[0].get('Endpoint').get('Address')
     instances = aws_conn.describe_db_instances(DBInstanceIdentifier=rdsMySQLIdentifier)
     RDS_HOSTMySQL = instances.get('DBInstances')[0].get('Endpoint').get('Address')
     print(RDS_HOST)
     print(RDS_HOSTMySQL)
except Exception as ex:
     print("La instancia de base de datos no existe o aun no se ha terminado de crear.")
     print(ex)

proyecto-db-1.ckhzkzfbezed.us-east-1.rds.amazonaws.com
proyecto-db-2.ckhzkzfbezed.us-east-1.rds.amazonaws.com


## 7. Creamos URL a Postgres

In [7]:
postgres_driver = f"""postgresql://{config.get('RDS', 'DB_USER')}:{config.get('RDS', 'DB_PASSWORD')}@{RDS_HOST}:{config.get('RDS', 'DB_PORT')}/{config.get('RDS', 'DB_NAME')}"""  

## 8. Comenzamos a manejar las dimensiones

### Obteniendo branch

In [8]:
sql_query = 'SELECT * FROM branch;'
df_branch = pd.read_sql(sql_query, postgres_driver)
df_branch.head()

Unnamed: 0,branch_id,branch
0,1,A
1,2,B
2,3,C


### Obteniendo city

In [9]:
sql_query = 'SELECT * FROM city;'
df_city = pd.read_sql(sql_query, postgres_driver)
df_city.head()

Unnamed: 0,city_id,city_name
0,1,Mandalay
1,2,Yangon
2,3,Napypyitaw


### Obteniendo location

In [10]:
sql_query = 'SELECT * FROM location;'
df_location = pd.read_sql(sql_query, postgres_driver)
df_location.head()



Unnamed: 0,location_id,branch_loc_id,city_loc_id
0,1,3,1
1,2,3,2
2,3,3,3


### Realizando join entre branch y location

In [11]:
df_location = df_location.merge(df_branch, left_on='branch_loc_id',right_on='branch_id', how='inner')
df_location

Unnamed: 0,location_id,branch_loc_id,city_loc_id,branch_id,branch
0,1,3,1,3,C
1,2,3,2,3,C
2,3,3,3,3,C


### Realizando join entre location y city

In [12]:
df_location = df_location.merge(df_city, left_on='city_loc_id',right_on='city_id', how='inner')
df_location

Unnamed: 0,location_id,branch_loc_id,city_loc_id,branch_id,branch,city_id,city_name
0,1,3,1,3,C,1,Mandalay
1,2,3,2,3,C,2,Yangon
2,3,3,3,3,C,3,Napypyitaw


### Limpiando columnas



In [13]:
df_location = df_location.drop(labels=['branch_loc_id','city_loc_id','branch_id','city_id'],axis=1)
df_location

Unnamed: 0,location_id,branch,city_name
0,1,C,Mandalay
1,2,C,Yangon
2,3,C,Napypyitaw


### Obteniendo Customer type

In [17]:
sql_query = 'SELECT * FROM customer_type;'
df_customer_type = pd.read_sql(sql_query, postgres_driver)
df_customer_type.head()


Unnamed: 0,customertype_id,customer_type
0,1,Member
1,2,Normal


### Obteniendo customer_gender

In [18]:
sql_query = 'SELECT * FROM customer_gender;'
df_customer_gender = pd.read_sql(sql_query, postgres_driver)
df_customer_gender.head()

Unnamed: 0,customergender_id,customer_gender
0,1,Female
1,2,Male


### Obteniendo customers

In [19]:
sql_query = 'SELECT * FROM customers;'
df_customers = pd.read_sql(sql_query, postgres_driver)
df_customers.head()

Unnamed: 0,customers_id,customer_name,type_customer_id,gender_customer_id
0,1,Gregory King,2,2
1,2,Caitlin Hess,2,1
2,3,Joseph Taylor,1,2
3,4,Ashley Sanchez,1,1
4,5,Christina Fisher,1,1


### Realizando join entre Customer Type y Customers

In [20]:
df_customers = df_customers.merge(df_customer_type, left_on='type_customer_id',right_on='customertype_id', how='inner')
df_customers

Unnamed: 0,customers_id,customer_name,type_customer_id,gender_customer_id,customertype_id,customer_type
0,1,Gregory King,2,2,2,Normal
1,2,Caitlin Hess,2,1,2,Normal
2,6,Amanda Frost,2,1,2,Normal
3,9,Jillian Hamilton,2,1,2,Normal
4,10,Joseph Becker,2,2,2,Normal
...,...,...,...,...,...,...
960,960,Dennis Bird,1,2,1,Member
961,961,Leslie Powell,1,1,1,Member
962,962,David Martinez,1,2,1,Member
963,963,Amanda Sheppard,1,1,1,Member


### Realizando join entre Customer Gender y Customers

In [21]:
df_customers = df_customers.merge(df_customer_gender, left_on='gender_customer_id',right_on='customergender_id', how='inner')
df_customers

Unnamed: 0,customers_id,customer_name,type_customer_id,gender_customer_id,customertype_id,customer_type,customergender_id,customer_gender
0,1,Gregory King,2,2,2,Normal,2,Male
1,10,Joseph Becker,2,2,2,Normal,2,Male
2,12,Jack Hall,2,2,2,Normal,2,Male
3,13,Jimmy Greer,2,2,2,Normal,2,Male
4,19,Brian Davis,2,2,2,Normal,2,Male
...,...,...,...,...,...,...,...,...
960,950,Terri Cooke MD,1,1,1,Member,1,Female
961,952,Katie Goodman,1,1,1,Member,1,Female
962,954,Julie Heath,1,1,1,Member,1,Female
963,961,Leslie Powell,1,1,1,Member,1,Female


### Limpiando columnas

In [22]:
df_customers =df_customers.drop(labels=['type_customer_id','gender_customer_id','customertype_id','customergender_id'],axis=1)
df_customers

Unnamed: 0,customers_id,customer_name,customer_type,customer_gender
0,1,Gregory King,Normal,Male
1,10,Joseph Becker,Normal,Male
2,12,Jack Hall,Normal,Male
3,13,Jimmy Greer,Normal,Male
4,19,Brian Davis,Normal,Male
...,...,...,...,...
960,950,Terri Cooke MD,Member,Female
961,952,Katie Goodman,Member,Female
962,954,Julie Heath,Member,Female
963,961,Leslie Powell,Member,Female


### Obteniendo Payment Type

In [23]:
sql_query = 'SELECT * FROM payment;'
df_payment = pd.read_sql(sql_query, postgres_driver)
df_payment.head()

Unnamed: 0,payment_id,payment_type
0,1,Ewallet
1,2,Cash
2,3,Credit card


### Obteniendo Product Line

In [24]:
sql_query = 'SELECT * FROM product_line;'
df_product_line = pd.read_sql(sql_query, postgres_driver)
df_product_line.head()

Unnamed: 0,product_line_id,product_line_name
0,1,Electronic accessories
1,2,Fashion accessories
2,3,Health and beauty
3,4,Food and beverages
4,5,Home and lifestyle


### Obteniendo Sale

In [25]:
sql_query = 'SELECT * FROM sales;'
df_sales = pd.read_sql(sql_query, postgres_driver)
df_sales.head()

Unnamed: 0,sale_id,sale_location_id,sale_payment_type_id,sale_product_line_id,sale_costumer_id,sale_date,sale_quantity,sale_unitprice,sale_taxes,sale_total,sale_gross_income
0,1,3,1,3,36,2023-02-27 17:09:06,3,9973.17,37.892,2387.196,238.7196
1,2,3,2,6,519,2023-04-02 17:52:48,4,4198.7,0.6825,57.33,5.733
2,3,2,2,4,591,2023-03-23 08:14:50,15,2434.63,35.147,11071.305,1107.1305
3,4,2,2,2,606,2023-01-04 05:06:32,0,8834.15,49.143,0.0,0.0
4,5,1,2,3,861,2023-03-06 00:22:57,19,299.86,46.7945,18671.0055,1867.10055


## 9. Conexion a MySQL

In [26]:
try:
    response = aws_conn.create_db_instance(
            AllocatedStorage=10,
            DBName=config.get('RDS_MYSQL', 'DB_NAME'),
            DBInstanceIdentifier=rdsMySQLIdentifier,
            DBInstanceClass="db.t3.micro",
            Engine="mysql",
            MasterUsername=config.get('RDS_MYSQL', 'DB_USER'),
            MasterUserPassword=config.get('RDS_MYSQL', 'DB_PASSWORD'),
            Port=int(config.get('RDS_MYSQL', 'DB_PORT')),
            VpcSecurityGroupIds=[config.get('VPC', 'SECURITY_GROUP')],
            PubliclyAccessible=True
        )
    print(response)
except aws_conn.exceptions.DBInstanceAlreadyExistsFault as ex:
    print("La Instancia de Base de Datos ya Existe.")

La Instancia de Base de Datos ya Existe.


In [28]:
import DDL_DOS
import mysql.connector as mysqlC

try:
    myDw = mysqlC.connect(
    host=RDS_HOSTMySQL, 
    user=config.get('RDS_MYSQL', 'DB_USER'),
    password=config.get('RDS_MYSQL', 'DB_PASSWORD'),
    database=config.get('RDS_MYSQL', 'DB_NAME')
    )
    mycursor = myDw.cursor()
    mycursor.execute(DDL_DOS.DDL_T, multi=True)
    myDw.commit()
    print("Data Warehouse Creado Exitosamente")
except Exception as ex:
    print("ERROR: Error al crear la base de datos.")
    print(ex)

Data Warehouse Creado Exitosamente


### 10. Insertando en MySQL

In [29]:
mysql_driver = f"""mysql+pymysql://{config.get('RDS_MYSQL', 'DB_USER')}:{config.get('RDS_MYSQL', 'DB_PASSWORD')}@{RDS_HOSTMySQL}:{config.get('RDS_MYSQL', 'DB_PORT')}/{config.get('RDS_MYSQL', 'DB_NAME')}""" 

In [32]:
df_location.to_sql('dim_location',mysql_driver,index=False,if_exists='append')

3

In [33]:
df_product_line.to_sql('dim_product_line',mysql_driver,index=False,if_exists='append')

6

In [34]:
df_payment.to_sql('dim_payment',mysql_driver,index=False,if_exists='append')

3

In [35]:
df_customers.to_sql('dim_customers',mysql_driver,index=False,if_exists='append')

965

In [36]:
df_sales.to_sql('fact_sales',mysql_driver,index=False,if_exists='append')

1106