In [None]:
INPUT_PATH = "publication.txt"
OUTPUT_PATH = "empresas_publication.csv"
FORCE_SPAIN = False

import re, csv
from typing import List, Dict, Tuple, Optional

try:
    import pandas as pd
except Exception:
    pd = None

try:
    import ipywidgets as widgets
except Exception:
    widgets = None


In [None]:
EMAIL_RE = re.compile(r'[\w\.-]+@[\w\.-]+', re.I)
PHONE_RE = re.compile(r'\+?\d[\d\s\-\.]{5,}\d')
IGNORE_TITLES = {"SUMARIO", "PUBLICIDAD", "ÍNDICE", "RESPONSABLES", "CONTACTO", "+ INFO", "CHANNELPARTNER.ES ISV2025"}

def normalize_phone(phone: str, force_spain: bool = False) -> str:
    digits = re.sub(r'\D+', '', phone)
    if phone.strip().startswith('+'):
        m = re.match(r'^\+(\d{1,3})(\d+)$', '+' + digits)
        if m:
            code, rest = m.groups()
            if code == '34' and len(rest) == 9:
                return f"+34 {rest[:3]} {rest[3:6]} {rest[6:]}"
            else:
                return f"+{code} {rest}"
        return '+' + digits
    else:
        if force_spain and len(digits) == 9 and digits[0] in '6789':
            return f"+34 {digits[:3]} {digits[3:6]} {digits[6:]}"
        return digits

def extract_emails(text: str) -> List[str]:
    seen = set()
    res = []
    for m in EMAIL_RE.findall(text):
        m = m.strip('.,;')
        low = m.lower()
        if low not in seen:
            seen.add(low)
            res.append(m)
    return res

def extract_phones(text: str, force_spain: bool) -> List[str]:
    seen = set()
    res = []
    for m in PHONE_RE.findall(text):
        norm = normalize_phone(m, force_spain)
        key = re.sub(r'\s+', '', norm).lower()
        if key not in seen:
            seen.add(key)
            res.append(norm)
    return res

def is_company_candidate(line: str) -> bool:
    t = line.strip()
    if not t or t.upper() != t:
        return False
    if any(word in t for word in IGNORE_TITLES):
        return False
    return True

def find_company_name(lines: List[str], dir_idx: int) -> str:
    for i in range(max(0, dir_idx - 10), dir_idx):
        if is_company_candidate(lines[i]):
            return lines[i].strip()
    return ''

def parse_address(lines: List[str], start_idx: int) -> Tuple[str, int]:
    i = start_idx + 1
    parts = []
    while i < len(lines):
        line = lines[i].strip()
        if re.match(r'^(Tel|Teléfono|Tel\.|Mail|Mails|Correo|Web|Responsable|Responsables|Contacto)', line, re.I):
            break
        if is_company_candidate(line):
            break
        parts.append(line)
        i += 1
    address = ', '.join([p for p in parts if p])
    return address, i

def extract_responsibles(lines: List[str], start: int, end: int) -> Tuple[List[str], str]:
    res = []
    first = ''
    i = start
    while i < end:
        if re.match(r'^(Responsable|Responsables|Contacto)', lines[i], re.I):
            i += 1
            while i < end:
                t = lines[i].strip()
                if not t or re.match(r'^(Tel|Teléfono|Tel\.|Mail|Mails|Correo|Web|Dirección|Direcciones|Responsable|Responsables|Contacto)', t, re.I) or is_company_candidate(t):
                    break
                name, cargo = t, ''
                m = re.match(r'^(.+?)\s*-\s*(.+)$', t)
                if m:
                    name, cargo = m.groups()
                elif i + 1 < end:
                    nxt = lines[i + 1].strip()
                    if nxt and not re.match(r'^(Tel|Teléfono|Tel\.|Mail|Mails|Correo|Web|Dirección|Direcciones|Responsable|Responsables|Contacto)', nxt, re.I) and not is_company_candidate(nxt):
                        cargo = nxt
                        i += 1
                email = ''
                for j in range(max(start, i - 1), min(end, i + 3)):
                    ems = EMAIL_RE.findall(lines[j])
                    if ems:
                        email = ems[0]
                        break
                entry = name.strip()
                if cargo:
                    entry += f" ({cargo.strip()})"
                if email:
                    entry += f" [{email}]"
                if entry:
                    res.append(entry)
                    if not first:
                        first = name.split()[0].title()
                i += 1
            break
        i += 1
    return res, first

def guess_first_name(text: str) -> str:
    m = re.search(r'\b([A-ZÁÉÍÓÚÑ][a-záéíóúñ]+)\b', text)
    return m.group(1) if m else ''

def parse_entries(text: str, force_spain: bool = False) -> List[Dict[str, str]]:
    lines = text.splitlines()
    entries = []
    dir_pattern = re.compile(r'^Dirección(?:es)?\s*:', re.I)
    i = 0
    while i < len(lines):
        if dir_pattern.match(lines[i]):
            company = find_company_name(lines, i)
            address, j = parse_address(lines, i)
            block_end = j
            while block_end < len(lines) and not dir_pattern.match(lines[block_end]) and not is_company_candidate(lines[block_end]):
                block_end += 1
            block_text = '
'.join(lines[i:block_end])
            emails = extract_emails(block_text)
            phones = extract_phones(block_text, force_spain)
            responsibles, first = extract_responsibles(lines, i, block_end)
            if not first:
                first = guess_first_name(block_text)
            entries.append({
                'Nombre': first,
                'Nombre de la empresa': company,
                'Teléfono': ', '.join(phones),
                'Email': ', '.join(emails),
                'Persona(s) de contacto / responsable(s)': ' | '.join(responsibles),
                'Dirección': address
            })
            i = block_end
        else:
            i += 1
    return entries

def dedupe(entries: List[Dict[str, str]]) -> List[Dict[str, str]]:
    seen = set()
    res = []
    for e in entries:
        key = (e['Nombre de la empresa'].lower(), e['Email'].lower(), re.sub(r'\s+', '', e['Teléfono']).lower())
        if key in seen:
            continue
        seen.add(key)
        res.append(e)
    return res

def write_csv(entries: List[Dict[str, str]], path: str) -> None:
    headers = ['Nombre', 'Nombre de la empresa', 'Teléfono', 'Email', 'Persona(s) de contacto / responsable(s)', 'Dirección']
    with open(path, 'w', encoding='utf-8-sig', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=headers)
        writer.writeheader()
        for e in entries:
            writer.writerow(e)


In [None]:
def load_text() -> str:
    if widgets:
        uploader = widgets.FileUpload(accept='.txt', multiple=False)
        display(uploader)
        if uploader.value:
            content = next(iter(uploader.value.values()))['content'].decode('utf-8', 'ignore')
            return content
    with open(INPUT_PATH, encoding='utf-8', errors='ignore') as f:
        return f.read()

text = load_text()
entries = dedupe(parse_entries(text, FORCE_SPAIN))
write_csv(entries, OUTPUT_PATH)
if pd:
    display(pd.DataFrame(entries))
else:
    for e in entries[:5]:
        print(e)
    print(f'Total: {len(entries)}')
print(f'Saved to {OUTPUT_PATH}')
