In [1]:
import os
import re
import unicodedata
import httpx
import pandas as pd
import json
from datetime import datetime
import nest_asyncio
import uvicorn
from fastapi import FastAPI, Response
from pydantic import BaseModel
from typing import Dict, List, Optional
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from urllib.parse import unquote
import mysql.connector
from pymongo import MongoClient

In [2]:
def conectar_db():
    try:
        client = MongoClient("mongodb://localhost:27017/")
        db = client["Marcas"]  # Nombre de la base de datos
        print("Conexión a MongoDB exitosa.")
        return db
    except Exception as err:
        print(f"Error al conectar a MongoDB: {err}")
        return None

In [3]:
nest_asyncio.apply()
app = FastAPI()

class MarcaRequest(BaseModel):
    marca: str

In [4]:
class ModificarRequest(BaseModel):
    marca: str
    red: str
    enlaces_eliminar: Optional[List[int]] = None
    enlaces_agregar: Optional[List[str]] = None

In [5]:
###### Credenciales
#api_key = 'AIzaSyDuIQSmkWuEDc32twaUDJ-2OJFThw4R9V8'
#search_engine_id = 'd425b95fffdc64915'
#marca = 'coca-cola'
country = 'MX|US'

### Credenciales 2
api_key = 'AIzaSyAfaMeGgowD2e6BkPGuHkD9ScMUg8mrtCQ'
search_engine_id = '0507d2831a0c64998'


In [6]:
def normalizar_marca(nombre_marca: str) -> str:
    nombre_normalizado = ''.join(
        c for c in unicodedata.normalize('NFKD', nombre_marca) if unicodedata.category(c) != 'Mn'
    )
    nombre_normalizado = re.sub(r'[^a-zA-Z0-9]', '', nombre_normalizado)
    return nombre_normalizado.lower()

In [7]:
def google_search(api_key, search_engine_id, query, **params):
    base_url = 'https://www.googleapis.com/customsearch/v1'
    params = {
        'key': api_key,
        'cx': search_engine_id,
        'q': query,
        'gl' : country,
        **params
    }
    response =  httpx.get(base_url, params = params)
    response.raise_for_status()
    return response.json()

In [8]:
def guardar_o_actualizar_marca(marca, resultados_x, resultados_meta, resultados_tiktok, cuentas_verificadas, logo_binario):
    db = conectar_db()
    if db is None:
        return {"error": "No se pudo conectar a la base de datos."}

    try:
        # Crear el documento a insertar/actualizar
        documento = {
            "nombre_marca": marca,
            "resultados_x": resultados_x,
            "resultados_meta": resultados_meta,
            "resultados_tiktok": resultados_tiktok,
            "cuentas_verificadas": cuentas_verificadas,
            "logo": logo_binario
        }

        # Buscar si la marca ya existe
        coleccion = db["marcas"]
        marca_existente = coleccion.find_one({"nombre_marca": marca})

        if marca_existente:
            print(f"Actualizando datos para la marca: {marca}")
            coleccion.update_one({"nombre_marca": marca}, {"$set": documento})
        else:
            print(f"Insertando datos para la marca: {marca}")
            coleccion.insert_one(documento)

        print("Datos guardados correctamente.")
    except Exception as err:
        print(f"Error al guardar datos: {err}")

In [9]:
def hacer_busquedas(marca):
    marca = normalizar_marca(marca)
    db = conectar_db()
    if db is None:
        return {"error": "No se pudo conectar a la base de datos."}

    print(f"\nDepuración: hacer_busquedas - Marca: {marca}")

    # Buscar si la marca ya existe en MongoDB
    coleccion = db["marcas"]
    marca_existente = coleccion.find_one({"nombre_marca": marca})

    if marca_existente:
        print(f"La marca '{marca}' ya existe en la base de datos. Usando resultados existentes.")
        return marca_existente["cuentas_verificadas"]

    # Si la marca no existe, realizar las búsquedas
    country = "MX|US"
    busquedas = {
        "X": [f"{marca} X Twitter MX Mexico cuenta", f"{marca} X Twitter US account"],
        "Facebook": [f"{marca} Facebook MX Mexico cuenta", f"{marca} Facebook US account"],
        "Instagram": [f"{marca} Instagram MX Mexico cuenta", f"{marca} Instagram US account"],
        "Tiktok": [f"{marca} Tiktok MX Mexico cuenta", f"{marca} Tiktok US account"]
    }

    dominios_por_red = {
        "X": "x.com",
        "Instagram": "www.instagram.com",
        "Tiktok": "www.tiktok.com",
        "Facebook": "www.facebook.com"
    }

    # Inicializar las variables para los resultados
    resultados_x = []
    resultados_meta = []
    resultados_tiktok = []

    # Realizar las búsquedas
    for red, queries in busquedas.items():
        resultados_red = []
        for query in queries:
            print(f"Realizando búsqueda para {red}: {query}")
            response = google_search(
                api_key=api_key,
                search_engine_id=search_engine_id,
                query=query,
                country=country,
                num=5
            )
            resultados_red.extend(response.get('items', [])[:5])

        if red == "X":
            resultados_x = resultados_red  # Guardar resultados de X
        elif red == "Tiktok":
            resultados_tiktok = resultados_red  # Guardar resultados de TikTok
        elif red in ["Facebook", "Instagram"]:
            resultados_meta.extend(resultados_red)  # Guardar resultados de Meta

    print(f"Resultados de X: {resultados_x}")
    print(f"Resultados de Meta: {resultados_meta}")
    print(f"Resultados de TikTok: {resultados_tiktok}")

    # Buscar y guardar el logo
    logo_binario = buscar_y_guardar_logo(db, marca, resultados_x)

    # Crear el objeto cuentas_verificadas con los enlaces encontrados
    cuentas_verificadas = {"automaticas": {}, "manuales": {}, "eliminadas": {}}
    for red in busquedas.keys():
        print(f"Filtrando enlaces para {red}")
        enlaces_filtrados = filtrar_enlaces(db, marca, red, dominios_por_red[red])
        cuentas_verificadas["automaticas"][red] = enlaces_filtrados

    print(f"Cuentas verificadas: {cuentas_verificadas}")

    # Guardar o actualizar en la base de datos
    guardar_o_actualizar_marca(
        marca=marca,
        resultados_x=resultados_x,  # Datos de X
        resultados_meta=resultados_meta,  # Datos de Meta
        resultados_tiktok=resultados_tiktok,  # Datos de TikTok
        cuentas_verificadas=cuentas_verificadas,
        logo_binario=logo_binario  # Logo en binario
    )

    print("Cuentas verificadas que se guardarán:", cuentas_verificadas)
    return cuentas_verificadas

In [10]:
def filtrar_enlaces(db, marca, red, dominio):
    # Normalizar el nombre de la marca
    marca_normalizada = normalizar_marca(marca)
    primeros_tres_caracteres = marca_normalizada[:3]

    print(f"\nDepuración: filtrar_enlaces - Marca: {marca}, Red: {red}, Dominio: {dominio}")
    print(f"Primeros tres caracteres de la marca: {primeros_tres_caracteres}")

    # Construir la consulta de MongoDB
    consulta = {
        "nombre_marca": marca,
        f"resultados_{red.lower()}": {
            "$elemMatch": {
                "link": {
                    "$regex": f".*{dominio}.*",
                    "$options": "i"
                }
            }
        }
    }

    print(f"Consulta de MongoDB: {consulta}")

    # Obtener la colección de marcas
    coleccion = db["marcas"]

    # Realizar la consulta
    resultados = coleccion.find(consulta, {f"resultados_{red.lower()}.link": 1})

    print(f"Resultados de la consulta: {list(resultados)}")

    # Extraer los enlaces filtrados
    enlaces_filtrados = []
    for documento in resultados:
        print(f"Documento encontrado: {documento}")
        for resultado in documento.get(f"resultados_{red.lower()}", []):
            link = resultado.get("link", "")
            print(f"Enlace encontrado: {link}")
            enlaces_filtrados.append(link)

    print(f"Enlaces filtrados para {red}: {enlaces_filtrados}")
    return enlaces_filtrados

In [11]:
def imprimir_enlaces(db, marca, red, dominio):
    # Obtener la colección "marcas" de la base de datos
    coleccion = db["marcas"]
    
    # Normalizar el nombre de la marca
    marca_normalizada = normalizar_marca(marca)
    print(f"Marca normalizada: {marca_normalizada}")  # Depuración
    
    # Buscar el documento de la marca en la colección
    documento = coleccion.find_one({"nombre_marca": marca})
    
    if not documento:
        print(f"No se encontró la marca '{marca}' en la base de datos.")  # Depuración
        return []  # Si no se encuentra la marca, devolver una lista vacía
    
    # Obtener los resultados de la red social específica
    if red == "X":
        resultados = documento.get("resultados_x", [])
    elif red == "Tiktok":
        resultados = documento.get("resultados_tiktok", [])
    else:
        resultados = documento.get("resultados_meta", [])
    
    print(f"Resultados para {red}:", resultados)  # Depuración
    
    # Filtrar los enlaces
    enlaces_filtrados = filtrar_enlaces(resultados, dominio, marca_normalizada)
    return enlaces_filtrados

In [12]:
def buscar_y_guardar_logo(db, marca, resultados_x):
    # Normalizar el nombre de la marca
    marca_normalizada = normalizar_marca(marca)
    primeros_tres_caracteres = marca_normalizada[:3]

    print(f"\nDepuración: buscar_y_guardar_logo - Marca: {marca}")
    print(f"Primeros tres caracteres de la marca: {primeros_tres_caracteres}")

    # Construir la consulta de MongoDB
    consulta = {
        "nombre_marca": marca_normalizada,
        "resultados_x.link": {
            "$regex": f"https://x.com/.*{primeros_tres_caracteres}",
            "$options": "i"  # Ignorar mayúsculas/minúsculas
        },
        "resultados_x.pagemap.cse_thumbnail": {"$exists": True}
    }

    print(f"Consulta de MongoDB: {consulta}")

    # Obtener la colección de marcas
    coleccion = db["marcas"]

    # Realizar la consulta
    resultado = coleccion.find_one(consulta, {"resultados_x.pagemap.cse_thumbnail": 1})

    print(f"Resultado de la consulta: {resultado}")

    if resultado:
        # Extraer la URL del logo
        url_logo = resultado["resultados_x"][0]["pagemap"]["cse_thumbnail"][0]["src"]
        print(f"URL del logo encontrada: {url_logo}")

        try:
            # Descargar el logo
            response = httpx.get(url_logo)
            if response.status_code == 200:
                print("Logo descargado correctamente.")
                return response.content  # Devolver el logo como binario
        except Exception as e:
            print(f"Error al descargar el logo: {e}")

    print("No se encontró el logo.")
    return None

In [13]:
def buscar_logo(marca: str) -> Optional[bytes]:
    db = conectar_db()
    if db is None:
        return None

    # Normalizar el nombre de la marca
    marca_normalizada = normalizar_marca(marca)

    # Buscar el documento de la marca en la colección
    coleccion = db["marcas"]
    documento = coleccion.find_one({"nombre_marca": marca_normalizada}, {"logo": 1})

    if not documento:
        print(f"No se encontró la marca '{marca_normalizada}' en la base de datos.")
        return None

    # Obtener el logo en binario
    logo_binario = documento.get("logo")
    return logo_binario

In [14]:
@app.post("/modificar-enlaces/")
async def modificar_enlaces(request: ModificarRequest):
    marca = normalizar_marca(request.marca)
    red = request.red
    enlaces_eliminar = request.enlaces_eliminar
    enlaces_agregar = request.enlaces_agregar

    # Conectar a MongoDB
    db = conectar_db()
    if db is None:
        raise HTTPException(status_code=500, detail="Error al conectar a la base de datos.")

    # Obtener la colección de marcas
    coleccion = db["marcas"]

    # Buscar la marca en la base de datos
    marca_existente = coleccion.find_one({"nombre_marca": marca})
    if not marca_existente:
        raise HTTPException(status_code=404, detail=f"No se encontró la marca {marca}.")

    # Cargar las cuentas verificadas
    cuentas_verificadas = marca_existente.get("cuentas_verificadas", {"automaticas": {}, "manuales": {}, "eliminadas": {}})

    # Verificar si la red social existe
    if red not in cuentas_verificadas["automaticas"]:
        raise HTTPException(status_code=404, detail=f"No se encontraron enlaces para la red social {red}.")

    # Mostrar los enlaces actuales (automáticos y manuales)
    enlaces_automaticos = cuentas_verificadas["automaticas"][red]
    enlaces_manuales = cuentas_verificadas["manuales"].get(red, [])
    enlaces_actuales = enlaces_automaticos + enlaces_manuales

    print(f"Enlaces actuales para {red}:")
    print("Automáticos:")
    for i, enlace in enumerate(enlaces_automaticos):
        print(f"{i + 1}. {enlace} (Automático)")
    print("Manuales:")
    for i, enlace in enumerate(enlaces_manuales, len(enlaces_automaticos) + 1):
        print(f"{i}. {enlace} (Manual)")

    # Eliminar enlaces si se proporcionan índices
    if enlaces_eliminar:
        enlaces_eliminar = [i for i in enlaces_eliminar if 0 <= i < len(enlaces_actuales)]
        if enlaces_eliminar:
            # Guardar los enlaces eliminados
            if red not in cuentas_verificadas["eliminadas"]:
                cuentas_verificadas["eliminadas"][red] = []
            cuentas_verificadas["eliminadas"][red].extend([enlaces_actuales[i] for i in enlaces_eliminar])

            # Actualizar la lista de enlaces automáticos y manuales
            cuentas_verificadas["automaticas"][red] = [enlace for i, enlace in enumerate(enlaces_automaticos) if i not in enlaces_eliminar]
            cuentas_verificadas["manuales"][red] = [enlace for i, enlace in enumerate(enlaces_manuales, len(enlaces_automaticos)) if i not in enlaces_eliminar]

    # Agregar enlaces si se proporcionan
    if enlaces_agregar:
        if red not in cuentas_verificadas["manuales"]:
            cuentas_verificadas["manuales"][red] = []

        # Formatear los enlaces manuales según la red social
        for enlace in enlaces_agregar:
            if enlace.strip():  # Solo agregar si la cadena no está vacía
                if red == "Tiktok" and not enlace.startswith("https://"):
                    enlace = f"https://www.tiktok.com/@{enlace.lstrip('@')}"
                elif red == "Facebook" and not enlace.startswith("https://"):
                    enlace = f"https://www.facebook.com/{enlace}"
                elif red == "Instagram" and not enlace.startswith("https://"):
                    enlace = f"https://www.instagram.com/{enlace}"
                elif red == "X" and not enlace.startswith("https://"):
                    enlace = f"https://x.com/{enlace}"
                cuentas_verificadas["manuales"][red].append(enlace)

    # Eliminar la red social de "manuales" si no tiene enlaces
    if red in cuentas_verificadas["manuales"] and not cuentas_verificadas["manuales"][red]:
        del cuentas_verificadas["manuales"][red]

    # Actualizar la base de datos
    coleccion.update_one(
        {"nombre_marca": marca},
        {"$set": {"cuentas_verificadas": cuentas_verificadas}}
    )

    return cuentas_verificadas

In [15]:
@app.post("/buscar/")
async def buscar(request: MarcaRequest):
    marca = request.marca
    return hacer_busquedas(marca)

In [16]:
@app.get("/obtener-logo/{marca}")
async def obtener_logo(marca: str):
    marca = normalizar_marca(marca)  # Normalizar la marca
    logo_binario = buscar_logo(marca)
    if logo_binario:
        return Response(content=logo_binario, media_type="image/jpeg")
    else:
        return {"error": "Logo no encontrado"}

In [17]:
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://marketing.test"],  # Origen permitido (tu frontend)
    allow_credentials=True,
    allow_methods=["*"],  # Permitir todos los métodos (GET, POST, etc.)
    allow_headers=["*"],  # Permitir todos los encabezados
)

In [18]:
def start_api():
    uvicorn.run(app, host="127.0.0.1", port=8000)

In [19]:
start_api()

INFO:     Started server process [28196]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:60787 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:60787 - "GET /openapi.json HTTP/1.1" 200 OK
Conexión a MongoDB exitosa.

Depuración: hacer_busquedas - Marca: nintendo
Realizando búsqueda para X: nintendo X Twitter MX Mexico cuenta
Realizando búsqueda para X: nintendo X Twitter US account
Realizando búsqueda para Facebook: nintendo Facebook MX Mexico cuenta
Realizando búsqueda para Facebook: nintendo Facebook US account
Realizando búsqueda para Instagram: nintendo Instagram MX Mexico cuenta
Realizando búsqueda para Instagram: nintendo Instagram US account
Realizando búsqueda para Tiktok: nintendo Tiktok MX Mexico cuenta
Realizando búsqueda para Tiktok: nintendo Tiktok US account
Resultados de X: [{'kind': 'customsearch#result', 'title': 'Nintendo Latinoamérica (@NintendoLatam) / X', 'htmlTitle': '<b>Nintendo</b> Latinoamérica (@NintendoLatam) / <b>X</b>', 'link': 'https://twitter.com/NintendoLatam', 'displayLink': 'twitter.com', 'snippet': 'Battle Royale Capí

INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [28196]
