# 📊 Datajud e Pyspark

Este notebook apresenta uma análise exploratória de dados judiciais utilizando **PySpark** em modo local, coletando registros da **API pública do CNJ (DataJud)** para os tribunais de PB, PE e RN no período de janeiro de 2024 até o momento atual.

## 1. Descrição dos Dados
A API DataJud oferece acesso a processos judiciais de todos os tribunais estaduais brasileiros. Para esta atividade, extraímos dados dos tribunais da Paraíba (PB), Pernambuco (PE) e Rio Grande do Norte (RN). Os registros incluem informações como data de ajuizamento, movimentações, assuntos, juízes, classe processual e grau.
- **Período**: Jan/2024 até hoje
- **Tribunais**: PB, PE, RN
- **Formato de coleta**: Scroll API com paginação

In [None]:
# === 0. IMPORTS ===
import requests, time
from datetime import datetime
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import to_date, datediff, col, explode, count, avg
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import plotly.express as px



# === 1. INICIA A SPARK SESSION ===
# == pode-se mudar para 4g-6g a depender do hardware do PC ==
spark = SparkSession.builder \
    .appName("DataJud_Analise_Completo") \
    .config("spark.executor.memory","8g") \
    .config("spark.driver.memory","4g") \
    .config("spark.sql.shuffle.partitions","200") \
    .getOrCreate()


# === 2. CONFIGURAÇÕES DA API & CONSTS ===
API_KEY = "APIKey cDZHYzlZa0JadVREZDJCendQbXY6SkJlTzNjLV9TRENyQk1RdnFKZGRQdw=="
HEADERS = {"Content-Type":"application/json","Authorization":API_KEY}
PAGE_SIZE = 1000
SCROLL_TTL = "2m"
SCROLL_URL = "https://api-publica.datajud.cnj.jus.br/_search/scroll"

TRIBUNAIS = {
    'PB':'https://api-publica.datajud.cnj.jus.br/api_publica_tjpb/_search',
    'PE':'https://api-publica.datajud.cnj.jus.br/api_publica_tjpe/_search',
    'RN':'https://api-publica.datajud.cnj.jus.br/api_publica_tjrn/_search'
}

In [None]:
# === 3. CONFIGURAÇÃO DE PERÍODO E LIMITE ===
# Defina o intervalo de interesse para a coleta de processos (YYYY-MM-DDThh:mm:ss.000Z).
# Para evitar sobrecarga e facilitar testes, ajuste MAX_DOCS para um valor menor
# (por exemplo, 10.000 ou 100.000) – caso contrário, o script tentará baixar todos
# os processos do período, o que pode demorar muito dependendo da sua máquina.
DATA_INICIO="2024-01-01T00:00:00.000Z"
DATA_FIM = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
MAX_DOCS=10000000  # altere para None (ilimitado) ou outro valor conforme necessidade

In [4]:
# === 4. FUNÇÃO DE “FLATTEN”: TRANSFORMA HITS EM REGISTROS SIMPLES ===

def flatten_hit(hit, estado):
    src = hit.get("_source",{})
    dt_aju = src.get("dataAjuizamento")
    dt_upd = src.get("dataHoraUltimaAtualizacao")
    # contagens
    movs = src.get("movimentos") or []
    movs = movs if isinstance(movs,list) else []
    ass = src.get("assuntos") or []
    ass = ass if isinstance(ass,list) else []
    # juiz
    juiz = None
    og = src.get("orgaoJulgador")
    if isinstance(og,dict): juiz = og.get("nome")
    # classe
    classe = src.get("classe",{}).get("nome")
    # grau
    grau = src.get("grau")
    # assunto principal
    ap = None
    if ass and isinstance(ass[0],dict): ap=ass[0].get("nome")
    return Row(
        estado=estado,
        dataAjuizamento=dt_aju,
        dataAtualizacao=dt_upd,
        movimentos_count=len(movs),
        assuntos_count=len(ass),
        classe=classe,
        grau=grau,
        juiz=juiz,
        assunto_principal=ap
    )

In [5]:
# === 5. EXTRAI APELAÇÕES em (estado,hora) ===
def extract_apel(hit, estado):
    src = hit.get("_source",{})
    movs = src.get("movimentos") or []
    movs = movs if isinstance(movs,list) else []
    rows=[]
    for m in movs:
        nome = (m.get("nome") or "").lower()
        if "apela" in nome:
            dt = m.get("dataHora")
            if dt and len(dt)>=13:
                hora = int(dt[11:13])
                rows.append(Row(estado=estado, hora=hora))
    return rows


 ## 2. Coleta processos e registros de apelações de um tribunal via API DataJud,
Parâmetros:
    - sigla (str): sigla do tribunal (por exemplo, 'PB').
    - url (str): endpoint da API para o tribunal.

Funcionamento:
- Envia a requisição inicial com filtro de data (DATA_INICIO a DATA_FIM)
  e tamanho de página (PAGE_SIZE).
- Extrai o scroll_id retornado pela API para continuar na próxima página.
Em cada iteração:
- Converte cada hit em Row via flatten_hit.
- Extrai registros de apelações com extract_apel.
- Para se atingir MAX_DOCS ou quando não houver mais hits.
- Realiza nova requisição de scroll até esgotar os resultados.

Retorna:
- prots: lista de Row com processos normalizados.
- apeos: lista de Row com horas de apelações extraídas.







In [6]:
# === 6. PAGINAÇÃO + COLETA ===
def collect_for(sigla,url):
    prots=[]    # para processos
    apeos=[]    # para apelações
    payload={"size":PAGE_SIZE,"query":{"range":{"dataAjuizamento":{"gte":DATA_INICIO,"lte":DATA_FIM}}}}
    r=requests.post(f"{url}?scroll={SCROLL_TTL}",headers=HEADERS,json=payload)
    r.raise_for_status()
    data=r.json()
    scroll_id=data.get("_scroll_id")
    hits=data.get("hits",{}).get("hits",[])
    while hits and (MAX_DOCS is None or len(prots)<MAX_DOCS):
        for h in hits:
            prots.append(flatten_hit(h,sigla))
            apeos.extend(extract_apel(h,sigla))
            if MAX_DOCS and len(prots)>=MAX_DOCS: break
        if not hits or (MAX_DOCS and len(prots)>=MAX_DOCS): break
        r=requests.post(SCROLL_URL,headers=HEADERS,json={"scroll":SCROLL_TTL,"scroll_id":scroll_id})
        r.raise_for_status()
        data=r.json()
        scroll_id=data.get("_scroll_id")
        hits=data.get("hits",{}).get("hits",[])
    return prots,apeos


In [7]:
# === 7. COLETA DOS DADOS  -  PODE DEMORAR 5, 10, 20 MINUTOS A DEPENDER DO SEU PC ===
all_prots=[]
all_ape=[]
for sig,url in TRIBUNAIS.items():
    print("Buscando",sig)
    p,a = collect_for(sig,url)
    all_prots.extend(p)
    all_ape.extend(a)
    time.sleep(0.2)


Buscando PB
Buscando PE
Buscando RN


In [8]:
print(f"Total de processos lidos: {len(all_prots)}")

Total de processos lidos: 3000


In [9]:
# === 8. CRIA RDDs + DF SPARK ===
rdd_pro = spark.sparkContext.parallelize(all_prots)
df_pro = spark.createDataFrame(rdd_pro) \
    .withColumn("dt_aju", to_date("dataAjuizamento")) \
    .withColumn("dt_upd", to_date("dataAtualizacao")) \
    .withColumn("duracao_dias", datediff("dt_upd","dt_aju"))

# Definição de schema para as apelações
schema_ape = StructType([
    StructField("estado", StringType(), True),
    StructField("hora", IntegerType(), True)
])

if all_ape:
    rdd_ape = spark.sparkContext.parallelize(all_ape)
    df_ape = spark.createDataFrame(rdd_ape)
else:
    # cria DataFrame vazio com schema
    df_ape = spark.createDataFrame([], schema_ape)

df_pro.cache()
df_ape.cache()

DataFrame[estado: string, hora: int]

9. Visualizações com Pandas e Plotly
Nesta seção, transformamos os resultados do Spark em DataFrames Pandas e criamos gráficos interativos para explorar diferentes aspectos dos processos:

In [11]:
# === 9. VISUALIZAÇÕES PANDAS/Plotly ===

# Converte o DataFrame Spark principal para Pandas
pdf_pro = df_pro.toPandas()

# 9.1 EFICIÊNCIA DOS JUÍZES
df_eff = df_pro.groupBy("estado", "juiz") \
    .agg(
        avg("duracao_dias").alias("dur_media"),
        count("*").alias("n_proc")
    ) \
    .filter(col("n_proc") >= 20) \
    .orderBy("estado", "dur_media")

pdf_eff = df_eff.toPandas()
fig1 = px.bar(
    pdf_eff,
    x="dur_media",
    y="juiz",
    color="estado",
    orientation="h",
    title="Juízes com ≥20 processos: duração média"
)
fig1.show()


# 9.2 TOP ASSUNTOS POR ESTADO
df_ass_est = df_pro.groupBy("estado", "assunto_principal") \
    .count() \
    .orderBy("estado", col("count").desc())

pdf_ass_est = df_ass_est.toPandas()
fig2 = px.bar(
    pdf_ass_est,
    x="count",
    y="assunto_principal",
    color="assunto_principal",
    facet_col="estado",
    facet_col_wrap=3,
    category_orders={
        "estado": sorted(df_pro.select("estado").distinct().rdd.flatMap(lambda x: x).collect())
    },
    title="Top Assuntos por Estado",
    height=400
)
fig2.update_layout(showlegend=False)
fig2.show()


# 9.3 CLASSES PROCESSUAIS POR ESTADO (TOP >50)
df_cls_est = df_pro.groupBy("estado", "classe") \
    .count() \
    .filter(col("count") > 50) \
    .orderBy("estado", col("count").desc())

pdf_cls_est = df_cls_est.toPandas()
fig3 = px.bar(
    pdf_cls_est,
    x="count",
    y="classe",
    color="estado",
    facet_col="estado",
    facet_col_wrap=3,
    title="Classes Processuais (>50 processos) por Estado",
    height=800
)
fig3.update_layout(showlegend=False)
fig3.show()


# 9.4 HISTOGRAMA DA DURAÇÃO DOS PROCESSOS POR ESTADO
fig4 = px.histogram(
    pdf_pro,
    x="duracao_dias",
    facet_col="estado",
    facet_col_wrap=3,
    nbins=50,
    title="Distribuição da Duração dos Processos (dias) por Estado",
    labels={"duracao_dias": "Duração (dias)", "count": "Número de Processos"}
)
fig4.update_layout(
    height=1000,
    showlegend=False
)
# Desacopla eixos X para cada faceta
fig4.for_each_xaxis(lambda ax: ax.update(matches=None))
fig4.show()


# 9.5 QUANTIDADE DE PROCESSOS POR GRAU E ESTADO
df_grau_est = df_pro.groupBy("estado", "grau") \
    .count() \
    .orderBy("estado", col("count").desc())

pdf_grau_est = df_grau_est.toPandas()
fig5 = px.bar(
    pdf_grau_est,
    x="grau",
    y="count",
    color="estado",
    barmode="group",
    title="Quantidade de Processos por Grau e Estado"
)
fig5.show()



In [12]:
# === 10. TABELAS RESUMO ===

# === 10.1.  CONTAGEM DE PROCESSOS POR ESTADO  POR ASSUNTO  ===

df_pro.groupBy("estado", "assunto_principal") \
    .count() \
    .orderBy("estado", col("count").desc()) \
    .show(100, truncate=False)

# === 10.2.  MÉDIA DE DIAS POR PROCESSO  ===
df_pro.groupBy("estado") \
    .agg({"duracao_dias": "avg"}) \
    .withColumnRenamed("avg(duracao_dias)", "duracao_media_dias") \
    .orderBy("estado") \
    .show()

# === 10.3. CONTAGEM DE PROCESSOS POR GRAU POR ESTADO  ===
df_pro.groupBy("grau") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()



+------+--------------------------------------------------------------------------------------------+-----+
|estado|assunto_principal                                                                           |count|
+------+--------------------------------------------------------------------------------------------+-----+
|PB    |Bancários                                                                                   |88   |
|PB    |Seguro                                                                                      |42   |
|PB    |Indenização por Dano Moral                                                                  |42   |
|PB    |Tarifas                                                                                     |40   |
|PB    |Municipais                                                                                  |39   |
|PB    |Empréstimo consignado                                                                       |37   |
|PB    |Habeas Corpus - Cabi

In [None]:
# === 11. ENCERRANDO SESSÃO  ===
spark.stop()