In [0]:
import requests
import os
import json
import logging
import time
from pyspark.sql.types import StructType, StructField, StringType

# Databricks Widgets
dbutils.widgets.text("collection_id", "")
dbutils.widgets.text("brand_name", "")

collection_id = dbutils.widgets.get("collection_id")
brand_name = dbutils.widgets.get("brand_name")

# ---------------------------
# Setup Logger
# ---------------------------
logger = logging.getLogger("aprimo_logger")
logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
handler.setFormatter(formatter)

if not logger.handlers:
    logger.addHandler(handler)

logger.info("Logger initialized")
logger.info(f"Collection ID: {collection_id}")
logger.info(f"Brand Name: {brand_name}")


In [0]:
def exponential_backoff_request(
    method,
    url,
    max_retries=5,
    backoff_factor=1.5,
    retry_status=(429, 500, 502, 503, 504),
    **kwargs
):
    """
    Reliable HTTP request wrapper with exponential backoff.
    Retries for network errors + retryable HTTP codes.
    """
    attempt = 0

    while attempt < max_retries:
        try:
            response = method(url, **kwargs)

            if response.status_code in retry_status:
                raise Exception(f"Retryable status: {response.status_code}")

            return response

        except Exception as e:
            attempt += 1
            if attempt >= max_retries:
                logger.error(f"Max retries exceeded for: {url}")
                raise

            sleep_time = (backoff_factor ** attempt)
            logger.warning(
                f"Attempt {attempt}/{max_retries} failed. "
                f"Retrying in {sleep_time:.2f}s | {url}"
            )
            logger.warning(f"Reason: {str(e)}")
            time.sleep(sleep_time)


In [0]:
token_url = "https://marssnacking-sb1.aprimo.com/login/connect/token"

payload = {
    "grant_type": "client_credentials",
    "client_id": "VG0U957L-FAMN",
    "client_secret": "QWER9573lkj*asd"
}

headers = {"Content-Type": "application/x-www-form-urlencoded"}

logger.info("Requesting OAuth token...")

token_res = exponential_backoff_request(
    requests.post,
    token_url,
    data=payload,
    headers=headers
)

access_token = token_res.json()["access_token"]
logger.info("‚úî Token generated successfully")


In [0]:
aprimo_collection_url = (
    f"https://marssnacking-sb1.dam.aprimo.com/api/core/collection/"
    f"{collection_id}/records"
)

collection_headers = {
    "Authorization": f"Bearer {access_token}",
    "Accept": "application/json",
    "API-VERSION": "1"
}

logger.info(f"Collection URL built: {aprimo_collection_url}")


In [0]:
# =============================
# Load metadata file
# =============================

metadata_file = f"/Volumes/workspace/default/aprimo-testing/{brand_name}_metadata.json"

if os.path.exists(metadata_file):
    with open(metadata_file, "r") as f:
        metadata = json.load(f)
else:
    metadata = {}

print("Loaded metadata entries:", len(metadata))


In [0]:
logger.info("Fetching records for collection...")

resp = exponential_backoff_request(
    requests.get,
    aprimo_collection_url,
    headers=collection_headers
)

data = resp.json()
ids = [item["id"] for item in data.get("items", [])]

logger.info(f"Total records found: {len(ids)}")


In [0]:
# =============================
# 4Ô∏è‚É£ Prepare Download Directory
# =============================

download_dir = f"/Volumes/workspace/default/aprimo-testing/{brand_name}/"
os.makedirs(download_dir, exist_ok=True)

CHUNK_LIMIT = 190 * 1024 * 1024  # 190 MB

logger.info(f"Download directory ready: {download_dir}")



In [0]:
CHUNK_LIMIT = 190 * 1024 * 1024   # 190MB

for api_id in ids:
    logger.info(f"Processing record: {api_id}")

    record_url = f"https://marssnacking-sb1.dam.aprimo.com/api/core/record/{api_id}"

    record_headers = {
        "Authorization": f"Bearer {access_token}",
        "Accept": "application/json",
        "API-VERSION": "1",
        "select-record": "files",
        "select-file": "fileversions",
        "select-fileversion": "publicuris"
    }

    # -------- Fetch record metadata with retry --------
    resp = exponential_backoff_request(
        requests.get,
        record_url,
        headers=record_headers
    )

    data_inv = resp.json()

    try:
        file_obj = data_inv["files"]["items"][0]["fileVersions"]["items"][0]
        pdf_url = file_obj["publicUris"]["items"][0]["uri"]
        file_name = file_obj.get("fileName", f"{api_id}.pdf")
        modified_on = file_obj.get("fileModifiedOn")
        logger.info(f"File: {file_name} | Modified: {modified_on}")

    except Exception as e:
        print(f"‚ö† No downloadable file for {api_id}: {e}")
        continue

    # -------- Skip if Not Modified --------
    old_mod = metadata.get(api_id)
    if old_mod and modified_on <= old_mod:
        logger.info("Skipped ‚Äî Not modified since last download")
        continue

    logger.info(f"Downloading: {file_name}")

    # -------- Download file with retry --------
    file_resp = exponential_backoff_request(
        requests.get,
        pdf_url,
        stream=True
    )

    save_path = os.path.join(download_dir, file_name)

    # -------- Stream download --------
    with open(save_path, "wb") as f:
        for chunk in file_resp.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)

    logger.info(f"File Saved: {save_path}")

    # -------- SPLIT FILE >190MB --------
    file_size = os.path.getsize(save_path)

    if file_size > CHUNK_LIMIT:
        logger.warning(
                f"Large file detected ({file_size/(1024*1024):.2f} MB). Splitting..."
            )

        part_no = 1
        with open(save_path, "rb") as src:
            while True:
                block = src.read(CHUNK_LIMIT)
                if not block:
                    break

                part_file = f"{file_name}_part{part_no}"
                part_path = os.path.join(download_dir, part_file)

                with open(part_path, "wb") as dest:
                    dest.write(block)
            
                logger.info(f"No split needed ({file_size/(1024*1024):.2f} MB)")
                part_no += 1

        logger.info("Splitting complete")
    else:
        print(f"üìÑ No split required ({file_size/(1024*1024):.2f} MB)")
    
    metadata[api_id] = modified_on


In [0]:
# =============================
# 6Ô∏è‚É£ Save Updated Metadata
# =============================

with open(metadata_file, "w") as f:
    json.dump(metadata, f, indent=4)


logger.info(f"Metadata updated: {metadata_file}")
