### PROYECTO FINAL
#### 22000349 – Mario Eduardo Tabarini Andretta
#### mario.tabarini@galileo.edu
#### 09170350 – David Francisco Tejeda Cabrera
#### 09170350@galileo.edu 
#### Ciencia de datos Python
#### Maestría en Business Intelligence y Análisis de Datos
#### Universidad Galileo


#### Scope: el proyecto que se presenta a continuación tiene como objetivo la extracción de la información de 5 archivos .CSV correspondientes a los registros de una tienda de consumo masivo, para estructuralos en una base de datos transaccional y posteriormente transformarlos y cargarlos hacia una base de datos dimensional mediante la cual se facilite el analisis estrategico de los datos.   

#### Exploración: El proyecto consta de 5 tablas las cuales corresponden a productos, proveedores, ordenes, clientes y una tabla de hechos, las cuales se encuentran en cascada. Los 5 archivos se procedieron a subir a una base de datos de S3 en una instancia RDS de Amazon Web Services para su posterior transformación y estructruación en SQL, asi como analisis en Python.

#### Modelo de datos: Se creó un datawarehouse que permitiera el análisis óptimo de la información recopilada. Para ello, se unificaron varias tablas y se creo una tabla de hechos que agilizara las consultas; gracias a esto se logra reducir de 5 tablas relacionales a 3 dimensionales. 

#### Procesamiento: A continuación, se presenta el procedimiento llevado a cabo para la extraccion, transformación y carga de información conforme a requerimientos del proyecto.

In [1]:
import pandas as pd
import numpy as np
import boto3
import psycopg2
import configparser

#### Se carga archivo de escenario para instancia de base de datos RDS.

In [2]:
config = configparser.ConfigParser()
config.read('escecPF.cfg')

['escecPF.cfg']

In [3]:
RDS_HOST = 'proyectofinal.c9wvrkwvngbi.us-east-1.rds.amazonaws.com'
#RDS_HOST=config.get('DW', 'RDS_HOST')
#RDS_HOST
print(RDS_HOST)

proyectofinal.c9wvrkwvngbi.us-east-1.rds.amazonaws.com


In [4]:
postgres_driver = f"""postgresql://{config.get('RDS', 'DB_USER')}:{config.get('RDS', 'DB_PASSWORD')}@{RDS_HOST}:{config.get('RDS', 'DB_PORT')}/{config.get('RDS', 'DB_NAME')}"""  

##### Lectura de Archivo desde S3

In [5]:
s3 = boto3.resource(
    service_name = 's3',
    region_name = 'us-east-1',
    aws_access_key_id = config.get('IAM', 'ACCESS_KEY'),
    aws_secret_access_key = config.get('IAM', 'SECRET_ACCESS_KEY')
)

In [6]:
for bucket in s3.buckets.all():
    S3_BUCKET_NAME = bucket.name
    print(bucket.name)

profdfmt


In [7]:
#extraemos todo lo que está en el bucket
remoteFileList = []
for objt in s3.Bucket(S3_BUCKET_NAME).objects.all():
    remoteFileList.append(objt.key)

remoteFileList

['Customers.csv',
 'OrderItems.csv',
 'Orders.csv',
 'Products.csv',
 'Suppliers.csv']

##### leemos data de S3

In [8]:
import io


#for remoteFile in remoteFileList:
try:
        file = s3.Bucket(S3_BUCKET_NAME).Object('Customers.csv').get()
        data = file['Body'].read()
        #print(remoteFile)
        df_customersS3 = pd.read_csv(io.BytesIO(data), sep=';', encoding='utf-8')
except Exception as ex:
        print("No es un archivo.")
        print(ex)

df_customersS3.head()




Unnamed: 0,customer_id,first_name,last_name,city,country,phone
0,1,Maria,Anders,Berlin,Germany,300074321
1,2,Ana,Trujillo,México D.F.,Mexico,5 5554729
2,3,Antonio,Moreno,México D.F.,Mexico,5 5553932
3,4,Thomas,Hardy,London,UK,171 5557788
4,5,Christina,Berglund,Luleå,Sweden,092112 34 65


In [9]:


try:
        file = s3.Bucket(S3_BUCKET_NAME).Object('OrderItems.csv').get()
        data = file['Body'].read()
        #print(remoteFile)
        df_orderitemsS3 = pd.read_csv(io.BytesIO(data), sep=';', encoding='utf-8')
except Exception as ex:
        print("No es un archivo.")
        print(ex)

df_orderitemsS3.head()



Unnamed: 0,order_item_id,order_id,product_id,unit_price,quantity
0,1,1,11,14.0,12
1,2,1,42,9.8,10
2,3,1,72,34.8,5
3,4,2,14,18.6,9
4,5,2,51,42.4,40


In [10]:

try:
        file = s3.Bucket(S3_BUCKET_NAME).Object('Orders.csv').get()
        data = file['Body'].read()
        #print(remoteFile)
        df_ordersS3 = pd.read_csv(io.BytesIO(data), sep=';', encoding='utf-8')
except Exception as ex:
        print("No es un archivo.")
        print(ex)

df_ordersS3.head()



Unnamed: 0,order_id,order_date,customer_id,total_amount
0,1,Jul 4 2012 12:00:00:000AM,85,440.0
1,2,Jul 5 2012 12:00:00:000AM,79,1863.4
2,3,Jul 8 2012 12:00:00:000AM,34,1813.0
3,4,Jul 8 2012 12:00:00:000AM,84,670.8
4,5,Jul 9 2012 12:00:00:000AM,76,3730.0


In [11]:




try:
        file = s3.Bucket(S3_BUCKET_NAME).Object('Suppliers.csv').get()
        data = file['Body'].read()
        #print(remoteFile)
        df_suppliersS3 = pd.read_csv(io.BytesIO(data), sep=';', encoding='utf-8')
except Exception as ex:
        print("No es un archivo.")
        print(ex)

df_suppliersS3.head()




Unnamed: 0,supplier_id,company_name,contact_name,city,country,phone,fax
0,1,Exotic Liquids,Charlotte Cooper,London,UK,171 5552222,
1,2,New Orleans Cajun Delights,Shelley Burke,New Orleans,USA,100 5554822,
2,3,Grandma Kellys Homestead,Regina Murphy,Ann Arbor,USA,313 5555735,313 5553349
3,4,Tokyo Traders,Yoshi Nagase,Tokyo,Japan,03 35555011,
4,5,Cooperativa de Quesos Las Cabras,Antonio del Valle Saavedra,Oviedo,Spain,98 598 76 54,


In [12]:

try:
        file = s3.Bucket(S3_BUCKET_NAME).Object('Products.csv').get()
        data = file['Body'].read()
        #print(remoteFile)
        df_productsS3 = pd.read_csv(io.BytesIO(data), sep=';', encoding='latin-1')
except Exception as ex:
        print("No es un archivo.")
        print(ex)

df_productsS3.head()

Unnamed: 0,product_id,product_name,supplier_id,unit_price,package,is_discontinued
0,1,Chai,1,18.0,10 boxes x 20 bags,0
1,2,Chang,1,19.0,24 - 12 oz bottles,0
2,3,Aniseed Syrup,1,10.0,12 - 550 ml bottles,0
3,4,Chef Antons Cajun Seasoning,2,22.0,48 - 6 oz jars,0
4,5,Chef Antons Gumbo Mix,2,21.35,36 boxes,1


#### Se carga base de datos relacional con información obtenida de archivos de S3 en AWS

In [13]:
df_customersS3.to_sql('customers', postgres_driver, index=False, if_exists='append', method='multi')
df_suppliersS3.to_sql('suppliers', postgres_driver, index=False, if_exists='append', method='multi')
df_productsS3.to_sql('products', postgres_driver, index=False, if_exists='append', method='multi')
df_ordersS3.to_sql('orders', postgres_driver, index=False, if_exists='append', method='multi')
df_orderitemsS3.to_sql('orderitems', postgres_driver, index=False, if_exists='append', method='multi')

##### Creamos tabla de Suppliers

In [14]:
sql_query = 'SELECT * FROM Suppliers;'
df_suppliers = pd.read_sql(sql_query, postgres_driver)
df_suppliers.head()

Unnamed: 0,supplier_id,company_name,contact_name,city,country,phone,fax
0,1,Exotic Liquids,Charlotte Cooper,London,UK,171 5552222,
1,2,New Orleans Cajun Delights,Shelley Burke,New Orleans,USA,100 5554822,
2,3,Grandma Kellys Homestead,Regina Murphy,Ann Arbor,USA,313 5555735,313 5553349
3,4,Tokyo Traders,Yoshi Nagase,Tokyo,Japan,03 35555011,
4,5,Cooperativa de Quesos Las Cabras,Antonio del Valle Saavedra,Oviedo,Spain,98 598 76 54,


##### Creamos tabla de Productos

In [15]:
sql_query = 'SELECT * FROM Products;'
df_products = pd.read_sql(sql_query, postgres_driver)
df_products.head()

Unnamed: 0,product_id,product_name,supplier_id,unit_price,package,is_discontinued
0,1,Chai,1,18.0,10 boxes x 20 bags,0
1,2,Chang,1,19.0,24 - 12 oz bottles,0
2,3,Aniseed Syrup,1,10.0,12 - 550 ml bottles,0
3,4,Chef Antons Cajun Seasoning,2,22.0,48 - 6 oz jars,0
4,5,Chef Antons Gumbo Mix,2,21.35,36 boxes,1


##### Unificamos tablas de proveedores y productos. 

In [16]:
df_suppliers_products = df_suppliers.merge(df_products, on='supplier_id', how='inner')
df_suppliers_products.head()

Unnamed: 0,supplier_id,company_name,contact_name,city,country,phone,fax,product_id,product_name,unit_price,package,is_discontinued
0,1,Exotic Liquids,Charlotte Cooper,London,UK,171 5552222,,1,Chai,18.0,10 boxes x 20 bags,0
1,1,Exotic Liquids,Charlotte Cooper,London,UK,171 5552222,,2,Chang,19.0,24 - 12 oz bottles,0
2,1,Exotic Liquids,Charlotte Cooper,London,UK,171 5552222,,3,Aniseed Syrup,10.0,12 - 550 ml bottles,0
3,2,New Orleans Cajun Delights,Shelley Burke,New Orleans,USA,100 5554822,,4,Chef Antons Cajun Seasoning,22.0,48 - 6 oz jars,0
4,2,New Orleans Cajun Delights,Shelley Burke,New Orleans,USA,100 5554822,,5,Chef Antons Gumbo Mix,21.35,36 boxes,1


##### Creamos tabla de Clientes.

In [17]:
sql_query = 'SELECT * FROM Customers;'
df_customers = pd.read_sql(sql_query, postgres_driver)
df_customers.head()

Unnamed: 0,customer_id,first_name,last_name,city,country,phone
0,1,Maria,Anders,Berlin,Germany,300074321
1,2,Ana,Trujillo,México D.F.,Mexico,5 5554729
2,3,Antonio,Moreno,México D.F.,Mexico,5 5553932
3,4,Thomas,Hardy,London,UK,171 5557788
4,5,Christina,Berglund,Luleå,Sweden,092112 34 65


##### Creamos tabla de Ordenes

In [18]:
sql_query = 'SELECT * FROM Orders;'
df_orders = pd.read_sql(sql_query, postgres_driver)
df_orders.head()

Unnamed: 0,order_id,order_date,customer_id,total_amount
0,1,Jul 4 2012 12:00:00:000AM,85,440.0
1,2,Jul 5 2012 12:00:00:000AM,79,1863.4
2,3,Jul 8 2012 12:00:00:000AM,34,1813.0
3,4,Jul 8 2012 12:00:00:000AM,84,670.8
4,5,Jul 9 2012 12:00:00:000AM,76,3730.0


##### Unificamos tablas de Ordenes y Clientes

In [19]:
df_customers_orders = df_customers.merge(df_orders, how='inner', on='customer_id')
df_customers_orders.head()

Unnamed: 0,customer_id,first_name,last_name,city,country,phone,order_id,order_date,total_amount
0,1,Maria,Anders,Berlin,Germany,300074321,396,Aug 25 2013 12:00:00:000AM,1086.0
1,1,Maria,Anders,Berlin,Germany,300074321,445,Oct 3 2013 12:00:00:000AM,878.0
2,1,Maria,Anders,Berlin,Germany,300074321,455,Oct 13 2013 12:00:00:000AM,330.0
3,1,Maria,Anders,Berlin,Germany,300074321,588,Jan 15 2014 12:00:00:000AM,851.0
4,1,Maria,Anders,Berlin,Germany,300074321,705,Mar 16 2014 12:00:00:000AM,491.2


### Tabla de Hechos

In [40]:
sql_query = '''SELECT order_item_id, order_id, product_id,
                              unit_price, quantity FROM OrderItems;'''
df_factTable = pd.read_sql(sql_query, postgres_driver)
df_factTable.head()

Unnamed: 0,order_item_id,order_id,product_id,unit_price,quantity
0,1,1,11,14.0,12
1,2,1,42,9.8,10
2,3,1,72,34.8,5
3,4,2,14,18.6,9
4,5,2,51,42.4,40


##### Creación de Instancias en AWS para DW

In [21]:
config = configparser.ConfigParser()
config.read('escecPF.cfg')

['escecPF.cfg']

In [22]:
aws_conn = boto3.client('rds', aws_access_key_id=config.get('IAM', 'ACCESS_KEY'),
                    aws_secret_access_key=config.get('IAM', 'SECRET_ACCESS_KEY'),
                    region_name='us-east-1')
print(aws_conn)

<botocore.client.RDS object at 0x00000269EBCCE8B0>


In [23]:
rdsInstanceIds = []

response = aws_conn.describe_db_instances()
for resp in response['DBInstances']:
    rdsInstanceIds.append(resp['DBInstanceIdentifier'])
    db_instance_status = resp['DBInstanceStatus']

print(f"DBInstanceIds {rdsInstanceIds}")

DBInstanceIds ['dw-db', 'proyectofinal']


In [24]:
rdsIdentifier = 'dw-db'

In [25]:
try:
    response = aws_conn.create_db_instance(
            AllocatedStorage=10,
            DBName=config.get('RDS_MYSQL', 'DB_NAME'),
            DBInstanceIdentifier=rdsIdentifier,
            DBInstanceClass="db.t3.micro",
            Engine="mysql",
            MasterUsername=config.get('RDS_MYSQL', 'DB_USER'),
            MasterUserPassword=config.get('RDS_MYSQL', 'DB_PASSWORD'),
            Port=int(config.get('RDS_MYSQL', 'DB_PORT')),
            VpcSecurityGroupIds=[config.get('VPC', 'SECURITY_GROUP')],
            PubliclyAccessible=True
        )
    print(response)
except aws_conn.exceptions.DBInstanceAlreadyExistsFault as ex:
    print("La Instancia de Base de Datos ya Existe.")

La Instancia de Base de Datos ya Existe.


In [26]:
try:
     instances = aws_conn.describe_db_instances(DBInstanceIdentifier=rdsIdentifier)
     RDS_DW_HOST = instances.get('DBInstances')[0].get('Endpoint').get('Address')
     print(RDS_DW_HOST)
except Exception as ex:
     print("La instancia de base de datos no existe o aun no se ha terminado de crear.")
     print(ex)

dw-db.c9wvrkwvngbi.us-east-1.rds.amazonaws.com


##### Se crea base de datos dimensional mediante archivo create_dw_queryPF

In [69]:


import mysql.connector as mysqlC
import create_dw_queryPF
try:
    myDw = mysqlC.connect(
    host=RDS_DW_HOST, 
    user=config.get('RDS_MYSQL', 'DB_USER'),
    password=config.get('RDS_MYSQL', 'DB_PASSWORD'),
    database=config.get('RDS_MYSQL', 'DB_NAME')
    )
    mycursor = myDw.cursor()
    for i in mycursor.execute(create_dw_queryPF.CREATE_DW, multi= True):
        print(i.statement)
    mycursor.close()
    #mycursor.execute(create_dw_queryPF.CREATE_DW, multi=True)
    myDw.commit()
    print("Data Warehouse Creado Exitosamente")
except Exception as ex:
    print("ERROR: Error al crear la base de datos.")
    print(ex)

truncate table dimorders
truncate table dimproducts
truncate table factorderitems
create table if not exists dimorders(
    order_id int primary key,
    order_date date,
    customer_id int,
    total_amount double,
    first_name varchar(100),
    last_name varchar(100),
    city varchar(100),
    country varchar(100),
    phone varchar(50)
)
create table if not exists dimproducts(
    product_id int primary key,
    product_name varchar(35),
    supplier_id int,
    unit_price double,
    package varchar(100),
    is_discontinued INT,
    company_name varchar(50),
    contact_name varchar (50),
    city varchar(50),
    country varchar(50),
    phone varchar(50),
    fax varchar(50)
)
create table if not exists factorderitems(
    order_item_id INT,
    order_id int,
    product_id int,
    unit_price double,
    quantity int,

    constraint fk_order_id
        foreign key (order_id)
            references dimorders(order_id),
    
    constraint fk_product_id
        foreign key (

##### Insertamos Data

In [70]:
mysql_driver = f"""mysql+pymysql://{config.get('RDS_MYSQL', 'DB_USER')}:{config.get('RDS_MYSQL', 'DB_PASSWORD')}@{RDS_DW_HOST}:{config.get('RDS_MYSQL', 'DB_PORT')}/{config.get('RDS_MYSQL', 'DB_NAME')}"""  

In [71]:
#insertamos ordenes.
df_customers_orders.to_sql('dimorders', mysql_driver, index=False, if_exists='append')

In [72]:
sql_query = 'SELECT * FROM dimorders;'
df_dimorders = pd.read_sql(sql_query, mysql_driver)
df_dimorders.head()

Unnamed: 0,order_id,order_date,customer_id,total_amount,first_name,last_name,city,country,phone
0,1,0000-00-00,85,440.0,Paul,Henriot,Reims,France,26471510
1,2,0000-00-00,79,1863.4,Karin,Josephs,Münster,Germany,251031259
2,3,0000-00-00,34,1813.0,Mario,Pontes,Rio de Janeiro,Brazil,21
3,4,0000-00-00,84,670.8,Mary,Saveley,Lyon,France,78325486
4,5,0000-00-00,76,3730.0,Pascale,Cartrain,Charleroi,Belgium,71


In [73]:
#insertamos productos
df_suppliers_products.to_sql('dimproducts', mysql_driver, index=False, if_exists='append')

In [74]:
sql_query = 'SELECT * FROM dimproducts;'
df_dimproducts = pd.read_sql(sql_query, mysql_driver)
df_dimproducts.head()


Unnamed: 0,product_id,product_name,supplier_id,unit_price,package,is_discontinued,company_name,contact_name,city,country,phone,fax
0,1,Chai,1,18.0,10 boxes x 20 bags,0,Exotic Liquids,Charlotte Cooper,London,UK,171,
1,2,Chang,1,19.0,24 - 12 oz bottles,0,Exotic Liquids,Charlotte Cooper,London,UK,171,
2,3,Aniseed Syrup,1,10.0,12 - 550 ml bottles,0,Exotic Liquids,Charlotte Cooper,London,UK,171,
3,4,Chef Antons Cajun Seasoning,2,22.0,48 - 6 oz jars,0,New Orleans Cajun Delights,Shelley Burke,New Orleans,USA,100,
4,5,Chef Antons Gumbo Mix,2,21.35,36 boxes,1,New Orleans Cajun Delights,Shelley Burke,New Orleans,USA,100,


In [75]:
df_factTable.to_sql('factorderitems', mysql_driver, index=False, if_exists='append', method='multi')

In [76]:
sql_query = 'SELECT * FROM factorderitems;'
df_factoritems = pd.read_sql(sql_query, mysql_driver)
df_factoritems.head()

Unnamed: 0,order_item_id,order_id,product_id,unit_price,quantity
0,1,1,11,14.0,12
1,2,1,42,9.8,10
2,3,1,72,34.8,5
3,4,2,14,18.6,9
4,5,2,51,42.4,40


## Analítica
### 1. ¿Cuál es el producto mayor vendido?


In [92]:
sql_query = 'SELECT product_id, count(*) from factorderitems group by product_id order by count(*) desc ;'
df_ProductoMasVendido = pd.read_sql(sql_query, mysql_driver)
df_ProductoMasVendido.head()

Unnamed: 0,product_id,count(*)
0,59,54
1,24,51
2,31,51
3,60,51
4,56,50


### 2. ¿Cuál es la media de los precios de los productos?

In [95]:
import numpy as np
df_dimproducts.agg(
    Media_PrecioProductos= ('unit_price', np.mean),
)

Unnamed: 0,unit_price
Media_PrecioProductos,28.621282


### 3. ¿Cuál fue el proveedor de mayor variedad de productos en gondola?

In [101]:
sql_query = 'SELECT company_name, count(*) from dimproducts group by company_name order by count(*) desc ;'
df_ProductoMasVendido = pd.read_sql(sql_query, mysql_driver)
df_ProductoMasVendido.head()

Unnamed: 0,company_name,count(*)
0,Pavlova,5
1,Plutzer Lebensmittelgroßmärkte AG,5
2,New Orleans Cajun Delights,4
3,Specialty Biscuits,4
4,Norske Meierier,3


### 4. ¿Cuál fue la facturacion con el valor mas alto?

In [104]:
nuevodffactor=df_factoritems
nuevodffactor['Facturacion'] = ((nuevodffactor['quantity'])*(nuevodffactor['unit_price']))

nuevodffactor.iloc[nuevodffactor['Facturacion'].idxmax()]

order_item_id     1621.0
order_id           618.0
product_id          38.0
unit_price         263.5
quantity            60.0
Facturacion      15810.0
Name: 1620, dtype: float64

La factura con el correlativo 1621 fue la que presento una mayor facturacion siendo esta de $ 15,810.00 

### 5. ¿Cuál fue el pais con menor facturación?

In [105]:
sql_query = 'SELECT sum(total_amount), country from dimorders group by country order by sum(total_amount) ;'
df_Pais_menor_facturacion = pd.read_sql(sql_query, mysql_driver)
df_Pais_menor_facturacion.head()

Unnamed: 0,sum(total_amount),country
0,3531.95,Poland
1,5735.15,Norway
2,8119.1,Argentina
3,12468.65,Portugal
4,16705.15,Italy
