In [None]:
import os
import time
import json
import gzip
import asyncio
from io import BytesIO
from datetime import datetime, timedelta, timezone

import requests
import dotenv
import jwt
from jwt import ExpiredSignatureError, InvalidTokenError
from aiohttp import web
from sqlalchemy import create_engine, text
from sendgrid import SendGridAPIClient

import bspump
from bspump.http.web.server import WebRouteSource, JSONWebSink
from bspump.jupyter import *
from bspump.trigger import CronTrigger
from bspump.abc.source import TriggerSource
from bspump.common.print import PPrintSink

In [None]:
class SendGridSyncSource(TriggerSource):
    async def cycle(self, *args, **kwargs):
        await self.Pipeline.ready()
        now = datetime.now(timezone.utc)
        await self.Pipeline.process({
            "time_triggered": datetime.now(timezone.utc).isoformat()
        })


In [None]:
auto_pipeline(
    source=lambda app, pipeline: SendGridSyncSource(app, pipeline).on(
        CronTrigger(app, "*/10 * * * *", init_time=datetime.now())  # every hour
    ),
    sink=lambda app, pipeline: bspump.common.PPrintSink(app, pipeline),
)

In [4]:




dotenv.load_dotenv("/home/coder/workspace/secrets/sendgrid")
SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY")
sg = SendGridAPIClient(SENDGRID_API_KEY)
DATABASE_URL = os.getenv("DATABASE_URL")



In [None]:
BATCH_SIZE = 1000   
SEARCH_CHUNK = 50   


def find_unsynced_contacts(engine):
    with engine.connect() as conn:
        rows = conn.execute(text("""
            SELECT contact_id, email, first_name, last_name
            FROM contacts
            WHERE sendgrid_id IS NULL
        """)).fetchall()

    contacts = []
    for r in rows:
        contacts.append({
            "email": r.email,
            "first_name": r.first_name,
            "last_name": r.last_name,
        })
    return contacts


def upsert_contacts_batch(contacts: list) -> dict:
    resp = sg.client.marketing.contacts.put(request_body={"contacts": contacts})

    try:
        data = json.loads(resp.body.decode()) if resp.body else {}
    except Exception:
        data = {}

    return {
        "ok": 200 <= resp.status_code < 300,
        "status": resp.status_code,
        "job_id": data.get("job_id") or data.get("id"),
        "body": data,
    }


def poll_for_job_completion(job_id):
    endpoint = sg.client.marketing.contacts.imports._(job_id)

    while True:
        resp = endpoint.get()
        if resp.status_code != 200:
            raise Exception(f"Error checking job {job_id}: {resp.body}")

        body = json.loads(resp.body.decode())
        status = body.get("status")
        print(f"⏳ Polling import job {job_id}... status: {status}")

        if status == "completed":
            return body
        if status == "failed":
            raise Exception(f"SendGrid import job {job_id} failed: {body}")

        time.sleep(10)


def fetch_sendgrid_ids_for_emails(emails: list[str]) -> dict:
    emails = [e.lower() for e in emails]  
    email_list = ",".join([f"'{e}'" for e in emails])
    query = f"email IN ({email_list})"

    resp = sg.client.marketing.contacts.search.post(request_body={"query": query})
    if resp.status_code != 200:
        raise Exception(f"Search API failed: {resp.status_code} {resp.body}")

    raw = resp.body
    if isinstance(raw, (bytes, bytearray)):
        raw = raw.decode("utf-8")
    if isinstance(raw, str):
        data = json.loads(raw)
    else:
        data = raw  

    mapping = {}
    for entry in data.get("result", []):
        email = entry.get("email")
        sgid = entry.get("id")
        if email and sgid:
            mapping[email.lower()] = sgid

    return mapping


def backfill_sendgrid_ids(engine, emails: list[str]):
    updated = 0
    for i in range(0, len(emails), SEARCH_CHUNK):
        chunk = emails[i:i+SEARCH_CHUNK]
        mapping = fetch_sendgrid_ids_for_emails(chunk)

        with engine.begin() as conn:
            for email, sgid in mapping.items():
                result = conn.execute(text("""
                    UPDATE contacts
                    SET sendgrid_id = :sgid,
                        last_synced_at = now()
                    WHERE lower(email) = :email
                      AND sendgrid_id IS NULL
                """), {"sgid": sgid, "email": email})
                updated += result.rowcount
    print(f"✅ Backfilled {updated} sendgrid_id values.")


def run_contact_sync_pipeline(engine):
    contacts = find_unsynced_contacts(engine)
    if not contacts:
        print("✅ All contacts already synced.")
        return

    print(f"Found {len(contacts)} unsynced contacts. Starting upserts...")

    job_ids = []
    for i in range(0, len(contacts), BATCH_SIZE):
        batch = contacts[i:i+BATCH_SIZE]
        print(f"➡ Upserting batch {i//BATCH_SIZE + 1} with {len(batch)} contacts...")
        result = upsert_contacts_batch(batch)
        print("SendGrid response:", result)

        if not result["ok"]:
            raise Exception(f"❌ Upsert failed: {result}")

        if result["job_id"]:
            job_ids.append(result["job_id"])

    print("All upsert jobs submitted. Waiting for completion...")
    for job_id in job_ids:
        poll_for_job_completion(job_id)
    print("✅ All upsert jobs completed.")

    emails = [c["email"] for c in contacts]
    print("\n🔄 Fetching SendGrid IDs via Search API...")
    backfill_sendgrid_ids(engine, emails)

    print("🏁 Pipeline finished successfully.")


try:
    engine = create_engine(DATABASE_URL, echo=False)
    run_contact_sync_pipeline(engine)
except Exception as e:
    print("Error during contact sync pipeline:", e)


Found 1 unsynced contacts. Starting upserts...
➡ Upserting batch 1 with 1 contacts...
SendGrid response: {'ok': True, 'status': 202, 'job_id': '46c095e6-8b97-4efd-af47-20952ea77260', 'body': {'job_id': '46c095e6-8b97-4efd-af47-20952ea77260'}}
All upsert jobs submitted. Waiting for completion...
⏳ Polling import job 46c095e6-8b97-4efd-af47-20952ea77260... status: pending
⏳ Polling import job 46c095e6-8b97-4efd-af47-20952ea77260... status: pending
⏳ Polling import job 46c095e6-8b97-4efd-af47-20952ea77260... status: pending
⏳ Polling import job 46c095e6-8b97-4efd-af47-20952ea77260... status: pending
⏳ Polling import job 46c095e6-8b97-4efd-af47-20952ea77260... status: pending
⏳ Polling import job 46c095e6-8b97-4efd-af47-20952ea77260... status: completed
✅ All upsert jobs completed.

🔄 Fetching SendGrid IDs via Search API...
✅ Backfilled 1 sendgrid_id values.
🏁 Pipeline finished successfully.
