In [None]:
import pandas as pd
import numpy as np
from faker import Faker
import random
import datetime
import boto3
import psycopg2
import configparser

### Creación del modelo de datos

In [7]:
rdsIdentifier = 'super-db' #nombre de la instancia

#### Cargamos archivo de configuraciones

In [8]:
config = configparser.ConfigParser()
config.read('escec2.cfg')



['escec2.cfg']

### Creamos Instancia de S3 y RDS

In [23]:
s3 = boto3.resource(
    service_name = 's3',
    region_name = 'us-east-1',
    aws_access_key_id = config.get('IAM', 'ACCESS_KEY'),
    aws_secret_access_key = config.get('IAM', 'SECRET_ACCESS_KEY')
)
aws_conn = boto3.client('rds', aws_access_key_id=config.get('IAM', 'ACCESS_KEY'),
                    aws_secret_access_key=config.get('IAM', 'SECRET_ACCESS_KEY'),
                    region_name='us-east-1')

#### Verificamos Instancias de RDS disponibles

In [12]:
rdsInstanceIds = []

response = aws_conn.describe_db_instances()
for resp in response['DBInstances']:
    rdsInstanceIds.append(resp['DBInstanceIdentifier'])
    db_instance_status = resp['DBInstanceStatus']

print(f"DBInstanceIds {rdsInstanceIds}")

DBInstanceIds ['dw-db', 'super-db']


#### Creación de Servicio RDS

In [13]:
try:
    response = aws_conn.create_db_instance(
            AllocatedStorage=10,
            DBName=config.get('RDS_MYSQL', 'DB_NAME'),
            DBInstanceIdentifier=rdsIdentifier,
            DBInstanceClass="db.t3.micro",
            Engine="mysql",
            MasterUsername=config.get('RDS_MYSQL', 'DB_USER'),
            MasterUserPassword=config.get('RDS_MYSQL', 'DB_PASSWORD'),
            BackupRetentionPeriod=0, #para evitar los backups y que nos cobren
            Port=int(config.get('RDS_MYSQL', 'DB_PORT')),
            VpcSecurityGroupIds=[config.get('VPC', 'SECURITY_GROUP')],
            PubliclyAccessible=True
        )
    print(response)
except aws_conn.exceptions.DBInstanceAlreadyExistsFault as ex:
    print("La Instancia de Base de Datos ya Existe.")

La Instancia de Base de Datos ya Existe.


##### Obtenemos URL del Host

In [14]:
try:
     instances = aws_conn.describe_db_instances(DBInstanceIdentifier=rdsIdentifier)
     RDS_HOST = instances.get('DBInstances')[0].get('Endpoint').get('Address')
     print(RDS_HOST)
except Exception as ex:
     print("La instancia de base de datos no existe o aun no se ha terminado de crear.")
     print(ex)

super-db.cio9bwv4hzyt.us-east-1.rds.amazonaws.com


#### Conexión a Base de Datos desde Python

In [15]:
import querie_super

In [16]:
import querie_super
import mysql.connector as mysqlC
try:
    myDw = mysqlC.connect(
    host=RDS_HOST, 
    user=config.get('RDS_MYSQL', 'DB_USER'),
    password=config.get('RDS_MYSQL', 'DB_PASSWORD'),
    database=config.get('RDS_MYSQL', 'DB_NAME')
    )

    mycursor = myDw.cursor()
    mycursor.execute(querie_super.DDL_QUERY_SUPER, multi=True)
    myDw.commit()
    print("Data Warehouse Creado Exitosamente")
except Exception as ex:
    print("ERROR: Error al crear la base de datos.")
    print(ex)

#Aunque nos de un error, verificar en DBeaver si se creó el query y las tablas

ERROR: Error al crear la base de datos.
Commands out of sync; you can't run this command now


#### Driver MYSQL


In [78]:
mysql_driver = f"""mysql+pymysql://{config.get('RDS_MYSQL', 'DB_USER')}:{config.get('RDS_MYSQL', 'DB_PASSWORD')}@{RDS_HOST}:{config.get('RDS_MYSQL', 'DB_PORT')}/{config.get('RDS_MYSQL', 'DB_NAME')}"""  

In [18]:
sql_query = 'SELECT * FROM Ubicacion;'
df_ubicacion = pd.read_sql(sql_query, mysql_driver)
df_ubicacion.head()

Unnamed: 0,ID_Ubicacion,Codigo_postal,Pais,Estado,Ciudad


#### Creamos el bucket y cargamos los datos

In [25]:
# Nombre del bucket
bucket_name = 'datos-super-edgar'

In [7]:
# Creación del bucket
s3.create_bucket(Bucket=bucket_name)

# documentos
file_name = 'dim_date.csv'
file_name2 = 'SuperStoreOutput.csv'

# Nombre del archivo csv en el bucket
s3_file_name = 'dim_date_super.csv'
s3_file_name2 = 'SuperStoreOutput.csv'

# Metiendo los archivos al bucket
s3.upload_file(file_name, bucket_name, s3_file_name)
s3.upload_file(file_name2, bucket_name, s3_file_name2)

#Acabo de cargar los datos de fechas y la base de datos principal



In [24]:
for bucket in s3.buckets.all():
    S3_BUCKET_NAME = bucket.name
    print(bucket.name)

bucket-v-23000966
datos-super-edgar


In [26]:
#extraemos todo lo que está en el bucket
remoteFileList = []
for objt in s3.Bucket(bucket_name).objects.all():
    remoteFileList.append(objt.key)

remoteFileList

['SuperStoreOutput.csv', 'dim_date_super.csv']

#### Leemos archivo del bucket de S3

In [55]:
# Cargamos el data frame de supermercado
file1 = s3.Bucket(bucket_name).Object('SuperStoreOutput.csv').get()
tabla_super= pd.read_csv(file1['Body'])
tabla_super.head() 

Unnamed: 0,Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,Country,City,...,Postal Code,Region,Product ID,Category,Sub-Category,Product Name,Sales,Quantity,Discount,Profit
0,1160,CA-2018-147039,6/29/2018 12:00:00 AM,7/4/2018 12:00:00 AM,Standard Class,AA-10315,Alex Avila,Consumer,United States,Minneapolis,...,55407.0,Central,OFF-AP-10000576,Office Supplies,Appliances,"Belkin 325VA UPS Surge Protector, 6'",362.94,3,0.0,90.74
1,1161,CA-2018-147039,6/29/2018 12:00:00 AM,7/4/2018 12:00:00 AM,Standard Class,AA-10315,Alex Avila,Consumer,United States,Minneapolis,...,55407.0,Central,OFF-BI-10004654,Office Supplies,Binders,Avery Binding System Hidden Tab Executive Styl...,11.54,2,0.0,5.77
2,1300,CA-2016-121391,10/4/2016 12:00:00 AM,10/7/2016 12:00:00 AM,First Class,AA-10315,Alex Avila,Consumer,United States,San Francisco,...,94109.0,West,OFF-ST-10001590,Office Supplies,Storage,Tenex Personal Project File with Scoop Front D...,26.96,2,0.0,7.01
3,2230,CA-2015-128055,3/31/2015 12:00:00 AM,4/5/2015 12:00:00 AM,Standard Class,AA-10315,Alex Avila,Consumer,United States,San Francisco,...,94122.0,West,OFF-BI-10004390,Office Supplies,Binders,GBC DocuBind 200 Manual Binding Machine,673.57,2,0.2,252.59
4,2231,CA-2015-128055,3/31/2015 12:00:00 AM,4/5/2015 12:00:00 AM,Standard Class,AA-10315,Alex Avila,Consumer,United States,San Francisco,...,94122.0,West,OFF-AP-10002765,Office Supplies,Appliances,Fellowes Advanced Computer Series Surge Protec...,52.98,2,0.0,14.83


In [73]:
# Cargamos el data frame de fechas
file2 = s3.Bucket(bucket_name).Object('dim_date_super.csv').get()
fechas= pd.read_csv(file2['Body'])
fechas.head()

Unnamed: 0,date_key,full_date,day_of_week,day_num_in_month,day_num_overall,day_name,day_abbrev,weekday_flag,week_num_in_year,week_num_overall,...,month_name,month_abbrev,quarter,year,yearmo,fiscal_month,fiscal_quarter,fiscal_year,last_day_in_month_flag,same_day_year_ago_date
0,20150101,1/1/2015,4,1,1,Thursday,Thu,Weekday,1,1,...,January,Jan,1,2015,201501,7,3,2015,Not Month End,1/1/2014
1,20150102,1/2/2015,5,2,2,Friday,Fri,Weekday,1,1,...,January,Jan,1,2015,201501,7,3,2015,Not Month End,1/2/2014
2,20150103,1/3/2015,6,3,3,Saturday,Sat,Weekend,1,1,...,January,Jan,1,2015,201501,7,3,2015,Not Month End,1/3/2014
3,20150104,1/4/2015,7,4,4,Sunday,Sun,Weekend,1,1,...,January,Jan,1,2015,201501,7,3,2015,Not Month End,1/4/2014
4,20150105,1/5/2015,1,5,5,Monday,Mon,Weekday,2,2,...,January,Jan,1,2015,201501,7,3,2015,Not Month End,1/5/2014


### Limpiamos las tablas para ser ingresadas a las dimensiones y hechos (Procesamiento)

#### Creamos tabla para cliente e insertamos en la dimensión

In [82]:
tabla_cl = tabla_super.loc[:, ['Customer ID','Customer Name', 'Segment']]

nombres_cl = {'Customer ID': 'Id_cliente', 'Customer Name': 'Nombre', 'Segment': 'Segmento'}
tabla_cl.rename(columns = nombres_cl, inplace = True)

tabla_clientes= tabla_cl.drop_duplicates()
tabla_clientes.head()

Unnamed: 0,Id_cliente,Nombre,Segmento
0,AA-10315,Alex Avila,Consumer
11,AA-10375,Allen Armold,Consumer
26,AA-10480,Andrew Allen,Consumer
38,AA-10645,Anna Andreadi,Consumer
56,AB-10015,Aaron Bergman,Consumer


In [83]:
#insertamos en dimension de cliente
tabla_clientes.to_sql('Cliente', mysql_driver, index=False, if_exists='append')

793

#### Creamos tabla para producto

In [70]:
tabla_pr = tabla_super.loc[:, ['Product ID','Category', 'Sub-Category', 'Product Name']]
tabla_producto= tabla_pr.drop_duplicates()

tabla_producto.head()


Unnamed: 0,Product ID,Category,Sub-Category,Product Name
0,OFF-AP-10000576,Office Supplies,Appliances,"Belkin 325VA UPS Surge Protector, 6'"
1,OFF-BI-10004654,Office Supplies,Binders,Avery Binding System Hidden Tab Executive Styl...
2,OFF-ST-10001590,Office Supplies,Storage,Tenex Personal Project File with Scoop Front D...
3,OFF-BI-10004390,Office Supplies,Binders,GBC DocuBind 200 Manual Binding Machine
4,OFF-AP-10002765,Office Supplies,Appliances,Fellowes Advanced Computer Series Surge Protec...


#### Creamos tabla para Ubicación

In [72]:
tabla_ub = tabla_super.loc[:, ['Postal Code','Country', 'State', 'City']]
tabla_ubicacion= tabla_ub.drop_duplicates()

tabla_ubicacion.head()

Unnamed: 0,Postal Code,Country,State,City
0,55407.0,United States,Minnesota,Minneapolis
2,94109.0,United States,California,San Francisco
3,94122.0,United States,California,San Francisco
5,78664.0,United States,Texas,Round Rock
9,10011.0,United States,New York,New York City


#### Creamos tabla para fechas de orden

In [75]:
tabla_fechas = fechas.loc[:, ['date_key', 'full_date','day_of_week', 'day_num_in_month', 'day_name', 'weekday_flag','month_name','month_abbrev','year']]

tabla_fechas.head()

Unnamed: 0,date_key,full_date,day_of_week,day_num_in_month,day_name,weekday_flag,month_name,month_abbrev,year
0,20150101,1/1/2015,4,1,Thursday,Weekday,January,Jan,2015
1,20150102,1/2/2015,5,2,Friday,Weekday,January,Jan,2015
2,20150103,1/3/2015,6,3,Saturday,Weekend,January,Jan,2015
3,20150104,1/4/2015,7,4,Sunday,Weekend,January,Jan,2015
4,20150105,1/5/2015,1,5,Monday,Weekday,January,Jan,2015
