In [4]:
from IPython.core.display import HTML
HTML(r"""
<style>
    .output-plaintext, .output-stream, .output {
        white-space: pre !important;
    }
    pre { white-space: pre !important; }
</style>
""")

## Imports

In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *

spark=SparkSession.builder.appName('comando3v1').getOrCreate()

## Load files

In [6]:
dataframes_dict = { 
                'facturas'            : spark.read.parquet('./datasets/bkpf_vbrk.parquet'), 
                'facturas_x_clientes' : spark.read.parquet('./datasets/vbpa_kna1.parquet'),
                'materiales'          : spark.read.parquet('./datasets/vbrp_mara_marm.parquet'),
                'paises'              : spark.read.parquet('./datasets/T005T.parquet')
                }

### Select dataframes, filter and join

In [7]:
def set_filters_and_joins(dataframes_dict):
    
    tabla_factura = dataframes_dict['facturas'].select('Factura','Clase_factura','Ejercicio','Per_contable')
    facturas_x_clientes = dataframes_dict['facturas_x_clientes'].select('Factura','Posicion_de_Factura','Nombre','Pais','Tipo_de_Cliente','Cliente_Solicitante','Cliente_Pagador','Cliente_Destinatario_de_Mercancia','Cliente_Destinatario_de_factura')
    tabla_materiales = dataframes_dict['materiales'].select('Factura','Posicion','Material','Volumen','Unidad_de_medida_Volumen_Destino')
    maestro_paises = dataframes_dict['paises'].where((col('Idioma') == 'S') & (col('Mandante')==400))
    
    main_table = tabla_materiales.join(tabla_factura, 'Factura','left')
    main_table = main_table.join(facturas_x_clientes, 'Factura','left') 
    main_table = main_table.join(maestro_paises, 'Pais', 'Left')
    main_table = main_table.select('Factura','Clase_factura','Posicion','Material','Volumen','Unidad_de_medida_Volumen_Destino','Per_contable','Ejercicio','Pais','Pais_descr','Tipo_de_Cliente','Cliente_Solicitante','Cliente_Pagador','Cliente_Destinatario_de_Mercancia','Cliente_Destinatario_de_factura')
    
    return { 'main_table' : main_table }

## Realizar cuatro consultas
  - Cantidad de facturas por tipo de cliente
  - Top 10 Volumen total por país de cliente destinatario mercancía
  - Top 5 Volumen total por cliente solicitante
  - Volumen total por clase de factura, de mayor a menor volumen

### Consulta 1: Cantidad de facturas por tipo de cliente (numero_de_cliente)

In [8]:
def set_consulta1(dataframes_dict):
    
    main_table = dataframes_dict['main_table']
    
    consulta1 = main_table.groupBy('Tipo_de_Cliente').count() \
                    .withColumnRenamed('count','Cant_Factura') \
                    .orderBy(col('Cant_Factura').desc())
    
    return { 'consulta1' : consulta1 }

### Consulta 2: Top 10 Volumen total por país de cliente destinatario mercancía

In [9]:
def set_consulta2(dataframes_dict):
    
    main_table = dataframes_dict['main_table']
    
    # filtrando los nulos de Cliente_Destinatario_de_Mercancia
    consulta2 = main_table.where(col("Cliente_Destinatario_de_Mercancia").isNotNull()) \
                .groupBy("Pais","Cliente_Destinatario_de_Mercancia").sum("Volumen") \
                .select("Pais","Cliente_Destinatario_de_Mercancia", round("sum(Volumen)",2).alias("Volumen_total")) \
                .orderBy(col("Volumen_total").desc())\
                .withColumn("Top_10", monotonically_increasing_id()+1) \
                .select("Top_10", "Pais","Cliente_Destinatario_de_Mercancia","Volumen_total").limit(10)
    
    # filtrando los nulos de Pais y Cliente_Destinatario_de_Mercancia
    consulta2_no_nulls = main_table.where((col('Cliente_Destinatario_de_Mercancia') \
                .isNotNull()) & (col('Pais').isNotNull())) \
                .groupBy('Pais','Cliente_Destinatario_de_Mercancia').sum('Volumen') \
                .select("Pais","Cliente_Destinatario_de_Mercancia", round("sum(Volumen)",2).alias("Volumen_total")) \
                .orderBy(col('Volumen_total').desc()) \
                .withColumn("Top_10", monotonically_increasing_id()+1) \
                .select("Top_10", "Pais","Cliente_Destinatario_de_Mercancia","Volumen_total").limit(10)

    return {
        'consulta2'          : consulta2,
        'consulta2_no_nulls' : consulta2_no_nulls
        }

### Consulta 3: Top 5 Volumen total por cliente solicitante

In [10]:
def set_consulta3(dataframes_dict):
    
    main_table = dataframes_dict['main_table']
    
    consulta3 = main_table.where(col('Cliente_Solicitante').isNotNull()) \
                .groupBy('Pais','Cliente_Solicitante').sum('Volumen') \
                .select("Pais", "Cliente_Solicitante", round("sum(Volumen)",2).alias("Volumen_total")) \
                .orderBy(col('Volumen_total').desc()) \
                .withColumn("Top_5", monotonically_increasing_id()+1) \
                .select("Top_5", "Pais","Cliente_Solicitante","Volumen_total").limit(5)
    
    consulta3_no_nulls = main_table.where((col('Cliente_Solicitante').isNotNull()) & (col('Pais').isNotNull())) \
                .groupBy('Pais','Cliente_Solicitante').sum('Volumen') \
                .select("Pais", "Cliente_Solicitante", round("sum(Volumen)",2).alias("Volumen_total")) \
                .orderBy(col('Volumen_total').desc()) \
                .withColumn("Top_5", monotonically_increasing_id()+1) \
                .select("Top_5", "Pais","Cliente_Solicitante","Volumen_total").limit(5)

    return {
        'consulta3'          : consulta3,
        'consulta3_no_nulls' : consulta3_no_nulls
        }

### Consulta 4: Volumen total por clase de factura, de mayor a menor volumen

In [11]:
def set_consulta4(dataframes_dict):
    
    main_table = dataframes_dict['main_table']
    
    consulta4 = main_table.groupBy('Clase_factura').sum('Volumen') \
                .select('Clase_factura', round('sum(Volumen)',2).alias('Volumen_total')) \
                .orderBy(col('Volumen_total').desc()).limit(5)
    
    consulta4_no_nulls = main_table.where(col('Clase_factura').isNotNull()) \
                .groupBy('Clase_factura').sum('Volumen') \
                .select('Clase_factura', round('sum(Volumen)',2).alias('Volumen_total')) \
                .orderBy(col('Volumen_total').desc()).limit(5)

    return {
        'consulta4'          : consulta4,
        'consulta4_no_nulls' : consulta4_no_nulls
        }

### Clean dataframes_dict to setup final dictionary

In [12]:
def set_delete_dfs(dataframes_dict):
    
    to_delete_list = [
        'facturas',
        'facturas_x_clientes',
        'materiales',
        'paises'
    ]
    
    for delete_df in to_delete_list:
        del dataframes_dict[delete_df]
    
    return dataframes_dict

# Main Function:

In [13]:
def gen_comando3(dataframes_dict):
    
    functions_list = [
        set_filters_and_joins,
        set_consulta1,
        set_consulta2,
        set_consulta3,
        set_consulta4,
        #set_delete_dfs
    ]

    for function in functions_list:
        print(function)
        result_element = function(dataframes_dict)
        dataframes_dict.update(result_element)
    
    return dataframes_dict

# Exporting notebook to .py extension

In [11]:
!jupyter nbconvert --to script --no-prompt scen_comando3.ipynb

[NbConvertApp] Converting notebook scen_comando3.ipynb to script
[NbConvertApp] Writing 7353 bytes to scen_comando3.py


# Testing

In [14]:
test = gen_comando3(dataframes_dict)

<function set_filters_and_joins at 0x000001BD0A499C60>
<function set_consulta1 at 0x000001BD06B3BEB0>
<function set_consulta2 at 0x000001BD06B3B490>
<function set_consulta3 at 0x000001BD06B3BD00>
<function set_consulta4 at 0x000001BD0A498CA0>


In [15]:
test.keys()

dict_keys(['main_table', 'consulta1', 'consulta2', 'consulta2_no_nulls', 'consulta3', 'consulta3_no_nulls', 'consulta4', 'consulta4_no_nulls'])

In [18]:
tabla_factura = dataframes_dict['facturas'].select('Factura','Clase_factura','Ejercicio','Per_contable')
facturas_x_clientes = dataframes_dict['facturas_x_clientes'].select('Factura','Posicion_de_Factura','Nombre','Pais','Tipo_de_Cliente','Cliente_Solicitante','Cliente_Pagador','Cliente_Destinatario_de_Mercancia','Cliente_Destinatario_de_factura')
tabla_materiales = dataframes_dict['materiales'].select('Factura','Posicion','Material','Volumen','Unidad_de_medida_Volumen_Destino')
maestro_paises = dataframes_dict['paises'].where((col('Idioma') == 'S') & (col('Mandante')==400))

main_table1 = tabla_materiales.join(tabla_factura, 'Factura','left')
main_table2 = main_table1.join(facturas_x_clientes, 'Factura','left') 
main_table3 = main_table2.join(maestro_paises, 'Pais', 'Left')
main_table4 = main_table3.select('Factura','Clase_factura','Posicion','Material','Volumen','Unidad_de_medida_Volumen_Destino','Per_contable','Ejercicio','Pais','Pais_descr','Tipo_de_Cliente','Cliente_Solicitante','Cliente_Pagador','Cliente_Destinatario_de_Mercancia','Cliente_Destinatario_de_factura')


In [28]:
print(main_table1.count())
print(facturas_x_clientes.count())

80708
19135


In [26]:
tabla_materiales.select(sum('Volumen')).show()
#tabla_factura.select(sum('Volumen')).show()
main_table1.select(sum('Volumen')).show()
main_table2.select(sum('Volumen')).show()
main_table3.select(sum('Volumen')).show()
main_table4.select(sum('Volumen')).show()

+-----------------+
|     sum(Volumen)|
+-----------------+
|899471.0868944847|
+-----------------+

+-----------------+
|     sum(Volumen)|
+-----------------+
|899471.0868944847|
+-----------------+

+-----------------+
|     sum(Volumen)|
+-----------------+
|5652128.297495328|
+-----------------+

+-----------------+
|     sum(Volumen)|
+-----------------+
|5652128.297495328|
+-----------------+

+-----------------+
|     sum(Volumen)|
+-----------------+
|5652128.297495328|
+-----------------+



In [60]:
main_table2_CS = main_table2.where(col('Cliente_Solicitante').isNotNull())
main_table2_CP = main_table2.where(col('Cliente_Pagador').isNotNull())
main_table2_CDM = main_table2.where(col('Cliente_Destinatario_de_Mercancia').isNotNull())
main_table2_CDF = main_table2.where(col('Cliente_Destinatario_de_factura').isNotNull())
main_table2_allnulls = main_table2.where(~( (col('Cliente_Destinatario_de_factura').isNull())  & (col('Cliente_Destinatario_de_Mercancia').isNull()) & (col('Cliente_Pagador').isNull()) & (col('Cliente_Solicitante').isNull())))
main_table2_allnulls2 = main_table2.where(( (col('Cliente_Destinatario_de_factura').isNull())  & (col('Cliente_Destinatario_de_Mercancia').isNull()) & (col('Cliente_Pagador').isNull()) & (col('Cliente_Solicitante').isNull())))

In [61]:
print(main_table2.count())
print(main_table2_allnulls.count())
print(main_table2_allnulls2.count())

570466
322868
247598


In [57]:
main_table2_allnulls.groupBy('Tipo_de_Cliente', 'Pais').count().show()

+---------------+----+-----+
|Tipo_de_Cliente|Pais|count|
+---------------+----+-----+
|             VD|null|59754|
|             AV|null|28421|
|             EN|null|22361|
|             SP|null|  597|
|             VE|null|80708|
|             EN|  CL|   52|
|             SE|null|54698|
|             NF|  CN|  647|
|             EN|  US|  140|
|             NF|  SG|  179|
|             NF|  VN|   41|
+---------------+----+-----+



In [39]:
main_table2.select(sum('Volumen')).show()
main_table2_CS.select(sum('Volumen')).show()
main_table2_CP.select(sum('Volumen')).show()
main_table2_CDM.select(sum('Volumen')).show()
main_table2_CDF.select(sum('Volumen')).show()

+-----------------+
|     sum(Volumen)|
+-----------------+
|5652128.297495328|
+-----------------+

+-----------------+
|     sum(Volumen)|
+-----------------+
|899471.0868944847|
+-----------------+

+-----------------+
|     sum(Volumen)|
+-----------------+
|899471.0868944847|
+-----------------+

+-----------------+
|     sum(Volumen)|
+-----------------+
|899593.4532944837|
+-----------------+

+-----------------+
|     sum(Volumen)|
+-----------------+
|899471.0868944847|
+-----------------+

