In [1]:
import sys
sys.path.append('/home/jovyan/work')
import xml.etree.ElementTree as ET
import xml.dom.minidom
import os
import shutil
import requests
import urllib.parse
import pandas as pd
import pymssql
from datetime import datetime
from urllib.parse import quote_plus
from sqlalchemy import create_engine, text
from minio import Minio
from utils.SendMinio import SendMinio
import json

### Define Variaveis de conexão banco e API

In [2]:
db_username = os.getenv("DB_PG_USER")
db_password = os.getenv("DB_PG_PASS")
db_hostname = os.getenv("DB_PG_HOST")
db_port = '5432'
db_name = 'aplicacao'
db_schema = 'dw'

connection_str = f'postgresql://{db_username}:{db_password}@{db_hostname}:{db_port}/{db_name}'

engine = create_engine(connection_str)

In [3]:
# Define o URL do endpoint da API SOAP
url = 'https://sei.trf5.jus.br/sei/ws/SeiWS.php'

### Define Lista de Processos

In [4]:
lista_de_procedimentos = []

In [5]:
# Defina sua consulta SQL
sql_query = "SELECT processo FROM dw.dim_processosporcategoria;"

#coleta da data de ultima atualização
with engine.connect() as connection:
    result = connection.execute(text(sql_query))
    for row in result:
        lista_de_procedimentos.append(row[0])  # Assume que a coluna de processo é a primeira

In [None]:
print(lista_de_procedimentos)

### Consultar Procedimentos

In [7]:
def extract_data_from_xml(root, tag):
    columns = []
    data = {}

    # Itera sobre os elementos XML para extrair os dados
    for tag in root.findall(f'.//{tag}'):
        for child in tag:
            if child.tag not in columns:
                columns.append(child.tag)
                data[child.tag] = []

            # Verifica se a tag atual é 'Unidade' ou 'Usuario'
            if child.tag == 'Unidade' or child.tag == 'Usuario':
                # Encontra a subtag 'IdUnidade' ou 'IdUsuario' dentro de 'Unidade' ou 'Usuario', respectivamente
                subtag_id = child.find('Id' + child.tag)
                # Se a subtag 'IdUnidade' ou 'IdUsuario' existe, adiciona seu texto ao dataframe
                if subtag_id is not None:
                    data[child.tag].append(subtag_id.text)
                # Se a subtag 'IdUnidade' ou 'IdUsuario' não existe, adiciona None ao dataframe
                else:
                    data[child.tag].append(None)
            # Se não for 'Unidade' ou 'Usuario', adiciona o texto da tag ao dataframe
            else:
                data[child.tag].append(child.text)

    return columns, data


In [8]:
def xml_to_dataframe(url, xml_body, tag):
    # Faz a requisição SOAP
    response = requests.post(url, data=xml_body, headers={'Content-Type': 'text/xml'})

    # Verifica se a requisição foi bem-sucedida
    if response.status_code == 200:
        # Parseia a resposta XML
        root = ET.fromstring(response.content)
        
        # Extrai os nomes das colunas e os dados
        columns, data = extract_data_from_xml(root, tag)
        
        # Cria o DataFrame
        df = pd.DataFrame(data, columns=columns)
        
        return df
    
    else:
        print('Erro ao fazer a requisição:', response.status_code, response.text)
        return None

In [9]:
#obs: tarefa utilizada -> 2
# Refere-se a 'Geração de documento público'

In [10]:
def consultar_api_e_gerar_df(lista_de_procedimentos, url, tag, key):
    # Crie um dataframe vazio para acumular os resultados
    df_acumulado = pd.DataFrame()

    for procedimento in lista_de_procedimentos:
        # Define o XML para a requisição
        xml_body = f"""
        <soapenv:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:sei="Sei">
           <soapenv:Header/>
           <soapenv:Body>
              <sei:listarAndamentos soapenv:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
                 <SiglaSistema xsi:type="xsd:string">SIS-JFAL</SiglaSistema>
                 <IdentificacaoServico xsi:type="xsd:string">{TOKEN_API}</IdentificacaoServico>
                 <IdUnidade xsi:type="xsd:string">110001098</IdUnidade>
                 <ProtocoloProcedimento xsi:type="xsd:string">{procedimento}</ProtocoloProcedimento> 
                 <SinRetornarAtributos xsi:type="xsd:string">N</SinRetornarAtributos>
                 <Tarefas xsi:type="xsd:string">2</Tarefas>
              </sei:listarAndamentos>
           </soapenv:Body>
        </soapenv:Envelope>
        """

        # Supondo que xml_to_dataframe() retorna um DataFrame com a estrutura correta
        df = xml_to_dataframe(url, xml_body, tag)
        df['IdProcedimentoFormatado'] = procedimento

        # Concatena o dataframe atual ao acumulado
        df_acumulado = pd.concat([df_acumulado, df], ignore_index=True)

    # Remove registros duplicados com base na coluna 'idUsuario'
    df_acumulado = df_acumulado.drop_duplicates(subset=key)

    return df_acumulado

### Atualização de Tabelas

In [12]:
df_andamentos = consultar_api_e_gerar_df(lista_de_procedimentos, url, "item", "IdAndamento")

In [None]:
print(df_andamentos.head(10))

### Função para inserir no Data Lake

In [15]:
def insert_datalake(df, name_folder, name_file):
    today = datetime.today().strftime("%Y%m%d")
    # Camada (bucket) do MinIO para qual os dados extraídos serão enviados
    bucket_path = "raw"
    # Data Mart do DW ao qual esse notebook pertence
    business_area = "sei_andamentos"

    minio_host = os.getenv("MINIO_HOST")
    minio_access_key = os.getenv("MINIO_ACCESS_KEY")
    minio_secret_key = os.getenv("MINIO_SECRET_KEY")

    client = Minio(
        minio_host,
        access_key = minio_access_key,
        secret_key = minio_secret_key,
        secure = False
    )

    today = datetime.today().strftime("%Y%m%d")
    today_folder = datetime.today().strftime("%Y%m%d")
    file_output = f'{name_file}_{today}.json'

    # Transforme o DataFrame em uma lista de dicionários
    data = df.to_dict(orient='records')

    # Crie um arquivo JSON com o formato desejado
    with open(file_output, 'w', encoding='utf-8') as json_file:
        json.dump(data, json_file, ensure_ascii=False, indent=4)
    
    sm = SendMinio(client=client,bucket_name=bucket_path,object_name=f"{business_area}/operacional/{name_folder}/{today_folder}/{file_output}",file_path=file_output)
    sm.send2bucket()
    os.remove(file_output)

### Carga

In [16]:
name_folder = "andamentos"
name_file = "andamentos"

In [17]:
insert_datalake(df_andamentos, name_folder, name_file)

Bucket existente... enviando arquivos
