In [1]:
import os
# Modulo onde fara com que localizemos o spark na maquina, junto com outros arquivos
import findspark

# Definindo o caminho onde se encontram os arquivos em uma váriavel de ambiente
os.environ['SPARK_HOME'] = 'C:\spark\spark-3.4.0-bin-hadoop3'
# Setando o caminho onde o java esta instalado
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jre-1.8'

findspark.init()

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import DoubleType, StringType

In [2]:
# Cria a sessão indicando para rodar localmente e criar as CPUs possíveis
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Create Session') \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [3]:
# Por padrao o spark espera um json de uma linha apenas, por conta disso ajustamos o multine
# Ao utilizar o * eu posso ler va
df_2020 = spark.read.option("multiline", "true").json(os.getcwd() + r"\dados\response_2020.json")

In [4]:
# Por padrao o spark espera um json de uma linha apenas, por conta disso ajustamos o multine
# Ao utilizar o * eu posso ler va
df_2021 = spark.read.option("multiline", "true").json(os.getcwd() + r"\dados\response_2021.json")

In [5]:
# Por padrao o spark espera um json de uma linha apenas, por conta disso ajustamos o multine
# Ao utilizar o * eu posso ler va
df_2022 = spark.read.option("multiline", "true").json(os.getcwd() + r"\dados\response_2022.json")

In [6]:
df_2020.show()

+--------------------+
|           resultado|
+--------------------+
|{MATE, (2020)[ANO...|
+--------------------+



In [7]:
df_2020.collect()

[Row(resultado=Row(banco='MATE', consulta='(2020)[ANO]', dataHora=Row(date='2022-10-19'), erro=False, listaItem=[Row(acompanhamento='N', ano='2020', apelido=None, arquivamento='20210224', assunto='Indica Vívia Paula Diniz Abreu para compor o Conselho Estadual de\nEducação.', assuntoGeral='Conselho Estadual', atualizacao='20210224', atualizacaoTexto=None, autor='Governador Romeu Zema Neto', classificacao=None, dataPublicacao='2020-03-12', dataUltimaAcao='2021-02-04', dominio='Remissivo', edicao=None, horario='1528', horarioTexto=None, indexacao='Indicação, Membro, Conselho Estadual de Educação (CEE).', legislacaoCitada=None, linkProposicaoAnexadora=None, links=[Row(ano=2021, banco='mate', descricao='RQO 985 2021', expressao="(RQO adj '985' adj 2021[PROP])[dbmt]", numero=985, tipo='RQO', url='/proposicoes/pesquisa/avancada?expr=%28RQO+adj+%27985%27+adj+2021%5BPROP%5D%29%5Bdbmt%5D'), Row(ano=2021, banco='mate', descricao='MSG 115 2021', expressao="(MSG adj '115' adj 2021[PROP])[dbmt]", nu

In [8]:
# Dentro do campo struct temos o campo que queremos que é o listaItem, que é um array, por conta disso que precisamos dar um explode nele
df_2020 = df_2020.select(F.explode(F.col('resultado.listaItem')))\
    .select('col.*')

df_2021 = df_2021.select(F.explode(F.col('resultado.listaItem')))\
    .select('col.*')

df_2022 = df_2022.select(F.explode(F.col('resultado.listaItem')))\
    .select('col.*')

In [9]:
# Utilizando o UnionByName, pois os arquivos possuem colunas diferentes,
# Desta forma ele empilha e preenche com valores nulos que nao tiverem
df_final = df_2020.unionByName(df_2021, allowMissingColumns=True).\
    unionByName(df_2022, allowMissingColumns=True)

In [10]:
df_final.show(5)

+--------------+----+-------+------------+--------------------+-----------------+-----------+----------------+--------------------+-------------+--------------+--------------+----------+------+-------+------------+--------------------+----------------+-----------------------+--------------------+--------------------+------------------------+--------------------+-------------------------+-----------------+---------+------+---------+--------------------+--------------------+--------+-------------+------------------+----------------+--------------------+--------------------+--------------------+-----------+-------+------------+----+
|acompanhamento| ano|apelido|arquivamento|             assunto|     assuntoGeral|atualizacao|atualizacaoTexto|               autor|classificacao|dataPublicacao|dataUltimaAcao|   dominio|edicao|horario|horarioTexto|           indexacao|legislacaoCitada|linkProposicaoAnexadora|               links|         linksOrigem|linksProposicoesAnexadas|         linksTexto

In [11]:
df_final.printSchema()

root
 |-- acompanhamento: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- apelido: string (nullable = true)
 |-- arquivamento: string (nullable = true)
 |-- assunto: string (nullable = true)
 |-- assuntoGeral: string (nullable = true)
 |-- atualizacao: string (nullable = true)
 |-- atualizacaoTexto: string (nullable = true)
 |-- autor: string (nullable = true)
 |-- classificacao: string (nullable = true)
 |-- dataPublicacao: string (nullable = true)
 |-- dataUltimaAcao: string (nullable = true)
 |-- dominio: string (nullable = true)
 |-- edicao: string (nullable = true)
 |-- horario: string (nullable = true)
 |-- horarioTexto: string (nullable = true)
 |-- indexacao: string (nullable = true)
 |-- legislacaoCitada: string (nullable = true)
 |-- linkProposicaoAnexadora: struct (nullable = true)
 |    |-- ano: long (nullable = true)
 |    |-- banco: string (nullable = true)
 |    |-- descricao: string (nullable = true)
 |    |-- expressao: string (nullable = true)
 |    |

In [12]:
# Tabela Proposicao
# id (incremental)
# author (string - Autor - ex. Governador Romeu Zema Neto)
# presentationDate (timestamp - Data de Apresentação - ex. 2022-10-06T00:00:00Z)
# ementa (string - Assunto - ex. Encaminha o Projeto de Lei 4008 2022, que dispõe sobre a revisão do Plano Plurianual de Ação Governamental - PPAG - 2020-2023, para o exercício de 2023.)
# regime (string - Regime - ex. Especial)
# situation (string - Situação - ex. Publicado)
# propositionType (string - Tipo da Proposição - ex. MSG)
# number (string - Numero - ex. 300)
# year (integer - Ano - ex. 2022)
# city (string -> fixo "Belo Horizonte")
# state (string -> fixo "Minas Gerais")

# Tabela Tramitacao
# id (incremental)
# createdAt (string - Data - ex. 2022-10-04T00:00:00Z)
# description (string - Histórico - Proposição lida em Plenário.\nPublicada no DL em 6 10 2022, pág 24.)
# local (string - Local - ex. Plenário)
# propositionId (chave estrangeira da proposição)

In [20]:
df_proposicao = df_final\
    .withColumn(
        'id_proposition', F.monotonically_increasing_id()
    )\
    .select(
        F.col('id_proposition'),
        F.col('autor').alias('author'),
        F.col('dataPublicacao').alias('presentationDate'),
        F.col('assunto').alias('ementa'),
        F.col('regime'),
        F.col('situacao').alias('situation'),
        F.col('siglaTipoProjeto').alias('propositiontype'),
        F.col('numero').alias('number'),
        F.col('ano').alias('year'),
        F.col('listaHistoricoTramitacoes').alias('tramitacoes')
    )\
    .withColumn(
        'presentationDate', F.to_timestamp(F.col('presentationDate'),'yyyy-mm-dd')
    )\
    .withColumn(
        'year', F.col('year').cast('integer')
    )\
    .withColumn(
        'city', F.lit('Belo Horizonte')
    )\
    .withColumn(
        'state', F.lit('Minas Gerais')
    )\
    .withColumn(
        'ementa', F.regexp_replace(F.col("ementa"), "\n|\r", " ")
    )\
    .withColumn(
        'tramitacoes',
        F.transform(
            F.col('tramitacoes'),
                    lambda x: F.struct(
                        F.monotonically_increasing_id().alias('id_tramitation'),
                        F.to_timestamp(x.data, 'yyyy-MM-dd').cast('string').alias('createdAt'),
                        F.regexp_replace(x.historico.alias('historico').alias('description'), "\n|\r", " "),
                        x.local.alias('local'),
                        F.col('id_proposition')
                    )
            )
    )

In [21]:
df_proposicao.show(5, truncate=False)

+--------------+--------------------------+-------------------+--------------------------------------------------------------------------------+--------+---------------------------------+---------------+------+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------