In [122]:
import findspark
import os

findspark.init(os.environ.get('SPARK_HOME'))

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as functions
from decimal import Decimal

In [123]:
spark = SparkSession.builder.appName("GastoMinisterio").getOrCreate()

In [124]:
df = spark.read.format("csv").option("header", True).option("delimiter", ";").option('encoding', 'windows-1252').csv('/SPARK_HOME/datasets/Viagem.csv')

In [125]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[Identificador do processo de viagem: string, Situação: string, Código do órgão superior: string, Nome do órgão superior: string, Código órgão solicitante: string, Nome órgão solicitante: string, CPF viajante: string, Nome: string, Cargo: string, Período - Data de início: string, Período - Data de fim: string, Destinos: string, Motivo: string, Valor diárias: string, Valor passagens: string, Valor outros gastos: string]>

In [126]:
#converte decimal
to_value = lambda v : Decimal(v.replace(",", "."))

In [127]:
udf_to_value = functions.udf(to_value)

In [128]:
df2 = df.withColumn("Max_por_org_sup",    udf_to_value(df["Valor passagens"]))
df2 = df2.withColumn("Media_por_org_sup", udf_to_value(df2["Valor passagens"]))
df2 = df2.withColumn("Min_por_org_sup",   udf_to_value(df2["Valor passagens"]))
df2 = df2.withColumn("Total_por_org_sup", udf_to_value(df2["Valor passagens"]))

df2 = df2.withColumn("Max_por_destinos", udf_to_value(df2["Valor passagens"]))
df2 = df2.withColumn("Media_por_destinos", udf_to_value(df2["Valor passagens"]))
df2 = df2.withColumn("Min_por_destinos", udf_to_value(df2["Valor passagens"]))
df2 = df2.withColumn("Total_por_destinos", udf_to_value(df2["Valor passagens"]))

df2 = df2.withColumn("Max_por_cargos", udf_to_value(df2["Valor passagens"]))
df2 = df2.withColumn("Media_por_cargos", udf_to_value(df2["Valor passagens"]))
df2 = df2.withColumn("Min_por_cargos", udf_to_value(df2["Valor passagens"]))
df2 = df2.withColumn("Total_por_cargos", udf_to_value(df2["Valor passagens"]))



In [129]:
from pyspark.sql import functions as F 

In [130]:
df2.groupBy("Nome do órgão superior").agg(F.max("Max_por_org_sup"), 
                                          F.avg("Media_por_org_sup"), 
                                          F.min("Min_por_org_sup"), 
                                          F.sum("Total_por_org_sup")).sort('Nome do órgão superior').show( truncate=True)

+----------------------+--------------------+----------------------+--------------------+----------------------+
|Nome do órgão superior|max(Max_por_org_sup)|avg(Media_por_org_sup)|min(Min_por_org_sup)|sum(Total_por_org_sup)|
+----------------------+--------------------+----------------------+--------------------+----------------------+
|  Advocacia-Geral d...|              994.57|    275.82561043065976|                0.00|    1396229.2399999998|
|  Controladoria-Ger...|              998.71|     782.2714762979683|                0.00|    1732731.3199999998|
|  Ministério da Agr...|             9981.38|     339.6013735431856|                0.00|     9645018.610000014|
|  Ministério da Cid...|              998.58|     884.8200835654591|                0.00|     2858853.689999998|
|  Ministério da Ciê...|              999.94|    1310.5361431438134|                0.00|     9796257.670000006|
|  Ministério da Cul...|             3494.56|                2628.1|             1761.64|       

In [131]:
df2.groupBy("Destinos").agg(F.max("Max_por_destinos"), 
                                          F.avg("Media_por_destinos"), 
                                          F.min("Min_por_destinos"), 
                                          F.sum("Total_por_destinos")).sort('Destinos').show( truncate=True)

+--------------------+---------------------+-----------------------+---------------------+-----------------------+
|            Destinos|max(Max_por_destinos)|avg(Media_por_destinos)|min(Min_por_destinos)|sum(Total_por_destinos)|
+--------------------+---------------------+-----------------------+---------------------+-----------------------+
|    Aarhus/Dinamarca|              6169.18|               1233.836|                 0.00|                6169.18|
|Aarhus/Dinamarca,...|                 0.00|                    0.0|                 0.00|                    0.0|
|Abadia dos Dourad...|                 0.00|                    0.0|                 0.00|                    0.0|
|        Abadiânia/GO|                 0.00|                    0.0|                 0.00|                    0.0|
|Abadiânia/GO, Aba...|                 0.00|                    0.0|                 0.00|                    0.0|
|Abadiânia/GO, Bra...|                 0.00|                    0.0|            

In [132]:
df2.groupBy("Cargo").agg(F.max("Max_por_cargos"), 
                                          F.avg("Media_por_cargos"), 
                                          F.min("Min_por_cargos"), 
                                          F.sum("Total_por_cargos")).sort('Cargo').show( truncate=False)

+--------------------------------+-------------------+---------------------+-------------------+---------------------+
|Cargo                           |max(Max_por_cargos)|avg(Media_por_cargos)|min(Min_por_cargos)|sum(Total_por_cargos)|
+--------------------------------+-------------------+---------------------+-------------------+---------------------+
|null                            |9992.33            |796.6471852601408    |0.00               |1.0470015296999925E8 |
|AAD-AUX DE RECURSOS MATERIAIS   |2707.09            |1353.545             |0.00               |2707.09              |
|AAD-AUXILIAR ADMINISTRATIVO     |0.00               |0.0                  |0.00               |0.0                  |
|AAD-AUXILIAR DE OPERACOES       |0.00               |0.0                  |0.00               |0.0                  |
|AAD-OPERADOR DE MICROCOMPUTADOR |0.00               |0.0                  |0.00               |0.0                  |
|ADMINISTRACAO E PLANEJAMENTO    |2909.56       

In [133]:
tabela_aggnm_sup = df2.groupBy("Nome do órgão superior").agg(F.max("Max_por_org_sup"), 
                                          F.avg("Media_por_org_sup"), 
                                          F.min("Min_por_org_sup"), 
                                          F.sum("Total_por_org_sup")).sort('Nome do órgão superior')

In [134]:
export_tabela_aggnm_sup = tabela_aggnm_sup.rdd

In [135]:
export_tabela_aggnm_sup.saveAsTextFile('C:/SPARK_HOME/agg_por_org_sup')

In [136]:
tabela_aggnm_destino =df2.groupBy("Destinos").agg(F.max("Max_por_destinos"), 
                                          F.avg("Media_por_destinos"), 
                                          F.min("Min_por_destinos"), 
                                          F.sum("Total_por_destinos")).sort('Destinos')

In [137]:
export_tabela_aggnm_destino = tabela_aggnm_destino.rdd

In [138]:
export_tabela_aggnm_destino.saveAsTextFile('C:/SPARK_HOME/agg_por_destinos')

In [139]:
tabela_aggnm_cargo =df2.groupBy("Cargo").agg(F.max("Max_por_cargos"), 
                                          F.avg("Media_por_cargos"), 
                                          F.min("Min_por_cargos"), 
                                          F.sum("Total_por_cargos")).sort('Cargo')

In [140]:
export_tabela_aggnm_cargo = tabela_aggnm_cargo.rdd

In [None]:
export_tabela_aggnm_cargo.saveAsTextFile('C:/SPARK_HOME/agg_por_cargo')