In [1]:
# import das bibliotecas
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
import collections
import os
from os.path import isfile, isdir, join

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars file:///home/jovyan/jdbc/postgresql-42.2.17.jar pyspark-shell'

In [3]:
db_host = os.getenv('POSTGRES_HOST')
db_port = os.getenv('POSTGRES_PORT')
db_name = os.getenv('POSTGRES_DB')
db_user = os.getenv('POSTGRES_USER')
db_pass = os.getenv('POSTGRES_PASSWORD')

db_driver = "org.postgresql.Driver"
db_url = "jdbc:postgresql://"+db_host+":"+db_port+"/" + db_name

In [4]:
# quando for True, as tabelas das dimensões serão recriadas e carregadas
carregar_dimensoes = False

In [5]:
# inicialização do spark
conf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName("ETL-CarregarDadosNasDimensoes") \
        .set("spark.executor.memory", "8g") \
        .set("spark.driver.memory", "8g") \
        .set("spark.driver.maxResultSize", "2g") \
        .set("spark.ui.enabled", "true") \
        .set("spark.sql.shuffle.partitions" , "800") \
        .set("spark.sql.execution.arrow.pyspark.enabled" , "false") \

spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate()

sc = spark.sparkContext

In [6]:
# definindo o schema dos dados para leitura dos arquivos JSON
schema = StructType([
    StructField("dadosBasicos", StructType([
        StructField("assunto", ArrayType(
            StructType([
                StructField("assuntoLocal", StructType([
                    StructField("codigoAssunto", LongType(), True),
                    StructField("codigoPaiNacional", LongType(), True),
                    StructField("descricao", StringType(), True)
                ]), True),
                StructField("codigoNacional", LongType(), True),
                StructField("principal", BooleanType(), True)
            ]),
        ), True),
        StructField('classeProcessual', LongType(), True),
        StructField('codigoLocalidade', StringType(), True),
        StructField('competencia', StringType(), True),
        StructField('dataAjuizamento', StringType(), True),
        StructField('dscSistema', StringType(), True),
        StructField('nivelSigilo', LongType(), True),
        StructField('numero', StringType(), True),
        StructField("orgaoJulgador", StructType([
            StructField("codigoMunicipioIBGE", LongType(), True),
            StructField("codigoOrgao", StringType(), True),
            StructField("instancia", StringType(), True),
            StructField("nomeOrgao", StringType(), True)
        ]), True),
        StructField('procEl', LongType(), True),
        StructField("tamanhoProcesso", StringType(), True),
        StructField("totalAssuntos", LongType(), True),
        StructField("valorCausa", StringType(), True)       
    ]), True),
    StructField("grau", StringType(), True),
    StructField("millisInsercao", LongType(), True),
    StructField("movimento", ArrayType(     
        StructType([
            StructField("complementoNacional", ArrayType(
                StructType([
                    StructField("codComplemento", LongType(), True),
                    StructField("codComplementoTabelado", LongType(), True),
                    StructField("descricaoComplemento", StringType(), True),
                ])
            ), True),
            StructField("dataHora", StringType(), True),
            StructField("idDocumentoVinculado", ArrayType(
                StringType(),
            ), True),
            StructField("identificadorMovimento", StringType(), True),
            StructField("movimentoLocal", StructType([
                StructField('codigoMovimento', LongType(), True),
                StructField('codigoPaiNacional', LongType(), True)
            ]), True),
            StructField("movimentoNacional", StructType([
                StructField('codigoNacional', LongType(), True)
            ]), True),
            StructField("nivelSigilo", StringType(), True),
            StructField("orgaoJulgador", StructType([
                StructField("codigoMunicipioIBGE", LongType(), True),
                StructField("codigoOrgao", StringType(), True),
                StructField("instancia", StringType(), True),
                StructField("nomeOrgao", StringType(), True)
            ]), True),
            StructField("tipoDecisao", StringType(), True),
            StructField("tipoResponsavelMovimento", StringType(), True)
        ]),
    ), True),
    StructField("siglaTribunal", StringType(), True)
])

In [7]:
# carrega o CSV de classes e faz a carga da dimensão
df_classes = spark.read \
    .option("header","true") \
    .option("inferSchema","true") \
    .option("delimiter",";") \
    .csv("./base/sgt_classes.csv")

df_classes.createOrReplaceTempView("classes")
   
df_qry_classes = spark.sql(
    "SELECT " +
    "codigo AS cod," + 
    "descricao," + 
    "sigla," + 
    "cod_pai AS codpai " +    
    "FROM classes "
)

if carregar_dimensoes :
    df_qry_classes.write \
        .mode("overwrite") \
        .format("jdbc") \
        .option("url", db_url).option("user", db_user).option("password", db_pass).option("driver", db_driver) \
        .option("dbtable", "inovacnj.classe") \
        .save()

print("tabela inovacnj.classe criada.")

tabela inovacnj.classe criada.


In [9]:
# carrega o CSV de assuntos e faz a carga da dimensão
df_assuntos = spark.read \
    .option("header","true") \
    .option("inferSchema","true") \
    .option("delimiter",";") \
    .csv("./base/sgt_assuntos.csv")

df_assuntos.createOrReplaceTempView("assuntos")
   
df_qry_assuntos = spark.sql(
    "SELECT " +
    "codigo AS cod," + 
    "descricao," + 
    "cod_pai AS codpai " +    
    "FROM assuntos "
)

if carregar_dimensoes :
    df_qry_assuntos.write \
        .mode("overwrite") \
        .format("jdbc") \
        .option("url", db_url).option("user", db_user).option("password", db_pass).option("driver", db_driver) \
        .option("dbtable", "inovacnj.assunto") \
        .save()

print("tabela inovacnj.assunto criada.")

tabela inovacnj.assunto criada.


In [10]:
# carrega o CSV de movimentos e faz a carga da dimensão
df_movimentos = spark.read \
    .option("header","true") \
    .option("inferSchema","true") \
    .option("delimiter",";") \
    .csv("./base/sgt_movimentos.csv")

# cria uma view temporaria dos movimentos
df_movimentos.createOrReplaceTempView("movimentos")

df_qry_movimentos = spark.sql(
    "SELECT " +
    "codigo AS cod," + 
    "descricao," + 
    "cod_pai AS codpai " +    
    "FROM movimentos "
)

if carregar_dimensoes :
    df_qry_movimentos.write \
        .mode("overwrite") \
        .format("jdbc") \
        .option("url", db_url).option("user", db_user).option("password", db_pass).option("driver", db_driver) \
        .option("dbtable", "inovacnj.movimentocnj") \
        .save()

print("tabela inovacnj.movimentocnj criada.")

tabela inovacnj.movimentocnj criada.


In [8]:
# carrega o CSV de grau de jurisdição

df_graujur = spark.read \
.option("header","true") \
    .option("inferSchema","true") \
    .option("delimiter",";") \
    .csv("./base/grau_jurisdicao.csv")

df_graujur.createOrReplaceTempView("graujur")
df_graujur = spark.sql("SELECT cod as cod, " +
                               "descricao " +
                           "FROM graujur "
                      )

df_graujur.write \
        .mode("overwrite") \
        .format("jdbc") \
        .option("url", db_url).option("user", db_user).option("password", db_pass).option("driver", db_driver) \
        .option("dbtable", "inovacnj.grau_jurisdicao") \
        .save()

print("tabela inovacnj.grau_jurisdicao criada.")

tabela inovacnj.grau_jurisdicao criada.


In [12]:
# carrega o CSV de serventia e faz a carga da dimensão
df_serventias = spark.read \
    .option("header","true") \
    .option("inferSchema","true") \
    .option("delimiter",";") \
    .csv("./base/mpm_serventias.csv")

df_serventias.createOrReplaceTempView("serventias")

df_qry_serventias = spark.sql(
    "SELECT " +
    "SEQ_ORGAO AS cod, " + 
    "DSC_ORGAO AS descricao, " + 
    "SEQ_ORGAO_PAI AS codpai, " + 
    "TIP_ORGAO AS sigla_tipoj, " + 
    "DSC_TIP_ORGAO AS tipo_oj, " + 
    "DSC_CIDADE AS cidade, " + 
    "SIG_UF AS uf, " + 
    "COD_IBGE AS codibge, " + 
    "TIP_ESFERA_JUSTICA AS esfera " + 
    "FROM serventias "
)

if carregar_dimensoes :
    df_qry_serventias.write \
        .mode("overwrite") \
        .format("jdbc") \
        .option("url", db_url).option("user", db_user).option("password", db_pass).option("driver", db_driver) \
        .option("dbtable", "inovacnj.orgao_julgador") \
        .save()

    print("tabela inovacnj.orgao_julgador criada.")

In [13]:
# carrega o CSV de tribunal e faz a carga da dimensão
df_tribunais = spark.read \
    .option("header","true") \
    .option("inferSchema","true") \
    .option("delimiter",",") \
    .csv("./base/tribunal.csv")

df_tribunais.createOrReplaceTempView("tribunais")

df_qry_df_tribunais = spark.sql(
    "SELECT * " +
    "FROM tribunais "
)

if carregar_dimensoes :
    df_qry_df_tribunais.write \
        .mode("overwrite") \
        .format("jdbc") \
        .option("url", db_url).option("user", db_user).option("password", db_pass).option("driver", db_driver) \
        .option("dbtable", "inovacnj.tribunal") \
        .save()

    print("tabela inovacnj.tribunal criada.")

In [14]:
df_qry_classes = df_qry_classes.withColumnRenamed("descricao", "descclasse")
df_qry_movimentos = df_qry_movimentos.withColumnRenamed("descricao", "descmovimento")

In [19]:
ramos = {'./base/justica_eleitoral': 'jele', './base/justica_estadual': 'jest', \
         './base/justica_federal': 'jfed', './base/justica_militar': 'jmil', \
         './base/justica_trabalho': 'jtra'}

In [23]:
# faz o carregamento de todos os arquivos em um único DataFrame,
# geracao do CSV com os dados consolidados
# cria a tabela fato com os movimentos processuais

basedir = "./base"

dirs_ramos_justica = [join(basedir, f) for f in os.listdir(basedir) if isdir(join(basedir, f))]

is_first = True

for dir_ramo_just in dirs_ramos_justica:
    print("Iniciando carregamento do ramo de justica: " + dir_ramo_just)
    sufixo_tabela = ramos.get(dir_ramo_just, 'default')
    print("Sufixo tabela para ramo de justica: " + sufixo_tabela)
    dirs_tribunais = [join(dir_ramo_just, f) for f in os.listdir(dir_ramo_just) if isdir(join(dir_ramo_just, f))]
    
    for dir_trib in dirs_tribunais:
        print("Iniciando carregamento do tribunal: " + dir_trib)
        
        arquivos = [join(dir_trib, f) for f in os.listdir(dir_trib) if isfile(join(dir_trib, f))]
        
        df_union_tribunal = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
        
        for arq in arquivos:
            if arq.endswith(".DS_Store") :
                continue
                
            print("Carregando dataframe do arquivo: " + arq)
            df = spark.read.schema(schema).json(arq)
            df_union_tribunal = df_union_tribunal.union(df)
        
        # Cria uma view temporaria para o dataframe
        df_union_tribunal.createOrReplaceTempView("proc_movimentos")
        
        
        # Query para carregar os assuntos dos processos na dimensão: inovacnj.processo_assunto
        df_query_processo_assunto = spark.sql(
            "SELECT DISTINCT " + 
            "siglaTribunal AS codtribunal, " + 
            "dadosBasicos.numero AS npu, " + 
            "to_timestamp(dadosBasicos.dataAjuizamento, 'yyyyMMddHHmmss') AS dtajuizamento, "
            "dadosBasicos.classeProcessual AS codclasse, " +
            
            "coalesce(exp_assunto.assunto.codigoNacional, exp_assunto.assunto.codigoNacional, -1) AS codassunto, " +
            "exp_assunto.assunto.principal AS assunto_principal, " + 
            "coalesce(exp_assunto.assunto.assuntoLocal.codigoAssunto, exp_assunto.assunto.assuntoLocal.codigoAssunto, -1) AS codassunto_local, " +
            "coalesce(exp_assunto.assunto.assuntoLocal.codigoPaiNacional, exp_assunto.assunto.assuntoLocal.codigoPaiNacional, -1) AS codassunto_pai, " +
            "exp_assunto.assunto.assuntoLocal.descricao AS descassunto_local " +
            
            "FROM proc_movimentos " + 
            "LATERAL VIEW explode(dadosBasicos.assunto) exp_assunto as assunto " +
            "WHERE cast(substring(dadosBasicos.dataAjuizamento,0,4) as INT) >= 2000 AND to_timestamp(dadosBasicos.dataAjuizamento, 'yyyyMMddHHmmss') >= to_timestamp('20000101000000', 'yyyyMMddHHmmss') "
        )
        
        df_query_processo_assunto = df_query_processo_assunto \
           .join(df_qry_classes, df_query_processo_assunto["codclasse"] == df_qry_classes["cod"], "left") \
           .join(df_qry_assuntos, df_query_processo_assunto["codassunto"] == df_qry_assuntos["cod"], "left") \
           .select( \
                col("codtribunal"), col("npu"), col("dtajuizamento"), \
                col("codclasse"), col("descclasse"), \
                col("codassunto"), col("descricao").alias("descassunto"), \
                col("assunto_principal"), col("codassunto_local"), col("descassunto_local"), col("codassunto_pai") \
        )
        
        
        # Query para formato em CSV
        df_query_processo_movimento = spark.sql(
            "SELECT DISTINCT " + 
            "siglaTribunal AS codtribunal, " + 
            "grau, " +
            "millisinsercao, " +

            "dadosBasicos.classeProcessual AS codclasse, " +
            "dadosBasicos.codigoLocalidade AS codlocalidade, " +
            "coalesce(dadosBasicos.competencia, dadosBasicos.competencia, -1) as competencia, " +
            "to_timestamp(dadosBasicos.dataAjuizamento, 'yyyyMMddHHmmss') AS dtajuizamento, "
            "dadosBasicos.dscSistema AS descsistema, " +
            "dadosBasicos.nivelSigilo AS nivelsigilo, " +
            "dadosBasicos.numero AS npu, " + 
            "dadosBasicos.orgaoJulgador.codigoMunicipioIBGE AS oj_codibge, " +
            "dadosBasicos.orgaoJulgador.codigoOrgao AS oj_cod, " +
            "dadosBasicos.orgaoJulgador.instancia AS oj_instancia, " +
            "dadosBasicos.orgaoJulgador.nomeOrgao AS oj_descricao, " +
            "dadosBasicos.procEl AS tramitacao, " +
            "dadosBasicos.tamanhoProcesso AS tamanhoprocesso, " +
            "dadosBasicos.valorCausa AS valorcausa, " +

            "exp_movimento.movimento.dataHora AS mov_dtmov, " +
            "exp_movimento.movimento.nivelSigilo AS mov_nivelsigilo, " +
            "exp_movimento.movimento.movimentoNacional.codigoNacional AS mov_cod, " +
            "exp_movimento.movimento.movimentoLocal.codigoMovimento AS mov_codlocal, " +
            "exp_movimento.movimento.movimentoLocal.codigoPaiNacional AS mov_codpainacional, " +

            "exp_movimento.movimento.orgaoJulgador.codigoMunicipioIBGE as mov_oj_codibge, " +
            "exp_movimento.movimento.orgaoJulgador.codigoOrgao as mov_oj_cod, " +
            "exp_movimento.movimento.orgaoJulgador.instancia as mov_oj_instancia, " +
            "exp_movimento.movimento.orgaoJulgador.nomeOrgao as mov_oj_descricao, " +

            "coalesce(exp_movimento.movimento.tipoDecisao, exp_movimento.movimento.tipoDecisao, -1) as mov_tpdecisao, " +
            "coalesce(exp_movimento.movimento.tipoResponsavelMovimento, exp_movimento.movimento.tipoResponsavelMovimento, -1) as mov_tprespmov " +

            "FROM proc_movimentos " + 
            "LATERAL VIEW explode(movimento) exp_movimento as movimento " + 
            "WHERE cast(substring(dadosBasicos.dataAjuizamento,0,4) as INT) >= 2000 AND to_timestamp(dadosBasicos.dataAjuizamento, 'yyyyMMddHHmmss') >= to_timestamp('20000101000000', 'yyyyMMddHHmmss') " + 
            "AND exp_movimento.movimento.movimentoNacional.codigoNacional NOT IN(581, 85, 12270, 12271) " + 
            "AND size(proc_movimentos.movimento) > 0 "
            #"AND (proc_movimentos.movimento[0].movimentoNacional.codigoNacional IN (26, 12474) " +
            #"AND proc_movimentos.movimento[size(proc_movimentos.movimento) -1].movimentoNacional.codigoNacional IN (22, 246)) "
        )
        
        df_query_processo_movimento = df_query_processo_movimento \
           .join(df_qry_movimentos, df_query_processo_movimento["mov_cod"] == df_qry_movimentos["cod"], "left") \
           .join(df_qry_classes, df_query_processo_movimento["codclasse"] == df_qry_classes["cod"], "left") \
           .select( \
                col("codtribunal"), col("grau"), col("millisinsercao"), col("codclasse"), col("descclasse"), \
                col("codlocalidade"), col("competencia"), col("dtajuizamento"), col("descsistema"), \
                col("nivelsigilo"), col("npu"), col("valorcausa"), col("tramitacao"), col("tamanhoprocesso"), \
                col("oj_codibge"), col("oj_cod"), col("oj_instancia"), col("oj_descricao"), \
                col("mov_dtmov"), col("mov_cod"), col("descmovimento"), col("mov_codlocal"), \
                col("mov_codpainacional"), col("mov_nivelsigilo"), col("mov_oj_codibge"), \
                col("mov_oj_cod"), col("mov_oj_instancia"), col("mov_oj_descricao"), col("mov_tpdecisao"), \
                col("mov_tprespmov") \
        )
        
        #df_query_distinctPd = df_query_processo_movimento.toPandas()
        #df_query_distinctPd.to_csv('./output/movimentos_tribunais.csv', mode='a', header=is_first, sep = ";", index=False, chunksize=1000)
        
        df_query_processo_movimento = df_query_processo_movimento.withColumn('mov_dtmov', to_timestamp(df_query_processo_movimento['mov_dtmov'], 'yyyyMMddHHmmss'))
        
        if is_first == True:
            is_first = False
            
            df_query_processo_assunto.repartition(5).write \
                .mode("overwrite") \
                .format("jdbc") \
                .option("url", db_url).option("user", db_user).option("password", db_pass).option("driver", db_driver) \
                .option("dbtable", "inovacnj.processo_assunto") \
                .option("batchsize", "10000") \
                .save()
            
            df_query_processo_movimento.repartition(5).write \
                .mode("overwrite") \
                .format("jdbc") \
                .option("url", db_url).option("user", db_user).option("password", db_pass).option("driver", db_driver) \
                .option("dbtable", "inovacnj.fat_movimento_" + sufixo_tabela) \
                .option("batchsize", "10000") \
                .save()
        else :
            
            df_query_processo_assunto.repartition(5).write \
                .mode("append") \
                .format("jdbc") \
                .option("url", db_url).option("user", db_user).option("password", db_pass).option("driver", db_driver) \
                .option("dbtable", "inovacnj.processo_assunto") \
                .option("batchsize", "10000") \
                .save()
            
            df_query_processo_movimento.repartition(5).write \
                .mode("append") \
                .format("jdbc") \
                .option("url", db_url).option("user", db_user).option("password", db_pass).option("driver", db_driver) \
                .option("dbtable", "inovacnj.fat_movimento_" + sufixo_tabela) \
                .option("batchsize", "10000") \
                .save()

        print("Finalizando carregamento do tribunal: " + dir_trib)
        
    print("Finalizando carregamento do ramo de justica: " + dir_ramo_just)
    
print("Carregamento dos arquivos finalizado.")


Iniciando carregamento do ramo de justica: ./base/justica_federal
Sufixo tabela para ramo de justica: jfed
Iniciando carregamento do tribunal: ./base/justica_federal/processos-trf4
Carregando dataframe do arquivo: ./base/justica_federal/processos-trf4/processos-trf4_9.json
Carregando dataframe do arquivo: ./base/justica_federal/processos-trf4/processos-trf4_11.json
Carregando dataframe do arquivo: ./base/justica_federal/processos-trf4/processos-trf4_5.json
Carregando dataframe do arquivo: ./base/justica_federal/processos-trf4/processos-trf4_4.json
Carregando dataframe do arquivo: ./base/justica_federal/processos-trf4/processos-trf4_10.json
Carregando dataframe do arquivo: ./base/justica_federal/processos-trf4/processos-trf4_8.json
Carregando dataframe do arquivo: ./base/justica_federal/processos-trf4/processos-trf4_3.json
Carregando dataframe do arquivo: ./base/justica_federal/processos-trf4/processos-trf4_2.json
Carregando dataframe do arquivo: ./base/justica_federal/processos-trf4/pr