## Entidad del modelo

In [7]:
from pydantic import BaseModel

class Check(BaseModel):
    id: str
    departamento: str
    municipio: str
    zona: str
    completado: bool = False

class Tarjeton(BaseModel):
    id: str
    departamento: str
    municipio: str
    puesto: str
    zona: str
    mesa: str
    link: str
    votos_gustavo: int = 0
    votos_ivan: int = 0
    votos_blanco: int = 0
    votos_nulos: int = 0
    votos_no_marcados: int = 0
    votos_total: int = 0
    votos_sufragantes: int = 0
    votos_urna: int = 0
    votos_incinerados: int = 0
    validar_total: bool = False
    validar_votantes: bool = False


## Conexión a base de datos

In [8]:
from decouple import config
import psycopg2

HOST = config('PGSQL_HOST')
USER = config('PGSQL_USER')
PASSWORD = config('PGSQL_PASSWORD')
PORT = config('PGSQL_PORT')
DB_NAME = config('PGSQL_DATABASE')

table_name = config('TABLE_NAME')
table_check = 'resumen'

def get_connection():
    connection = psycopg2.connect(
                dbname=DB_NAME,
                user=USER,
                password=PASSWORD,
                host=HOST,
                port=PORT
            )
    return connection

Crear tablas si no existen

In [9]:
create_table = f"""
                CREATE TABLE IF NOT EXISTS {table_name} (
                    id VARCHAR(50),
                    departamento VARCHAR(50),
                    municipio VARCHAR(50),
                    puesto VARCHAR(50),
                    zona VARCHAR(50),
                    mesa VARCHAR(50),
                    link VARCHAR(150),
                    votos_gustavo SMALLINT,
                    votos_ivan SMALLINT,
                    votos_blanco SMALLINT,
                    votos_nulos SMALLINT,
                    votos_no_marcados SMALLINT,
                    votos_total INT,
                    votos_sufragantes INT,
                    votos_urna INT,
                    votos_incinerados SMALLINT,
                    validar_total BOOLEAN,
                    validar_votantes BOOLEAN
                )
                """
connection = get_connection()
cur = connection.cursor()
cur.execute(create_table)
connection.commit()

In [10]:
create_table_check = f"""
                CREATE TABLE IF NOT EXISTS {table_check} (
                    id VARCHAR(50),
                    departamento VARCHAR(50),
                    municipio VARCHAR(50),
                    zona VARCHAR(50),
                    completado BOOLEAN
                )
                """
connection = get_connection()
cur = connection.cursor()
cur.execute(create_table_check)
connection.commit()

In [11]:
def get_max_id(table_name=table_name):
    try:
        connection = get_connection()
        with connection.cursor() as cursor:
            query = f'SELECT MAX(CAST(id AS INTEGER)) AS max_id FROM "{table_name}"'
            cursor.execute(query)
            result = cursor.fetchone()
            cursor.close()
        return result[0] if result[0] is not None else 0  # Acceso por índice
    except psycopg2.Error as e:
        raise Exception(e)

CRUD base de datos

In [12]:
from psycopg2.extras import DictCursor

def create_row(infoDom, table_name=table_name):
    try:
        dictItem = infoDom.dict()

        columns = list(dictItem.keys())
        values = list(dictItem.values())

        connection = get_connection()
        with connection.cursor() as cursor:
            query = f'INSERT INTO "{table_name}" ({",".join(columns)}) VALUES ({",".join(["%s"]*len(columns))})'
            cursor.execute(query, values)
            affected_rows = cursor.rowcount
            connection.commit()
            cursor.close()
        return affected_rows
    except psycopg2.Error as e:
        raise Exception(e)

def update_row(values, table_name=table_name, columns=0, condition=None):
    try:
        connection = get_connection()

        with connection.cursor() as cursor:
            set_values = ", ".join([f"{column} = %s" for column in columns])
            condition_query = f" WHERE {condition}" if condition else ""
            query = f"UPDATE {table_name} SET {set_values}{condition_query}"
            
            cursor.execute(query, list(values.values()))  # Convertimos a lista
            connection.commit()
            cursor.close()
    except psycopg2.Error as e:
        raise Exception(e)


def get_all_by_columns(params, table_name=table_name):
    try:
        connection = get_connection()

        columns = list(params.keys())
        values = list(params.values())
        
        where_clause = " AND ".join([f'"{col}" = %s' for col in columns])
        with connection.cursor(cursor_factory=DictCursor) as cursor:
            query = f'SELECT * FROM "{table_name}" WHERE {where_clause}'
            cursor.execute(query, values)
            infoData = cursor.fetchall()
        connection.close()
        return infoData
    except Exception as ex:
        raise Exception(ex)
    
def get_all_rows(table_name=table_name, condition=None):
    try:
        connection = get_connection()
        with connection.cursor() as cursor:

            condition_query = f" WHERE {condition}" if condition else ""
            query = f'SELECT * FROM "{table_name}"{condition_query}'
            cursor.execute(query)
            result = cursor.fetchall()
            cursor.close()
        return result
    except psycopg2.Error as e:
        raise Exception(e)

## Web Scrapping de la página de la registraduría

In [13]:
import requests
from bs4 import BeautifulSoup

In [14]:
url = 'https://elecciones1.registraduria.gov.co/e14_pre2_2018/e14'

findDepartamentos = 'cargar_departamentos_barra'
findMunicipios = 'cambiar_departamento'
findZonas = 'cambiar_municipio'
findMesas = 'cambiar_zona'
findLinks = 'cargar_mesas'

In [15]:
import time
import random

# Al final de cada solicitud
time.sleep(random.uniform(1, 3))  # Espera entre 1 y 3 segundos


def getInfo(url, accion, dep, mun, zona, pues, retries=3):
    body = dict(accion=accion,
                dep_activo=dep,
                mun_activo=mun,
                zona_activo=zona,
                pues_activo=pues)
    
    user_agents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
        # Agrega más User Agents aquí
    ]

    headers = {'User-Agent': random.choice(user_agents)}

    for attempt in range(retries):
        time.sleep(random.uniform(1, 3)) 
        try:
            htmlText = requests.post(url, data=body, headers=headers).text
            return htmlText
        except requests.exceptions.RequestException as e:
            print(f"Error en la solicitud: {e}. Reintentando...")
            time.sleep(2)
    raise Exception("No se pudo completar la solicitud después de varios intentos.")


In [16]:
import re

def limpiar_cadena(cadena):
    return re.sub(r'[^a-zA-Z\s]', '', cadena).strip()

In [17]:
def validateExistence(infoLink):
    # Convierte el objeto a un diccionario y elimina la clave 'id'
    dictItem = infoLink.dict()
    del dictItem['id']
    
    # Crea la condición de búsqueda SQL
    condition = " AND ".join([f"{key} = '{value}'" for key, value in dictItem.items()])
    
    # Consulta en la base de datos
    data = get_all_rows(table_name, condition)
    
    # Si no existen registros, retorna True, lo que significa que no existe
    return len(data) == 0


In [18]:
global arrayInfo
arrayInfo = []

In [19]:
def procesar_link(departamento, municipio, zona, mesa, link):
    info = {
        "departamento": limpiar_cadena(departamento.text),
        "municipio": limpiar_cadena(municipio.text),
        "puesto": mesa.text.strip(),
        "mesa": link.text.strip(),
        "zona": zona.text.strip(),
        "link": link['href'],
    }
    print(info)
    arrayInfo.append(info)
    # Aquí iría la validación y el almacenamiento en la base de datos
    # create_row(info)

In [20]:
def procesar_mesa(departamento, municipio, zona, mesa):
    mes = ('0' + mesa['value'])[-2:]
    infoLinks = getInfo(url, findLinks, departamento, municipio, zona, mes)
    if not infoLinks:
        return
    
    soupLinks = BeautifulSoup(infoLinks, "html.parser")
    for link in soupLinks.findAll('a'):
        procesar_link(departamento, municipio, zona, mesa, link)
        if len(arrayInfo) > 20:
            return

In [21]:
def is_departamento_completado(departamento):
    params = {"departamento": departamento}
    result = get_all_by_columns(params, table_check)
    return all([row["completado"] for row in result]) if result else False

# Función para verificar si todos los registros de un municipio están completados
def is_municipio_completado(departamento, municipio):
    params = {"departamento": departamento, "municipio": municipio}
    result = get_all_by_columns(params, table_check)
    return all([row["completado"] for row in result]) if result else False

# Función para verificar si todos los registros de una zona están completados
def is_zona_completada(departamento, municipio, zona):
    params = {"departamento": departamento, "municipio": municipio, "zona": zona}
    result = get_all_by_columns(params, table_check)
    return all([row["completado"] for row in result]) if result else False

In [22]:
is_zona_completada( 'Atlntico','Barranquilla', 'ZONA 20')

True

In [23]:
id = int(get_max_id()) + 1
id

96286

### Alimentar la tabla de resumen

In [24]:
# infoDepartamento = getInfo(url, findDepartamentos, '01', '', '', '')
# soupDepartamentos = BeautifulSoup(infoDepartamento, "html.parser")
# id = 0
# # Itera cada uno de los departamentos
# for departamento in soupDepartamentos.findAll('a'):
#     dpto = (departamento['id'][-2:]).replace('_', '0')
#     infoMunicipios = getInfo(url, findMunicipios, dpto, '', '', '')
#     soupMunicipios = BeautifulSoup(infoMunicipios, "html.parser")
#     departamentoName = limpiar_cadena(departamento.text)
    
#     # Itera cada uno de los municipios
#     for municipio in soupMunicipios.findAll('option'):
#         mnpio = ('00'+municipio['value'])[-3:]
#         infoZonas = getInfo(url, findZonas, dpto, mnpio, '', '')
#         soupZonas = BeautifulSoup(infoZonas, "html.parser")

#         # Itera cada uno de las zonas
#         for zona in soupZonas.findAll('option'):
#             if zona['value'] == '-1':
#                 continue

#             info = {
#                 "id": str(id),
#                 "departamento": departamentoName,
#                 "municipio": limpiar_cadena(municipio.text),
#                 "zona": zona.text.strip(),
#             }
#             create_row(Check(**info), table_check)
#             id += 1

In [25]:
# tarjeton_rows = get_all_rows(table_name)
# unique_rows = set()

# for tarjeton_row in tarjeton_rows:
#     unique_rows.add((tarjeton_row[1], tarjeton_row[2], tarjeton_row[4]))

# for departamento, municipio, zona in unique_rows:
    
#     params = {'departamento': departamento, 'municipio': municipio, 'zona': zona}
#     check_record = get_all_by_columns(params, table_check)

#     if check_record:
#         update_values = {'completado': True}
#         condition = f'departamento = \'{departamento}\' AND municipio = \'{municipio}\' AND zona = \'{zona}\''
        
#         update_row(update_values, table_check, update_values.keys(), condition)
#     else:
#         print(f"No se encontró registro en resumen para {departamento}, {municipio}, {zona}")

### Alimentar la tabla de info votos

In [27]:
infoDepartamento = getInfo(url, findDepartamentos, '01', '', '', '')

soupDepartamentos = BeautifulSoup(infoDepartamento, "html.parser")

global arrayInfo
arrayInfo = []

update_completado = {'completado': True}
id = int(get_max_id()) + 1

# Itera cada uno de los departamentos
for departamento in soupDepartamentos.findAll('a'):
    dpto = (departamento['id'][-2:]).replace('_', '0')
    infoMunicipios = getInfo(url, findMunicipios, dpto, '', '', '')
    soupMunicipios = BeautifulSoup(infoMunicipios, "html.parser")
    departamentoName = limpiar_cadena(departamento.text)

    if is_departamento_completado(limpiar_cadena(departamento.text)):
        print(f"{limpiar_cadena(departamento.text)} completado. Saltando...")
        continue
    
    # Itera cada uno de los municipios
    for municipio in soupMunicipios.findAll('option'):
        mnpio = ('00'+municipio['value'])[-3:]
        infoZonas = getInfo(url, findZonas, dpto, mnpio, '', '')
        soupZonas = BeautifulSoup(infoZonas, "html.parser")

        if is_municipio_completado(departamentoName, limpiar_cadena(municipio.text)):
            print(f"{limpiar_cadena(departamento.text)} / {limpiar_cadena(municipio.text)} completado. Saltando...")
            continue

        # Itera cada uno de las zonas
        for zona in soupZonas.findAll('option'):
            zon = ('0'+zona['value'])[-2:]
            infoMesas = getInfo(url, findMesas, dpto, mnpio, zon, '')
            soupMesas = BeautifulSoup(infoMesas, "html.parser")

            if is_zona_completada(departamentoName, limpiar_cadena(municipio.text), zona.text.strip()):
                print(f"{limpiar_cadena(departamento.text)} / {limpiar_cadena(municipio.text)} / {zona.text.strip()} completada. Saltando...")
                continue

            # Itera cada uno de las mesas
            for mesa in soupMesas.findAll('option'):
                mes = ('0'+mesa['value'])[-2:]
                infoLinks = getInfo(
                    url, findLinks, dpto, mnpio, zon, mes)
                soupLinks = BeautifulSoup(infoLinks, "html.parser")

                # Itera cada uno de los links
                for link in soupLinks.findAll('a'):

                    info = {
                        "id": str(id),
                        "departamento": limpiar_cadena(departamento.text),
                        "municipio": limpiar_cadena(municipio.text),
                        "puesto": mesa.text.strip(),
                        "mesa": link.text.strip(),
                        "zona": zona.text.strip(),
                        "link": link['href'],
                    }

                    infoCheck = Tarjeton(**info)
                    boolValidate = validateExistence(infoCheck)
                    if boolValidate:
                        arrayInfo.append(infoCheck)
                        create_row(infoCheck)
                        id += 1
                        print(info)
                    else:
                        print(f"El registro {limpiar_cadena(departamento.text)}/{limpiar_cadena(municipio.text)}/{zona.text.strip()}/ ya existe en la base de datos.")
            
            condition = f'departamento = \'{limpiar_cadena(departamento.text)}\' AND municipio = \'{limpiar_cadena(municipio.text)}\' AND zona = \'{zona.text.strip()}\''
            update_row(update_completado, table_check, update_completado.keys(), condition)

Antioquia completado. Saltando...
Atlntico completado. Saltando...
Bolvar completado. Saltando...
Boyac completado. Saltando...
Caldas completado. Saltando...
Cauca completado. Saltando...
Cesar completado. Saltando...
Crdoba completado. Saltando...
Cundinamarca completado. Saltando...
Bogot DC completado. Saltando...
Choc completado. Saltando...
Huila completado. Saltando...
Magdalena completado. Saltando...
Nario completado. Saltando...
Risaralda completado. Saltando...
Norte de Santander completado. Saltando...
Quindo completado. Saltando...
Santander completado. Saltando...
El registro Sucre/Sincelejo/ZONA 01/ ya existe en la base de datos.
El registro Sucre/Sincelejo/ZONA 01/ ya existe en la base de datos.
El registro Sucre/Sincelejo/ZONA 01/ ya existe en la base de datos.
El registro Sucre/Sincelejo/ZONA 01/ ya existe en la base de datos.
El registro Sucre/Sincelejo/ZONA 01/ ya existe en la base de datos.
El registro Sucre/Sincelejo/ZONA 01/ ya existe en la base de datos.
El regi

Exception: No se pudo completar la solicitud después de varios intentos.