# Entendimiento de los datos con PySpark

## 1- Configuracion general


### 1.1- Configuración e importe de paquetes
Se utilizará el paquete de pandas profiling para apoyar el análisis estadístico, y se importan los paquetes de python necesarios

In [44]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType, StringType, IntegerType, DateType
from pyspark.sql.functions import udf, col, length, isnan, when, count
import pyspark.sql.functions as f
import os 
from datetime import datetime
from pyspark.sql import types as t
#from pandas_profiling import ProfileReport
from ydata_profiling import ProfileReport
#import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import to_date
from pyspark.sql.functions import date_format

Configuración del controlador e inicio de sesion Spark

In [2]:
path_jar_driver = '/home/luissolier/Documents/miso/ciclo-7/analisis-y-modelado-de-datos/tutoriales/entendimiento-datos/mysql-connector-java-8.0.28.jar'

In [3]:
#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver)

spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession

23/08/23 19:05:44 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.1.41 instead (on interface enp46s0)
23/08/23 19:05:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/23 19:05:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/23 19:05:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/08/23 19:05:45 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [4]:
# Si quiere practicar la conexion con el servidor de base de datos:
db_connection_string = 'jdbc:mysql://localhost:3306/WWImportersTransactional'
# El usuario es su estudiante _i asignado y su contraseña la encontrará en el archivo excel de Coursera 
db_user = ''
db_psswd = ''

PATH='./'

### 1.2- Conexión a fuente de datos y acceso a los datos

In [5]:
def obtener_dataframe_de_bd(db_connection_string, sql, db_user, db_psswd):
    df_bd = spark.read.format('jdbc')\
        .option('url', db_connection_string) \
        .option('dbtable', sql) \
        .option('user', db_user) \
        .option('password', db_psswd) \
        .option('driver', 'com.mysql.cj.jdbc.Driver') \
        .load()
    return df_bd

### 1.3- Cargue de datos

In [13]:
sql_movimientos = 'WWImportersTransactional.movimientosCopia'
sql_tipos_transaccion = 'WWImportersTransactional.TiposTransaccion'

In [14]:
#Se cargan los dataframes desde la base de datos
movimientos = obtener_dataframe_de_bd(db_connection_string, sql_movimientos, db_user, db_psswd)
tipos_transaccion = obtener_dataframe_de_bd(db_connection_string, sql_tipos_transaccion, db_user, db_psswd)

## 2- Información dada por la organización relacionada con los datos



## 3- Perfilamiento de los datos

### 3.1- Entendimiento general de datos

In [19]:
movimientos.show(10)

+---------------------+----------+-----------------+---------+---------+-----------+---------------+----------------+--------+
|TransaccionProductoID|ProductoID|TipoTransaccionID|ClienteID|InvoiceID|ProveedorID|OrdenDeCompraID|FechaTransaccion|Cantidad|
+---------------------+----------+-----------------+---------+---------+-----------+---------------+----------------+--------+
|               118903|       217|               10|    476.0|  24904.0|           |               |     Apr 25,2014|   -40.0|
|               286890|       135|               10|     33.0|  60117.0|           |               |     Dec 10,2015|    -7.0|
|               285233|       111|               10|    180.0|  59768.0|           |               |     Dec 04,2015|    -2.0|
|               290145|       213|               10|     33.0|  60795.0|           |               |     Dec 23,2015|    -3.0|
|               247492|        90|               10|     55.0|  51851.0|           |               |     Jul 27

In [17]:
tipos_transaccion.show(15)

+-----------------+---------------------+
|TipoTransaccionID|TipoTransaccionNombre|
+-----------------+---------------------+
|                2| Customer Credit Note|
|                3| Customer Payment ...|
|                4|      Customer Refund|
|                5|     Supplier Invoice|
|                6| Supplier Credit Note|
|                7| Supplier Payment ...|
|                8|      Supplier Refund|
|                9|       Stock Transfer|
|               10|          Stock Issue|
|               11|        Stock Receipt|
|               12| Stock Adjustment ...|
|               13|      Customer Contra|
+-----------------+---------------------+



### 3.2- Revisión de reglas de negocio 

#### 3.2.1- La cantidad maxima de productos movidos es 50 millones por transaccion

Vemos que no es posible corroborar esta informacion dado que no existe un tabla que nos informe del monto por cada factura, adicional a ello la columna InvoiceID tiene valores duplicados.

In [25]:
movimientos.select("InvoiceID").distinct().show(10)

+---------+
|InvoiceID|
+---------+
|  30102.0|
|  39221.0|
|  28134.0|
|  50815.0|
|  51013.0|
|  26597.0|
|  68581.0|
|  42071.0|
|  33322.0|
|  67357.0|
+---------+
only showing top 10 rows



In [26]:
print(movimientos.select("InvoiceID").distinct().count())

51831


Asumiendo que la columna **_InvoiceID_** representa el monto total del movimiento, la regla tampoco seria cierta, ya que si obtenemos el valor maximo obtenido es 70510.

In [27]:
movimientos.agg({'InvoiceID': 'max'}).show()

+--------------+
|max(InvoiceID)|
+--------------+
|       70510.0|
+--------------+



La conclusion es entonces es que no se puede determinar si la regla es cierta o no, dado que no hay existe una tabla **Invoice** que nos brinde el monto asociado a cada movimiento.

### 3.3- Análisis descriptivo

## 4- Análisis de calidad de datos

### 4.1- Completitud

### 4.2- Unicidad y Validez

### 4.3- Consistencia

## 5- Conclusiones/Resultados

**Conclusiones generales:**


**Conclusiones de reglas de negocio:**


**Conclusiones de calidad:**

**Conclusiones de consultoria**
