In [1]:
import os

# sets the output directory
cba_path = os.path.join(".", "cleaned_cba_samples")
if not os.path.isdir(cba_path):
    os.mkdir(cba_path)

# sets the input directory
file_path = os.getcwd() + '/cba_samples'
print(file_path)
# file_path = '/Users/calvineng/Dropbox/Calvin_Eng/cba_text_analysis/cba_txt_2009'

/Users/calvineng/Desktop/Research/CBA_Authority_Measure/cba_samples


In [2]:
from itertools import dropwhile, takewhile
import io
import re

# retrieves the type of document
def extract_document_type(file_path):
    with io.open(file_path, 'r', encoding='utf8') as f:
        lines = (line.strip() for line in f)   
        title_start_flage = dropwhile(lambda line: '<STARTofTITLE>' not in line, lines)
        next(title_start_flage,"")
        title_end_flag = takewhile(lambda line: '<ENDofTITLE>' not in line, title_start_flage)
        title = ''.join(title_end_flag).strip()
        if 'Extrato Acordo Coletivo' in title:
            acordo, extrato = 1, 1
        elif 'Extrato Convenção Coletiva' in title:
            acordo, extrato = 0, 1
        elif 'Extrato Termo Aditivo de Acordo Coletivo' in title:
            acordo, extrato = 1, 0
        elif 'Extrato Termo Aditivo de Convenção Coletiva' in title:
            acordo, extrato = 0, 0
        else:
            acordo, extrato = '', ''

    return acordo, extrato

# retrieves the validity
def extract_validity(file_path):
    with io.open(file_path, 'r', encoding='utf8') as f:
        lines = (line.strip() for line in f) 
        validity_start_flag = dropwhile(lambda line: '<STARTofVALIDITY>' not in line, lines)
        next(validity_start_flag,"")
        validity_end_flag = takewhile(lambda line: '<ENDofVALIDITY>' not in line, validity_start_flag)
        validity = ''.join(validity_end_flag).strip()
        if 'carimbo' in validity:
            validity = 1
        elif 'semvalorlegal' in validity:
            validity = 0
        else:
            validity = ''

    return validity

# extracts the text of clauses
def extract_clause_texts(file_path):
    with io.open(file_path, 'r', encoding='utf8') as f:
        # extracts text
        lines = (line.strip() for line in f)  
        text_start_flag = dropwhile(lambda line: '<STARTofTEXT>' not in line, lines)
        next(text_start_flag, "")
        text_end_flag = takewhile(lambda line: '<ENDofTEXT>' not in line, text_start_flag)
        text = ' '.join(text_end_flag).strip()

        # cleans text
        phrases = ['PARÁGRAFO ÚNICO', 'PARÁGRAFO PRIMEIRO', 'PARÁGRAFO SEGUNDO', 'PARÁGRAFO TERCEIRO', 
            'PARÁGRAFO QUARTO', 'PARÁGRAFO QUINTO', 'PARÁGRAFO SEXTO', 'PARÁGRAFO SÉTIMO',
            'PARÁGRAFO OITAVO', 'PARÁGRAFO NONO', 'PARÁGRAFO DÉCIMO']
        pattern = '|'.join(fr"{phrase}\s?[:–-]\s?" for phrase in phrases)
        text = re.sub(pattern, ' ', text, flags=re.IGNORECASE)
        text = text.replace('\xa0',' ')
        text = text.replace('|',' ')
        text = text.replace('R$', ' ')
        text = text.replace('%', ' ')
        text = text.replace("'", ' ')
        text = text.replace('"', ' ')
        text = text.replace('“', ' ')
        text = text.replace('”', ' ')
        text = text.replace('·', ' ')
        text = text.replace('CEEE D', 'CEEE-D')
        text = re.sub(r'\.\s[–-]|\.\s?[–-]', '. ', text) # '. –', '. -', '.–', '.-'
        text = re.sub(r'\(.*?\)', ' ', text) # (...)
        text = re.sub(r'[A-Z]\) [–-]', ' ', text, flags=re.IGNORECASE) # A) -
        text = re.sub(r'[A-Z]\)[–-]', ' ', text, flags=re.IGNORECASE) # A)-
        text = re.sub(r'[A-Z]\)', ' ', text, flags=re.IGNORECASE) # A)
        text = re.sub(r'[A-Z]\.\d+\)', ' ', text, flags=re.IGNORECASE) # a.1)
        text = re.sub(r'§ \d+º [–-]', ' ', text) # § 1º -
        text = text.replace('§', ' ')
        # text = re.sub(r'(?<=[.,:;!?–\-)])\s*\d+\.', ' ', text) # 1.
        text = re.sub(r'\d+º\s.*?trimestre\s–\s', ' ', text, flags=re.IGNORECASE) # 1º trimestre –
        text = re.sub(r'\s+([.,:;?!])', r'\1', text) # spaces before punctuation
        text = re.sub('\s{2,}', ' ', text) # unessecary white spaces

    return text

In [3]:
def output_all(file_path_x, files_x):
    contract_id = files_x[:-4]

    file_path = os.path.join(file_path_x, files_x)
    acordo, extrato = extract_document_type(file_path)
    validity = extract_validity(file_path)

    if acordo == 1 and extrato == 1 and validity == 1:
        text = extract_clause_texts(file_path)
        file_destination = os.path.join(cba_path, contract_id + '_cleaned.txt')
        with io.open(file_destination, 'w', encoding='utf8') as f:
            f.write(text)

In [4]:
from tqdm import tqdm

for file in tqdm(os.listdir(file_path)):
    if file == '.DS_Store':
        continue
    
    try:
        output_all(file_path, file)
    except Exception as e:
        print(f"Error processing file: {file}")
        print(e)

100%|██████████| 1296/1296 [00:07<00:00, 162.41it/s]
