# Salesforce custom fields (JWT + REST)

This notebook uses the existing JWT setup to obtain an access token once and reuse it in later cells to inspect and update custom fields (starting with Account).

Assumptions:
- You're running Jupyter somewhere inside the repo; the notebook will search upwards to find `agents/python/.env.local`.
- Salesforce connection settings are in `agents/python/.env` and `agents/python/.env.local`.


In [4]:
from __future__ import annotations

import os
import time
from pathlib import Path

import httpx
import jwt
from dotenv import load_dotenv

# ---- Locate agents/python regardless of where Jupyter was started ----

def find_python_dir() -> Path:
    cwd = Path.cwd()
    for root in (cwd, *cwd.parents):
        candidate = root / "agents" / "python"
        if (candidate / ".env.local").exists() or (candidate / ".env").exists():
            return candidate
    raise RuntimeError("Could not find agents/python/.env[.local] from current working directory")


PYTHON_DIR = find_python_dir()
PROJECT_ROOT = PYTHON_DIR.parent.parent
ENV_BASE = PYTHON_DIR / ".env"
ENV_LOCAL = PYTHON_DIR / ".env.local"

# Load base first, then local overrides
if ENV_BASE.exists():
    load_dotenv(ENV_BASE, override=True)
if ENV_LOCAL.exists():
    load_dotenv(ENV_LOCAL, override=True)

required_keys = [
    "SF_CLIENT_ID",
    "SF_USERNAME",
    "SF_LOGIN_URL",
    "SF_AUDIENCE",
    "SF_JWT_KEY_PATH",
]
missing = [k for k in required_keys if not os.environ.get(k)]
if missing:
    raise RuntimeError(f"Missing required env vars: {', '.join(missing)}")

SF_CLIENT_ID = os.environ["SF_CLIENT_ID"]
SF_USERNAME = os.environ["SF_USERNAME"]
SF_LOGIN_URL = os.environ["SF_LOGIN_URL"].rstrip("/")
SF_AUDIENCE = os.environ["SF_AUDIENCE"]
SF_JWT_KEY_PATH = os.environ["SF_JWT_KEY_PATH"]


In [23]:
def build_jwt_assertion() -> str:
    """Build a JWT assertion for the configured Connected App."""
    now = int(time.time())
    audience = SF_AUDIENCE
    if audience and not audience.startswith("http"):
        audience = f"https://{audience}"
    payload = {
        "iss": SF_CLIENT_ID,
        "sub": SF_USERNAME,
        "aud": audience,
        "exp": now + 5 * 60,
    }

    key_path = Path(SF_JWT_KEY_PATH).expanduser()
    if not key_path.is_absolute():
        candidates = [
            (PROJECT_ROOT / key_path).resolve(),
            (PYTHON_DIR / key_path).resolve(),
            (PROJECT_ROOT / "config" / key_path.name).resolve(),
        ]
        for c in candidates:
            if c.exists():
                key_path = c
                break
    if not key_path.exists():
        raise FileNotFoundError(f"Private key not found at {key_path}")

    private_key = key_path.read_bytes()
    assertion = jwt.encode(payload, private_key, algorithm="RS256")
    return assertion


In [24]:
def request_access_token(assertion: str) -> tuple[str, str]:
    """Exchange JWT assertion for an access token and instance URL."""
    login_url = SF_LOGIN_URL
    if not login_url.startswith("http"):
        login_url = f"https://{login_url}"
    token_url = f"{login_url}/services/oauth2/token"
    data = {
        "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
        "assertion": assertion,
        "client_id": SF_CLIENT_ID,
    }
    with httpx.Client(timeout=30) as client:
        resp = client.post(token_url, data=data)
    resp.raise_for_status()
    j = resp.json()
    return j["access_token"], j["instance_url"]


# Obtain and cache the token for later cells
assertion = build_jwt_assertion()
SF_ACCESS_TOKEN, SF_INSTANCE_URL = request_access_token(assertion)

SF_INSTANCE_URL, SF_ACCESS_TOKEN[:40] + "..."


('https://force-enterprise-762-dev-ed.scratch.my.salesforce.com',
 '00DAu00000Dmn5R!AQEAQESzDu2KWuM6JCYdt2PZ...')

In [16]:
# Helper for later cells: simple REST client using the cached token

def sf_request(path: str, method: str = "GET", **kwargs):
    """Call a Salesforce REST path with the cached bearer token.

    Example path: "services/data/v65.0/sobjects/Account/describe".
    """
    if not path.startswith("http"):
        url = f"{SF_INSTANCE_URL.rstrip('/')}/{path.lstrip('/')}"
    else:
        url = path
    headers = kwargs.pop("headers", {})
    headers.setdefault("Authorization", f"Bearer {SF_ACCESS_TOKEN}")
    headers.setdefault("Accept", "application/json")

    with httpx.Client(timeout=30) as client:
        resp = client.request(method, url, headers=headers, **kwargs)
    resp.raise_for_status()
    if "application/json" in resp.headers.get("Content-Type", ""):
        return resp.json()
    return resp.text


# Quick smoke test: describe Account to confirm the token works
describe_account = sf_request("services/data/v65.0/sobjects/Account/describe")
[field["name"] for field in describe_account.get("fields", [])[:10]]


['Id',
 'IsDeleted',
 'MasterRecordId',
 'Name',
 'Type',
 'ParentId',
 'BillingStreet',
 'BillingCity',
 'BillingState',
 'BillingPostalCode']

In [21]:
import csv
import urllib.parse

# Use same API version as the rest of the project
API_VERSION = os.environ.get("SF_API_VERSION", "65.0")

def soql_query(soql: str) -> list[dict]:
    """Run a SOQL query using the cached token."""
    encoded = urllib.parse.quote(soql)
    res = sf_request(f"services/data/v{API_VERSION}/query?q={encoded}")
    records = res["records"]
    while not res.get("done"):
        res = sf_request(res["nextRecordsUrl"])
        records.extend(res["records"])
    return records

# Build a map: Website -> Account record from Salesforce
accounts_sf = soql_query(
    "SELECT Id, Name, Website FROM Account WHERE Website LIKE 'https://%.example'"
)
accounts_by_website = {r["Website"]: r for r in accounts_sf}

len(accounts_by_website), list(accounts_by_website.keys())

(8,
 ['https://www.acmehealth.example',
  'https://www.northernlightsbank.example',
  'https://www.greenfieldlogistics.example',
  'https://www.blueskyretail.example',
  'https://www.vertexmfg.example',
  'https://www.urbangridenergy.example',
  'https://www.cloudnova.example',
  'https://www.everestinsurance.example'])

In [19]:
# Query accounts again to get AccountExtId__c and create the lookup map
accounts_sf = soql_query(
    "SELECT Id, Name, Website, AccountExtId__c FROM Account"
)

# Create lookup dictionary by external ID (for faster lookups in next cell)
accounts_by_extid = {
    acc.get("AccountExtId__c"): acc 
    for acc in accounts_sf 
    if acc.get("AccountExtId__c")
}

print(f"Found {len(accounts_sf)} total accounts in Salesforce")
print(f"Found {len(accounts_by_extid)} accounts with external IDs set")
print(f"\nSample external IDs: {list(accounts_by_extid.keys())[:3]}")

Found 9 total accounts in Salesforce
Found 0 accounts with external IDs set

Sample external IDs: []


In [20]:
from pathlib import Path

# Load the CSV source of truth
csv_path = PROJECT_ROOT / "data" / "accounts.csv"

accounts_csv: list[dict] = []
with csv_path.open(newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        accounts_csv.append(row)

def to_float(v: str | None):
    v = (v or "").strip()
    return float(v) if v else None

def to_int(v: str | None):
    v = (v or "").strip()
    return int(float(v)) if v else None

success = 0
failed: list[tuple] = []

for row in accounts_csv:
    ext_id = row.get("AccountExtId__c")
    if not ext_id:
        failed.append((None, "missing AccountExtId__c in CSV"))
        continue

    sf_acc = accounts_by_extid.get(ext_id)
    if not sf_acc:
        # Fallback: try matching by Website the first time,
        # then we write AccountExtId__c so future runs can use extId only.
        website = row["Website"]
        sf_acc = next(
            (r for r in accounts_sf if r["Website"] == website),
            None,
        )
        if not sf_acc:
            failed.append((ext_id, "not found in SF by extId or website"))
            continue

    acc_id = sf_acc["Id"]

    # Build payload of custom fields from CSV, including AccountExtId__c
    payload = {
        "AccountExtId__c": ext_id,
        "HealthScore__c": to_int(row.get("HealthScore__c")),
        "ChurnRisk__c": (row.get("ChurnRisk__c") or None),
        "Customer_Since__c": (row.get("Customer_Since__c") or None),
        "Segment__c": (row.get("Segment__c") or None),
        "ARR__c": to_float(row.get("ARR__c")),
        "MRR__c": to_float(row.get("MRR__c")),
        "Support_Tier__c": (row.get("Support_Tier__c") or None),
    }

    # Drop None values so we don't overwrite existing data with null
    payload = {k: v for k, v in payload.items() if v is not None}

    try:
        sf_request(
            f"services/data/v{API_VERSION}/sobjects/Account/{acc_id}",
            method="PATCH",
            json=payload,
        )
        success += 1
        # Keep local map in sync in case you re-use it later in the notebook
        accounts_by_extid[ext_id] = {**sf_acc, "AccountExtId__c": ext_id}
    except httpx.HTTPStatusError as e:
        failed.append((ext_id, e.response.status_code, e.response.text[:200]))

success, failed[:5]

(8, [])

In [8]:
!sf data query --target-org MyFirstScratch --query "SELECT Name, ARR__c, MRR__c, HealthScore__c, ChurnRisk__c, Segment__c, Support_Tier__c,Customer_Since__c FROM Account ORDER BY Name"

Querying Data...
[G[1A[JQuerying Data... [35m⣾[39m
[G[1A[JQuerying Data... [35m⣽[39m
[G[1A[JQuerying Data... [35m⣻[39m
[G[1A[JQuerying Data... [35m⢿[39m
[G[1A[J[?25l[?25h┌──────┬────────┬────────┬─────────────┬──────────────┬────────────┬─────┬─────┐
│[1m[34m NAME [39m[22m│[1m[34m ARR__C [39m[22m│[1m[34m MRR__C [39m[22m│[1m[34m HEALTHSCORE[39m[22m │[1m[34m CHURNRISK__C [39m[22m│[1m[34m SEGMENT__C [39m[22m│[1m[34m SUP[39m[22m │[1m[34m CUS[39m[22m │
│      │        │        │[1m[34m __C         [39m[22m│              │            │[1m[34m POR[39m[22m │[1m[34m TOM[39m[22m │
│      │        │        │             │              │            │[1m[34m T_T[39m[22m │[1m[34m ER_[39m[22m │
│      │        │        │             │              │            │[1m[34m IER[39m[22m │[1m[34m SIN[39m[22m │
│      │        │        │             │              │            │[1m[34m __C [39m[22m│[1m[34m CE_[39m[22

In [9]:
!sf sobject describe --sobject Account --target-org MyFirstScratch | rg "Segment__c|Support_Tier__c" || echo "no matches"

      [94m"name"[39m: [92m"[0m[1m[31mSegment__c[0m"[39m[32m,[39m
      [94m"name"[39m: [92m"[0m[1m[31mSupport_Tier__c[0m"[39m[32m,[39m


In [13]:
import csv
from pathlib import Path
import urllib.parse

# Use same API version as the rest of the project
API_VERSION = os.environ.get("SF_API_VERSION", "65.0")

def soql_query(soql: str) -> list[dict]:
    """Run a SOQL query using the cached token."""
    encoded = urllib.parse.quote(soql)
    res = sf_request(f"services/data/v{API_VERSION}/query?q={encoded}")
    records = res["records"]
    while not res.get("done"):
        res = sf_request(res["nextRecordsUrl"])
        records.extend(res["records"])
    return records

def to_float(v: str | None):
    v = (v or "").strip()
    return float(v) if v else None

def to_int(v: str | None):
    v = (v or "").strip()
    return int(float(v)) if v else None

def to_bool(v: str | None):
    v = (v or "").strip().lower()
    if not v:
        return None
    return v in ("true", "1", "yes", "y")

# 1) Build a map: (AccountExtId__c, Name) -> Opportunity from SF
opps_sf = soql_query(
    "SELECT Id, Name, Account.AccountExtId__c, OpportunityExtId__c "
    "FROM Opportunity WHERE Account.AccountExtId__c != null"
)

opps_by_key = {
    (r["Account"]["AccountExtId__c"], r["Name"]): r
    for r in opps_sf
}

len(opps_by_key)

# 2) Load CSV source of truth
opp_csv_path = PROJECT_ROOT / "data" / "opportunities.csv"
opps_csv: list[dict] = []
with opp_csv_path.open(newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        opps_csv.append(row)

# 3) Patch existing Opportunities with custom fields
success = 0
failed: list[tuple] = []

for row in opps_csv:
    acc_ext = row.get("AccountExtId__c")
    name = row.get("Name")
    key = (acc_ext, name)

    sf_opp = opps_by_key.get(key)
    if not sf_opp:
        failed.append((key, "not found in SF"))
        continue

    opp_id = sf_opp["Id"]

    payload = {
        # Backfill external ID
        "OpportunityExtId__c": (row.get("OpportunityExtId__c") or None),
        # Custom numeric fields
        "ARR__c": to_float(row.get("ARR__c")),
        "Term_Months__c": to_int(row.get("Term_Months__c")),
        # Boolean custom field
        "Renewal__c": to_bool(row.get("Renewal__c")),
        # Link back to original opp for renewals/expansions
        "Original_Opp_ExtId__c": (row.get("Original_Opp_ExtId__c") or None),
    }

    # Drop None so we don't overwrite existing values with null
    payload = {k: v for k, v in payload.items() if v is not None}

    try:
        sf_request(
            f"services/data/v{API_VERSION}/sobjects/Opportunity/{opp_id}",
            method="PATCH",
            json=payload,
        )
        success += 1
    except httpx.HTTPStatusError as e:
        failed.append((key, e.response.status_code, e.response.text[:200]))

success, failed[:5]

(16, [])

In [14]:
!sf data query --target-org MyFirstScratch --query "SELECT Name, OpportunityExtId__c, ARR__c, Renewal__c, Original_Opp_ExtId__c, Term_Months__c FROM Opportunity ORDER BY Name"

Querying Data...
[G[1A[JQuerying Data... [35m⣾[39m
[G[1A[JQuerying Data... [35m⣽[39m
[G[1A[JQuerying Data... [35m⣻[39m
[G[1A[JQuerying Data... [35m⢿[39m
[G[1A[J[?25l[?25h┌──────┬─────────────────────┬────────┬────────────┬──────────┬────────────────┐
│[1m[34m NAME [39m[22m│[1m[34m OPPORTUNITYEXTID__C [39m[22m│[1m[34m ARR__C [39m[22m│[1m[34m RENEWAL__C [39m[22m│[1m[34m ORIGINAL[39m[22m │[1m[34m TERM_MONTHS__C [39m[22m│
│      │                     │        │            │[1m[34m _OPP_EXT[39m[22m │                │
│      │                     │        │            │[1m[34m ID__C    [39m[22m│                │
├──────┼─────────────────────┼────────┼────────────┼──────────┼────────────────┤
│ Acme │ RC-OPP-0002         │ 330000 │ true       │ RC-OPP-0 │ 12             │
│ Heal │                     │        │            │ 001      │                │
│ th   │                     │        │            │          │                │
│ Sec

In [16]:
import csv
from pathlib import Path
import urllib.parse
from collections import defaultdict

# Ensure helpers exist
API_VERSION = os.environ.get("SF_API_VERSION", "65.0")

def soql_query(soql: str) -> list[dict]:
    encoded = urllib.parse.quote(soql)
    res = sf_request(f"services/data/v{API_VERSION}/query?q={encoded}")
    records = res["records"]
    while not res.get("done"):
        res = sf_request(res["nextRecordsUrl"])
        records.extend(res["records"])
    return records

def to_float(v: str | None):
    v = (v or "").strip()
    return float(v) if v else None

def to_int(v: str | None):
    v = (v or "").strip()
    return int(float(v)) if v else None

def to_bool(v: str | None):
    v = (v or "").strip().lower()
    if not v:
        return None
    return v in ("true", "1", "yes", "y")

results = {}

# ----------------------------------------
# 1) Contacts: ContactExtId__c, Role__c, Decision_Role__c
# ----------------------------------------

contacts_sf = soql_query(
    "SELECT Id, Email, FirstName, LastName, ContactExtId__c, "
    "Account.AccountExtId__c "
    "FROM Contact "
    "WHERE Account.AccountExtId__c != null"
)

contacts_by_key = {
    (r["Account"]["AccountExtId__c"], r["Email"]): r
    for r in contacts_sf
}

contacts_csv_path = PROJECT_ROOT / "data" / "contacts.csv"
contacts_csv: list[dict] = []
with contacts_csv_path.open(newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        contacts_csv.append(row)

c_success = 0
c_failed: list[tuple] = []

for row in contacts_csv:
    acc_ext = row.get("AccountExtId__c")
    email = row.get("Email")
    key = (acc_ext, email)

    sf_con = contacts_by_key.get(key)
    if not sf_con:
        c_failed.append((key, "not found in SF"))
        continue

    con_id = sf_con["Id"]
    payload = {
        "ContactExtId__c": (row.get("ContactExtId__c") or None),
        "Role__c": (row.get("Role__c") or None),
        "Decision_Role__c": (row.get("Decision_Role__c") or None),
    }
    payload = {k: v for k, v in payload.items() if v is not None}

    try:
        sf_request(
            f"services/data/v{API_VERSION}/sobjects/Contact/{con_id}",
            method="PATCH",
            json=payload,
        )
        c_success += 1
    except httpx.HTTPStatusError as e:
        c_failed.append((key, e.response.status_code, e.response.text[:200]))

results["contacts"] = (c_success, c_failed[:5])

# ----------------------------------------
# 2) Cases: CaseExtId__c, SLA_Due__c, First_Response_Time_Min__c, Resolve_Time_Min__c
# ----------------------------------------

cases_sf = soql_query(
    "SELECT Id, Subject, Status, Priority, Origin, CaseExtId__c, "
    "Account.AccountExtId__c "
    "FROM Case "
    "WHERE Account.AccountExtId__c != null"
)

cases_by_key: dict[tuple, list[dict]] = defaultdict(list)
for r in cases_sf:
    key = (r["Account"]["AccountExtId__c"], r["Subject"])
    cases_by_key[key].append(r)

cases_csv_path = PROJECT_ROOT / "data" / "cases.csv"
cases_csv: list[dict] = []
with cases_csv_path.open(newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        cases_csv.append(row)

cs_success = 0
cs_failed: list[tuple] = []

for row in cases_csv:
    acc_ext = row.get("AccountExtId__c")
    subject = row.get("Subject")
    key = (acc_ext, subject)

    sf_cases = cases_by_key.get(key)
    if not sf_cases:
        cs_failed.append((key, "no matching Case records in SF"))
        continue

    for sf_case in sf_cases:
        case_id = sf_case["Id"]
        payload = {
            "CaseExtId__c": (row.get("CaseExtId__c") or None),
            "SLA_Due__c": (row.get("SLA_Due__c") or None),
            "First_Response_Time_Min__c": to_int(row.get("First_Response_Time_Min__c")),
            "Resolve_Time_Min__c": to_int(row.get("Resolve_Time_Min__c")),
        }
        payload = {k: v for k, v in payload.items() if v is not None}

        try:
            sf_request(
                f"services/data/v{API_VERSION}/sobjects/Case/{case_id}",
                method="PATCH",
                json=payload,
            )
            cs_success += 1
        except httpx.HTTPStatusError as e:
            cs_failed.append((key, e.response.status_code, e.response.text[:200]))

results["cases"] = (cs_success, cs_failed[:5])

# ----------------------------------------
# 3) Product2 / Pricebook2 / PricebookEntry ext IDs
# ----------------------------------------

# Product2: ProductExtId__c
prod_sf = soql_query("SELECT Id, Name, ProductExtId__c FROM Product2")
prod_by_name = {r["Name"]: r for r in prod_sf}

products_csv_path = PROJECT_ROOT / "data" / "products.csv"
products_csv: list[dict] = []
with products_csv_path.open(newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        products_csv.append(row)

p_success = 0
p_failed: list[tuple] = []

for row in products_csv:
    name = row.get("Name")
    sf_prod = prod_by_name.get(name)
    if not sf_prod:
        p_failed.append((name, "not found in SF"))
        continue

    prod_id = sf_prod["Id"]
    payload = {"ProductExtId__c": (row.get("ProductExtId__c") or None)}
    payload = {k: v for k, v in payload.items() if v is not None}

    try:
        sf_request(
            f"services/data/v{API_VERSION}/sobjects/Product2/{prod_id}",
            method="PATCH",
            json=payload,
        )
        p_success += 1
    except httpx.HTTPStatusError as e:
        p_failed.append((name, e.response.status_code, e.response.text[:200]))

results["products"] = (p_success, p_failed[:5])

# Pricebook2: Pricebook2ExtId__c (non-standard pricebooks)
pb_sf = soql_query(
    "SELECT Id, Name, IsStandard, Pricebook2ExtId__c "
    "FROM Pricebook2 WHERE IsStandard = false"
)
pb_by_name = {r["Name"]: r for r in pb_sf}

pricebooks_csv_path = PROJECT_ROOT / "data" / "pricebooks.csv"
pricebooks_csv: list[dict] = []
with pricebooks_csv_path.open(newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        pricebooks_csv.append(row)

pb_success = 0
pb_failed: list[tuple] = []

for row in pricebooks_csv:
    name = row.get("Name")
    sf_pb = pb_by_name.get(name)
    if not sf_pb:
        pb_failed.append((name, "not found in SF"))
        continue

    pb_id = sf_pb["Id"]
    payload = {"Pricebook2ExtId__c": (row.get("Pricebook2ExtId__c") or None)}
    payload = {k: v for k, v in payload.items() if v is not None}

    try:
        sf_request(
            f"services/data/v{API_VERSION}/sobjects/Pricebook2/{pb_id}",
            method="PATCH",
            json=payload,
        )
        pb_success += 1
    except httpx.HTTPStatusError as e:
        pb_failed.append((name, e.response.status_code, e.response.text[:200]))

results["pricebooks"] = (pb_success, pb_failed[:5])

# PricebookEntry: PricebookEntryExtId__c
pbe_sf = soql_query(
    "SELECT Id, UnitPrice, PricebookEntryExtId__c, "
    "Product2.ProductExtId__c, Pricebook2.Pricebook2ExtId__c "
    "FROM PricebookEntry "
    "WHERE Pricebook2.IsStandard = false"
)

pbe_by_key = {}
for r in pbe_sf:
    prod = r.get("Product2") or {}
    pb = r.get("Pricebook2") or {}
    prod_ext = prod.get("ProductExtId__c")
    pb_ext = pb.get("Pricebook2ExtId__c")
    if not prod_ext or not pb_ext:
        # Skip entries that don't have both ext IDs yet
        continue
    key = (prod_ext, pb_ext, str(r["UnitPrice"]))
    pbe_by_key[key] = r

pbes_csv_path = PROJECT_ROOT / "data" / "pricebook_entries.csv"
pbes_csv: list[dict] = []
with pbes_csv_path.open(newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        pbes_csv.append(row)

pbe_success = 0
pbe_failed: list[tuple] = []

for row in pbes_csv:
    key = (
        row.get("ProductExtId__c"),
        row.get("Pricebook2ExtId__c"),
        row.get("UnitPrice"),
    )
    sf_pbe = pbe_by_key.get(key)
    if not sf_pbe:
        pbe_failed.append((key, "not found in SF"))
        continue

    pbe_id = sf_pbe["Id"]
    payload = {"PricebookEntryExtId__c": (row.get("PricebookEntryExtId__c") or None)}
    payload = {k: v for k, v in payload.items() if v is not None}

    try:
        sf_request(
            f"services/data/v{API_VERSION}/sobjects/PricebookEntry/{pbe_id}",
            method="PATCH",
            json=payload,
        )
        pbe_success += 1
    except httpx.HTTPStatusError as e:
        pbe_failed.append((key, e.response.status_code, e.response.text[:200]))


results["pricebook_entries"] = (pbe_success, pbe_failed[:5])
results

{'contacts': (24, []),
 'cases': (15, []),
 'products': (8, []),
 'pricebooks': (1, []),
 'pricebook_entries': (0,
  [(('RC-PROD-0001', 'RC-PB-0001', '300000'), 'not found in SF'),
   (('RC-PROD-0002', 'RC-PB-0001', '60000'), 'not found in SF'),
   (('RC-PROD-0003', 'RC-PB-0001', '240000'), 'not found in SF'),
   (('RC-PROD-0004', 'RC-PB-0001', '180000'), 'not found in SF'),
   (('RC-PROD-0005', 'RC-PB-0001', '220000'), 'not found in SF')])}