# Integrantes:

### Esvin Molina Guevara.  Carnet: 11003135
### Belter Molina Guevara. Carnet: 21002925


## Alcance del Proyecto

El proyecto consiste en la realización un pipeline de ingeniería de datos utilizando Python, SQL, y AWS.   El data set principal consiste en una base de datos relacional con información sobre la cobertura de vacunación contra Covid19 que el Seguro Social de Guatemala ha llevado a cabo durante la pandemia desde el año 2020 al primer trimestre del año 2022.   Además, contamos con dos archivos CSV con información de los municipios y de los departamentos de Guatemala.  El flujo general del pipeline de la ingeniería de datos a implementar será el siguiente: 
1.	Montar la base de datos relacional en AWS RDS
2.	Cargar los archivos CSV en AWS S3
3.	Crear acceso a los recursos de AWS con IAM
4.	Explorar (utilizando Python con Pandas) los datos subidos a AWS, tanto de la base de datos relacional como los archivos CSV
5.	Crear/Armar (utilizando Python con Pandas) un modelo dimensional con los datos relacionales y los archivos CSV
6.	Crear un *Data Warehouse* en RedShift de AWS 
7.	Utilizar el *Data Warehouse* hacer analítica sobre cinco preguntas del negocio 

## Montando la base de datos relacional en AWS RDS

### Creación del servicio AWS RDS

<img src="./images/001.jpg">
<img src="./images/002.jpg">
<img src="./images/003.jpg">
<img src="./images/004.jpg">
<img src="./images/005.jpg">
<img src="./images/006.jpg">
<img src="./images/007.jpg">
<img src="./images/008.jpg">

Habilitamos una regla de entrada para poder conectarnos hacia la base de datos, desde el exterior:

<img src="./images/009.jpg">

### Creación de la base de datos relacional en AWS RDS

Para la creación de la base de datos relacional utilizamos MySQL Workbench para conectarnos hacia nuestro servidor MySQL (en AWS RDS):

<img src="./images/010.jpg">

También desde MySQL ejecutamos los scripts para crear y poblar la base de datos relacional.

El script para crear el esquema y las tablas se encuentra en el archivo *01-Script_Crear_BD_Relacional.sql*, del cual se muestra un fragmento a continuación:

<img src="./images/011.jpg">

El script para poblar las tablas de los catálogos se encuentra en el archivo *02-Script_Poblar_Catalogos_BD_Relacional.sql*, del cual se muestra un fragmento a continuación:

<img src="./images/012.jpg">

Los scripts para poblar la tabla principal de transacciones, la tabla vacunación, se encuentran en los archivos *03-Script_Poblar_Transacciones1_BD_Relacional.sql* y *04-Script_Poblar_Transacciones2_BD_Relacional.sql*, del cual se muestra un fragmento a continuación:

<img src="./images/013.jpg">

El modelo entidad relación de la base de datos transaccional se muestra a continuación:

<img src="./images/014.jpg">

## Cargar los archivos CSV en AWS S3

### Creación del servicio AWS S3

Se crea un *bucket* que contendrá los archivos *csv* de Departamentos y Municipios:

<img src="./images/015.jpg">
<img src="./images/016.jpg">

No utilizaremos las listas de control de acceso para el *bucket*, el acceso lo configuraremos después con el servicio *IAM* de AWS.

<img src="./images/017.jpg">
<img src="./images/018.jpg">
<img src="./images/019.jpg">


### Carga de los archivos *Departamentos.csv* y *Municipios.csv* en AWS S3

Para subir los archivos, lo realizamos dentro del mismo portal de AWS en S3, con la opción de Cargar objetos:

<img src="./images/020.jpg">

Subimos los archivos.   A continuación, se muestran las imágenes del proceso:

<img src="./images/021.jpg">
<img src="./images/022.jpg">


## Creación del servicio IAM para establecer niveles de acceso a recursos en AWS

Esto lo utilizaremos tanto acceder a los archivos en los *bucket* de S3.

### Creamos usuario en IAM, con Full Access para el servicio S3:

<img src="./images/023.jpg">
<img src="./images/024.jpg">
<img src="./images/025.jpg">
<img src="./images/026.jpg">
<img src="./images/027.jpg">

## Exploración de Datos

### Cargamos la librería *load_ext* para interactuar con SQL desde Jupyterlab

In [1]:
%load_ext sql

### Cargamos los parámetros de conexión hacia la base de datos en RDS

In [2]:
import os
import configparser

In [3]:
config = configparser.ConfigParser()
config.read('./config/aws.cfg')

['./config/aws.cfg']

In [4]:
DB_ENDPOINT = config.get('RDS', 'DB_ENDPOINT')
DB = config.get('RDS', 'DB')
DB_USER = config.get('RDS', 'DB_USER')
DB_PASSWORD = config.get('RDS', 'DB_PASSWORD')
DB_PORT = config.get('RDS', 'DB_PORT')

### Realizamos la conexión hacia RDS

In [5]:
mysql_conn = 'mysql+pymysql://{}:{}@{}/{}'.format(DB_USER, DB_PASSWORD, DB_ENDPOINT, DB)

### Cargamos la librería *pandas* para consultar la base de datos

In [6]:
import pandas as pd

### Consultamos tabla *TipoVacuna*

In [17]:
sql_query = 'SELECT * FROM TipoVacuna;'
dfTipoVacuna = pd.read_sql(sql_query, mysql_conn)
dfTipoVacuna.head()

Unnamed: 0,idTipoVacuna,tipovacuna_nombre
0,1,SPUTNIK V
1,2,ASTRAZENECA
2,3,PFIZER-BIONTECH
3,4,MODERNA


### Consultamos tabla *Centro*

In [18]:
sql_query = 'SELECT * FROM Centro;'
dfCentro = pd.read_sql(sql_query, mysql_conn)
dfCentro.head()

Unnamed: 0,idCentro,centro_nombre,idMunicipio,idDepartamento
0,1,"ANEXO DEL IGSS CONSULTORIO GUASTATOYA, EN SANA...",55,5
1,2,CENTRO DE ATENCIÓN INTEGRAL DE SALUD MENTAL,23,7
2,3,CENTRO DE ATENCIÓN MÉDICA INTEGRAL PARA PENSIO...,23,7
3,4,CENTRO DE ATENCIÓN MÉDICA INTEGRAL PARA PENSIO...,23,7
4,5,CENTRO DE ATENCIÓN MÉDICA INTEGRAL PARA PENSIO...,23,7


### Consultamos tabla *Sexo*

In [19]:
sql_query = 'SELECT * FROM Sexo;'
dfSexo = pd.read_sql(sql_query, mysql_conn)
dfSexo.head()

Unnamed: 0,idSexo,sexo_descripcion
0,1,Hombre
1,2,Mujer


### Consultamos tabla *Dosis*

In [20]:
sql_query = 'SELECT * FROM Dosis;'
dfDosis = pd.read_sql(sql_query, mysql_conn)
dfDosis.head()

Unnamed: 0,idDosis,dosis_nombre
0,1,Primera
1,2,Segunda
2,3,Tercera
3,4,Cuarta


### Consultamos tabla *Vacunacion*

In [21]:
sql_query = 'SELECT * FROM Vacunacion;'
dfVacunacion = pd.read_sql(sql_query, mysql_conn)
dfVacunacion.head()

Unnamed: 0,idCentro,idSexo,idTipoVacuna,idDosis,fecha,cantidad_aplicaciones
0,1,1,2,1,2022-03-10,13
1,1,1,2,1,2022-03-11,19
2,1,1,2,1,2022-03-12,5
3,1,1,2,1,2022-03-13,7
4,1,1,2,1,2022-03-14,10


### Realizamos la conexión hacia S3

Hacemos uso de la librería *boto3*, que nos permite acceder a recursos de AWS.

In [22]:
import sys
import boto3

In [23]:
# Leemos los parámetros de conexión hacia el servicio S3
SERVICE_NAME = config.get('S3', 'service_name')
REGION_NAME = config.get('S3', 'region_name')
AWS_ACCESS_KEY_ID = config.get('S3', 'aws_access_key_id')
AWS_SECRET_ACCESS_KEY = config.get('S3', 'aws_secret_access_key')
S3_BUCKET_NAME = 'datavacunacion'
DEPARTAMENTOS_FILE_NAME = 'Departamentos.csv'
MUNICIPIOS_FILE_NAME = 'Municipios.csv'

In [24]:
s3 = boto3.resource(
    service_name = SERVICE_NAME,
    region_name = REGION_NAME,
    aws_access_key_id = AWS_ACCESS_KEY_ID,
    aws_secret_access_key = AWS_SECRET_ACCESS_KEY
)

### Consultamos archivo de *Departamentos*

In [25]:
objDepartamento = s3.Bucket(S3_BUCKET_NAME).Object(DEPARTAMENTOS_FILE_NAME).get()
dfDepartamento = pd.read_csv(objDepartamento['Body']) 
dfDepartamento.head()

Unnamed: 0,idDepartamento,departamento_nombre
0,1,ALTA VERAPAZ
1,2,BAJA VERAPAZ
2,3,CHIMALTENANGO
3,4,CHIQUIMULA
4,5,EL PROGRESO


### Consultamos archivo de *Municipios*

In [26]:
objMunicipio = s3.Bucket(S3_BUCKET_NAME).Object(MUNICIPIOS_FILE_NAME).get()
dfMunicipio = pd.read_csv(objMunicipio['Body']) 
dfMunicipio.head()

Unnamed: 0,idMunicipio,municipio_nombre,idDepartamento
0,1,ACATENANGO,3
1,2,AMATITLÁN,7
2,3,ANTIGUA GUATEMALA,16
3,4,ASUNCIÓN MITA,11
4,5,AYUTLA,17


## Modelo Dimensional

### El modelo dimensional para construir es el siguiente

<img src="./images/028.jpg">

### Creación de la dimensión *Dim_Centro*

In [27]:
dfDept_Munis = dfDepartamento.merge(dfMunicipio, left_on='idDepartamento', right_on='idDepartamento')
dimCentro = dfDept_Munis.merge(dfCentro, left_on=['idMunicipio', 'idDepartamento'], right_on=['idMunicipio', 'idDepartamento'])
dimCentro = dimCentro.loc[:, ['idCentro', 'centro_nombre', 'municipio_nombre', 'departamento_nombre']]
dimCentro.rename(columns={'idCentro': 'centro_key', 'centro_nombre': 'centro_dsc', 'municipio_nombre': 'municipio_dsc', 'departamento_nombre': 'departamento_dsc'}, inplace=True)
dimCentro.head()

Unnamed: 0,centro_key,centro_dsc,municipio_dsc,departamento_dsc
0,68,"UNIDAD INT.ADS.ACRE.DER., CAHABÓN, A.V.",CAHABÓN,ALTA VERAPAZ
1,52,"HOSPITAL IGSS COBÁN, ALTA VERAPAZ",COBÁN,ALTA VERAPAZ
2,77,"UNIDAD INT.ADS.ACRE.DER., FRAY BARTOLOMÉ DE LA...",FRAY BARTOLOMÉ DE LAS CASAS,ALTA VERAPAZ
3,78,"UNIDAD INT.ADS.ACRE.DER., SAN CRISTOBAL, A.V.",SAN CRISTÓBAL VERAPAZ,ALTA VERAPAZ
4,79,"UNIDAD INT.ADS.ACRE.DER., SANTA CATALINA, LA T...",SANTA CATALINA LA TINTA,ALTA VERAPAZ


### Creación de la dimensión *Dim_Tiempo*

In [28]:
sql_query = 'SELECT distinct fecha FROM Vacunacion;'
dimTiempo = pd.read_sql(sql_query, mysql_conn)
dimTiempo['fecha'] = pd.to_datetime(dimTiempo.fecha, format='%Y-%m-%d')
dimTiempo['anio'] = pd.DatetimeIndex(dimTiempo['fecha']).year
dimTiempo['mes'] = pd.DatetimeIndex(dimTiempo['fecha']).month
dimTiempo['trimestre'] = pd.DatetimeIndex(dimTiempo['fecha']).quarter
dimTiempo['dia'] = pd.DatetimeIndex(dimTiempo['fecha']).day
dimTiempo['semana'] = pd.DatetimeIndex(dimTiempo['fecha']).week
dimTiempo['dia_de_la_semana'] = pd.DatetimeIndex(dimTiempo['fecha']).dayofweek
dimTiempo['es_fin_de_semana'] = dimTiempo['dia_de_la_semana'].apply(lambda x: 1 if x > 5 else 0)
dimTiempo['tiempo_key'] = dimTiempo['fecha'].dt.strftime('%Y%m%d')
dimTiempo.head()

  dimTiempo['semana'] = pd.DatetimeIndex(dimTiempo['fecha']).week


Unnamed: 0,fecha,anio,mes,trimestre,dia,semana,dia_de_la_semana,es_fin_de_semana,tiempo_key
0,2022-03-10,2022,3,1,10,10,3,0,20220310
1,2022-03-11,2022,3,1,11,10,4,0,20220311
2,2022-03-12,2022,3,1,12,10,5,0,20220312
3,2022-03-13,2022,3,1,13,10,6,1,20220313
4,2022-03-14,2022,3,1,14,11,0,0,20220314


### Creación de la dimensión *Dim_TipoVacuna*

In [30]:
dimTipoVacuna = dfTipoVacuna.iloc[:, [0,1]]
dimTipoVacuna.rename(columns={'idTipoVacuna': 'tipovacuna_key', 'tipovacuna_nombre': 'tipovacuna_dsc'}, inplace=True)
dimTipoVacuna.head()

Unnamed: 0,tipovacuna_key,tipovacuna_dsc
0,1,SPUTNIK V
1,2,ASTRAZENECA
2,3,PFIZER-BIONTECH
3,4,MODERNA


### Creación de la dimensión *Dim_Sexo*

In [31]:
dimSexo = dfSexo.iloc[:, [0,1]]
dimSexo.rename(columns={'idSexo': 'sexo_key', 'sexo_descripcion': 'sexo_dsc'}, inplace=True)
dimSexo.head()

Unnamed: 0,sexo_key,sexo_dsc
0,1,Hombre
1,2,Mujer


### Creación de la dimensión *Dim_Dosis*

In [32]:
dimDosis = dfDosis.iloc[:, [0,1]]
dimDosis.rename(columns={'idDosis': 'dosis_key', 'dosis_nombre': 'dosis_dsc'}, inplace=True)
dimDosis.head()

Unnamed: 0,dosis_key,dosis_dsc
0,1,Primera
1,2,Segunda
2,3,Tercera
3,4,Cuarta


### Creación de la fact *Fact_Vacunacion*

In [33]:
dfVacunacion['fecha'] = pd.to_datetime(dfVacunacion.fecha, format='%Y-%m-%d')
factVacunacion = dfVacunacion.merge(dimCentro, left_on='idCentro', right_on='centro_key')
factVacunacion = factVacunacion.loc[:, ['centro_key', 'cantidad_aplicaciones', 'idSexo','idTipoVacuna','idDosis','fecha']]

factVacunacion = factVacunacion.merge(dimSexo, left_on='idSexo', right_on='sexo_key')
factVacunacion = factVacunacion.loc[:, ['centro_key', 'sexo_key', 'cantidad_aplicaciones', 'idTipoVacuna','idDosis','fecha']]                                        
                                        
factVacunacion = factVacunacion.merge(dimTipoVacuna, left_on='idTipoVacuna', right_on='tipovacuna_key')
factVacunacion = factVacunacion.loc[:, ['centro_key', 'sexo_key', 'tipovacuna_key', 'cantidad_aplicaciones', 'idDosis','fecha']]                                        
                                        
factVacunacion = factVacunacion.merge(dimDosis, left_on='idDosis', right_on='dosis_key')
factVacunacion = factVacunacion.loc[:, ['centro_key', 'sexo_key', 'tipovacuna_key', 'dosis_key', 'cantidad_aplicaciones', 'fecha']]                                        

factVacunacion = factVacunacion.merge(dimTiempo, left_on='fecha', right_on='fecha')
factVacunacion = factVacunacion.loc[:, ['centro_key', 'sexo_key', 'tipovacuna_key', 'dosis_key', 'tiempo_key', 'cantidad_aplicaciones']]                                        

factVacunacion.head()

Unnamed: 0,centro_key,sexo_key,tipovacuna_key,dosis_key,tiempo_key,cantidad_aplicaciones
0,1,1,2,1,20220310,13
1,3,1,2,1,20220310,16
2,5,1,2,1,20220310,12
3,7,1,2,1,20220310,9
4,8,1,2,1,20220310,1


## Creando el Data Warehouse con RedShift de Amazon

### Creación del servicio RedShift 

<img src="./images/029.jpg">
<img src="./images/030.jpg">
<img src="./images/031.jpg">

También habilitamos el acceso para poder utilizar el servicio desde Python.

Abrimos las configuraciones del *VPC security group* para establecer una *Regla de Entrada* que permita el tráfico con Redshift:

<img src="./images/032.jpg">
<img src="./images/033.jpg">
<img src="./images/034.jpg">
<img src="./images/035.jpg">

En el menú *Actions > Manage cluster > Modify publicly accesible setting* habilitamos el acceso público:

<img src="./images/036.jpg">
<img src="./images/037.jpg">

### Crear el modelo dimensional en RedShift

Para ello utilizamos el editor de Query de Redshift

El script completo se encuentra en el archivo *05-Scripts_Modelo_Dimensional.sql*

<img src="./images/038.jpg">

### Carga de datos al modelo dimensional desde Python

Se instala la librearia *psycopg2* para conectarnos a *postgress* desde Python.  Además utilizamos la librería *sqlalchemy* para mapear los datos desde Python hacia el Data Warehouse.

In [9]:
ENDPOINT = config.get('REDSHIFT', 'HOST')
DB_USER = config.get('REDSHIFT', 'DB_USER')
DB_PASSWORD = config.get('REDSHIFT', 'DB_PASSWORD')
DB_PORT = config.get('REDSHIFT', 'DB_PORT')
DB_NAME = config.get('REDSHIFT', 'DB_NAME')

In [12]:
redshift_conn_string = "postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, ENDPOINT, DB_PORT, DB_NAME)

In [13]:
%sql $redshift_conn_string

'Connected: awsuser@dev'

In [14]:
from sqlalchemy import create_engine

In [15]:
conn = create_engine(redshift_conn_string)

#### Traslado de la dimensión *Dim_Centro*

In [37]:
dimCentro.to_sql('dim_centro', conn, index=False, if_exists='replace') #si ya tiene datos, entonces sobreescribimos

#### Traslado de la dimensión *Dim_Sexo*

In [38]:
dimSexo.to_sql('dim_sexo', conn, index=False, if_exists='replace') #si ya tiene datos, entonces sobreescribimos

#### Traslado de la dimensión *Dim_TipoVacuna*

In [39]:
dimTipoVacuna.to_sql('dim_tipovacuna', conn, index=False, if_exists='replace') #si ya tiene datos, entonces sobreescribimos

#### Traslado de la dimensión *Dim_Dosis*

In [40]:
dimDosis.to_sql('dim_dosis', conn, index=False, if_exists='replace') #si ya tiene datos, entonces sobreescribimos

#### Traslado de la dimensión *Dim_Tiempo*

In [45]:
dimTiempo.to_sql('dim_tiempo', conn, index=False, if_exists='replace') #si ya tiene datos, entonces sobreescribimos

#### Traslado de la fact *Fact_Vacunacion*

In [50]:
factVacunacion.to_sql('fact_vacunacion', conn, index=False, if_exists='replace', method='multi') #si ya tiene datos, entonces sobreescribimosdfTipoVacuna

## Analítica utilizando el *Data Warehouse *

### Top 10 de Centros que más ha vacunado

In [54]:
sql_query = """
select 	c.centro_dsc
		,sum(cantidad_aplicaciones) cantidad
from 	public.fact_vacunacion f
inner join public.dim_centro c on f.centro_key = c.centro_key
group by c.centro_dsc
order by cantidad desc
"""
dfAnalisis1 = pd.read_sql(sql_query, redshift_conn_string)
dfAnalisis1.head(10)

Unnamed: 0,centro_dsc,cantidad
0,CENTRO DE ATENCIÓN MÉDICA INTEGRAL PARA PENSIO...,142827
1,UNIDAD PERIFÉRICA ZONA ONCE,118351
2,HOSPITAL GENERAL DOCTOR JUAN JOSÉ ARÉVALO BERMEJO,75801
3,CENTRO DE ATENCIÓN MÉDICA INTEGRAL PARA PENSIO...,65842
4,CONSULTORIO DEL INSTITUTO EN EL MUNICIPIO DE V...,64052
5,"CONSULTORIO DE ANTIGUA GUATEMALA, SACATEPÉQUEZ",54278
6,"HOSPITAL QUETZALTENANGO, QUETZALTENANGO",50786
7,UNIDAD PERIFÉRICA ZONA CINCO,50386
8,POLICLÍNICA,47585
9,"HOSPITAL DE MAZATENANGO, SUCHITEPÉQUEZ",45438


### Cantidad de dosis aplicadas mensualmente durante el año 2021

In [56]:
sql_query = """
select	t.anio
		,t.mes
        ,sum(cantidad_aplicaciones) cantidad
from 	public.fact_vacunacion f
inner join public.dim_tiempo t on f.tiempo_key = t.tiempo_key
where	t.anio = 2021
group by t.anio, t.mes
order by t.mes
"""
dfAnalisis2 = pd.read_sql(sql_query, redshift_conn_string)
dfAnalisis2

Unnamed: 0,anio,mes,cantidad
0,2021,1,4
1,2021,2,5
2,2021,3,14468
3,2021,4,3379
4,2021,5,32498
5,2021,6,11107
6,2021,7,160685
7,2021,8,264788
8,2021,9,178278
9,2021,10,99674


### Cantidad de Mujeres y Hombres Vacunados

In [57]:
sql_query = """
select	s.sexo_dsc
		,sum(cantidad_aplicaciones) cantidad
from public.fact_vacunacion f
inner join public.dim_sexo s on f.sexo_key = s.sexo_key
group by s.sexo_dsc
"""
dfAnalisis3 = pd.read_sql(sql_query, redshift_conn_string)
dfAnalisis3

Unnamed: 0,sexo_dsc,cantidad
0,Mujer,640864
1,Hombre,736741


### Que tipo de Vacuna es la que más se ha aplicado

In [58]:
sql_query = """
select 	tv.tipovacuna_dsc
		,sum(cantidad_aplicaciones) cantidad
from 	public.fact_vacunacion f
inner join public.dim_tipovacuna tv on f.tipovacuna_key = tv.tipovacuna_key
group by tv.tipovacuna_dsc
order by cantidad desc
"""
dfAnalisis4 = pd.read_sql(sql_query, redshift_conn_string)
dfAnalisis4

Unnamed: 0,tipovacuna_dsc,cantidad
0,MODERNA,755282
1,ASTRAZENECA,383446
2,PFIZER-BIONTECH,131421
3,SPUTNIK V,107456


### La cantidad de vacunas aplicadas por número de dosis

In [59]:
sql_query = """
select 	d.dosis_dsc
		,sum(cantidad_aplicaciones) cantidad
from 	public.fact_vacunacion f
inner join public.dim_dosis d on f.dosis_key = d.dosis_key
group by d.dosis_dsc
order by cantidad desc
"""
dfAnalisis4 = pd.read_sql(sql_query, redshift_conn_string)
dfAnalisis4

Unnamed: 0,dosis_dsc,cantidad
0,Primera,561498
1,Segunda,458058
2,Tercera,358006
3,Cuarta,43
