In [1]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [2]:
%fs ls /FileStore/tables/raw

path,name,size
dbfs:/FileStore/tables/raw/despesas_2009-d0d22.csv,despesas_2009-d0d22.csv,73741287
dbfs:/FileStore/tables/raw/despesas_2010-91775.csv,despesas_2010-91775.csv,96572819
dbfs:/FileStore/tables/raw/despesas_2011-2289d.csv,despesas_2011-2289d.csv,107508369
dbfs:/FileStore/tables/raw/despesas_2012-8ecc3.csv,despesas_2012-8ecc3.csv,99933266
dbfs:/FileStore/tables/raw/despesas_2013-e29e4.csv,despesas_2013-e29e4.csv,108572926
dbfs:/FileStore/tables/raw/despesas_2014-fae0d.csv,despesas_2014-fae0d.csv,104301267
dbfs:/FileStore/tables/raw/despesas_2015-55418.csv,despesas_2015-55418.csv,128248244
dbfs:/FileStore/tables/raw/despesas_2016-414fe.csv,despesas_2016-414fe.csv,122285142
dbfs:/FileStore/tables/raw/despesas_2017-89277.csv,despesas_2017-89277.csv,118080465
dbfs:/FileStore/tables/raw/despesas_2018-2f77b.csv,despesas_2018-2f77b.csv,96715561


In [3]:
# Temas de proposicoes
propositions_theme_schema = StructType([
    StructField('uriProposicao', StringType()),
    StructField('siglaTipo', StringType()),
    StructField('numero', IntegerType()),
    StructField('ano', IntegerType()),
    StructField('codTema', IntegerType()),
    StructField('tema', StringType()),
    StructField('relevancia', IntegerType())
])

df_propositions_theme = spark.read.csv('/FileStore/tables/raw/proposicoesTemas*.csv', header=True, sep=';', schema=propositions_theme_schema)

df_propositions_theme = df_propositions_theme.withColumn('ID_PROPOSICAO', F.split(df_propositions_theme.uriProposicao, '/').getItem(6))

df_propositions_theme = df_propositions_theme.select(
  F.col('ID_PROPOSICAO'),
  F.col('siglaTipo').alias('DC_SIGLA_TIPO'),
  F.col('numero').alias('NU_TEMA_PROPOSICAO'),
  F.col('ano').alias('NU_ANO'),
  F.col('codTema').alias('CD_TEMA'),
  F.col('tema').alias('DC_TEMA'),
  F.col('relevancia').alias('NU_RELEVANCIA')
)

df_propositions_theme = df_propositions_theme.cache()

In [4]:
df_propositions_theme.show(truncate=False)

In [5]:
# Proposicoes
propositions_schema = StructType([
    StructField('id', IntegerType()),
    StructField('uri', StringType()),
    StructField('siglaTipo', StringType()),
    StructField('numero', IntegerType()),
    StructField('ano', IntegerType()),
    StructField('codTipo', IntegerType()),
    StructField('descricaoTipo', StringType()),
    StructField('ementa', StringType()),
    StructField('ementaDetalhada', StringType()),
    StructField('keywords', StringType()),
    StructField('dataApresentacao', StringType()),
    StructField('uriOrgaoNumerador', StringType()),
    StructField('uriPropAnterior', StringType()),
    StructField('uriPropPrincipal', StringType()),
    StructField('uriPropPosterior', StringType()),
    StructField('urlInteiroTeor', StringType()),
    StructField('urnFinal', StringType()),
    StructField('ultimoStatus_dataHora', StringType()),
    StructField('ultimoStatus_sequencia', IntegerType()),
    StructField('ultimoStatus_uriRelator', StringType()),
    StructField('ultimoStatus_idOrgao', IntegerType()),
    StructField('ultimoStatus_siglaOrgao', StringType()),
    StructField('ultimoStatus_uriOrgao', StringType()),
    StructField('ultimoStatus_regime', StringType()),
    StructField('ultimoStatus_descricaoTramitacao', StringType()),
    StructField('ultimoStatus_idTipoTramitacao', IntegerType()),
    StructField('ultimoStatus_descricaoSituacao', StringType()),
    StructField('ultimoStatus_idSituacao', IntegerType()),
    StructField('ultimoStatus_despacho', StringType()),
    StructField('ultimoStatus_url', StringType())
])

df_propositions = spark.read.csv('/FileStore/tables/raw/proposicoes_*.csv', header=True, sep=';', schema=propositions_schema)

df_propositions = df_propositions.cache()

df_propositions = df_propositions.select(
    F.col('id').alias('ID_PROPOSICAO'),
    F.col('siglaTipo').alias('SG_TIPO'),
    F.col('numero').alias('NU_PROPOSICAO'),
    F.col('ano').alias('NU_ANO'),
    F.col('codTipo').alias('CD_TIPO'),
    F.col('descricaoTipo').alias('DC_TIPO'),
    F.col('ementa').alias('DC_EMENTA'),
    F.col('ementaDetalhada').alias('DC_EMENTA_DETALHADA'),
    F.col('keywords').alias('DC_KEYWORDS'),
    F.col('dataApresentacao').alias('DT_APRESENTACAO')
)

In [6]:
df_propositions.show()

In [7]:
# Despesas
expenses_schema = StructType([
    StructField('txtNomeParlamentar', StringType()),
    StructField('cpf', StringType()),
    StructField('ideCadastro', StringType()),
    StructField('nuCarteiraParlamentar', StringType()),
    StructField('nuLegislatura', IntegerType()),
    StructField('sgUF', StringType()),
    StructField('sgPartido', StringType()),
    StructField('codLegislatura', StringType()),
    StructField('numSubCota', IntegerType()),
    StructField('txtDescricao', StringType()),
    StructField('numEspecificacaoSubCota', IntegerType()),
    StructField('txtDescricaoEspecificacao', StringType()),
    StructField('txtFornecedor', StringType()),
    StructField('txtCNPJCPF', StringType()),
    StructField('txtNumero', StringType()),
    StructField('indTipoDocumento', StringType()),
    StructField('datEmissao', StringType()),
    StructField('vlrDocumento', DoubleType()),
    StructField('vlrGlosa', DoubleType()),
    StructField('vlrLiquido', DoubleType()),
    StructField('numMes', IntegerType()),
    StructField('numAno', IntegerType()),
    StructField('numParcela', IntegerType()),
    StructField('txtPassageiro', StringType()), 
    StructField('txtTrecho', StringType()), 
    StructField('numLote', IntegerType()), 
    StructField('numRessarcimento', IntegerType()), 
    StructField('vlrRestituicao', DoubleType()), 
    StructField('nuDeputadoId', IntegerType()), 
    StructField('ideDocumento', IntegerType()), 
    StructField('urlDocumento', StringType())
])

df_expenses = spark.read.csv('/FileStore/tables/raw/despesas*.csv', header=True, sep=';', schema=expenses_schema)

df_expenses = df_expenses.where(F.col('ideCadastro').isNotNull())

df_expenses = df_expenses.cache()

df_politicians = df_expenses.select(
  F.col('ideCadastro').alias('ID_DEPUTADO'),
  F.col('txtNomeParlamentar').alias('NM_PARLAMENTAR'),
  F.col('cpf').alias('DC_CPF_PARLAMENTAR'),
  F.col('nuCarteiraParlamentar').alias('NU_CARTEIRA_PARLAMENTAR'),
  F.col('nuLegislatura').alias('NU_LEGISLATURA'),
  F.col('sgUF').alias('SG_UF'),
  F.col('sgPartido').alias('SG_PARTIDO'),
  F.col('codLegislatura').alias('CD_LEGISLATURA')
).drop_duplicates()

df_expenses = df_expenses.select(
  F.col('ideCadastro').alias('ID_DEPUTADO'),
  F.col('numSubCota').alias('NU_SUBCOTA'),
  F.col('txtDescricao').alias('DC_DESCRICAO'),
  F.col('numEspecificacaoSubCota').alias('NU_ESPECIFICACAO_SUBCOTA'),
  F.col('txtDescricaoEspecificacao').alias('DC_ESPECIFICACAO'),
  F.col('txtFornecedor').alias('NM_FORNECEDOR'),
  F.col('txtCNPJCPF').alias('DC_CPFCNPJ_FORNECEDOR'),
  F.col('txtNumero').alias('DC_NUMERO'),
  F.col('indTipoDocumento').alias('DC_TIPO_DOCUMENTO'),
  F.col('datEmissao').alias('DT_EMISSAO'),
  F.col('vlrDocumento').alias('VL_DOCUMENTO'),
  F.col('vlrGlosa').alias('VL_GLOSA'),
  F.col('vlrLiquido').alias('VL_LIQUIDO'),
  F.col('numMes').alias('NU_MES'),
  F.col('numAno').alias('NU_ANO'),
  F.col('numParcela').alias('NU_PARCELA'),
  F.col('txtPassageiro').alias('DC_PASSAGEIRO'), 
  F.col('txtTrecho').alias('DC_TRECHO'), 
  F.col('numLote').alias('NU_LOTE'), 
  F.col('numRessarcimento').alias('NU_RESSARCIMENTO'), 
  F.col('vlrRestituicao').alias('VL_RESTITUICAO'), 
  F.col('ideDocumento').alias('ID_DOCUMENTO')
)

In [8]:
df_politicians.show()

In [9]:
df_expenses.show()

In [10]:
# Autores das Proposicoes
propositions_authors_schema = StructType([
    StructField('idProposicao', IntegerType()),
    StructField('uriProposicao', StringType()),
    StructField('idDeputadoAutor', IntegerType()),
    StructField('uriAutor', StringType()),
    StructField('codTipoAutor', StringType()),
    StructField('tipoAutor', StringType()),
    StructField('nomeAutor', StringType()),
    StructField('siglaPartidoAutor', StringType()),
    StructField('uriPartidoAutor', StringType()),
    StructField('siglaUFAutor', StringType()),
    StructField('ordemAssinatura', IntegerType()),
    StructField('proponente', IntegerType())
])

df_propositions_authors = spark.read.csv('/FileStore/tables/raw/proposicoesAutores*.csv', header=True, sep=';', schema=propositions_authors_schema)

df_propositions_authors = df_propositions_authors.cache()

df_propositions_authors = df_propositions_authors.select(
    F.col('idProposicao').alias('ID_PROPOSICAO'),
    F.col('idDeputadoAutor').alias('ID_DEPUTADO'),
    F.col('codTipoAutor').alias('CD_TIPO_AUTOR'),
    F.col('tipoAutor').alias('TP_AUTOR'),
    F.col('nomeAutor').alias('NM_AUTOR'),
    F.col('siglaPartidoAutor').alias('SG_PARTIDO_AUTOR'),
    F.col('siglaUFAutor').alias('SG_UF_AUTOR'),
    F.col('ordemAssinatura').alias('NU_ORDEM_ASSINATURA'),
    F.col('proponente').alias('NU_PROPONENTE')
)

df_propositions_authors = df_propositions_authors.where(F.col('ID_DEPUTADO').isNotNull())

In [11]:
df_propositions_authors.show()

In [None]:
# Export Data to curated zone

df_propositions_theme.write.parquet('/FileStore/tables/curated/propositions_themes')
df_propositions_authors.write.parquet('/FileStore/tables/curated/propositions_authors')
df_propositions.write.parquet('/FileStore/tables/curated/propositions')
df_politicians.write.parquet('/FileStore/tables/curated/politicians')
df_expenses.write.parquet('/FileStore/tables/curated/expenses')