In [9]:
!pip -q install -U fastapi pydantic email-validator uvicorn
import sys, json, re, uuid, csv, io, asyncio, hmac, hashlib, random
from typing import List, Dict, Tuple, Optional, Any, Callable, Set
from collections import deque, OrderedDict, defaultdict
from datetime import datetime, timedelta
from io import StringIO
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel, EmailStr, constr



In [2]:
import random, csv, json, os
from datetime import datetime, timedelta

random.seed(7)

def rand_phone():
    s = "".join(random.choice("0123456789") for _ in range(10))
    return f"({s[0:3]}) {s[3:6]}-{s[6:10]}"

def rand_email(name, i):
    doms = ["alpha.com","beta.io","gamma.org"]
    d = random.choice(doms)
    if i % 5 == 0:
        return f"{name}@{d}".upper()
    return f"{name}@{d}"

def iso(dt):
    return dt.strftime("%Y-%m-%dT%H:%M:%S")

now = datetime(2025,8,10,10,0,0)

contacts = []
for i in range(1,31):
    name = f"user{i}"
    email = rand_email(name, i)
    phone = rand_phone()
    if i % 7 == 0:
        phone = phone if i % 14 == 0 else contacts[-1]["phone"]
    updated = now - timedelta(hours=random.randint(0,72))
    contacts.append({"id":str(i),"name":name,"email":email,"phone":phone,"updated_at":iso(updated)})

leads = []
owners = ["u1","u2","u3"]
for i,c in enumerate(contacts[:20], start=1):
    created = now - timedelta(days=random.randint(0,14), hours=random.randint(0,10))
    converted = (i % 3 == 0)
    leads.append({
        "id":f"L{i}",
        "email":c["email"].lower(),
        "owner_id":random.choice(owners),
        "created_at":iso(created),
        "converted_at":iso(created+timedelta(days=3)) if converted else None
    })

opportunities = []
for i,l in enumerate(leads, start=1):
    if i % 2 == 0:
        stage = "Closed Won" if i % 4 == 0 else "Open"
        opportunities.append({
            "id":f"O{i}",
            "account_id":str((i%10)+1),
            "amount":1000+100*i,
            "stage":stage,
            "owner_id":l["owner_id"],
            "lead_id":l["id"],
            "created_at":l["created_at"],
            "updated_at":l["created_at"]
        })

tasks = []
for i in range(1,16):
    due = now + timedelta(minutes=random.choice([5,10,20,40,60]))
    status = "open" if i % 3 != 0 else "done"
    tasks.append({"id":str(i),"owner_id":random.choice(owners),"contact_id":str(random.randint(1,30)),"due_at":iso(due),"status":status})

logins = []
for u in range(1,6):
    first = datetime(2025,1,1) + timedelta(days=random.randint(0,20))
    for m in range(0,6):
        if random.random() < 0.7:
            ts = first + timedelta(days=30*m+random.randint(0,5))
            logins.append({"user_id":f"U{u}","ts":iso(ts)})

db = {"contacts":contacts,"leads":leads,"opportunities":opportunities,"tasks":tasks,"logins":logins}

with open("/content/crm_data.json","w") as f:
    json.dump(db,f)
with open("/content/contacts.csv","w",newline="") as f:
    w = csv.DictWriter(f, fieldnames=["id","name","email","phone","updated_at"])
    w.writeheader()
    for r in contacts:
        w.writerow(r)

print("data_ready", len(contacts), len(leads), len(opportunities), len(tasks), len(logins))


data_ready 30 20 10 15 21


In [5]:
import re, csv, asyncio, hashlib, hmac, uuid, json
from typing import List, Dict, Tuple, Optional, Any, Callable, Set
from collections import deque, OrderedDict, defaultdict
from datetime import datetime, timedelta
from io import StringIO
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr, constr

def normalize_email(s: Optional[str]) -> Optional[str]:
    if not s: return None
    s = s.strip().lower()
    return s or None

def normalize_phone(s: Optional[str]) -> Optional[str]:
    if not s: return None
    digits = re.sub(r"\D","",s)
    if digits.startswith("1") and len(digits) == 11:
        digits = digits[1:]
    return digits or None

def dedupe_contacts(records: List[Dict]) -> Tuple[List[Dict], Dict[str, str]]:
    seen = {}
    survivors = {}
    dup_map = {}
    for r in records:
        rid = str(r.get("id"))
        e = normalize_email(r.get("email"))
        p = normalize_phone(r.get("phone"))
        k = ("e", e) if e else ("p", p) if p else ("id", rid)
        if k in seen:
            sid = seen[k]
            sr = survivors[sid]
            sr["email"] = sr.get("email") or r.get("email")
            sr["phone"] = sr.get("phone") or r.get("phone")
            dup_map[rid] = sid
        else:
            seen[k] = rid
            survivors[rid] = dict(r)
    return list(survivors.values()), dup_map

def import_contacts_csv(csv_text: str, store: Dict[str, Dict[str, Any]]) -> Tuple[int,int]:
    added = 0
    updated = 0
    f = StringIO(csv_text)
    for row in csv.DictReader(f):
        email = row.get("email","").strip().lower()
        if not email:
            continue
        if email in store:
            before = dict(store[email])
            store[email].update(row)
            if store[email] != before:
                updated += 1
        else:
            store[email] = dict(row)
            added += 1
    return added, updated

class RateLimiter:
    def __init__(self, limit: int, window_seconds: int):
        self.limit = limit
        self.window = window_seconds
        self.qs: Dict[str, deque] = {}
    def allow(self, user_id: str, now: float = None) -> bool:
        if now is None:
            now = datetime.now().timestamp()
        q = self.qs.setdefault(user_id, deque())
        cutoff = now - self.window
        while q and q[0] <= cutoff:
            q.popleft()
        if len(q) < self.limit:
            q.append(now)
            return True
        return False

class IdempotencyStore:
    def __init__(self):
        self.store: Dict[str, Tuple[Any, bool]] = {}
    def call(self, key: str, fn: Callable[[], Any]) -> Any:
        if key in self.store:
            v, ok = self.store[key]
            if ok:
                return v
        try:
            v = fn()
            self.store[key] = (v, True)
            return v
        except Exception as e:
            self.store[key] = (e, False)
            raise

def parse_ts(s: str) -> float:
    return datetime.fromisoformat(s).timestamp()

def page_items(items: List[Dict], size: int, after: Optional[Tuple[float,str]]):
    arr = sorted(items, key=lambda r: (parse_ts(r["updated_at"]), r["id"]), reverse=True)
    start = 0
    if after is not None:
        for i, r in enumerate(arr):
            if (parse_ts(r["updated_at"]), r["id"]) == after:
                start = i + 1
                break
    slice_ = arr[start:start+size]
    next_after = None
    if len(slice_) == size:
        last = slice_[-1]
        next_after = (parse_ts(last["updated_at"]), last["id"])
    return slice_, next_after

def due_tasks(tasks: List[Dict], now: datetime, within_minutes: int) -> List[Dict]:
    end = now + timedelta(minutes=within_minutes)
    out = []
    for t in tasks:
        ts = datetime.fromisoformat(t["due_at"])
        if now <= ts <= end and t.get("status","open") == "open":
            out.append(t)
    return sorted(out, key=lambda x: (x["due_at"], x["id"]))

def verify_signature(secret: str, payload: bytes, signature: str) -> bool:
    mac = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
    return hmac.compare_digest(mac, signature.lower())

def weekly_conversion(leads: List[Dict]) -> List[Dict]:
    def wk(dt: datetime):
        y, w, _ = dt.isocalendar()
        return f"{y}-W{w:02d}"
    total = defaultdict(int)
    converted = defaultdict(int)
    for l in leads:
        d = datetime.fromisoformat(l["created_at"])
        key = wk(d)
        total[key] += 1
        if l.get("converted_at"):
            converted[key] += 1
    out = []
    for k in sorted(total.keys()):
        t = total[k]
        c = converted[k]
        rate = c / t if t else 0.0
        out.append({"week": k, "total": t, "converted": c, "rate": rate})
    return out

class LRU:
    def __init__(self, capacity: int):
        self.cap = capacity
        self.od = OrderedDict()
    def get(self, k: str):
        if k not in self.od:
            return None
        v = self.od.pop(k)
        self.od[k] = v
        return v
    def put(self, k: str, v: Any):
        if k in self.od:
            self.od.pop(k)
        elif len(self.od) >= self.cap:
            self.od.popitem(last=False)
        self.od[k] = v

def merge_records(a: Dict[str,Any], b: Dict[str,Any], prefer: str) -> Tuple[Dict[str,Any], Dict[str,Tuple[Any,Any]]]:
    out = dict(a if prefer=="a" else b)
    other = b if prefer=="a" else a
    out2 = dict(out)
    out2.update({k:v for k,v in other.items() if k not in out or out[k] in (None,"")})
    diff = {}
    keys = set(a.keys()) | set(b.keys())
    for k in keys:
        va = a.get(k)
        vb = b.get(k)
        if va != vb:
            diff[k] = (va, vb)
    return out2, diff

Role = str
Action = str
PERMS: Dict[Role, Set[Action]] = {
    "admin": {"read:any","write:any"},
    "manager": {"read:any","write:team"},
    "rep": {"read:own","write:own"},
}

def can(action: Action, role: Role, owner_id: str, user_id: str, team_ok: bool=False) -> bool:
    p = PERMS.get(role, set())
    if "read:any" in p and action.startswith("read"): return True
    if "write:any" in p and action.startswith("write"): return True
    if "write:team" in p and action.startswith("write") and team_ok: return True
    if "read:any" in p and action.startswith("search"): return True
    if "read:own" in p and owner_id == user_id and action.startswith(("read","search")): return True
    if "write:own" in p and owner_id == user_id and action.startswith("write"): return True
    return False

def leads_per_owner_month(leads: List[Dict]) -> List[Dict]:
    agg = defaultdict(int)
    for r in leads:
        dt = datetime.fromisoformat(r["created_at"])
        k = (r.get("owner_id",""), f"{dt.year}-{dt.month:02d}")
        agg[k] += 1
    out = [{"owner_id":k[0],"month":k[1],"total":v} for k,v in agg.items()]
    out.sort(key=lambda x: (x["month"], x["owner_id"]))
    return out

class Mailer:
    def __init__(self):
        self.q: asyncio.Queue = asyncio.Queue()
    async def enqueue(self, msg: Dict[str,Any]) -> None:
        await self.q.put(msg)
    async def run(self, send_fn: Callable[[Dict[str,Any]],Any]) -> None:
        while True:
            msg = await self.q.get()
            try:
                send_fn(msg)
            finally:
                self.q.task_done()

def export_opportunities_csv(opps: List[Dict], since_iso: str) -> str:
    headers = ["id","account_id","amount","stage","owner_id","created_at","updated_at"]
    f = StringIO()
    w = csv.DictWriter(f, fieldnames=headers)
    w.writeheader()
    for o in opps:
        if o["updated_at"] >= since_iso:
            w.writerow({k:o.get(k,"") for k in headers})
    return f.getvalue()

def import_contacts_csv_text(csv_text: str, store: Dict[str, Dict[str, Any]]) -> Dict[str,int]:
    added = updated = 0
    f = StringIO(csv_text)
    for row in csv.DictReader(f):
        email = row.get("email","").strip().lower()
        if not email:
            continue
        if email in store:
            before = dict(store[email])
            store[email].update(row)
            if store[email] != before:
                updated += 1
        else:
            store[email] = dict(row)
            added += 1
    return {"added":added,"updated":updated}

def apply_update_with_audit(entity: Dict[str,Any], changes: Dict[str,Any], user_id: str, audit_log: List[Dict[str,Any]]) -> Dict[str,Any]:
    before = {k: entity.get(k) for k in changes.keys()}
    entity.update(changes)
    after = {k: entity.get(k) for k in changes.keys()}
    audit_log.append({"at": datetime.utcnow().isoformat(),"user_id": user_id,"entity_id": str(entity.get("id","")),"before": before,"after": after})
    return entity

def sync_external_contacts(external_rows: List[Dict[str,Any]], store: Dict[str,Dict[str,Any]]) -> Dict[str,int]:
    added = updated = 0
    for r in external_rows:
        e = normalize_email(r.get("email"))
        p = normalize_phone(r.get("phone"))
        if not e and not p:
            continue
        k = e or p
        if k in store:
            before = dict(store[k])
            if e: store[k]["email"] = e
            if p: store[k]["phone"] = p
            for fld in ("name","source","updated_at"):
                if r.get(fld) not in (None,""): store[k][fld] = r.get(fld)
            if store[k] != before:
                updated += 1
        else:
            store[k] = {"id":r.get("id",k),"name":r.get("name",""),"email":e,"phone":p,"source":r.get("source"),"updated_at":r.get("updated_at")}
            added += 1
    return {"added":added,"updated":updated}

def funnel_metrics(leads: List[Dict], opps: List[Dict]) -> Dict[str,Any]:
    lead_ids = {str(x.get("id")) for x in leads}
    opp_by_lead = {str(o.get("lead_id")): o for o in opps if o.get("lead_id") is not None}
    n_leads = len(lead_ids)
    n_opps = sum(1 for lid in lead_ids if lid in opp_by_lead)
    n_won = sum(1 for lid in lead_ids if lid in opp_by_lead and str(opp_by_lead[lid].get("stage")) == "Closed Won")
    r_lo = (n_opps / n_leads) if n_leads else 0.0
    r_ow = (n_won / n_opps) if n_opps else 0.0
    r_lw = (n_won / n_leads) if n_leads else 0.0
    return {"leads": n_leads,"opportunities": n_opps,"won": n_won,"rate_lead_to_opp": r_lo,"rate_opp_to_won": r_ow,"rate_lead_to_won": r_lw}

def _ts(s: str) -> float:
    return datetime.fromisoformat(s).timestamp()

def search_and_page_contacts(contacts: List[Dict], q: str, size: int, after: Optional[Tuple[float,str]]) -> Tuple[List[Dict], Optional[Tuple[float,str]]]:
    ql = q.strip().lower()
    filt = []
    for c in contacts:
        name = (c.get("name") or "").lower()
        email = (c.get("email") or "").lower()
        if ql in name or ql in email:
            filt.append(c)
    arr = sorted(filt, key=lambda r: (_ts(r.get("updated_at","1970-01-01T00:00:00")), str(r.get("id",""))), reverse=True)
    start = 0
    if after is not None:
        for i,r in enumerate(arr):
            if (_ts(r.get("updated_at","1970-01-01T00:00:00")), str(r.get("id",""))) == after:
                start = i + 1
                break
    page = arr[start:start+size]
    nxt = None
    if len(page) == size:
        last = page[-1]
        nxt = (_ts(last.get("updated_at","1970-01-01T00:00:00")), str(last.get("id","")))
    return page, nxt

def monthly_cohort_retention(logins: List[Dict]) -> List[Dict]:
    first = {}
    for r in logins:
        u = str(r["user_id"])
        dt = datetime.fromisoformat(r["ts"])
        m = f"{dt.year}-{dt.month:02d}"
        if u not in first: first[u] = m
    cohorts = {}
    for r in logins:
        u = str(r["user_id"])
        dt = datetime.fromisoformat(r["ts"])
        m = f"{dt.year}-{dt.month:02d}"
        c = first[u]
        cohorts.setdefault(c, {}).setdefault(m, set()).add(u)
    out = []
    for c, buckets in cohorts.items():
        users = set().union(*buckets.values())
        size = len({u for u in users})
        months = sorted(buckets.keys())
        row = {"cohort": c, "size": size}
        for m2 in months:
            row[m2] = len(buckets[m2])
        out.append(row)
    out.sort(key=lambda x: x["cohort"])
    return out

def upsert_contact(store: Dict[str,Dict[str,Any]], row: Dict[str,Any]) -> Dict[str,Any]:
    e = normalize_email(row.get("email"))
    p = normalize_phone(row.get("phone"))
    k = e or p or str(row.get("id"))
    if k in store:
        store[k].update({k2:v for k2,v in row.items() if v not in (None,"")})
        return store[k]
    store[k] = dict(row)
    if e: store[k]["email"] = e
    if p: store[k]["phone"] = p
    return store[k]

def dedupe_contacts_simpler(records: List[Dict]) -> Tuple[List[Dict], Dict[str, str]]:
    def rid_or_new(r: Dict) -> str:
        rid = r.get("id")
        return str(rid) if rid is not None else f"tmp:{uuid.uuid4().hex}"
    parent: Dict[str, str] = {}
    rank: Dict[str, int] = {}
    order: Dict[str, int] = {}
    def find(x: str) -> str:
        while parent[x] != x:
            parent[x] = parent[parent[x]]
            x = parent[x]
        return x
    def union(a: str, b: str):
        ra, rb = find(a), find(b)
        if ra == rb: return
        if rank[ra] < rank[rb] or (rank[ra] == rank[rb] and order[ra] > order[rb]):
            ra, rb = rb, ra
        parent[rb] = ra
        if rank[ra] == rank[rb]:
            rank[ra] += 1
    key_owner: Dict[Tuple[str, str], str] = {}
    rows: Dict[str, Dict] = {}
    ids_in_order: List[str] = []
    for i, r in enumerate(records):
        rid = rid_or_new(r)
        if rid not in parent:
            parent[rid] = rid
            rank[rid] = 0
            order[rid] = i
            rows[rid] = dict(r)
            ids_in_order.append(rid)
        e = normalize_email(r.get("email"))
        p = normalize_phone(r.get("phone"))
        keys = []
        if e: keys.append(("e", e))
        if p: keys.append(("p", p))
        keys.append(("id", rid))
        for k in keys:
            if k in key_owner:
                union(rid, key_owner[k])
            else:
                key_owner[k] = rid
    groups: Dict[str, List[str]] = {}
    for rid in ids_in_order:
        root = find(rid)
        groups.setdefault(root, []).append(rid)
    survivors: List[Dict] = []
    dup_map: Dict[str, str] = {}
    for root, member_ids in groups.items():
        primary = min(member_ids, key=lambda x: order[x])
        merged = dict(rows[primary])
        for mid in sorted(member_ids, key=lambda x: order[x]):
            if mid == primary: continue
            for field, val in rows[mid].items():
                if (field not in merged) or (not merged[field]):
                    merged[field] = val
            dup_map[mid] = primary
        survivors.append(merged)
    return survivors, dup_map

app = FastAPI()
DB = {}
IDEMP = {}

class LeadIn(BaseModel):
    email: EmailStr
    name: constr(min_length=1)
    source: Optional[str] = None

def idem_key(raw: str) -> str:
    return hashlib.sha256(raw.encode()).hexdigest()

def create_lead(body: LeadIn, idempotency_key: Optional[str] = None):
    if not idempotency_key:
        raise HTTPException(status_code=400, detail="Missing Idempotency-Key")
    k = idem_key(idempotency_key + body.email)
    if k in IDEMP:
        return IDEMP[k]
    if body.email in DB:
        raise HTTPException(status_code=409, detail="Lead exists")
    DB[body.email] = {"email": body.email, "name": body.name, "source": body.source}
    IDEMP[k] = DB[body.email]
    return DB[body.email]




In [6]:

import json, asyncio, hmac, hashlib
from datetime import datetime

with open("/content/crm_data.json") as f:
    d = json.load(f)

contacts = d["contacts"]
leads = d["leads"]
opps = d["opportunities"]
tasks = d["tasks"]
logins = d["logins"]

survivors, dup = dedupe_contacts(contacts)
print("dedupe", len(survivors), len(dup))

csv_text = "email,name,phone\nx@a.com,X,111\nx@a.com,X2,111\ny@a.com,Y,222\n"
store = {}
a,u = import_contacts_csv(csv_text, store)
print("csv_upsert", a, u, store["x@a.com"]["name"])

rl = RateLimiter(2, 60)
print("rate", rl.allow("u", now=0.0), rl.allow("u", now=1.0), rl.allow("u", now=59.0), rl.allow("u", now=61.0))

calls = {"n":0}
def f():
    calls["n"] += 1
    return 42
st = IdempotencyStore()
print("idem", st.call("k", f), st.call("k", f), calls["n"])

p1, nxt = page_items(contacts, 5, None)
p2, nxt2 = page_items(contacts, 5, nxt)
print("page", len(p1), bool(nxt), len(p2), bool(nxt2))

now = datetime(2025,8,10,10,0,0)
dtasks = due_tasks(tasks, now, 30)
print("due_tasks", [x["id"] for x in dtasks])

secret = "s"
payload = b"hello"
expect = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
print("hmac", verify_signature(secret, payload, expect), verify_signature(secret, payload, "deadbeef"))

w = weekly_conversion(leads)
print("weekly", w[0]["total"] if w else 0)

lru = LRU(2)
lru.put("a",1); lru.put("b",2); lru.get("a"); lru.put("c",3)
print("lru", lru.get("b") is None, lru.get("a"))

m, dff = merge_records({"name":"X","email":"","phone":"111"},{"name":"","email":"x@a.com","phone":"111"},"a")
print("merge", m["email"], "email" in dff)

print("rbac", can("write:own","rep","u1","u1"), can("write:any","admin","u1","u2"))

print("leads_month", leads_per_owner_month(leads)[:2])

sent = []
async def demo_mail():
    mailer = Mailer()
    t = asyncio.create_task(mailer.run(lambda m: sent.append(m)))
    await mailer.enqueue({"to":"a","sub":"hi"})
    await mailer.enqueue({"to":"b","sub":"yo"})
    await asyncio.sleep(0.05)
    await mailer.q.join()
    t.cancel()
    try:
        await t
    except asyncio.CancelledError:
        pass

try:
    loop = asyncio.get_running_loop()
    if loop.is_running():
        await demo_mail()
    else:
        asyncio.run(demo_mail())
except RuntimeError:
    asyncio.run(demo_mail())

print("mail", len(sent))

csv_opps = export_opportunities_csv(opps, "2025-01-01T00:00:00")
print("export_len", len(csv_opps.splitlines()))

res_imp = import_contacts_csv_text("email,name\nz@z.com,Z\nz@z.com,Z2\n", {})
print("import_counts", res_imp)

audit_log = []
ent = {"id":"1","name":"Old"}
print("audit", apply_update_with_audit(ent, {"name":"New"}, "u1", audit_log)["name"], len(audit_log))

sync_store = {}
sync_res = sync_external_contacts([{"email":"A@EXAMPLE.COM","phone":"(212) 555-0001","name":"A1","updated_at":"2025-08-10T09:00:00"}], sync_store)
print("sync", sync_res["added"], sync_res["updated"])

funnel = funnel_metrics(leads, opps)
print("funnel", funnel["leads"], funnel["opportunities"], funnel["won"])

page, nxt = search_and_page_contacts(contacts, "user1", 3, None)
print("search_page", [x["id"] for x in page], bool(nxt))

ret = monthly_cohort_retention(logins)
print("retention_rows", len(ret))

us = {}
print("upsert", upsert_contact(us, {"id":"10","email":"A@EXAMPLE.COM","phone":""})["email"])

sv2, dup2 = dedupe_contacts_simpler(contacts)
print("unionfind", len(sv2), len(dup2) >= len(dup))

lead = create_lead(LeadIn(email="demo@x.com", name="Demo", source="ads"), idempotency_key="k1")
lead2 = create_lead(LeadIn(email="demo@x.com", name="Demo", source="ads"), idempotency_key="k1")
print("api_idem", lead["email"], lead2["email"])

print("done")


dedupe 30 0
csv_upsert 2 1 X2
rate True True False True
idem 42 42 1
page 5 True 5 True
due_tasks ['4', '11', '2', '10', '13']
hmac True False
weekly 2
lru True 1
merge x@a.com True
rbac True True
leads_month [{'owner_id': 'u1', 'month': '2025-07', 'total': 1}, {'owner_id': 'u2', 'month': '2025-07', 'total': 3}]
mail 2
export_len 11
import_counts {'added': 1, 'updated': 1}
audit New 1
sync 1 0
funnel 20 10 5
search_page ['12', '15', '14'] True
retention_rows 2
upsert a@example.com
unionfind 28 True
api_idem demo@x.com demo@x.com
done


  audit_log.append({"at": datetime.utcnow().isoformat(),"user_id": user_id,"entity_id": str(entity.get("id","")),"before": before,"after": after})
