In [4]:
from pathlib import Path

base = Path("ads_to_salesforce_func")
(base / "AdsToSalesforceDaily").mkdir(parents=True, exist_ok=True)
(base / "shared").mkdir(parents=True, exist_ok=True)

print("Created/verified folders:")
for p in [base, base/"AdsToSalesforceDaily", base/"shared"]:
    print(" -", p.resolve())



Created/verified folders:
 - C:\Users\aqeel.malik\Documents\Integration Project\ads_to_salesforce_func
 - C:\Users\aqeel.malik\Documents\Integration Project\ads_to_salesforce_func\AdsToSalesforceDaily
 - C:\Users\aqeel.malik\Documents\Integration Project\ads_to_salesforce_func\shared


In [5]:
%%writefile ads_to_salesforce_func/host.json
{
  "version": "2.0"
}

Writing ads_to_salesforce_func/host.json


In [6]:
%%writefile ads_to_salesforce_func/requirements.txt
azure-functions==1.20.0
azure-identity==1.17.1
azure-keyvault-secrets==4.8.0
azure-storage-blob==12.22.0

simple-salesforce==1.12.6
requests==2.32.3

google-ads==24.1.0
tenacity==8.3.0
python-dateutil==2.9.0.post0


Writing ads_to_salesforce_func/requirements.txt


In [7]:
%%writefile ads_to_salesforce_func/local.settings.json
{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "python",

    "ENVIRONMENT": "local",
    "KEY_VAULT_URL": "https://<your-keyvault-name>.vault.azure.net/",

    "BLOB_CONNECTION_STRING": "<your-blob-connection-string>",
    "BLOB_CONTAINER": "ads-archive",

    "SF_LOGIN_URL": "https://test.salesforce.com",
    "SF_API_VERSION": "59.0",
    "SF_SECRET_NAME_USERNAME": "sf-username",
    "SF_SECRET_NAME_PASSWORD": "sf-password",
    "SF_SECRET_NAME_TOKEN": "sf-security-token",

    "GOOGLE_ADS_SECRET_NAME_DEV_TOKEN": "google-ads-dev-token",
    "GOOGLE_ADS_SECRET_NAME_CLIENT_ID": "google-ads-client-id",
    "GOOGLE_ADS_SECRET_NAME_CLIENT_SECRET": "google-ads-client-secret",
    "GOOGLE_ADS_SECRET_NAME_REFRESH_TOKEN": "google-ads-refresh-token",
    "GOOGLE_ADS_CUSTOMER_ID": "1234567890",

    "META_SECRET_NAME_ACCESS_TOKEN": "meta-access-token",
    "META_AD_ACCOUNT_ID": "act_1234567890",

    "SF_CAMPAIGN_OBJECT_API_NAME": "Ad_Campaign_Performance__c",
    "SF_EXTERNAL_ID_FIELD": "External_Id__c"
  }
}


Writing ads_to_salesforce_func/local.settings.json


In [8]:
%%writefile ads_to_salesforce_func/AdsToSalesforceDaily/function.json
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 15 20 * * *"
    }
  ]
}


Writing ads_to_salesforce_func/AdsToSalesforceDaily/function.json


In [9]:
%%writefile ads_to_salesforce_func/AdsToSalesforceDaily/__init__.py
import azure.functions as func
from datetime import datetime, timedelta, timezone

from shared.logging_utils import get_logger
from shared.config import load_config
from shared.keyvault_client import KeyVaultSecrets
from shared.blob_archive import BlobArchiver
from shared.google_ads_client import fetch_google_ads_daily
from shared.meta_ads_client import fetch_meta_ads_daily
from shared.mapper import normalize_google_rows, normalize_meta_rows
from shared.salesforce_client import SalesforceWriter
from shared.utils import iso_ts


logger = get_logger(__name__)


def main(mytimer: func.TimerRequest) -> None:
    cfg = load_config()

    now_utc = datetime.now(timezone.utc)
    target_date = (now_utc - timedelta(days=1)).date()
    run_id = iso_ts(now_utc)

    logger.info("Run start", extra={"run_id": run_id, "target_date": str(target_date)})

    secrets = KeyVaultSecrets(cfg.key_vault_url)

    archiver = BlobArchiver(
        container_name=cfg.blob_container,
        connection_string=cfg.blob_connection_string,
    )

    # Google Ads
    google_raw = fetch_google_ads_daily(
        secrets=secrets,
        customer_id=cfg.google_ads_customer_id,
        target_date=target_date,
        secret_names=cfg.google_ads_secret_names(),
    )
    archiver.archive_json(
        path=f"raw/google_ads/{target_date}/google_ads_{run_id}.json",
        data=google_raw,
        metadata={"source": "google_ads", "date": str(target_date), "run_id": run_id},
    )

    # Meta Ads
    meta_raw = fetch_meta_ads_daily(
        secrets=secrets,
        ad_account_id=cfg.meta_ad_account_id,
        target_date=target_date,
        access_token_secret_name=cfg.meta_secret_name_access_token,
    )
    archiver.archive_json(
        path=f"raw/meta_ads/{target_date}/meta_ads_{run_id}.json",
        data=meta_raw,
        metadata={"source": "meta_ads", "date": str(target_date), "run_id": run_id},
    )

    # Normalize -> Salesforce-ready records
    google_records = normalize_google_rows(google_raw)
    meta_records = normalize_meta_rows(meta_raw)

    archiver.archive_json(
        path=f"normalized/{target_date}/google_records_{run_id}.json",
        data=google_records,
        metadata={"source": "google_ads_normalized", "date": str(target_date), "run_id": run_id},
    )
    archiver.archive_json(
        path=f"normalized/{target_date}/meta_records_{run_id}.json",
        data=meta_records,
        metadata={"source": "meta_ads_normalized", "date": str(target_date), "run_id": run_id},
    )

    # Salesforce upsert
    sf = SalesforceWriter(
        login_url=cfg.sf_login_url,
        api_version=cfg.sf_api_version,
        username=secrets.get(cfg.sf_secret_name_username),
        password=secrets.get(cfg.sf_secret_name_password),
        security_token=secrets.get(cfg.sf_secret_name_token),
    )

    total_ok = 0
    total_fail = 0

    for batch in (google_records, meta_records):
        res = sf.upsert_records(
            object_api_name=cfg.sf_campaign_object_api_name,
            external_id_field=cfg.sf_external_id_field,
            records=batch,
        )
        total_ok += res["success"]
        total_fail += res["failed"]

    logger.info("Run done", extra={"run_id": run_id, "success": total_ok, "failed": total_fail})


Writing ads_to_salesforce_func/AdsToSalesforceDaily/__init__.py


In [10]:
%%writefile ads_to_salesforce_func/shared/__init__.py
# shared package


Writing ads_to_salesforce_func/shared/__init__.py


In [11]:
%%writefile ads_to_salesforce_func/shared/config.py
import os
from dataclasses import dataclass


def _env(name: str, default: str | None = None) -> str:
    v = os.getenv(name, default)
    if v is None or v == "":
        raise ValueError(f"Missing required environment variable: {name}")
    return v


@dataclass(frozen=True)
class Config:
    environment: str
    key_vault_url: str

    blob_connection_string: str
    blob_container: str

    sf_login_url: str
    sf_api_version: str
    sf_secret_name_username: str
    sf_secret_name_password: str
    sf_secret_name_token: str

    google_ads_secret_name_dev_token: str
    google_ads_secret_name_client_id: str
    google_ads_secret_name_client_secret: str
    google_ads_secret_name_refresh_token: str
    google_ads_customer_id: str

    meta_secret_name_access_token: str
    meta_ad_account_id: str

    sf_campaign_object_api_name: str
    sf_external_id_field: str

    def google_ads_secret_names(self) -> dict:
        return {
            "developer_token": self.google_ads_secret_name_dev_token,
            "client_id": self.google_ads_secret_name_client_id,
            "client_secret": self.google_ads_secret_name_client_secret,
            "refresh_token": self.google_ads_secret_name_refresh_token,
        }


def load_config() -> Config:
    return Config(
        environment=os.getenv("ENVIRONMENT", "local"),
        key_vault_url=_env("KEY_VAULT_URL"),

        blob_connection_string=_env("BLOB_CONNECTION_STRING"),
        blob_container=_env("BLOB_CONTAINER", "ads-archive"),

        sf_login_url=_env("SF_LOGIN_URL", "https://test.salesforce.com"),
        sf_api_version=_env("SF_API_VERSION", "59.0"),
        sf_secret_name_username=_env("SF_SECRET_NAME_USERNAME"),
        sf_secret_name_password=_env("SF_SECRET_NAME_PASSWORD"),
        sf_secret_name_token=_env("SF_SECRET_NAME_TOKEN"),

        google_ads_secret_name_dev_token=_env("GOOGLE_ADS_SECRET_NAME_DEV_TOKEN"),
        google_ads_secret_name_client_id=_env("GOOGLE_ADS_SECRET_NAME_CLIENT_ID"),
        google_ads_secret_name_client_secret=_env("GOOGLE_ADS_SECRET_NAME_CLIENT_SECRET"),
        google_ads_secret_name_refresh_token=_env("GOOGLE_ADS_SECRET_NAME_REFRESH_TOKEN"),
        google_ads_customer_id=_env("GOOGLE_ADS_CUSTOMER_ID"),

        meta_secret_name_access_token=_env("META_SECRET_NAME_ACCESS_TOKEN"),
        meta_ad_account_id=_env("META_AD_ACCOUNT_ID"),

        sf_campaign_object_api_name=_env("SF_CAMPAIGN_OBJECT_API_NAME"),
        sf_external_id_field=_env("SF_EXTERNAL_ID_FIELD"),
    )


Writing ads_to_salesforce_func/shared/config.py


In [12]:
%%writefile ads_to_salesforce_func/shared/logging_utils.py
import logging
import json
from datetime import datetime, timezone


class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        payload = {
            "ts": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }

        # include extras
        for k, v in record.__dict__.items():
            if k in (
                "msg", "args", "levelname", "levelno", "name", "pathname", "filename",
                "module", "exc_info", "exc_text", "stack_info", "lineno", "funcName",
                "created", "msecs", "relativeCreated", "thread", "threadName",
                "processName", "process"
            ):
                continue
            try:
                json.dumps(v)
                payload[k] = v
            except Exception:
                payload[k] = str(v)

        if record.exc_info:
            payload["exc_info"] = self.formatException(record.exc_info)

        return json.dumps(payload, ensure_ascii=False)


def get_logger(name: str) -> logging.Logger:
    logger = logging.getLogger(name)
    if not logger.handlers:
        h = logging.StreamHandler()
        h.setFormatter(JsonFormatter())
        logger.addHandler(h)
        logger.setLevel(logging.INFO)
    return logger


Writing ads_to_salesforce_func/shared/logging_utils.py


In [13]:
%%writefile ads_to_salesforce_func/shared/keyvault_client.py
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient


class KeyVaultSecrets:
    def __init__(self, vault_url: str):
        self._credential = DefaultAzureCredential()
        self._client = SecretClient(vault_url=vault_url, credential=self._credential)

    def get(self, secret_name: str) -> str:
        return self._client.get_secret(secret_name).value


Writing ads_to_salesforce_func/shared/keyvault_client.py


In [14]:
%%writefile ads_to_salesforce_func/shared/blob_archive.py
import json
from azure.storage.blob import BlobServiceClient


class BlobArchiver:
    def __init__(self, container_name: str, connection_string: str):
        self._svc = BlobServiceClient.from_connection_string(connection_string)
        self._container = self._svc.get_container_client(container_name)
        try:
            self._container.create_container()
        except Exception:
            pass

    def archive_json(self, path: str, data, metadata: dict | None = None) -> None:
        blob = self._container.get_blob_client(path)
        payload = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8")
        blob.upload_blob(payload, overwrite=True, metadata=metadata or {})


Writing ads_to_salesforce_func/shared/blob_archive.py


In [15]:
%%writefile ads_to_salesforce_func/shared/utils.py
import hashlib
from datetime import datetime, timezone


def iso_ts(dt: datetime) -> str:
    return dt.astimezone(timezone.utc).strftime("%Y%m%dT%H%M%SZ")


def safe_int(v, default=0) -> int:
    try:
        if v is None or v == "":
            return default
        return int(float(v))
    except Exception:
        return default


def safe_float(v, default=0.0) -> float:
    try:
        if v is None or v == "":
            return default
        return float(v)
    except Exception:
        return default


def make_external_id(source: str, date_str: str, campaign_id: str, adset_id: str | None = None) -> str:
    raw = f"{source}|{date_str}|{campaign_id}|{adset_id or ''}"
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()


Writing ads_to_salesforce_func/shared/utils.py


In [16]:
%%writefile ads_to_salesforce_func/shared/google_ads_client.py
from datetime import date
from tenacity import retry, stop_after_attempt, wait_exponential
from google.ads.googleads.client import GoogleAdsClient


@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
def fetch_google_ads_daily(secrets, customer_id: str, target_date: date, secret_names: dict) -> dict:
    dev_token = secrets.get(secret_names["developer_token"])
    client_id = secrets.get(secret_names["client_id"])
    client_secret = secrets.get(secret_names["client_secret"])
    refresh_token = secrets.get(secret_names["refresh_token"])

    client = GoogleAdsClient.load_from_dict({
        "developer_token": dev_token,
        "client_id": client_id,
        "client_secret": client_secret,
        "refresh_token": refresh_token,
        "use_proto_plus": True
    })

    ga_service = client.get_service("GoogleAdsService")

    query = f"""
        SELECT
          segments.date,
          campaign.id,
          campaign.name,
          metrics.impressions,
          metrics.clicks,
          metrics.cost_micros,
          metrics.conversions
        FROM campaign
        WHERE segments.date = '{target_date.isoformat()}'
    """

    rows = []
    stream = ga_service.search_stream(customer_id=customer_id, query=query)
    for batch in stream:
        for r in batch.results:
            rows.append({
                "date": str(r.segments.date),
                "campaign_id": str(r.campaign.id),
                "campaign_name": r.campaign.name,
                "impressions": int(r.metrics.impressions),
                "clicks": int(r.metrics.clicks),
                "cost_micros": int(r.metrics.cost_micros),
                "conversions": float(r.metrics.conversions),
            })

    return {"source": "google_ads", "customer_id": customer_id, "date": target_date.isoformat(), "rows": rows}


Writing ads_to_salesforce_func/shared/google_ads_client.py


In [17]:
%%writefile ads_to_salesforce_func/shared/meta_ads_client.py
from datetime import date
from tenacity import retry, stop_after_attempt, wait_exponential
import requests


@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
def fetch_meta_ads_daily(secrets, ad_account_id: str, target_date: date, access_token_secret_name: str) -> dict:
    access_token = secrets.get(access_token_secret_name)

    graph_version = "v20.0"
    base_url = f"https://graph.facebook.com/{graph_version}/{ad_account_id}/insights"

    params = {
        "access_token": access_token,
        "level": "campaign",
        "time_range": {"since": target_date.isoformat(), "until": target_date.isoformat()},
        "time_increment": 1,
        "fields": ",".join([
            "date_start",
            "campaign_id",
            "campaign_name",
            "impressions",
            "clicks",
            "spend",
            "actions"
        ]),
        "limit": 500
    }

    rows = []
    paging = {}
    url = base_url

    while True:
        resp = requests.get(url, params=params, timeout=60)
        resp.raise_for_status()
        data = resp.json()
        rows.extend(data.get("data", []))
        paging = data.get("paging", {})

        next_url = paging.get("next")
        if not next_url:
            break
        url = next_url
        params = {}

    return {"source": "meta_ads", "ad_account_id": ad_account_id, "date": target_date.isoformat(), "rows": rows}


Writing ads_to_salesforce_func/shared/meta_ads_client.py


In [18]:
%%writefile ads_to_salesforce_func/shared/mapper.py
from shared.utils import make_external_id, safe_int, safe_float


def normalize_google_rows(google_raw: dict) -> list[dict]:
    records = []
    for r in google_raw.get("rows", []):
        spend = safe_int(r.get("cost_micros"), 0) / 1_000_000.0

        records.append({
            # Must exist in Salesforce and be External ID + Unique
            "External_Id__c": make_external_id("google", r.get("date", ""), r.get("campaign_id", ""), None),

            # === Map these to YOUR Salesforce fields ===
            "Source__c": "Google Ads",
            "Date__c": r.get("date"),
            "Campaign_Id__c": r.get("campaign_id"),
            "Campaign_Name__c": r.get("campaign_name"),

            "Impressions__c": safe_int(r.get("impressions"), 0),
            "Clicks__c": safe_int(r.get("clicks"), 0),
            "Spend__c": float(spend),
            "Conversions__c": safe_float(r.get("conversions"), 0.0),

            # UTM placeholders (replace once you decide your UTM capture strategy)
            "UTM_Source__c": "google",
            "UTM_Medium__c": "cpc",
            "UTM_Campaign__c": r.get("campaign_name"),
        })
    return records


def normalize_meta_rows(meta_raw: dict) -> list[dict]:
    records = []
    for r in meta_raw.get("rows", []):
        conversions = 0.0
        for a in (r.get("actions") or []):
            if a.get("action_type") in ("lead", "purchase"):
                try:
                    conversions += float(a.get("value", 0))
                except Exception:
                    pass

        records.append({
            "External_Id__c": make_external_id("meta", r.get("date_start", ""), r.get("campaign_id", ""), None),

            "Source__c": "Meta Ads",
            "Date__c": r.get("date_start"),
            "Campaign_Id__c": r.get("campaign_id"),
            "Campaign_Name__c": r.get("campaign_name"),

            "Impressions__c": safe_int(r.get("impressions"), 0),
            "Clicks__c": safe_int(r.get("clicks"), 0),
            "Spend__c": safe_float(r.get("spend"), 0.0),
            "Conversions__c": float(conversions),

            "UTM_Source__c": "facebook",
            "UTM_Medium__c": "paid_social",
            "UTM_Campaign__c": r.get("campaign_name"),
        })
    return records


Writing ads_to_salesforce_func/shared/mapper.py


In [19]:
import os

print("Root:", os.listdir("ads_to_salesforce_func"))
print("Daily:", os.listdir("ads_to_salesforce_func/AdsToSalesforceDaily"))
print("Shared:", os.listdir("ads_to_salesforce_func/shared"))


Root: ['AdsToSalesforceDaily', 'host.json', 'local.settings.json', 'requirements.txt', 'shared']
Daily: ['function.json', '__init__.py']
Shared: ['blob_archive.py', 'config.py', 'google_ads_client.py', 'keyvault_client.py', 'logging_utils.py', 'mapper.py', 'meta_ads_client.py', 'utils.py', '__init__.py']
