# Fase 3 - Modelagem dos Dados (Modeled)

* Importação das bibliotecas

In [1]:
import os
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField,StringType, FloatType

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import create_map, struct
from pyspark.sql import functions as F

In [3]:
MODELED = '../modeled/'
RAW = '../raw/'
os.listdir(MODELED+'/parquet/')

[]

* Sessão Spark

In [4]:
builder = SparkSession.builder.appName("GASTOS PARLAMENTARES")
builder.config(
    "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
builder.config("spark.speculation", "false")
builder.config("spark.sql.parquet.compression.codec", "gzip")
builder.config("spark.debug.maxToStringFields", "100")
builder.config("spark.driver.memory", "1g")
builder.config("spark.driver.cores", "1")
builder.config("spark.executor-memory", "20g")
builder.config("spark.executor.cores", "4")

builder.master("local[*]")

spark = builder.getOrCreate()
spark


* Função para conversão de valor

In [5]:
to_value = lambda v:float(v.replace(",","."))
udf_to_value = F.udf(to_value, FloatType())

* Leitura dos arquivos Parquet dos anos de gastos

In [6]:
dfRaw = spark.read.parquet(RAW+"/parquet/*")

In [7]:
dfRaw.count()

1558650

* Otimização do esquema

In [8]:
df = dfRaw.select(dfRaw['txNomeParlamentar'].alias('parlamentar'), \
                  dfRaw['sgPartido'].alias('partido'), \
                  dfRaw['sgUF'].alias('uf'), \
                  dfRaw['txtDescricao'].alias('categoria'), \
                  dfRaw['numAno'].alias('ano'), \
                  dfRaw['vlrLiquido'].alias('valor'))

In [9]:
df.show(10)

+-----------------+-------+---+--------------------+----+------+
|      parlamentar|partido| uf|           categoria| ano| valor|
+-----------------+-------+---+--------------------+----+------+
|LIDERANÇA DO PSDB|   null| NA|    PASSAGENS AÉREAS|2015|2570.1|
|LIDERANÇA DO PSDB|   null| NA|           TELEFONIA|2015|459.62|
|LIDERANÇA DO PSDB|   null| NA|           TELEFONIA|2015|530.21|
|LIDERANÇA DO PSDB|   null| NA|           TELEFONIA|2015|341.35|
|LIDERANÇA DO PSDB|   null| NA|           TELEFONIA|2015|   324|
|LIDERANÇA DO PSDB|   null| NA|COMBUSTÍVEIS E LU...|2015| 106.2|
|LIDERANÇA DO PSDB|   null| NA|COMBUSTÍVEIS E LU...|2015| 137.2|
|LIDERANÇA DO PSDB|   null| NA|COMBUSTÍVEIS E LU...|2015|115.88|
|LIDERANÇA DO PSDB|   null| NA|COMBUSTÍVEIS E LU...|2015|  96.9|
|LIDERANÇA DO PSDB|   null| NA|COMBUSTÍVEIS E LU...|2015|118.61|
+-----------------+-------+---+--------------------+----+------+
only showing top 10 rows



In [10]:
df2 = df.withColumn("valor", udf_to_value(df["valor"]))

In [11]:
df2.printSchema()

root
 |-- parlamentar: string (nullable = true)
 |-- partido: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- valor: float (nullable = true)



* Agrupamento e sumarização das principais informações

In [12]:
dfPq = df2.groupBy('parlamentar','partido','uf','categoria','ano').agg(F.sum('valor').alias('valor'))

In [13]:
dfPq.printSchema()
dfPq.count()

root
 |-- parlamentar: string (nullable = true)
 |-- partido: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- valor: double (nullable = true)



32309

* Salvamento de um novo arquivo Parquet com o agrupamento

In [14]:
dfPq.write.parquet(MODELED+'/parquet/sumario/')

In [15]:
os.listdir(MODELED+'/parquet/')

['sumario']