#### NOTA: En este cuaderno se crearan las Tablas `producto` y `tipo_producto` del Datawarehouse

***
### Importando Librerias

In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.functions import  count, when, isnull,  monotonically_increasing_id


***
Creando Session de Spark

In [3]:
spark = SparkSession.builder.appName('producto-tipo_producto').getOrCreate()

***
Importando Conexion a Contenedor `mod4_mysql`

In [4]:
from db.connection import connec_db

***
Trabajando La Tabla `producto`

In [5]:
producto = spark.read.csv("Data/PRODUCTOS.csv", sep=",", header=True, inferSchema=True)

***
Visualizando como vienen los Datos

In [6]:
producto.show(5)

+-----------+--------------------+-----------+-------+
|ID_PRODUCTO|            Concepto|       Tipo| Precio|
+-----------+--------------------+-----------+-------+
|      42737|  EPSON COPYFAX 2000|  IMPRESIÓN| 1658.0|
|      42754|MOT ASROCK H110M-...|INFORMATICA| 1237.5|
|      42755|MOT ASROCK A58M-V...|INFORMATICA|1079.32|
|      42756|  MOT ECS KAM1-I AM1|INFORMATICA| 638.66|
|      42757|MOT ASROCK B150M-...|INFORMATICA|1784.42|
+-----------+--------------------+-----------+-------+
only showing top 5 rows



***
Visualizando la Inferencia del Schema

In [7]:
producto.printSchema()

root
 |-- ID_PRODUCTO: integer (nullable = true)
 |-- Concepto: string (nullable = true)
 |-- Tipo: string (nullable = true)
 |-- Precio: double (nullable = true)



***
Anomalia en Registros de Tabla `producto`
<image src="assets/ANOMALIA-MAESTRO-PRODUCTOS.png">

***
Verficar en ventas si esos Productos han tenido ventas y si sus valores se ven mas normales

In [8]:
venta = spark.read.csv("Data/Venta.csv", sep=",", header=True, inferSchema=True )
venta.show(1)

+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+
|IdVenta|     Fecha|Fecha_Entrega|IdCanal|IdCliente|IdSucursal|IdEmpleado|IdProducto|Precio|Cantidad|
+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+
|      1|2018-03-09|   2018-03-17|      3|      969|        13|      1674|     42817|813.12|       2|
+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+
only showing top 1 row



FILTRANDO LAS VENTAS CUYOS CODIGOS DE PRODUCTOS SEAN LOS QUE TIENEN ANOMALIAS`

In [9]:
codigos_anomalias = [42802, 42803, 42804, 42805, 42806, 42807, 42808, 42809]
venta.filter(venta.IdProducto.isin(codigos_anomalias)).show()

+-------+-----+-------------+-------+---------+----------+----------+----------+------+--------+
|IdVenta|Fecha|Fecha_Entrega|IdCanal|IdCliente|IdSucursal|IdEmpleado|IdProducto|Precio|Cantidad|
+-------+-----+-------------+-------+---------+----------+----------+----------+------+--------+
+-------+-----+-------------+-------+---------+----------+----------+----------+------+--------+



Al no tener ventas estos Productos con Fuertes Anomalias y sin Descripcion puedo eliminarlos

In [10]:
del venta

In [11]:
producto.filter(producto.ID_PRODUCTO.isin(codigos_anomalias)).show()

+-----------+---------+----+------------+
|ID_PRODUCTO| Concepto|Tipo|      Precio|
+-----------+---------+----+------------+
|      42802|Producto1|NULL|         0.0|
|      42803|Producto2|NULL|2.44444442E8|
|      42804|Producto3|NULL|2.44444442E8|
|      42805|Producto4|NULL|2.44444442E8|
|      42806|Producto5|NULL|         0.0|
|      42807|Producto6|NULL|         0.0|
|      42808|Producto7|NULL|2.44444442E8|
|      42809|Producto8|NULL|         0.0|
+-----------+---------+----+------------+



In [12]:
producto = producto.filter(~producto.ID_PRODUCTO.isin(codigos_anomalias))

In [13]:
producto.filter(producto.ID_PRODUCTO.isin(codigos_anomalias)).show()

+-----------+--------+----+------+
|ID_PRODUCTO|Concepto|Tipo|Precio|
+-----------+--------+----+------+
+-----------+--------+----+------+



In [14]:
del codigos_anomalias

***
Buscando Valores Faltantes

In [15]:
# Contar valores nulos por columna
null_counts = producto.agg(*[count(when(isnull(c), c)).alias(c) for c in producto.columns])

null_counts.show()

+-----------+--------+----+------+
|ID_PRODUCTO|Concepto|Tipo|Precio|
+-----------+--------+----+------+
|          0|       0|   4|     0|
+-----------+--------+----+------+



*** 
Reemplazando Valores Faltantes

In [16]:
# REEMPLAZA LOS VALORES FALTANTES DE LA COLUMNA TIPO CON LA PALABRA `VARIOS`
producto = producto.fillna({'Tipo': 'VARIOS'})

In [17]:
# VERIFICO QUE NO QUEDO NULOS
null_counts = producto.agg(*[count(when(isnull(c), c)).alias(c) for c in producto.columns]).show()

+-----------+--------+----+------+
|ID_PRODUCTO|Concepto|Tipo|Precio|
+-----------+--------+----+------+
|          0|       0|   0|     0|
+-----------+--------+----+------+



In [18]:
del null_counts

Creo el Dataframe tipo_producto que luego será vaciado en la Tabla `tipo_producto`

In [19]:
tipo_producto = producto.select(producto['Tipo']).distinct()

# Agregar una columna con un ID único e incremental le sumo 1 para comenzar desde 1 y no de 0
tipo_producto = tipo_producto.withColumn('IdTipoProducto', monotonically_increasing_id()+1)

# Renombro columna
tipo_producto = tipo_producto.withColumnRenamed('Tipo', 'TipoProducto')
tipo_producto = tipo_producto.select('IdTipoProducto', 'TipoProducto')
tipo_producto.show()

+--------------+------------+
|IdTipoProducto|TipoProducto|
+--------------+------------+
|             1|      VARIOS|
|             2|      GAMING|
|             3|       BASES|
|             4|    LIMPIEZA|
|             5| INFORMATICA|
|             6|   IMPRESIÓN|
|             7|       AUDIO|
|             8|  ESTUCHERIA|
|             9|   GRABACION|
|            10|   GABINETES|
+--------------+------------+



Ahora necesito crear la Columna IdTipoProducto en `producto`, mapeando lo que tenga en "Tipo" <br>
Para mapear se trabaja Tipo SQL se crea un Join de Tablas

In [20]:
producto_join_tipo = producto.join(tipo_producto, producto.Tipo == tipo_producto.TipoProducto)

producto = producto_join_tipo.select('ID_PRODUCTO', 'Concepto', 'Precio', 'IdTipoProducto')

In [21]:
producto.columns

['ID_PRODUCTO', 'Concepto', 'Precio', 'IdTipoProducto']

***
Cambio Nombre de las Columnas

In [22]:
producto = producto.withColumnRenamed("ID_PRODUCTO", "IdProducto")
producto = producto.withColumnRenamed("Concepto", "Producto")
producto.show(5)

+----------+--------------------+-------+--------------+
|IdProducto|            Producto| Precio|IdTipoProducto|
+----------+--------------------+-------+--------------+
|     42737|  EPSON COPYFAX 2000| 1658.0|             6|
|     42754|MOT ASROCK H110M-...| 1237.5|             5|
|     42755|MOT ASROCK A58M-V...|1079.32|             5|
|     42756|  MOT ECS KAM1-I AM1| 638.66|             5|
|     42757|MOT ASROCK B150M-...|1784.42|             5|
+----------+--------------------+-------+--------------+
only showing top 5 rows



***
Creando la Conexion

In [23]:
engine = connec_db()

***
Pasando los Dataframes a Dataframes de Pandas  

In [23]:
# IMPORTANDO LIBRERIA PANDAS
import pandas as pd

In [24]:
pandas_df_producto = producto.toPandas()
pandas_df_tipo_producto = tipo_producto.toPandas()

Creando Csv para practica

In [25]:
pandas_df_tipo_producto.to_csv("csv_practica/tipo_producto.csv", index=False)

In [26]:
pandas_df_producto.to_csv("csv_practica/producto.csv", index=False)

***
Vaciando el Dataframe a la Tabla `tipo_gasto`

In [26]:
pandas_df_tipo_producto.to_sql(name="tipo_producto", con=engine, index=False, if_exists="append")

10

In [27]:
pandas_df_producto.to_sql(name="producto", con=engine, index=False, if_exists="append")

283

In [4]:
spark.stop()