In [5]:
# !pip install -r requiretments_spark.txt

## **DESCRIPCION**
___

Este notebook `busca consolidar todas los procesos de extraccion, analisis, carga y visualizacion` de datos con el proposito de brindar al evaluador una `fuente unificada` de todas las herramientas utilizadas en la prueba. 

El documento se divide en 3 etapas: 

* **Extraccion y carga de datos:**  Se crea la funcion que va a leer los documentos de extension .xlsx para su posterior analisis.

* **Conexion ODBC con SQL server:** Se crea una conexion con un servidor local del tipo SQL SERVER para realizar la evaluacion de las diferentes tablas con el proposito de explorar la exportacion de datos a un entorno externo. 

* **Solucion de la prueba:**        Se resuleven las diferentes consultas sql usando la conexion ODBC y la libreria pandas

## **EXTRACCION Y CARGA DE DATOS**
___

Iniciamos importando las librerias que utilizamos para leer los archivos y analizarlos.

In [1]:
import pandas as pd
import os

# import pandas_profiling

Se define la funcion que leera los archivos del repositorio local.

In [2]:
def read_files(subfolder):
    """Funcion que lee multiples archivos de extension .xlsx.
        Se usa como carpeta de referencia la carpeta que contiene este archivo .ipynb
        
        Parametros: 
        >subfolder: carpeta local que contiene los archivos que se deben leer
        
        Retorno:
        >dfs: Diccionario de archivos de tipo pd.DataFrame creados con los archivos de extension .xlsx encontrados en la carpeta local 
    """

    folder_path = os.path.join(os.getcwd(), subfolder)

    # Se listan archivos en ruta local
    files = os.listdir(folder_path)
    # files = os.listdir(os.getcwd())
    # Se crea una lista con los nombres de los archivos de extension excel
    excel_files = [file for file in files if file.endswith('.xlsx') or file.endswith('.xls')]
    
    dfs = {}

    # Loop que crea un dataframe con cada archivo de excel en la carpeta local
    for excel_file in excel_files:
        file_path = os.path.join(folder_path, excel_file)
        df = pd.read_excel(file_path)

        # Store the DataFrame in the dictionary with the file name as the key
        dfs[excel_file] = df 
    
    print(f'Se creó diccionario de dataframes con los archivos encontrados en la ruta {folder_path}')
    return dfs

Se ejecuta la funcion guardando el diccionario de dataframes en la variable `dfs`

In [3]:
dfs = read_files(subfolder="bases_excel")

Se creó diccionario de dataframes con los archivos encontrados en la ruta c:\Users\USER\Documents\9.PRUEBAS_TECNICAS\Prueba_Itau\prueba\bases_excel


### **Analisis de datos**

Se procede a imprimir en pantalla las primeras 5 filas de cada uno de los archivos encontrados en el repositorio local con extension .xlsx

In [4]:
#Se revisa el contenido de cada tabla
for t in dfs:
    print(t)
    display(dfs[t].head())

Base_Ahorros.xlsx


Unnamed: 0,Fecha_Corte,Nro Cuenta,Tipo_Operacion,Cliente,Saldo_Ahorro,Fecha_Apertura
0,2020-09-30,467002000765,ORD,12702,76514.43,2011-11-22
1,2020-09-30,711302007184,ORD,36404,10082.09,2013-04-05
2,2020-09-30,411202010443,ORD,53153,10962.62,2015-03-10
3,2020-09-30,411202005161,ORD,59433,110017.28,2013-09-28
4,2020-09-30,467002007286,ORD,116692,10143.09,2013-02-27


Base_CDT.xlsx


Unnamed: 0,Fecha_Corte,Nro_Cdt,Tipo_Operacion,Cliente,Monto_CDT,Fecha_Apertura
0,2020-09-30,4571DF21711574,CDT,349042,750000.0,2015-03-27
1,2020-09-30,4571DF2591320,CDT,392356,50000.0,2015-01-19
2,2020-09-30,4571DF2881875,CDT,408816,50000.0,2015-02-11
3,2020-09-30,4571DF2804021,CDT,507602,50000.0,2014-12-01
4,2020-09-30,4571DF2908609,CDT,521905,50000.0,2014-11-25


Base_Cliente.xlsx


Unnamed: 0,Cliente,Estrato,Estado_Civil,Genero
0,1809442,Estrato 2,Union Libre,Masculino
1,2076767,Estrato 2,Soltero,Femenino
2,2075800,Estrato 2,Divorciado,Masculino
3,757961,Estrato 2,Union Libre,Masculino
4,2078570,Estrato 2,Divorciado,Masculino


Base_Creditos.xlsx


Unnamed: 0,Fecha_Corte,Nro_Credito,Tipo_Operacion,Cliente,Saldo_Credito,Dias_En_Mora,Fecha_Desembolso
0,2020-09-30,171130006396,SINCO,36404,7117180.0,40,2013-04-05
1,2020-09-30,141120017373,SINCO,53153,2858052.4,40,2015-03-10
2,2020-09-30,146700010607,SINCO,59315,5172800.0,0,2015-01-14
3,2020-09-30,141120018058,CLIENT1A,59433,5000000.0,0,2015-03-30
4,2020-09-30,140700125119,SINCO,116692,13751100.0,20,2013-02-27


## **Conexion ODBC con SQL server**
___

En esta etapa se crea la conexion al motor de bases de datos `SQL server` utiliando la libreria sqlalchemy.

In [5]:
from sqlalchemy import create_engine

# Detalles del servidor
SERVER   = 'DESKTOP-AQS8U0M'
DATABASE = 'itau'
DRIVER   = 'ODBC Driver 17 for SQL Server'

# Se crea string de conexion con la base de datos
connection_string = f'mssql://@{SERVER}/{DATABASE}?driver={DRIVER}'
engine = create_engine(connection_string)

### Se realiza la carga de los datos a MSQL por medio de la funcion de pandas df.to_sql()

<img src="images/itau_db.png" alt="Se crea la base de datos en entorno local" width="500"/>

In [6]:
for d in dfs:
    nombre_tabla = d.rsplit('.', 1)[0]
    dfs[d].to_sql(nombre_tabla, con=engine, if_exists='replace', index=False, chunksize=100)
    print(f'Se envian datos de tabla "{nombre_tabla}" a la base de datos "{DATABASE}"')

Se envian datos de tabla "Base_Ahorros" a la base de datos "itau"
Se envian datos de tabla "Base_CDT" a la base de datos "itau"
Se envian datos de tabla "Base_Cliente" a la base de datos "itau"
Se envian datos de tabla "Base_Creditos" a la base de datos "itau"


## **SOLUCION DE LA PRUEBA**
___

### **1. Realice las consultas que le permitan obtener la siguiente información**

1.a Número de Clientes que tienen Crédito y a la vez Cuenta de Ahorro

In [7]:
query = 'SELECT COUNT(*) AS Total FROM [dbo].[Base_Ahorros] AS ah INNER JOIN [dbo].[Base_Creditos] AS cr ON ah.Cliente = cr.Cliente'
pd.read_sql_query(query, con=engine)

Unnamed: 0,Total
0,7269


1.b	Número de Clientes que tienen Crédito y al vez CDT

In [8]:
query = 'SELECT COUNT(*) AS Total FROM [dbo].[Base_CDT] AS cd INNER JOIN [dbo].[Base_Creditos] AS cr ON cd.Cliente = cr.Cliente'
pd.read_sql_query(query, con=engine)

Unnamed: 0,Total
0,92


1.c	Número de Clientes que Solo tienen Crédito (Que no tienen Cuenta de Ahorro ni CDT)

In [9]:
query = "SELECT COUNT(*) AS Total 		\
FROM [dbo].[Base_Creditos] AS cr  		\
LEFT JOIN [dbo].[Base_Ahorros] AS ah 	\
ON cr.Cliente = ah.Cliente          	\
LEFT JOIN [dbo].[Base_CDT] AS cd    	\
	ON cr.Cliente = cd.Cliente      	\
WHERE ah.Cliente IS NULL            	\
	AND cd.Cliente IS NULL" 
pd.read_sql_query(query, con=engine)

Unnamed: 0,Total
0,3657


1.d	Numero de Clientes que tienen los 3 productos (Crédito - Ahorro - CDT)

In [10]:
query = "SELECT COUNT(*) AS Total   	\
FROM [dbo].[Base_Creditos] AS cr    	\
INNER JOIN [dbo].[Base_Ahorros] AS ah	\
	ON cr.Cliente = ah.Cliente      	\
INNER JOIN [dbo].[Base_CDT] AS cd   	\
	ON cr.Cliente = cd.Cliente" 
pd.read_sql_query(query, con=engine)

Unnamed: 0,Total
0,83


### **2. Con la tabla “Base_Creditos”, realice las siguientes consultas:**

2.1 Genere un rango en un Campo Nuevo o en una tabla temporal (Insertar Columna en la tabla Prueba_Base_Creditos) de acuerdo a las siguientes rangos

<img src="images/rangos.png" alt="Se crea la base de datos en entorno local" width="300"/>

Por `practicidad`, se aplican los cambios sobre la tabla Base_Creditos `directamente en el motor de BD` como se muestra en la siguiente imagen:

<img src="images/DDL_base_credito.png" alt="Se crea la base de datos en entorno local" width="700"/>


**2.2**	De acuerdo al punto anterior realice una consulta y agrupe por el rango de mora creado, para identificar la cantidad de créditos en cada rango y su saldo sumado (Saldo_Credito).

Esta consulta solo debe agrupar los créditos vigentes desembolsados en el año 2015. 

In [12]:
query = "                                                       \
SELECT Rango_Dias_En_Mora                                       \
		,COUNT(Dias_En_Mora) AS Cantidad_Creditos               \
		,SUM(Saldo_Credito) AS Saldo_Creditos                   \
FROM [dbo].[Base_Creditos]                                      \
WHERE Fecha_Desembolso BETWEEN '2015-01-01' AND '2015-12-31'    \
GROUP BY Rango_Dias_En_Mora                                     \
ORDER BY Rango_Dias_En_Mora "
pd.read_sql_query(query, con=engine)

Unnamed: 0,Rango_Dias_En_Mora,Cantidad_Creditos,Saldo_Creditos
0,1,3479,9789993000.0
1,2,3,4002832.0


### **3. Realice el cruce de las tablas**

Tomando como base la tabla `Prueba_Base_Cliente` se construye la tabla `tabla_final` que posteriormente es usada para construir el reporte PBI.

In [13]:
query = "                                                                                           \
SELECT cl.Cliente                                                                                   \
		,cl.Estrato                                                                                 \
		,cl.Estado_Civil                                                                            \
		,cl.Genero                                                                                  \
		,CASE WHEN cr.Cliente		IS NOT NULL	THEN 1				  ELSE 0 END AS Credito         \
		,CASE WHEN cr.Saldo_Credito IS NOT NULL THEN cr.Saldo_Credito ELSE 0 END AS Saldo_Credito   \
		,CASE WHEN cr.Dias_En_Mora  IS NOT NULL THEN cr.Dias_En_Mora  ELSE 0 END AS Dias_En_Mora    \
		,CASE WHEN ah.Cliente	    IS NOT NULL THEN 1				  ELSE 0 END AS Ahorro          \
		,CASE WHEN ah.Saldo_Ahorro  IS NOT NULL THEN ah.Saldo_Ahorro  ELSE 0 END AS Saldo_Ahorro    \
		,CASE WHEN cd.Cliente	    IS NOT NULL THEN 1				  ELSE 0 END AS CDT             \
		,CASE WHEN cd.Monto_CDT		IS NOT NULL THEN cd.Monto_CDT	  ELSE 0 END AS Monto_CDT       \
FROM [dbo].[Base_Cliente]		AS cl                                                               \
LEFT JOIN [dbo].[Base_Ahorros]	AS ah                                                               \
	ON cl.Cliente = ah.Cliente                                                                      \
LEFT JOIN [dbo].[Base_CDT]		AS cd                                                               \
	ON cl.Cliente = cd.Cliente                                                                      \
LEFT JOIN [dbo].[Base_Creditos]	AS cr                                                               \
	ON cl.Cliente = cr.Cliente "

tabla_pbi = pd.read_sql_query(query, con=engine)

In [14]:
tabla_pbi.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14908 entries, 0 to 14907
Data columns (total 11 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   Cliente        14908 non-null  int64  
 1   Estrato        14908 non-null  object 
 2   Estado_Civil   14908 non-null  object 
 3   Genero         14908 non-null  object 
 4   Credito        14908 non-null  int64  
 5   Saldo_Credito  14908 non-null  float64
 6   Dias_En_Mora   14908 non-null  int64  
 7   Ahorro         14908 non-null  int64  
 8   Saldo_Ahorro   14908 non-null  float64
 9   CDT            14908 non-null  int64  
 10  Monto_CDT      14908 non-null  float64
dtypes: float64(3), int64(5), object(3)
memory usage: 1.3+ MB


### **4.	Exporte la tabla resultado del punto anterior a un archivo de Excel**

Se exporta como archivo de `extension .xlsx` para facilitar la integracion con PBI

In [15]:
# Nombre del nuevo directorio
directory_path = "pbi"

# Funcion que crea el nuevo directorio en caso que no exista
if not os.path.exists(directory_path):
    os.makedirs(directory_path)

tabla_pbi.to_excel("pbi/tabla_final.xlsx", index=False)

### **5. Con base en la tabla final usando Power BI realice un tablero de control en donde se pueda ver de forma rápida la información de los clientes y sus productos inicialmente por ciudad y luego se pueda se pueda profundizar al nivel de oficina.**

Se crea un dashboard breve en el que se presenta la relacion de las diferentes oficinas, segmentadas por ciudad y su relacion con las demas variables

<img src="images/dashboard.png" alt="captura de pantalla de tablero de control" width="1200"/>

### **6. Realice un analisis de clientes y sus productos por cada ciudad y oficina**

##### **Productos:**
1.	Créditos

* La ciudad con mayor número de créditos y mayor saldo de crédito promedio es Bogotá, luego Cali y Medellín.
* En Bogotá la oficina con mayor número de créditos es la 1, luego la 2 y la 3 y mayor saldo de crédito promedio es la 2, luego la 3 y la 1.
* En Cali la oficina con mayor número de créditos y mayor saldo de crédito promedio es la 1, luego la 2 y la 3 respectivamente.
* En Medellín la única oficina con créditos es la 1 

2.	CDT

* La ciudad con mayor número de CDT y mayor monto_cdt promedio es Medellín, luego Bogotá y Cali 
* En Bogotá la oficina con mayor número de CDT es la 1, luego la 2 y la 3 y mayor monto_cdt promedio es la 2, luego la 3 y la 1 
* En Cali la oficina con mayor número de CDT es la 1, luego la 3 y la 2 y mayor monto_cdt promedio es la 1, luego la 3 y la 2
* En Medellín la oficina con mayor número de CDT es la 2, luego la 1 y la 3 y mayor monto_cdt promedio es la 3, luego la 2 y la 1

3.	Ahorros

* La ciudad con mayor número de ahorros es Medellín, luego Bogotá y Cali y mayor ahorros promedio es Medellín, luego Cali y Bogotá 
* En Bogotá la oficina con mayor número de ahorros es la 1, luego la 2 y la 3 y mayor ahorro promedio es la 2, luego la 1 y la 3
* En Cali la oficina con mayor número de ahorros es la 1, luego la 3 y la 2 y mayor ahorro promedio es la 2, luego la 3 y la 1
* En Medellín la oficina con mayor número de ahorros es la 2, luego la 1 y la 3 y mayor ahorro promedio es la 3, luego la 2 y la 1

##### **Clientes:**
 
* La mayoría de los clientes están en unión libre con mayor concentración en Bogotá, luego Medellín y Cali. Los clientes viudos son los de menor concentración, pero están distribuidos de mayo a menor en Bogotá, Cali y Medellín respectivamente.
* Existen mayor concentración de clientes masculinos distribuidos de mayor a menor en Bogotá, Medellín y Cali respectivamente y clientes de género femenino tienen la misma distribución de ciudades.
* Existen mayor concentración de clientes que están al día distribuidos de mayor a menor en Bogotá, Cali y Medellín respectivamente y la ciudad con mayor número de clientes en mora en Cali.
* Existen mayor concentración de clientes que están en estrato 1 y distribuidos de mayor a menor por estrato están en el siguiente orden 1,2,3,6 y 4 respectivamente.


### **7. Prueba Pyspark**

¿Puede presentar en una tabla la estructura que se obtendría con el siguiente código usando pyspark? ¿Y podría darnos un detalle de esta tabla considerando solo la primera fila de la tabla de entrada?

<img src="images/pyspark.png" alt="tabla base para ejecutar codigo pyspark" width="900"/>

Para presentar en una tabla la esctructura que se obtiene ejecutando los comandos pyspark primero debemos recrear la tabla presentada en la imagen. 

Es importante resaltar que `para ejecutar el codigo` aqui descrito se debe hacer el set up de las variables de entorno **SPARK_HOME**, **HADOOP_HOME** y **JAVA_HOME**.

Empezamos por importar las librerias necesarias.

In [37]:
import findspark
findspark.init()

from pyspark.sql            import SparkSession
from pyspark.sql.types      import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions  import expr

Creamos la sesion de spark

In [30]:
spark = SparkSession.builder.appName('itau_test').getOrCreate()

Definimos el schema haciendo una inferencia basica sobre la imagen de la tabla y creamos la tabla bajo el nombre py_df

In [38]:
schema = StructType([
    StructField("Num_uni",      IntegerType(), True),
    StructField("periodo",      IntegerType(), True),
    StructField("Alto_valor",   StringType(), True),
    StructField("Sm_activo",    IntegerType(), True),
    StructField("Sm_pasivo",    IntegerType(), True),
    StructField("margen",       IntegerType(), True),
    StructField("ivc",          IntegerType(), True),
])

# Crea un DataFrame con los datos proporcionados
data = [
    (10000, 202305, "SI", 1000, 1000000, 20, 4),
    (10001, 202305, "NO", 30, 1000000, 10, 2),
]

# Se crea el DataFrame utilizando el esquema definido
py_df = spark.createDataFrame(data, schema=schema)

Se ejecuta el codigo descrito en el ejercicio

In [41]:
# Lista de columnas que no se deben apilar (stack)
exclude_list = ["Num_uni", "periodo", 'Alto_valor']

# Selecciona las columnas relevantes que se deben apilar (stack)
relevant_columns = [c for c in py_df.columns if c not in exclude_list]

# Número de columnas relevantes
n = len(relevant_columns)

# Crea una expresión para apilar las columnas relevantes
cols_to_stack = ", ".join(['\'{c}\', {c}'.format(c=c) for c in relevant_columns])
stack_expression = "stack({}, {}) as (variable, valores)".format(n, cols_to_stack)

# Aplicar la operación de apilado al DataFrame
df_unpivot = py_df.select("Num_uni", "periodo", expr(stack_expression))

Se imprime la tabla resultante

In [40]:
df_unpivot.show()

+-------+-------+---------+-------+
|Num_uni|periodo| variable|valores|
+-------+-------+---------+-------+
|  10000| 202305|Sm_activo|   1000|
|  10000| 202305|Sm_pasivo|1000000|
|  10000| 202305|   margen|     20|
|  10000| 202305|      ivc|      4|
|  10001| 202305|Sm_activo|     30|
|  10001| 202305|Sm_pasivo|1000000|
|  10001| 202305|   margen|     10|
|  10001| 202305|      ivc|      2|
+-------+-------+---------+-------+



### **8. Prueba AWS**

Una de mis principales influencias en el proceso de aprendizaje sobre la nube ha sido el libro [Data Engineering with AWS: Learn how to design and build cloud-based data transformation pipelines using AWS](https://www.packtpub.com/product/data-engineering-with-aws/9781800560413). 

<img src="images/aws_datalake.png" alt="dtl_arqu" width="900"/>

En la imagen del capitulo 2 se plantea de forma abstracta la arquitectura de un datalakehouse en terminos de las diferentes etapas que debe incluir un lago de datos. 

Dado que no contamos con un escenario especifico de transformacion de datos, planteo de manera descriptiva los servicios que podria desplegar en el caso hipotetico en el que se recibe un archivo de extension .csv y debe alistarse para su consumo en el marco de la arquitectura de un datalakehouse.

* **Etapa de ingesta:** `AWS dataSync` para obtencion de datos desde soluciones on premise, `CLI (command line interface)` en conjunto con boto3 para la creacion de buckets y manipulacion programatica.

* **Etapa de almacenamiento**: `CLI`, `AWS S3(buckets bronce, silver, gold)`, `AWS redshift` en caso que se requiera normalizar la informacion en los archivos .csv

* **Etapa de procesamiento:** `AWS Lambda` para crear trigers y procesamiento ligero usando boto3 y python sintaxis, `AWS Glue` para transformaciones sobre API de spark, `AWS DataBrew` o `AWS SageMaker` para analisis de calidad de datos, `AWS EC2` para creacion de instancias de administradas de computacion, `AWS Glue` para la creacion de catalogos.

* **Etapa de consumo:** `AWS Athena` para la consulta de datos en lenguaje SQL, `AWS quickSight` para la creacion de tableros, `AWS Sagemaker` para el consumo de datos en un contexto de machine learning.


