In [2]:
import sys, os
sys.path.append('..')

import datetime as dt
from capture_dou.inlabs_driver import InLabsDriver
import pandas as pd

#tipo_dou="DO1 DO2 DO3 DO1E DO2E DO3E" # Seções separadas por espaço
secoes = "DO2"

In [3]:
def brasilia_day():
    """
    No matter where the code is ran, return UTC-3 day
    (Brasilia local day, no daylight savings)
    """
    return (dt.datetime.utcnow() + dt.timedelta(hours=-3)).replace(hour=0, minute=0, second=0, microsecond=0)

In [4]:
import requests
import io
import zipfile
from lxml import etree

def parse_zipped_response(response):
    """
    Download a ZIP file and extract its contents in memory
    yields (filename, file-like object) pairs
    """

    # Extract the contents of the .zip file in memory
    zip_file = io.BytesIO(response.content)
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        for xml_file in zip_ref.namelist():
            with zip_ref.open(xml_file) as f:
                # Read the xml file from memory
                xml_data = f.read()
            # Parse the XML data
            root = etree.fromstring(xml_data)
            # Extract and parse the information you need from the XML data

            return root

## FINAL CODE

In [7]:
# Inicialização do driver:
driver = InLabsDriver()
driver.login()

# Montagem da URL:
do_date_format = '%Y-%m-%d'
# Transforms date to DOU format:
date_string    = '2023-01-13'

for dou_secao in secoes.split(' '):
    file_url = driver.url_download + date_string + "&dl=" + date_string + "-" + dou_secao + ".zip"
    file_header = {'Cookie': 'inlabs_session_cookie=' + driver.cookie, 'origem': '736372697074'}
    file_response = driver.session.request("GET", file_url, headers = file_header)
    if file_response.status_code == 200:
        response = parse_zipped_response(file_response)
        del file_response

    elif file_response.status_code == 404:
        print("File not found: %s" % (date_string + "-" + dou_secao + ".zip"))



## PARSE_DOU_ARTICLE

In [None]:
from datetime import datetime
from collections import defaultdict
import re

def branch_text(branch):
    """
    Takes and lxml tree element 'branch' and returns its text, 
    joined to its children's tails (i.e. the content that follows 
    childrens of 'branch').
    """
    texts = list(filter(lambda s: s != None, [branch.text] + [child.tail for child in branch]))
    if len(texts) == 0:
        return None
    text  = ' | '.join(texts)
    return text


def add_to_data(branch, data, key):
    """
    Given a dict 'data' and a key, add the text (see branch_text function) 
    found in a branch to its value.
    """
    if key in data:
        if data[key] is None:
            data[key] = branch_text(branch)
        else:
            data[key] = data[key] + ' | %s' % branch_text(branch)            
    else:
        data[key] = branch_text(branch)        
    return data


def recurse_over_nodes(tree, parent_key, data):
    """
    Recursevely gets the text of the xml leafs and saves
    its classes and keys and text as values
    
    input: 
        tree: lxml.etree._Element
        parent_key: lxml.etree._Element
        data: dict
    return: dict
    """            
    for branch in tree:
        try:
            key = branch.attrib.get('class')
        except:
            key = branch.attrib.get('id')
        
        if list(branch):
            if parent_key:
                key = '%s_%s' % (parent_key, key)    
            add_to_data(branch, data, key) 
            data = recurse_over_nodes(branch, key, data)
        
        else:            
            if parent_key:
                key = '%s_%s' % (parent_key, key)            
            add_to_data(branch, data, key)
    
    return data

def parse_xml_flattened(element, parent_key=""):
    result = {}
    for child in element:
        key = parent_key + child.tag
        if len(child) > 0:
            result.update(parse_xml_flattened(child, key + "-"))
        else:
            result[key] = child.text
    return result


def extract_necessary_fields(article):
    """ 
    Extract necessary fields that were present in html parser but
    are not present as keys in inlabs xml response
    """
    orgao_dou_data = article.xpath('//article/@artCategory')[0]
    edicao_dou_data =  article.xpath('//article/@editionNumber')[0]
    secao_dou_data = article.xpath('//article/@numberPage')[0]
    secao_dou = article.xpath('//article/@pubName')[0]
    publicado_dou_data = article.xpath('//article/@pubDate')[0]

    # Compile a regular expression pattern to search for the "assina" class
    pattern = re.compile(r'<p class="assina">(.*?)</p>')
    # Search for the pattern in the assina variable
    match = pattern.search(article.xpath('.//article/body/Texto/text()')[0])
    # Extract the text within the <p> tags
    if match:
        assina_text = match.group(1)
        assina = assina_text
    else:
        assina = ''
        
    return {'orgao-dou-data': orgao_dou_data, 'edicao-dou-data': edicao_dou_data, 
            'secao-dou-data': secao_dou_data, 'assina': assina, 'secao-dou': secao_dou,
            'publicado-dou-data': publicado_dou_data}

def filter_keys(data):
    """
    Filter keys paths to get only last class from html
    
    input:
        data: dict
    return: dict
    """

    final = defaultdict(lambda: '')

    for k, v in data.items():
        if v is not None:            
            k_new = k.split('_')[-1]
            final[k_new] =  ' | '.join([final[k_new], v]) if len(final[k_new]) > 0 else v
            
    return final


def filter_values(data):
    """
    Filter values that do not have letters or numbers
    
    input:
        data: dict
    return: dict
    """    
    final = {}
    
    for k, v in data.items():        
        if re.search('[a-zA-Z0-9]', v):        
            final[k] = v
            
    return final    


def decode(data, encoding= 'iso-8859-1', decoding='utf8'):
    """
    Change enconding from string with secure error handling
    
    input:
        data: dict
        encoding: string
        decoding: string
    return: dict
    """    
    final = {}
    
    for k, v in data.items():        
        try:
            final[k] = v.encode('iso-8859-1').decode('utf8')
        except Exception as e:
            print("Error", e)
            final[k] = v
    
    return final

In [None]:
def data_schema(key, value, url, url_certificado):
    """
    Final data schema
    
    input:
        key: string
        value: string
        url: string
        url_certificado: string
    return: dict
    """    
    return {
        "key": key,
        "value": value,
        "url": url,
        "capture_date": datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'),
        "url_certificado": url_certificado
    }

def get_url_certificado(article):
    """
    Gets the certified url in the xml
    
    input: 
        article: lxml.etree._Element
    return: string
    """
    return article.xpath('//article/@pdfPage')[0]

def get_data(article):
    """
    Get relevant data from xml. It recursevely gets leaf text from xml
    and saves theirs classes as keys. 
    It also creates an item in dict's key 'full-text' with all text 
    in the xml, without tags.
    
    input: 
        article: lxml.etree._Element
    return: dict
    """
    data = parse_xml_flattened(article)

    # filtra None e melhora keys
    data = filter_keys(data)
    data = filter_values(data)
    data = {k: v for k,v in data.items() if len(k) != 0}

    # Include other fields from extraction
    fields = extract_necessary_fields(article)
    data['orgao-dou-data'] = fields['orgao-dou-data']
    data['edicao-dou-data'] = fields['edicao-dou-data']
    data['secao-dou-data'] = fields['secao-dou-data']
    data['assina'] = fields['assina']
    data['secao-dou'] = fields['secao-dou']
    data['publicado-dou-data'] = fields['publicado-dou-data']

    # Include full-text ignoring xml tags and formatters:
    full_text = etree.tostring(article, pretty_print=True, encoding='unicode')
    full_text = ' '.join(full_text.split())
    full_text = re.search(r'<Texto>(.*?)</Texto>', full_text).group(1)
    full_text = full_text.replace('&lt;','<').replace('&gt;','>')
    clean_full_text = re.sub(r'<[^>]+>', '', full_text)

    data['fulltext'] = clean_full_text

    return data

def structure_data(data, url, article):
    """
    Structures html parsed data to list of dicts 
    ready to be processed by http-request Lambda
    Function.
    It adds the capture date, url, and 
    certified url
    
    input: 
        data: dict
        url: string
        artigo: lxml.html.HtmlElement
    return: list of dict
    """    
    url_certificado = get_url_certificado(article)
    
    final = []
    for key, value in data.items():        
        final.append(data_schema(key, value, url, url_certificado))
        
    return final
        

def parse_dou_article(response, url=''):
    """
    Gets an HTTP request response for a DOU article's URL and that url 
    and parse the relevant fields to a list of dicts. Each dict has the 
    keys: 
    * key             -- an html tag class identifying the field;
    * value           -- the respective value (text) in that field;
    * url             -- The original article URL;
    * capture_date    -- The date when capture occured;
    * url_certificado -- The link to the certified version of the article.
    """
    data    = get_data(response)    
    data    = structure_data(data, url, response)
    
    return data

# STRUCTURE_ARTICLE

In [None]:
def get_key_value(key, article_raw):
    """
    Searches for an entry in article_raw (which is a list of dicts) that
    has the 'key'. Then it returns the value associated to that key. 
    If the key is not found, return None.
    """ 
    sel = list(filter(lambda d: d['key']==key, article_raw))
    if len(sel)==0:
        return None
    return sel[0]['value']


def make_resumo(fulltext):
    """
    Given a string (fulltext), this function aims to extract 
    the most important part of it as a abstract.
    """

    # Termos a serem pesquisados:
    termos = ['resolve:', 'onde se l', 'objeto:', 'espécie']
    # Tamanho do resumo:
    resumo_size = 300
    
    # Alterando o texto para minúsculo    
    fulltext  = str(fulltext)
    paragraph = fulltext.lower()
         
    for termo in termos:
        
        pos = paragraph.find(termo)
        
        if pos != -1: 
            # Se encontra algum dos termos, resume o texto com os 300 primeiros caracteres 
            # a partir do termo encontrado.
            abstract = fulltext[pos:pos + resumo_size]    
            break            # O break aqui serve para garantir que, caso um termo seja encontrado, 
                             # não busque pelos demais.
        
    if pos == -1:
            abstract = fulltext[:resumo_size]   # Se não encontra nenhum dos termos, resume o texto 
                                                # nos primeros 300 caracteres.          
    
    if len(fulltext[pos:]) > len(abstract):
        abstract = abstract + '...'
    
    return abstract
    
def structure_paragraph(paragraph):
    """
    Given a html text paragraph, this function aims to extract the clean text
    ignoring the html tags.
    """

    # Remove tags
    pattern = r'<p>(.*?)<\/p>'
    paragraph = ' '.join(re.findall(pattern, paragraph))

    return paragraph

def structure_article(article_raw):
    """
    Takes a list of dicts that represent a DOU article with the keywords
    key, value, capture_date, url and url_certificado and select relevant 
    keys (hard-coded), rename them and output a dict with only the relevant 
    keys.
    """

    relevant_keys = ['secao-dou', 'orgao-dou-data', 'assina', 'article-body-Identifica', 'cargo', 'secao-dou-data', 
                     'edicao-dou-data', 'dou-em', 'ementa', 'dou-strong', 'titulo', 'subtitulo', 
                     'article-body-Texto', 'publicado-dou-data', 'assinaPr', 'fulltext']
    new_keys      = ['secao', 'orgao', 'assina', 'identifica', 'cargo', 'pagina',
                     'edicao', 'italico', 'ementa', 'strong', 'ato_orgao', 'subtitulo', 
                     'paragraph', 'pub_date', 'assinaPr', 'fulltext']
    
    relevant_values = [get_key_value(key, article_raw) for key in relevant_keys]
    struct = dict(zip(new_keys, relevant_values))
    
    # Join with identifying fields:
    struct['capture_date']    = article_raw[0]['capture_date']
    struct['url']             = article_raw[0]['url']
    struct['url_certificado'] = article_raw[0]['url_certificado']
    
    # Format selected fields:
    struct['secao']  = re.search(r'\d+(?:\w)?', struct['secao']).group()
    if struct['assinaPr'] != None:   # Existe assinatura do presidente.
        if struct['assina'] != None: # Existe as duas assinaturas.
            struct['assina'] = struct['assinaPr'] + ' | ' + struct['assina']
        else:                        # Só existe a assinatura do presidente.
            struct['assina'] = struct['assinaPr']

    # Structure paragraph:
    struct['paragraph'] = structure_paragraph(struct['paragraph'])

    # Create new field (all the text):
    fields_list = filter(lambda s: s!=None, [struct['ato_orgao'], struct['subtitulo'], struct['ementa'], 
                                            struct['strong'], struct['italico'], struct['paragraph']])
    struct['alltext'] = ' | '.join(fields_list)
    # Another new field (a clipping):
    struct['resumo'] = make_resumo(struct['fulltext'])
        
    return struct


In [None]:
structured = structure_article(parsed)

In [None]:
pd.DataFrame(structured, index=[0]).T

Unnamed: 0,0
secao,2
orgao,Ministério Público da União/Ministério Público...
assina,MARCELO GOSS NEVES
identifica,"Portaria Nº 256, de 13 de dezembro de 2022"
cargo,
pagina,46
edicao,10
italico,
ementa,
strong,
