In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *   #lit,col,expr,regexp_extract,regexp_replace,current_date, current_timestamp,date_format
from pyspark.sql.types import StructField,StringType,StructType,DoubleType,LongType,IntegerType,FloatType,TimestampType,DataType
import pyspark

In [2]:
spark= SparkSession.builder.appName("BaseDatos").getOrCreate()


In [3]:
"""Defino funcion de usuario para limpiar espacios en columnas"""
def clean_columns(Database):
    """
    - Database: Dataframe de spark 
    """
    for column in Database.columns:
        Database=Database.withColumnRenamed(column,column.replace(" ",""))
    return(Database)

In [4]:
Base_Datos=spark.read.csv("Base de datos inventario de riesgo.csv",sep=";",header=True,inferSchema=True)

In [98]:
# Estandarizar columnas              
Base_Datos_clean= Base_Datos.select(col('Negocio Inventarios').alias('NegocioInventarios'),
                  col('Material'),
                  col('Tipo Material Inventario').alias('TipoMaterialInventario'),
                  regexp_replace(col('Descripcion'),"(�)","o").alias('Descripcion'),
                  col('Lote'),
                  col('Fecha entrada').alias('Fechaentrada'),
                  col('Costo Unitario Real').alias('CostoUnitarioReal'),
                  regexp_replace(col('Inventario Disponibl'),"(,)",".").cast(FloatType()).alias('InventarioDisponible'),
                  regexp_replace(col('Inventario No Dispon'),"(,)",".").cast(FloatType()).alias('InventarioNoDisponible'),
                  regexp_replace(col('Valor Obsoleto'),"(,)",".").cast(FloatType()).alias('ValorObsoleto'),
                  regexp_replace(col('Valor Bloqueado MM'),"(,)",".").cast(FloatType()).alias('ValorBloqueadoMM'),
                  regexp_replace(col('Valor Total MM'),"(,)",".").cast(FloatType()).alias('ValorTotalMM'),
                  col('Permanencia'),
                  col('Marca concat').alias('Cliente'),
                  col('CLAS BASE RIESGO').alias('ClaseBaseRiesgo'),
                  col('Base de Riesgo').alias('BasedeRiesgo'),
                  regexp_replace(trim(col(' Base de Riesgo ($) ')),"([$]|-)","").cast(FloatType()).alias('BRPesos'),
                  )

In [99]:
# Remplazo valores faltantes por 0
Base_Datos_clean = Base_Datos_clean.fillna(0, subset=['BRPesos'])

In [100]:
# Creo vista de tabla Base_Datos_Clean en SQL

Base_Datos_clean.createOrReplaceTempView("BaseDatosclean")



In [101]:
# Limpiar 
spark.sql('SELECT Lote FROM BaseDatosclean WHERE Lote like "%#%"').show(5)



#
#

+----+
|Lote|
+----+
|   #|
|   #|
|   #|
|   #|
|   #|
+----+
only showing top 5 rows



In [102]:
#Creo las tablas relacionales
Tabla_Materiales=Base_Datos_clean.select( regexp_extract(col('Material'),'(\d{1,})',1).cast(IntegerType()).alias('Id'),
                                        col('Material'),
                                        col('TipoMaterialInventario'),
                                        col('Descripcion')).distinct().sort('Id')    #col('CostoUnitarioReal')

Tabla_Clientes=Base_Datos_clean.select(regexp_extract(col('Cliente'),'(\d{1,})',1).cast(IntegerType()).alias('Id'),
                                       col('Cliente')).distinct().sort('Id')

Tabla_Riesgo=Base_Datos_clean.select(col('ClaseBaseRiesgo'),
                                     col('BasedeRiesgo')).distinct().sort('ClaseBaseRiesgo')\
                                    .withColumn("Id", monotonically_increasing_id()+1)\
                                    .select(col('Id'),col('ClaseBaseRiesgo'),col('BasedeRiesgo'))

Tabla_Lote=spark.sql("""
                        SELECT row_number() OVER (ORDER BY Lote) as Id, Lote
                        FROM BaseDatosclean
                        GROUP BY Lote
                        HAVING Lote!='#'
                    """)

In [103]:
# Creo las vistas de Tabla_Riesgo,Tabla_Clientes,Tabla_Materiales,Tabla_Lote en SQL
Tabla_Riesgo.createOrReplaceTempView('Tabla_Riesgo')
Tabla_Clientes.createOrReplaceTempView('Tabla_Clientes')
Tabla_Materiales.createOrReplaceTempView('Tabla_Materiales')
Tabla_Lote.createOrReplaceTempView('Tabla_Lote')

In [109]:
Tabla_Clientes_Riesgo=spark.sql("""
            SELECT DISTINCT(Tabla_Riesgo.ID,Tabla_Riesgo.BasedeRiesgo,Tabla_Riesgo.ClaseBaseRiesgo,BaseDatosClean.BRPesos,Tabla_Clientes.Id AS Id_CT) AS Estructura
            FROM Tabla_Riesgo
            JOIN BaseDatosClean ON Tabla_Riesgo.ClaseBaseRiesgo=BaseDatosClean.ClaseBaseRiesgo
            JOIN Tabla_Clientes ON BaseDatosClean.Cliente=Tabla_Clientes.Cliente
            """) 

Tabla_Clientes_Riesgo=Tabla_Clientes_Riesgo.select(col('Estructura.Id').alias('Riesgo_Id'),
                 col('Estructura.Id_CT').alias('Cliente_Id'),
                 col('Estructura.BasedeRiesgo'),
                 col('Estructura.ClaseBaseRiesgo'),
                 col('Estructura.BRPesos'))\
                .withColumn("Id", monotonically_increasing_id()+1).alias('Id')    # Total 2511



Tabla_Lote_Materiales=spark.sql("""
            SELECT DISTINCT(Tabla_Materiales.Id AS Id_M,Tabla_Materiales.Material,Tabla_Materiales.TipoMaterialInventario,
            Tabla_Lote.Id AS Id_L,Tabla_Lote.Lote,BaseDatosClean.CostoUnitarioReal) AS Estructura
            FROM BaseDatosClean
            JOIN Tabla_Lote ON Tabla_Lote.Lote=BaseDatosClean.Lote
            JOIN Tabla_Materiales ON BaseDatosClean.Material=Tabla_Materiales.Material
            """)  

Tabla_Lote_Materiales=Tabla_Lote_Materiales.select(col('Estructura.Id_M').alias('Material_Id'),
                 col('Estructura.Material'),
                 col('Estructura.TipoMaterialInventario'),
                 col('Estructura.Id_L').alias('Lote_Id'),
                 col('Estructura.Lote'),
                 col('Estructura.CostoUnitarioReal'))\
                .withColumn("Id", monotonically_increasing_id()+1).alias('Id')  


Tabla_Fecha=Base_Datos_clean.select(col('Fechaentrada'),
                regexp_extract(col('Fechaentrada'),'/(\d{1,})/',1).cast(IntegerType()).alias('Mes'), 
                regexp_extract(col('Fechaentrada'),'/(\d{4})',1).cast(IntegerType()).alias('Año')).distinct().sort(['Mes','Año'])\
                .withColumn("Id",monotonically_increasing_id()+1) 

Table_Permanencias=Base_Datos_clean.select('Permanencia').distinct().sort('Permanencia')\
                                    .withColumn("Id",monotonically_increasing_id()) 

In [134]:
Tabla_Fecha.createOrReplaceTempView('Tabla_Fecha')
Table_Permanencias.createOrReplaceTempView('Table_Permanencias')
Tabla_Lote_Materiales.createOrReplaceTempView('Tabla_Lote_Materiales')
Tabla_Clientes_Riesgo.createOrReplaceTempView('Tabla_Clientes_Riesgo')

In [136]:
Inventarios=spark.sql("""
            SELECT DISTINCT(Tabla_Fecha.Fechaentrada,BaseDatosClean.Permanencia,BaseDatosClean.Material,Tabla_Clientes_Riesgo.ClaseBaseRiesgo) AS Estructura
            FROM BaseDatosClean
            JOIN Tabla_Fecha ON BaseDatosClean.Fechaentrada=Tabla_Fecha.Fechaentrada
            JOIN Table_Permanencias ON Table_Permanencias.Permanencia=BaseDatosClean.Permanencia
            JOIN Tabla_Lote_Materiales ON Tabla_Lote_Materiales.Material=BaseDatosClean.Material
            JOIN Tabla_Clientes_Riesgo ON Tabla_Clientes_Riesgo.ClaseBaseRiesgo=BaseDatosClean.ClaseBaseRiesgo
            """)  


In [140]:

Inventario=Inventarios.select(col('Estructura.Fechaentrada').alias('1'),
                 col('Estructura.Permanencia').alias('2'),
                 col('Estructura.Material').alias('3'),
                 col('Estructura.ClaseBaseRiesgo').alias('3'))

In [141]:
#df.cache()
#Inventario.show(5)
#df.unpersist()