Continuando con el proyecto de consultoria de Wide World Importers, el primer paso para iniciar la comprensión de los datos es explorar y entender las fuentes de datos disponibles. Note que esto también nos ayuda a comprender mejor la organización.

## Configuración e importe de paquetes
Se utilizará el paquete de pandas profiling para apoyar el análisis estadístico, y se importan los paquetes de python
necesarios

In [173]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType, StringType, IntegerType, DateType
from pyspark.sql.functions import udf, col, length, isnan, when, count
import pyspark.sql.functions as f
import os 
from datetime import datetime
from pyspark.sql import types as t
from pandas_profiling import ProfileReport
#import matplotlib.pyplot as plt
import numpy as np

Configuración del controlador e inicio de sesion Spark

In [174]:
path_jar_driver = 'C:\Program Files (x86)\MySQL\Connector J 8.0\mysql-connector-java-8.0.28.jar'

In [175]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext.getOrCreate(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession




In [176]:
# Si quiere practicar la conexion con el servidor de base de datos:
db_connection_string = 'jdbc:mysql://157.253.236.116:8080/WWImportersTransactional'
# El usuario es su estudiante _i asignado y su contraseña la encontrará en el archivo excel de Coursera 
db_user = 'Estudiante_24_202314'
db_psswd = 'aabb1122'

PATH='./'

### Conexión a fuente de datos y acceso a los datos

#### Conexión a fuente de datos
A continuación encuentra las funciones para conectarse a la fuente de datos (archivo csv o base de datos) y retornar un dataframe que es el que se utilizará posteriormente para manipular los datos.

In [177]:
def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

In [200]:
def contar_vacios(df):
    resultados = []
    for c in df.columns:
        vacios = df.filter(df[c].isNull()).count()
        if vacios!=0:
            print('número de vacíos para columna '+c+': '+str( vacios ))
            resultados.append(vacios)
    return resultados

def cardinalidad(df):
    resultado = {}
    for c in df.columns:
        cardinalidad = df.select(col(c)).distinct().count()
        if cardinalidad>=df.count()*0.5:
            resultado[c] = cardinalidad
    return resultado

# 5. Tarea
Espacio para desarrollar la tarea propuesta 

### Perfilamiento de datos

In [246]:
sql_movimientos = 'movimientosCopia'
movimientos = obtener_dataframe_de_bd(db_connection_string, sql_movimientos, db_user, db_psswd)
movimientos.show(20)

+---------------------+----------+-----------------+---------+---------+-----------+---------------+----------------+--------+
|TransaccionProductoID|ProductoID|TipoTransaccionID|ClienteID|InvoiceID|ProveedorID|OrdenDeCompraID|FechaTransaccion|Cantidad|
+---------------------+----------+-----------------+---------+---------+-----------+---------------+----------------+--------+
|               118903|       217|               10|    476.0|  24904.0|           |               |     Apr 25,2014|   -40.0|
|               286890|       135|               10|     33.0|  60117.0|           |               |     Dec 10,2015|    -7.0|
|               285233|       111|               10|    180.0|  59768.0|           |               |     Dec 04,2015|    -2.0|
|               290145|       213|               10|     33.0|  60795.0|           |               |     Dec 23,2015|    -3.0|
|               247492|        90|               10|     55.0|  51851.0|           |               |     Jul 27

La tabla muestra que los movimientos y salidas de inventarios se hacen por venta o compras de proveedores y clientes, las salidas con movimientos negativos y las entredas positivas. 

In [212]:
movimientos.schema

StructType(List(StructField(TransaccionProductoID,IntegerType,true),StructField(ProductoID,IntegerType,true),StructField(TipoTransaccionID,IntegerType,true),StructField(ClienteID,DoubleType,true),StructField(InvoiceID,DoubleType,true),StructField(ProveedorID,StringType,true),StructField(OrdenDeCompraID,StringType,true),StructField(FechaTransaccion,StringType,true),StructField(Cantidad,DoubleType,true)))

In [213]:
print((movimientos.count(), len(movimientos.columns)))

(204292, 9)


El cliente indica que hay 236668 registros y en realidad hay 204292 por lo tanto hay una diferencia de 31686

In [220]:
print(movimientos.select().where(col('Cantidad')>50000000).count())

No existen movimientos con mas de 50 millones en transaccion

In [293]:
print(movimientos.select('TransaccionProductoID').distinct().count())

173659


El cliente Indica que son 228265 clientes pero en los datos solo hay 664 clientes distintos.

Existen valores duplicados para el campo TransaccionProductoID,  hay que preguntar si es valido y cual es la razón de esta duplicidad

In [299]:
regex = "[0-2]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1])"
cumplenFormato = movimientos.filter(movimientos["FechaTransaccion"].rlike(regex))
print("Número total de movimientos: "+str(movimientos.count())+", número de movimientos con formato correcto: "+str(cumplenFormato.count()))

Número total de movimientos: 204292, número de movimientos con formato correcto: 140038


In [228]:
noCumplenFormato = movimientos.filter(~movimientos["FechaTransaccion"].rlike(regex))
noCumplenFormato.count(), noCumplenFormato.select(col("FechaTransaccion")).show()

+----------------+
|FechaTransaccion|
+----------------+
|     Apr 25,2014|
|     Dec 10,2015|
|     Dec 04,2015|
|     Dec 23,2015|
|     Jul 27,2015|
|     Sep 15,2014|
|     Aug 04,2015|
|     Feb 23,2015|
|     May 01,2015|
|     Jan 08,2016|
|     Mar 26,2014|
|     Jul 31,2015|
|     Sep 02,2014|
|     Mar 15,2016|
|     May 28,2016|
|     Sep 09,2015|
|     May 23,2014|
|     Aug 20,2014|
|     Jan 21,2015|
|     Dec 29,2015|
+----------------+
only showing top 20 rows



(64254, None)

Existen unicamente 140038 fechas con el formato correcto,  el resto deben ser corregidos. 

In [300]:
print(movimientos.select('ClienteID').distinct().count())

664


El cliente indica que tiene 13 proveedores, pero solo hay movimientos para 4
Hay 664 clientes asociados a los movimientos de inventarios

In [235]:
print(movimientos.select().where(col('Cantidad')==0).count())

Hay 5 movimientos con valores de 0,  los valores son validos y que significado tienen

In [239]:
movimientos.groupBy("TipoTransaccionID").count().show()
sql_tipoTransaccion = 'TiposTransaccion'
tiposTransaccion = obtener_dataframe_de_bd(db_connection_string, sql_tipoTransaccion, db_user, db_psswd)
tiposTransaccion.show(20)


+-----------------+------+
|TipoTransaccionID| count|
+-----------------+------+
|               12|    46|
|               10|197136|
|               11|  7110|
+-----------------+------+

+-----------------+---------------------+
|TipoTransaccionID|TipoTransaccionNombre|
+-----------------+---------------------+
|                2| Customer Credit Note|
|                3| Customer Payment ...|
|                4|      Customer Refund|
|                5|     Supplier Invoice|
|                6| Supplier Credit Note|
|                7| Supplier Payment ...|
|                8|      Supplier Refund|
|                9|       Stock Transfer|
|               10|          Stock Issue|
|               11|        Stock Receipt|
|               12| Stock Adjustment ...|
|               13|      Customer Contra|
+-----------------+---------------------+



Se ve que se usan las transaciones 12, 10 y 11 lo que indican salida, ajuste y recepción de inventario

In [252]:

def contar_vacios2(df):
    resultados = []
    for c in ['ProveedorID','OrdenDeCompraID']:
        vacios = df.where(df[c]=='').count()
        if vacios!=0:
            print('número de vacíos para columna '+c+': '+str( vacios ))
            resultados.append(vacios)
    return resultados

def contar_vacios3(df):
    resultados = []
    for c in ['ClienteID','InvoiceID']:
        vacios = df.where(df[c]==0).count()
        if vacios!=0:
            print('número de vacíos para columna '+c+': '+str( vacios ))
            resultados.append(vacios)
    return resultados

columnas_vacias_ordenes = contar_vacios2(movimientos)
columnas_vacias_ordenes = contar_vacios3(movimientos)

número de vacíos para columna ProveedorID: 197182
número de vacíos para columna OrdenDeCompraID: 197182
número de vacíos para columna ClienteID: 7156
número de vacíos para columna InvoiceID: 7156


Se observa que las transacciones que tienen ClienteID tienen asociada una factura pero no un proveedor,  y las que tienen un proveedor, no tienen asociada una factura y un cliente.  Existen movimientos de ajuste que no necesitan ni proveedor ni cliente

In [215]:
columnas_alta_cardinalidad_ordenes = cardinalidad(movimientos)
columnas_alta_cardinalidad_ordenes

{'TransaccionProductoID': 173659}

La única columna con cardinalidad superior al 50 es el identificador del registro

In [216]:
movimientos.summary().show()

+-------+---------------------+------------------+-------------------+------------------+-----------------+-----------------+------------------+--------------------+-----------------+
|summary|TransaccionProductoID|        ProductoID|  TipoTransaccionID|         ClienteID|        InvoiceID|      ProveedorID|   OrdenDeCompraID|    FechaTransaccion|         Cantidad|
+-------+---------------------+------------------+-------------------+------------------+-----------------+-----------------+------------------+--------------------+-----------------+
|  count|               204292|            204292|             204292|            204292|           204292|           204292|            204292|              204292|           204292|
|   mean|   212458.04047637695|110.70090850351458| 10.035253460732676| 517.3252941867523|42957.26929590978|4.951898734177215|1345.9973277074544|                null|719.4997650421946|
| stddev|    71352.37579752573| 63.49014746219581|0.18563716955046372|353.501369

Se puede ver que la información de los IDs no es valida para análisis, sin embargo en el campo cantidad se puede ver que la mayor cantidad de movientos, más del 75% son salidas de inventario que se producen en cantidades pequeñas, la m, lo que corresponde a ventas minorista=s, en cambio los valores de las entradas van hasta 67368 lo que evidencia las compras al por mayor.

### Conclusiones

In [None]:
Justificación de si es posible resolver los análisis basados en tableros de control propuestos.

Con el tablero propuesto se puede generar el análisis necesario, sin embargo hay que agregar información adicional a la tabla de movimientos para poder extraer información de precios y valores de venta. 

Incluir la síntesis de lo que representa la fila promedio de los datos analizados. 

Una fila representa la entrada o salida de un producto del inventario. Cada movimiento está realicionado a una venta (devolución de venta), compra (devolución de compra) o ajuste a inventario hecho sobre el producto. 


Lista de preguntas o comentarios realizados a la organización

-	Es valido que una fila no tenga ni ProveedorID ni ClienteID?
-	Existen registros con el campo Cantidad en 0,  estos registros se deben tener en cuenta?
-	El valor del campo TransaccionProductoID no es único en la tabla,  qué significado tiene cuando ese valor se encuentra en varias filas?
