# Enunciado

Tarea: ETL

**Introducción**

**Objetivo**

Realizar un proceso ETL básico en Pyspark

**¿Para qué?**

Practicar lo aprendido en el tutorial de ETL

**¿Qué necesita?**

- Modelo multidimensional asociado al proceso de movimientos
- Notebook para trabajar: puede usar la seccion 3 “Espacio para desarrollar la tarea” al final del notebook del tutorial para realizar esta actividad
- Servidor SQL con base de datos relacional “WWImportersTransactional” y base de datos relacional que corresponde a la bodega de WWI de cada estudiante “Estudiante_i”

**Enunciado**

Ahora que sabe cómo realizar un proceso ETL, dado el modelo multidimensional del proceso de negocio de movimientos de inventario realice las siguientes actividades:

**Entregable 1**  

Diseño del ETL: Diseñe el ETL para las dimensiones proveedor, tipoTransaccion, fecha y para la tabla de hechos. A nivel de la tabla de proveedores incluya la tabla categoriasProveedores donde encuentra información de las categorías. El diseño del ETL es un diagrama como lo encuentra en la infografía de proceso ETL y puede observar en la siguiente figura. En el entregable utilice la siguiente <a href='https://miad-modelo-datos.github.io/guias_v1/Tareas/PlantillaDise%C3%B1oETL.xlsx'>plantilla para presentar el diseño de ETL</a> e incluya una descripción general que permita comprender detalles que pueden ser confusos.
Diseño ETL tabla empleados
<img src='DiseñoETL.png'>

**Entregable 2**

Implementación del ETL: implementación del proceso ETL para las dimensiones Proveedor, TipoTransaccion, Fecha y para la tabla de hechos. En el entregable incluya una descripción general que permita comprender el proceso de implementación del ETL.

Note que para este proceso de negocio, las dimensiones Producto y Cliente son iguales a las del hecho Orden, este caso se conoce como dimensiones compartidas. Usted debe concentrarse en las dimensiones Proveedor, TipoTransaccion, Fecha y la tabla de hechos que no existen en la bodega de datos actualmente.

Es importante que mantenga las tablas creadas para que los tutores puedan validar las tablas en el momento de calificar su tarea.

WWImporters le comparte el modelo multidimensional que ha decido utilizar. Este modelo representa los movimientos(transacciones) que se hacen sobre el inventario de WWImporters. En particular, se observa que se tiene información de los tipos de transacciones, realizadas por un proveedor, relacionado con un cliente y un producto específico en una fecha determinada. En el modelo se dejan explícitos los dos tipos de identificadores que se crean a nivel de la bodega. el propio de la Bodega, (con el sufijo DWH) y el que viene de la fuente (con el sufijo T).

Modelo movimientos

<img src='Modelo movimiento.PNG'>

Sobre los resultados del entendimiento de datos, Wide World Importers les comenta lo siguiente:

- Cada fila representa una transacción o movimiento de productos en el inventario
- Los días de pago no pueden ser negativos no tiene sentido para nuestro negocio. Por favor corregir multiplicando los datos negativos por -1.
- Sobre la regla de negocio dada para la actividad de entendimiento de datos “La cantidad máxima de productos movidos es 50 millones por transaccion”, el negocio revisó y encontró que efectivamente gracias a los avances en su operación, ya puede mover más que la cantidad de 50 millones por transacción, por lo cual elimina esa regla de negocio.
- La falta de datos antes del 2014 es un error de extracción de datos. Los nuevos datos incluyen este año.
- Nuestro análisis concluye que la información que se ha duplicado totalmente no es útil. Por favor no tenerlos en cuenta.
- “El formato de fechas manejado es YYYY-MM-DD HH:MM:SS si tienen hora, minutos y segundos. De lo contrario el formato es YYYY-MM-DD”: En cuanto a formatos de fechas estamos de acuerdo con que los estandarizemos y el formato sea el especificado en la regla de negocio.
- Existen proveedores que tienen 2 filas una con un nombre y otra con el mismo nombre mas un “Inc” o “Ltd”. Unimos estos a un solo proveedor dado que se trató de un error de digitación.
- El código postal igual para todos nuestros proveedores es un error que también fue corregido.
- Cantidades negativas significan salidas de productos del inventario
- El negocio indica que las tablas de categoriasProveedores y TiposTransaccion fueron analizadas previamente, por su grupo de consultores.

Los datos revisados por el negocio quedan en las tablas Proveedores y movimientos y estos son los que deben utilizar en el proceso ETL. Por otra parte, en las tablas ProveedoresCopia y movimientosCopia quedan los datos con errores en caso de que deseen revisar/ejecutar el ejercicio que realizó de entendimiento de datos.

**Recuerde**

El diseño es fundamental y en particular el detalle de las transformaciones
Dejar evidencia de las tablas creadas y pobladas en la base de datos que le fue asignada en el curso
Entregar el notebook únicamente con la información solicitada en la tarea

# Configuración e importe de paquetes

Se utilizarán la librería pyspark para acceso a base de datos os para acceso al sistema operativo, datetime, pandas profile, matplotlib, numpy y missingno para el analisis descriptivo

**Librerias**

In [403]:
import os 
from pyspark.sql import functions as f, SparkSession, types as t
from pyspark.sql.functions import when, col, date_format, weekofyear
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import udf, col, length, isnan, when, count, regexp_replace
from datetime import datetime
import pandas as pd

**Drivers y sesion**

In [404]:
#driver
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'
#sesion
#Configuración de la sesión
spark = SparkSession.builder \
    .appName("Mi Aplicación Spark") \
    .config("spark.driver.extraClassPath", path_jar_driver) \
    .getOrCreate()

**Funciones a utilizar**

In [405]:
#Funciones a utilizar
def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

def describir_data(tabla):
    df=obtener_dataframe_de_bd(db_connection_string,tabla,db_user,db_psswd)
    print(f'hay {df.count()} filas y estas poseen {len(df.columns)} variables \n')    
    print('A continuacion un ejemplo de los datos de la tabla \n')
    df.show(5)
    print(f'Tipos de datos por columna \n',df.dtypes,'\n')
    

def guardar_db(db_connection_string, df, tabla, db_user, db_psswd):
    df.select('*').write.format('jdbc') \
      .mode('append') \
      .option('url', db_connection_string) \
      .option('dbtable', tabla) \
      .option('user', db_user) \
      .option('password', db_psswd) \
      .option('driver', 'com.mysql.cj.jdbc.Driver') \
      .save()
    
borrar_duplicados=lambda x: x.dropDuplicates()

**Credenciales**

In [406]:
#Credenciales
db_connection_string = 'jdbc:mysql://157.253.236.116:8080'
# El usuario es su estudiante _i asignado y su contraseña la encontrará en el archivo excel de Coursera 
db_user = 'Estudiante_8_202315'
db_psswd = 'aabb1122'

PATH='./'

**Tablas**

In [407]:
P_WWL='WWImportersTransactional.proveedores'
CP_WWL='WWImportersTransactional.CategoriasProveedores'
TT_WWL='WWImportersTransactional.TiposTransaccion'
M_WWL='WWImportersTransactional.movimientos'
PS_WWL='WWImportersTransactional.Personas'
F_EST='Estudiante_8_202315.Fecha'
P_EST='Estudiante_8_202315.Proveedor'
HM_EST='Estudiante_8_202315.Hecho_Movimiento'
TT_EST='Estudiante_8_202315.TipoTransaccion'

# Extraccion

**Proveedores y CategoriaProveedores**

In [430]:
query=f'(SELECT P.ProveedorID ID_proveedor_T,P.NombreProveedor Nombre,CP.CategoriaProveedor Categoria,PS.NombreCompleto Contacto_principal,P.ReferenciaProveedor Referencia,P.DiasPago Dias_pago,P.CodigoPostal Codigo_postal FROM {P_WWL} P INNER JOIN {CP_WWL} CP ON P.CategoriaProveedorID=CP.CategoriaProveedorID INNER JOIN {PS_WWL} PS ON PersonaContactoPrincipalID=ID_persona) AS dataset'
df_p=obtener_dataframe_de_bd(db_connection_string,query,db_user,db_psswd)
df_p.toPandas()


Unnamed: 0,ID_proveedor_T,Nombre,Categoria,Contacto_principal,Referencia,Dias_pago,Codigo_postal
0,1,A Datum Corporation,productos novedosos,Reio Kabin,AA20384,-14,46077
1,2,"Contoso, Ltd.",productos novedosos,Hanna Mihhailov,B2084020,-7,98253
2,3,Consolidated Messenger,servicios de mensajeria,Kerstin Parn,209340283,-30,94101
3,4,"Fabrikam, Inc.",ropa,Bill Lawson,293092,30,40351
4,5,Graphic Design Institute,productos novedosos,Penny Buck,08803922,14,64847
5,6,Humongous Insurance,servicios de seguros,Madelaine Cartier,082420938,-14,37770
6,7,"Litware, Inc.",embalaje,Elias Myllari,BC0280982,30,95245
7,8,Lucerne Publishing,productos novedosos,Prem Prabhu,JQ082304802,-30,37659
8,9,Nod Publishers,productos novedosos,Marcos Costa,GL08029802,7,27906
9,10,Northwind Electric Cars,juguetes,Eliza Soderberg,ML0300202,30,7860


**TipoTransaccion**

In [440]:
query=f'(SELECT TipoTransaccionID ID_Tipo_transaccion_T,TipoTransaccionNombre Tipo FROM {TT_WWL}) AS dataset'
df_tt=obtener_dataframe_de_bd(db_connection_string,query,db_user,db_psswd)
df_tt.show()

+---------------------+--------------------+
|ID_Tipo_transaccion_T|                Tipo|
+---------------------+--------------------+
|                    2|Customer Credit Note|
|                    3|Customer Payment ...|
|                    4|     Customer Refund|
|                    5|    Supplier Invoice|
|                    6|Supplier Credit Note|
|                    7|Supplier Payment ...|
|                    8|     Supplier Refund|
|                    9|      Stock Transfer|
|                   10|         Stock Issue|
|                   11|       Stock Receipt|
|                   12|Stock Adjustment ...|
|                   13|     Customer Contra|
+---------------------+--------------------+



**Fecha**

In [410]:
df_f=df_m.select('FechaTransaccion').show()

+----------------+
|FechaTransaccion|
+----------------+
|      2014-01-20|
|      2014-01-28|
|      2014-01-28|
|      2014-01-28|
|      2014-01-28|
|      2014-02-01|
|      2014-02-01|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
|      2014-03-25|
+----------------+
only showing top 20 rows



**Hecho_Movimiento**

In [411]:
query=f'(SELECT ProductoID,ProveedorID,ClienteID,TipoTransaccionID,FechaTransaccion,Cantidad FROM {M_WWL}) AS dataset'
df_m=obtener_dataframe_de_bd(db_connection_string,query,db_user,db_psswd)
df_m.show()

+----------+-----------+---------+-----------------+----------------+--------+
|ProductoID|ProveedorID|ClienteID|TipoTransaccionID|FechaTransaccion|Cantidad|
+----------+-----------+---------+-----------------+----------------+--------+
|       108|           |    185.0|               10|     Jan 20,2014|   -10.0|
|       162|           |    176.0|               10|     Jan 28,2014|   -10.0|
|       216|           |    474.0|               10|     Jan 28,2014|   -10.0|
|        22|           |    901.0|               10|     Jan 28,2014|   -10.0|
|        25|           |    926.0|               10|     Jan 28,2014|   -10.0|
|        14|           |    444.0|               10|     Feb 01,2014|   -10.0|
|        75|           |    168.0|               10|     Feb 01,2014|   -10.0|
|        20|           |    802.0|               10|     Mar 25,2014|   -10.0|
|        65|           |    975.0|               10|     Mar 25,2014|   -10.0|
|       130|           |    487.0|               10|

# Validacion de reglas de negocio

## Dias de pago

Los días de pago no pueden ser negativos no tiene sentido para nuestro negocio. Por favor corregir multiplicando los datos negativos por -1.

In [431]:
df_p_negativo=df_p.filter(df_p.Dias_pago<=0)
print(f'se encuentran {df_p_negativo.count()} valores negativos')
df_p_negativo.show()

se encuentran 6 valores negativos
+--------------+--------------------+--------------------+------------------+-----------+---------+-------------+
|ID_proveedor_T|              Nombre|           Categoria|Contacto_principal| Referencia|Dias_pago|Codigo_postal|
+--------------+--------------------+--------------------+------------------+-----------+---------+-------------+
|             1| A Datum Corporation| productos novedosos|        Reio Kabin|    AA20384|      -14|        46077|
|             2|       Contoso, Ltd.| productos novedosos|   Hanna Mihhailov|   B2084020|       -7|        98253|
|             3|Consolidated Mess...|servicios de mens...|      Kerstin Parn|  209340283|      -30|        94101|
|             6| Humongous Insurance|servicios de seguros|Madelaine  Cartier|  082420938|      -14|        37770|
|             8|  Lucerne Publishing| productos novedosos|       Prem Prabhu|JQ082304802|      -30|        37659|
|            11|       Trey Research|servicios de mark

Efectivamente se encuentran algunos valores negativos, por lo que se procede a cambiar los valores del df para que coincidan

In [432]:
df_p = df_p.withColumn('Dias_pago', when(col('Dias_pago') < 0, col('Dias_pago') / -1).otherwise(col('Dias_pago')))

validacion_df_p_negativo=df_p.filter(df_p.Dias_pago<=0)
print(f'se encuentran {validacion_df_p_negativo.count()} valores negativos')
validacion_df_p_negativo.show()

se encuentran 0 valores negativos
+--------------+------+---------+------------------+----------+---------+-------------+
|ID_proveedor_T|Nombre|Categoria|Contacto_principal|Referencia|Dias_pago|Codigo_postal|
+--------------+------+---------+------------------+----------+---------+-------------+
+--------------+------+---------+------------------+----------+---------+-------------+



## Proveedores con categorias nulas

Se valida que no existan valores nulos o vacios en la categoríaProveedorID, anteriormente se valido solo en la tabla proveedores, luego se hizo un join de informacion

In [433]:
query=f'(SELECT DISTINCT P.CategoriaProveedorID CPID, CP.CategoriaProveedor Categoria_Proveedor FROM {P_WWL} P INNER JOIN {CP_WWL} CP ON P.CategoriaProveedorID=CP.CategoriaProveedorID) AS dataset'
df=obtener_dataframe_de_bd(db_connection_string,query,db_user,db_psswd)
df.toPandas()

Unnamed: 0,CPID,Categoria_Proveedor
0,4,ropa
1,2,productos novedosos
2,5,embalaje
3,3,juguetes
4,7,servicios financieros
5,9,servicios de seguros
6,8,servicios de marketing
7,6,servicios de mensajeria


## Datos antes del 2014

Se validan que no existan datos antes del 2014

In [415]:
df_m.agg({'FechaTransaccion': "min"}).show()

+---------------------+
|min(FechaTransaccion)|
+---------------------+
| 2013-01-01 12:00:...|
+---------------------+



Aca se debe entender si los datos deben estar hasta el 2014 o se permiten otros datos anteriores

## Duplicados totales se deben eliminar

Se validan duplicados totales por tabla

In [418]:
print(f'Total datos en Proveedores {df_p.count()}, Vs total datos unicos {df_p.distinct().count()}')
print(f'Total datos en TipoTransaccion {df_tt.count()}, Vs total datos unicos {df_tt.distinct().count()}')
print(f'Total datos en Movimiento {df_m.count()}, Vs total datos unicos {df_m.distinct().count()}')

Total datos en Proveedores 13, Vs total datos unicos 13
Total datos en TipoTransaccion 12, Vs total datos unicos 12
Total datos en Movimiento 267300, Vs total datos unicos 236656


Se identifican datos duplicados en la tabla Movimientos, luego de proceder a depurar, siguen consistentes, por lo que se recomienda analizar la variable df_duplicados que lo contiene.

In [419]:
borrar_duplicados(df_m)
print(f'Total datos en Movimiento {df_m.count()}, Vs total datos unicos {df_m.distinct().count()}')

Total datos en Movimiento 267300, Vs total datos unicos 236656


In [420]:
df_duplicados = df_m.exceptAll(df_m.distinct())
df_duplicados.orderBy("FechaTransaccion").show()

+----------+-----------+---------+-----------------+--------------------+--------+
|ProductoID|ProveedorID|ClienteID|TipoTransaccionID|    FechaTransaccion|Cantidad|
+----------+-----------+---------+-----------------+--------------------+--------+
|       153|           |    988.0|               10|2013-01-19 12:00:...|   -10.0|
|       138|           |    910.0|               10|2013-02-01 12:00:...|   -12.0|
|       102|           |    850.0|               10|2013-02-14 12:00:...|    -2.0|
|       120|           |    535.0|               10|2013-06-04 12:00:...|    -1.0|
|         6|           |    444.0|               10|2013-06-24 12:00:...|    -1.0|
|       199|           |    901.0|               10|2013-12-31 12:00:...|   -96.0|
|        85|           |    976.0|               10|2013-12-31 12:00:...|   -12.0|
|       192|           |    569.0|               10|2013-12-31 12:00:...|  -144.0|
|       168|           |    139.0|               10|2013-12-31 12:00:...|   -70.0|
|   

## Validacion de fechas

**Hecho_Movimientos**

Se identifican 3 formatos de fecha tal como se identifica en las reglas de negocio, se cambian todas aquellas en formato ingles a formato YYYY-MM-DD como se pide, asi mismo aquellas en formato datetime se pasa a YYYY-MM-DD para estandarizar la unidad

In [421]:
# regexYMDHMS="([0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]) (2[0-3]|[0-1][0-9]):[0-5][0-9]:[0-5][0-9])"
regexYMD="([0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]))"
cumplenFormato = df_m.filter(df_m["FechaTransaccion"].rlike(regexYMD))
noCumplenFormato = df_m.filter(~df_m["FechaTransaccion"].rlike(regexYMD))
print(noCumplenFormato.count(), cumplenFormato.count())
print(noCumplenFormato.show(5))
noCumplenFormato = noCumplenFormato.withColumn('FechaTransaccion', f.udf(lambda d: datetime.strptime(d, '%b %d,%Y').strftime('%Y-%m-%d'), t.StringType())(f.col('FechaTransaccion')))
df_m = noCumplenFormato.union(cumplenFormato)
noCumplenFormato.count(), ordenes.count()

64254 203046
+----------+-----------+---------+-----------------+----------------+--------+
|ProductoID|ProveedorID|ClienteID|TipoTransaccionID|FechaTransaccion|Cantidad|
+----------+-----------+---------+-----------------+----------------+--------+
|       108|           |    185.0|               10|     Jan 20,2014|   -10.0|
|       162|           |    176.0|               10|     Jan 28,2014|   -10.0|
|       216|           |    474.0|               10|     Jan 28,2014|   -10.0|
|        22|           |    901.0|               10|     Jan 28,2014|   -10.0|
|        25|           |    926.0|               10|     Jan 28,2014|   -10.0|
+----------+-----------+---------+-----------------+----------------+--------+
only showing top 5 rows

None


(64254, 267300)

## Proveedores con nombres de 2 filas

Primero se borraran todos los nombres que contengan simbolos como "." y ",".
Luego se buscaran todos los items que posean Inc y Ltd y se borraran tambien
Despues se verificarán repetidos y se borraran, reemplazando la llave primaria en las foraneas viejas

In [434]:
proveedores_con_coma=df_p.filter(
    df_p["Nombre"].contains('Inc') | 
    df_p["Nombre"].contains('Ltd') |
    df_p["Nombre"].contains('.')   |
    df_p["Nombre"].contains(',') 
)
p_Fabrikam=df_p.filter(df_p['Nombre'].contains('Fabrikam'))
p_Litware=df_p.filter(df_p['Nombre'].contains('Litware'))
p_Contoso=df_p.filter(df_p['Nombre'].contains('Contoso'))

#Muestra el DataFrame resultante.
proveedores_con_coma.show()
p_Fabrikam.show()
p_Litware.show()
p_Contoso.show()

+--------------+--------------+-------------------+------------------+----------+---------+-------------+
|ID_proveedor_T|        Nombre|          Categoria|Contacto_principal|Referencia|Dias_pago|Codigo_postal|
+--------------+--------------+-------------------+------------------+----------+---------+-------------+
|             2| Contoso, Ltd.|productos novedosos|   Hanna Mihhailov|  B2084020|      7.0|        98253|
|             4|Fabrikam, Inc.|               ropa|       Bill Lawson|    293092|     30.0|        40351|
|             7| Litware, Inc.|           embalaje|     Elias Myllari| BC0280982|     30.0|        95245|
+--------------+--------------+-------------------+------------------+----------+---------+-------------+

+--------------+--------------+---------+------------------+----------+---------+-------------+
|ID_proveedor_T|        Nombre|Categoria|Contacto_principal|Referencia|Dias_pago|Codigo_postal|
+--------------+--------------+---------+------------------+-----

Se identificaron 3 nombres con Inc y Ltd que solo poseen un record por lo que se proceden a reemplazar para cumplir la regla del negocio

In [423]:
df_p = df_p.withColumn('Nombre', regexp_replace('Nombre', ', Inc.', ''))
df_p = df_p.withColumn('Nombre', regexp_replace('Nombre', ', Ltd.', ''))
df_p.filter(
    df_p["Nombre"].contains('Inc') | 
    df_p["Nombre"].contains('Ltd') |
    df_p["Nombre"].contains('.')   |
    df_p["Nombre"].contains(',') 
).show()

+--------------+------+---------+------------------+----------+---------+-------------+
|ID_proveedor_T|Nombre|Categoria|Contacto_principal|Referencia|Dias_pago|Codigo_postal|
+--------------+------+---------+------------------+----------+---------+-------------+
+--------------+------+---------+------------------+----------+---------+-------------+



## Codigo postal igual para todos los proveedores

Se identifican dos proveedores con mismo codigo postal, se sugiere preguntar al equipo de expertos si en efecto este valor es correcto.

In [424]:
duplicados = df_p \
    .select(['Nombre', 'Codigo_postal']) \
    .groupby(['Nombre', 'Codigo_postal']) \
    .count() \
    .where('count > 1') \
    .sort('count', ascending=False) \
    .show()

p_duplicados=df_p.select(['Nombre'])
df_p.filter(df_p['Codigo_postal']==94101).toPandas()

+------+-------------+-----+
|Nombre|Codigo_postal|count|
+------+-------------+-----+
|      |        94101|    2|
+------+-------------+-----+



Unnamed: 0,ID_proveedor_T,Nombre,Categoria,Contacto_principal,Referencia,Dias_pago,Codigo_postal
0,3,,servicios de mensajeria,Kerstin Parn,209340283,30.0,94101
1,13,,servicios financieros,Hubert Helms,28034202,7.0,94101


## Cantidades negativas significan salidas de productos del inventario

Se valida si hay cantidades negativas de proveedores o positivas de clientes, efectivamente la regla se cumple

In [425]:
proveedores_negativos = df_m.filter((col('Cantidad') <= 0) & (col('ProveedorID') >= 0))
clientes_negativos = df_m.filter((col('Cantidad') >= 0) & (col('ClienteID') > 0))
proveedores_negativos.show()
clientes_negativos.show()

+----------+-----------+---------+-----------------+----------------+--------+
|ProductoID|ProveedorID|ClienteID|TipoTransaccionID|FechaTransaccion|Cantidad|
+----------+-----------+---------+-----------------+----------------+--------+
+----------+-----------+---------+-----------------+----------------+--------+

+----------+-----------+---------+-----------------+----------------+--------+
|ProductoID|ProveedorID|ClienteID|TipoTransaccionID|FechaTransaccion|Cantidad|
+----------+-----------+---------+-----------------+----------------+--------+
+----------+-----------+---------+-----------------+----------------+--------+



## Creacion de valores para Tabla Fecha

In [426]:
fechas_unicas=df_m.select('FechaTransaccion')
fechas_unicas = fechas_unicas.withColumnRenamed("FechaTransaccion", "Fecha")
fechas_unicas = fechas_unicas.withColumn('ID_Fecha', date_format('Fecha', 'yyyyMMdd'))
fechas_unicas = fechas_unicas.drop('Fecha')
borrar_duplicados(fechas_unicas)
fechas_unicas = fechas_unicas.withColumn("Fecha", f.to_date("ID_Fecha", "yyyyMMdd"))
fechas_unicas = fechas_unicas.withColumn('Dia', date_format('Fecha', 'dd'))
fechas_unicas = fechas_unicas.withColumn('mes', date_format('Fecha', 'MM'))
fechas_unicas = fechas_unicas.withColumn('Anio', date_format('Fecha', 'yyyy'))
fechas_unicas = fechas_unicas.withColumn('Numero_semana_ISO', weekofyear('Fecha'))
fechas_unicas.show()

+--------+----------+---+---+----+-----------------+
|ID_Fecha|     Fecha|Dia|mes|Anio|Numero_semana_ISO|
+--------+----------+---+---+----+-----------------+
|20140120|2014-01-20| 20| 01|2014|                4|
|20140128|2014-01-28| 28| 01|2014|                5|
|20140128|2014-01-28| 28| 01|2014|                5|
|20140128|2014-01-28| 28| 01|2014|                5|
|20140128|2014-01-28| 28| 01|2014|                5|
|20140201|2014-02-01| 01| 02|2014|                5|
|20140201|2014-02-01| 01| 02|2014|                5|
|20140325|2014-03-25| 25| 03|2014|               13|
|20140325|2014-03-25| 25| 03|2014|               13|
|20140325|2014-03-25| 25| 03|2014|               13|
|20140325|2014-03-25| 25| 03|2014|               13|
|20140325|2014-03-25| 25| 03|2014|               13|
|20140325|2014-03-25| 25| 03|2014|               13|
|20140325|2014-03-25| 25| 03|2014|               13|
|20140325|2014-03-25| 25| 03|2014|               13|
|20140325|2014-03-25| 25| 03|2014|            

# Carga

## Generando y cargando tabla Fecha

Se diseña base de datos

In [445]:
df_f = fechas_unicas.distinct()

Se carga BDD

In [446]:
guardar_db(db_connection_string,df_f,F_EST, db_user, db_psswd)


## Generando y cargando tabla Proveedores

Se diseña base de datos

In [447]:
df_p.show()

+--------------+--------------------+--------------------+------------------+-----------+---------+-------------+
|ID_proveedor_T|              Nombre|           Categoria|Contacto_principal| Referencia|Dias_pago|Codigo_postal|
+--------------+--------------------+--------------------+------------------+-----------+---------+-------------+
|             1| A Datum Corporation| productos novedosos|        Reio Kabin|    AA20384|     14.0|        46077|
|             2|       Contoso, Ltd.| productos novedosos|   Hanna Mihhailov|   B2084020|      7.0|        98253|
|             3|Consolidated Mess...|servicios de mens...|      Kerstin Parn|  209340283|     30.0|        94101|
|             4|      Fabrikam, Inc.|                ropa|       Bill Lawson|     293092|     30.0|        40351|
|             5|Graphic Design In...| productos novedosos|        Penny Buck|   08803922|     14.0|        64847|
|             6| Humongous Insurance|servicios de seguros|Madelaine  Cartier|  082420938

Se carga BDD

In [448]:
guardar_db(db_connection_string,df_p,P_EST, db_user, db_psswd)


## Generando y cargando tabla TipoTransaccion

Se diseña base de datos

In [449]:
df_tt.show()

+---------------------+--------------------+
|ID_Tipo_transaccion_T|                Tipo|
+---------------------+--------------------+
|                    2|Customer Credit Note|
|                    3|Customer Payment ...|
|                    4|     Customer Refund|
|                    5|    Supplier Invoice|
|                    6|Supplier Credit Note|
|                    7|Supplier Payment ...|
|                    8|     Supplier Refund|
|                    9|      Stock Transfer|
|                   10|         Stock Issue|
|                   11|       Stock Receipt|
|                   12|Stock Adjustment ...|
|                   13|     Customer Contra|
+---------------------+--------------------+



Se carga BDD

In [450]:
guardar_db(db_connection_string,df_tt,TT_EST, db_user, db_psswd)


## Generando y cargando tabla Hecho_Movimiento

Se diseña base de datos

In [454]:
df_m = df_m.withColumnRenamed("ID_Producto_DWH", "ID_Producto_T")
df_m = df_m.withColumnRenamed("ID_Proveedor_DWH", "ID_Proveedor_T")
df_m = df_m.withColumnRenamed("ID_Cliente_DWH", "ID_Cliente_T")
df_m = df_m.withColumnRenamed("ID_Tipo_Transaccion_DWH", "ID_Tipo_Transaccion_T")

df_m.show()

+-------------+--------------+------------+---------------------+----------------+--------+
|ID_Producto_T|ID_Proveedor_T|ID_Cliente_T|ID_Tipo_Transaccion_T|FechaTransaccion|Cantidad|
+-------------+--------------+------------+---------------------+----------------+--------+
|          108|              |       185.0|                   10|      2014-01-20|   -10.0|
|          162|              |       176.0|                   10|      2014-01-28|   -10.0|
|          216|              |       474.0|                   10|      2014-01-28|   -10.0|
|           22|              |       901.0|                   10|      2014-01-28|   -10.0|
|           25|              |       926.0|                   10|      2014-01-28|   -10.0|
|           14|              |       444.0|                   10|      2014-02-01|   -10.0|
|           75|              |       168.0|                   10|      2014-02-01|   -10.0|
|           20|              |       802.0|                   10|      2014-03-2

In [455]:
#Fechas
query=f'(SELECT ID_Fecha, Fecha FROM {F_EST}) AS dataset'
df_E_F=obtener_dataframe_de_bd(db_connection_string,query,db_user,db_psswd)
df_m = df_m.withColumn('ID_Fecha', when(df_E_F['ID_Fecha'] == df_m['FechaTransaccion'], df_E_F['ID_Fecha']).otherwise(''))

In [456]:
#Proveedores
query=f'(SELECT ID_proveedor_DWH, ID_proveedor_T FROM {P_EST}) AS dataset'
df_E_P=obtener_dataframe_de_bd(db_connection_string,query,db_user,db_psswd)
df_m = df_m.withColumn('ID_proveedor_DWH', when(df_E_F['ID_Fecha'] == df_m['FechaTransaccion'], df_E_F['ID_Fecha']).otherwise(''))

In [457]:
#TipoTransaccion
query=f'(SELECT ID_Tipo_transaccion_T, ID_Tipo_transaccion_DWH FROM {TT_EST}) AS dataset'
df_E_P=obtener_dataframe_de_bd(db_connection_string,query,db_user,db_psswd)

Se carga BDD

In [443]:
guardar_db(db_connection_string,df_m,HM_EST, db_user, db_psswd)
