## One originator

In [None]:
import csv
import json
import re

ORIGINATOR = "24273"
INPUT = "./2025-11-13_on_boarding_messages/2025-11-13/com.att.aamg.kafka.prod.sms-prod-eastus2-mt-default.csv"
OUTPUT = f"./data/input/ref_{ORIGINATOR}.csv"
MAX_ROWS = 1000000
DILUTE_FACTOR = 10  # write every Nth matching-originator message

def normalize(s):
    s = s.strip()
    if len(s) >= 2 and s[0] == '"' and s[-1] == '"':
        s = s[1:-1]
    s = s.replace('""', '"').replace('\r', ' ').replace('\n', ' ').replace('\t', ' ')
    return s

def get_source_and_fields(raw):
    s = normalize(raw)
    try:
        o = json.loads(s)
    except json.JSONDecodeError:
        src = re.search(r'"sourceAddress"\s*:\s*"([^"]*)"', s)
        req = re.search(r'"requestID"\s*:\s*"([^"]*)"', s)
        msg = re.search(r'"shortMessage"\s*:\s*"([^"]*)"', s)
        return (src.group(1) if src else None,
                req.group(1) if req else None,
                msg.group(1).replace('\\"','"') if msg else None)
    return (o.get('sourceAddress'), o.get('requestID'), o.get('shortMessage'))

with open(INPUT, newline='', encoding='utf-8') as fin, \
     open(OUTPUT, 'w', newline='', encoding='utf-8') as fout:
    reader = csv.DictReader(fin)
    writer = csv.DictWriter(fout, fieldnames=['originator_id','message_id','raw_text'])
    writer.writeheader()
    match_counter = 0
    for i, row in enumerate(reader, 1):
        src, req, msg = get_source_and_fields(row['Value'])
        if src == ORIGINATOR:
            match_counter += 1
            if match_counter % DILUTE_FACTOR == 0:
                writer.writerow({'originator_id': src, 'message_id': req, 'raw_text': msg})
        if i % 100000 == 0:
            print(f"Processed {i:,} rows (matches so far: {match_counter:,})")
        if i >= MAX_ROWS:
            break


# loop over a list of originators

In [None]:
import csv
import json
import re
from collections import defaultdict
from math import ceil
import pandas as pd

df = pd.read_csv('./data/data_raw_input/2025-11-13/CAMPAIGNS.csv')

ORIGINATORS = [str(o) for o in df['business_number'].unique().tolist()] 

INPUT = "./data/data_raw_input/2025-11-13/com.att.aamg.kafka.prod.sms-prod-eastus2-mt-default.csv"
OUTPUT_DIR = "./data/stage1/input"
TARGET_ROWS = 5000

def normalize(s):
    s = s.strip()
    if len(s) >= 2 and s[0] == '"' and s[-1] == '"':
        s = s[1:-1]
    return s.replace('""', '"').replace('\r', ' ').replace('\n', ' ').replace('\t', ' ')

def get_source_and_fields(raw):
    s = normalize(raw)
    try:
        o = json.loads(s)
    except json.JSONDecodeError:
        src = re.search(r'"sourceAddress"\s*:\s*"([^"]*)"', s)
        req = re.search(r'"requestID"\s*:\s*"([^"]*)"', s)
        msg = re.search(r'"shortMessage"\s*:\s*"([^"]*)"', s)
        return (src.group(1) if src else None,
                req.group(1) if req else None,
                msg.group(1).replace('\\"','"') if msg else None)
    return (o.get('sourceAddress'), o.get('requestID'), o.get('shortMessage'))

# First pass: count matches per originator
counts = defaultdict(int)
with open(INPUT, newline='', encoding='utf-8') as fin:
    reader = csv.DictReader(fin)
    for i, row in enumerate(reader, 1):
        src, _, _ = get_source_and_fields(row['Value'])
        if src in ORIGINATORS:
            counts[src] += 1
        

# Compute dilute factors
dilute = {}
for ori in ORIGINATORS:
    c = counts.get(ori, 0)
    dilute[ori] = None if c == 0 else max(1, ceil(c / TARGET_ROWS))

print("Counts:", dict(counts))
print("Dilute factors:", dilute)

# Second pass: loop per originator (simpler, one input scan per originator)
for ori in ORIGINATORS:
    df = dilute.get(ori)
    if df is None:
        print(f"No rows for originator {ori}; skipping.")
        continue
    outpath = f"{OUTPUT_DIR}/ref_{ori}.csv"
    with open(outpath, 'w', newline='', encoding='utf-8') as fout, \
         open(INPUT, newline='', encoding='utf-8') as fin:
        reader = csv.DictReader(fin)
        writer = csv.DictWriter(fout, fieldnames=['originator_id','message_id','raw_text'])
        writer.writeheader()
        match_counter = 0
        for i, row in enumerate(reader, 1):
            src, req, msg = get_source_and_fields(row['Value'])
            if src == ori:
                match_counter += 1
                if match_counter % df == 0:
                    writer.writerow({'originator_id': src, 'message_id': req, 'raw_text': msg})
            
    print(f"Wrote {ori} -> {outpath} (count={counts.get(ori,0)}, df={df})")


Counts: {'7535': 25218, '73981': 27186}
Dilute factors: {'73981': 6, '7535': 6}
Wrote 73981 -> ./data/stage1/input/ref_73981.csv (count=27186, df=6)
Wrote 7535 -> ./data/stage1/input/ref_7535.csv (count=25218, df=6)
