In [None]:
import os  
import io  
import json  
import time  
import mimetypes  
import requests  
from typing import Dict, Any, Generator, List, Tuple  
from urllib.parse import urljoin  
from tqdm import tqdm  
from dotenv import load_dotenv  
from azure.storage.blob import (  
    BlobServiceClient,  
    ContentSettings,  
    ContainerClient  
)  
import magic  # pip install python-magic  
  
# --------------- Configuración ----------------  
load_dotenv()  
  
CMS_ENDPOINT   = os.environ["CMS_ENDPOINT"].rstrip("/")  
CMS_API_KEY    = os.environ["CMS_API_KEY"]  
CMS_PATH = os.getenv("CMS_PATH")  
BLOB_CONNECTION_STRING = os.getenv("BLOB_CONNECTION_STRING", "cms-raw")  
BLOB_CONTAINER_NAME = os.getenv("BLOB_CONTAINER_NAME", "contents")
PAGE_SIZE      = int(os.getenv("CMS_PAGE_SIZE", 100))  

HEADERS = {  
    "Accept": "application/json",  
    "Authorization": f"Bearer {CMS_API_KEY}"  
}  
# ------------------------------------------------  
  
def list_cms_documents() -> Generator[Dict[str, Any], None, None]:  
    """Itera paginadamente los documentos/artículos del CMS."""  
    page = 1  
    while True:  
        url = f"{CMS_ENDPOINT}/contents"  
        params = {"page": page, "pageSize": PAGE_SIZE}  
        resp = requests.get(url, headers=HEADERS, params=params, timeout=30)  
        resp.raise_for_status()  
  
        batch: List[Dict[str, Any]] = resp.json()  
        if not batch:  
            break  
  
        for doc in batch:  
            yield doc  
        page += 1  
        time.sleep(0.2)  # anti-throttling  
  
def normalize(doc: Dict[str, Any]) -> Dict[str, Any]:  
    """Mapea los campos del CMS a los que quieras exponer en el índice."""  
    return {  
        "id":        doc["id"],  
        "title":     doc.get("title"),  
        "author":    doc.get("author", {}).get("name"),  
        "tags":      doc.get("tags", []),  
        "createdAt": doc.get("createdAt"),  
        "updatedAt": doc.get("updatedAt"),  
        # Identificador del adjunto que vamos a descargar.  
        "file_id":   doc.get("fileId") or doc.get("attachmentId")  
    }  
  
# ------------------------------------------------  
#  Descarga y subida a Blob Storage  
# ------------------------------------------------  
def download_file(file_id: str) -> Tuple[bytes, str, str]:  
    """  
    Descarga el binario asociado a un documento del CMS.  
    Devuelve:  
        bytes      -> contenido  
        filename   -> nombre sugerido (si se puede inferir)  
        mime_type  -> tipo MIME  
    """  
    if not file_id:  
        raise ValueError("El documento no contiene file_id")  
  
    # Si tu CMS te devuelve una URL directa en vez del id, salta este bloque:  
    if "{file_id}" in FILE_ROUTE_TMPL:  
        relative = FILE_ROUTE_TMPL.format(file_id=file_id)  
        url = urljoin(CMS_BASE_URL + "/", relative.lstrip("/"))  
    else:  
        url = file_id  # asumimos que es la URL completa  
  
    resp = requests.get(url, headers=HEADERS, timeout=120)  
    resp.raise_for_status()  
  
    # Nombre de fichero: intentamos sacarlo del header o de la URL  
    fname = None  
    cd = resp.headers.get("Content-Disposition")  
    if cd and "filename=" in cd:  
        fname = cd.split("filename=")[1].strip("\"'")  
    if not fname:  
        fname = url.split("/")[-1]  
  
    mime_type = resp.headers.get("Content-Type") or magic.from_buffer(resp.content, mime=True)  
    return resp.content, fname, mime_type  
  
def to_blob_metadata(meta: Dict[str, Any]) -> Dict[str, str]:  
    """  
    Convierte el diccionario en metadata compatible con Azure Blob:  
    - claves ASCII en minúsculas  
    - valores -> texto (listas transformadas a csv)  
    """  
    clean: Dict[str, str] = {}  
    for k, v in meta.items():  
        if v is None:  
            continue  
        key = k.lower().replace(" ", "_")  
        if isinstance(v, list):  
            v = ",".join(map(str, v))  
        clean[key] = str(v)  
    return clean  
  
def ensure_container(client: BlobServiceClient, name: str) -> ContainerClient:  
    try:  
        return client.create_container(name)  
    except Exception:  # existe  
        return client.get_container_client(name)  
  
def upload_file_and_metadata(container: ContainerClient,  
                             raw_doc: Dict[str, Any],  
                             binary: bytes,  
                             filename: str,  
                             mime_type: str) -> None:  
    """  
    Sube el fichero binario con los metadatos del documento.  
    blob-name -> <id>.<ext_original>  
    """  
    _, ext = os.path.splitext(filename)  
    if not ext:  
        ext = mimetypes.guess_extension(mime_type) or ".bin"  
  
    blob_name = f"{raw_doc['id']}{ext}"  
    blob_client = container.get_blob_client(blob_name)  
  
    metadata = to_blob_metadata(normalize(raw_doc))  
  
    blob_client.upload_blob(  
        data=binary,  
        overwrite=True,  
        metadata=metadata,  
        content_settings=ContentSettings(content_type=mime_type)  
    )  
  

In [None]:
blob_service = BlobServiceClient.from_connection_string(BLOB_CONNECTION_STRING)  
container = ensure_container(blob_service, BLOB_CONTAINER_NAME)  

docs_iter = list_cms_documents()  
for raw_doc in tqdm(docs_iter, desc="Procesando documentos"):  
    file_id = raw_doc.get("fileId") or raw_doc.get("attachmentId")  
    if not file_id:  
        # Si no hay adjunto salta o guarda sólo JSON, según tu necesidad  
        continue  

    try:  
        bin_data, fname, mime = download_file(file_id)  
        upload_file_and_metadata(container,  
                                    raw_doc=raw_doc,  
                                    binary=bin_data,  
                                    filename=fname,  
                                    mime_type=mime)  
    except Exception as ex:  
        tqdm.write(f"[WARN] No se pudo procesar id={raw_doc.get('id')}: {ex}")  

print("✔ Ingesta finalizada.")  