In [37]:
import pandas as pd
import json
import requests
from google.cloud import bigquery
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime
import time
import os

client = bigquery.Client()

session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retries, pool_connections=100, pool_maxsize=100)
session.mount("http://", adapter)
session.mount("https://", adapter)

In [40]:
query = """
select distinct search_internal_keyword
from `rome-prod.temp.search_internal_keyword_view_2025` 
where (ds_category_ids is null or lower(ds_category_ids) in ('',' ','na','null','n/a','-'))
and  (search_internal_keyword in (select distinct search_internal_keyword from `rome-prod.temp.search_internal_keyword_20260109_v2` where lower(status) <> 'ok' )
or  search_internal_keyword in (select distinct search_internal_keyword from `rome-prod.temp.search_internal_keyword_20260109_v1` where lower(status) <> 'ok' ))
"""

df = client.query(query).to_dataframe()
print(f"Total records: {len(df)}")


Total records: 373


In [None]:
df = pd.read_csv("checkpoints/batch_1.csv")

destination_table = "rome-prod.temp."
job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    autodetect=True
)

job = client.load_table_from_dataframe(df, destination_table, job_config=job_config)
job.result()

print(f"Loaded {len(df)} rows to {destination_table}")


In [41]:
API_URL = "http://ds-search-c3-prediction.qa2-sg.cld/predict/search-term-data"

def get_error_result(search_term, error_type, error_msg):
    return {
        "search_internal_keyword": search_term,
        "code": None,
        "status": f"{error_type}: {error_msg}",
        "isGenericTerm": None,
        "isValidC3": None,
        "score": None,
        "c1CategoryCode": None,
        "c2CategoryCode": None,
        "c3CategoryCode": None,
        "c3_name": None,
        "c3_category": None
    }

def call_predict_api(search_term):
    if not search_term or pd.isna(search_term):
        return get_error_result(search_term, "SKIP", "Empty or null search term")
    
    data = {
        "searchTerm": search_term,
        "categoryPredCutoff": 30,
        "requestId": "",
        "maxCategoryHierarchy": 1
    }
    
    try:
        resp = session.post(API_URL, json=data, timeout=30)
        resp.raise_for_status()
        response_json = resp.json()
        
        result = {
            "search_internal_keyword": search_term,
            "code": response_json.get("code"),
            "status": response_json.get("status"),
            "isGenericTerm": None,
            "isValidC3": None,
            "score": None,
            "c1CategoryCode": None,
            "c2CategoryCode": None,
            "c3CategoryCode": None,
            "c3_name": None,
            "c3_category": None
        }
        
        if response_json.get("data"):
            data_obj = response_json["data"]
            result["isGenericTerm"] = data_obj.get("isGenericTerm")
            result["isValidC3"] = data_obj.get("isValidC3")
            
            if data_obj.get("categoryHierarchyList") and len(data_obj["categoryHierarchyList"]) > 0:
                cat = data_obj["categoryHierarchyList"][0]
                result["score"] = cat.get("score")
                result["c1CategoryCode"] = cat.get("c1CategoryCode")
                result["c2CategoryCode"] = cat.get("c2CategoryCode")
                result["c3CategoryCode"] = cat.get("c3CategoryCode")
                result["c3_name"] = cat.get("c3_name")
                result["c3_category"] = cat.get("c3_category")
        
        return result
    
    except requests.exceptions.Timeout:
        return get_error_result(search_term, "TIMEOUT", "Request timed out after 30s")
    
    except requests.exceptions.ConnectionError:
        return get_error_result(search_term, "CONNECTION_ERROR", "Failed to connect to API")
    
    except requests.exceptions.HTTPError as e:
        return get_error_result(search_term, "HTTP_ERROR", str(e))
    
    except json.JSONDecodeError:
        return get_error_result(search_term, "JSON_ERROR", "Invalid JSON response")
    
    except Exception as e:
        return get_error_result(search_term, "ERROR", str(e))

In [42]:
search_terms = df["search_internal_keyword"].dropna().unique().tolist()
print(f"Unique search terms: {len(search_terms)}")

NUM_THREADS = 40
BATCH_SIZE = 50000
CHECKPOINT_DIR = "checkpoints"
RESUME_FROM_BATCH = 0  # Set to batch number to resume from (0 = start fresh)

destination_table = "rome-prod.temp.search_internal_keyword_20260109_v2"
job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    autodetect=True
)

os.makedirs(CHECKPOINT_DIR, exist_ok=True)

total_batches = (len(search_terms) + BATCH_SIZE - 1) // BATCH_SIZE

start_time = time.time()
print(f"\n{'='*50}")
print(f"START TIME: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if RESUME_FROM_BATCH > 0:
    print(f"RESUMING FROM BATCH: {RESUME_FROM_BATCH + 1}")
print(f"{'='*50}")

total_success = 0
total_failed = 0

for batch_idx in range(RESUME_FROM_BATCH, total_batches):
    batch_start = time.time()
    start_idx = batch_idx * BATCH_SIZE
    end_idx = min((batch_idx + 1) * BATCH_SIZE, len(search_terms))
    batch_terms = search_terms[start_idx:end_idx]
    
    print(f"\nBatch {batch_idx + 1}/{total_batches} - Processing {len(batch_terms)} terms")
    
    results = []
    with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
        futures = {executor.submit(call_predict_api, term): term for term in batch_terms}
        
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"Batch {batch_idx + 1}"):
            results.append(future.result())
    
    results_df = pd.DataFrame(results)
    
    batch_success = (results_df["code"] == 200).sum()
    batch_failed = (results_df["code"] != 200).sum()
    total_success += batch_success
    total_failed += batch_failed

    
    checkpoint_file = f"{CHECKPOINT_DIR}/batch_{batch_idx + 1}.csv"
    results_df.to_csv(checkpoint_file, index=False)
    print(f"Checkpoint saved: {checkpoint_file}")
    
    if batch_idx == 0 and RESUME_FROM_BATCH == 0:
        job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
    else:
        job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
    
    job = client.load_table_from_dataframe(results_df, destination_table, job_config=job_config)
    job.result()
    
    batch_elapsed = time.time() - batch_start
    print(f"Batch {batch_idx + 1} uploaded: {len(results_df)} rows | Success: {batch_success} | Failed: {batch_failed} | Time: {batch_elapsed:.2f}s")
    
    with open(f"{CHECKPOINT_DIR}/progress.txt", "w") as f:
        f.write(f"last_completed_batch={batch_idx + 1}\n")
        f.write(f"total_batches={total_batches}\n")
        f.write(f"timestamp={datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
    
    del results, results_df

end_time = time.time()
total_elapsed = end_time - start_time

hours, remainder = divmod(total_elapsed, 3600)
minutes, seconds = divmod(remainder, 60)

print(f"\n{'='*50}")
print(f"END TIME: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"TOTAL TIME: {int(hours)}h {int(minutes)}m {seconds:.2f}s")
print(f"TOTAL RECORDS: {len(search_terms)}")
print(f"SUCCESS: {total_success} ({100*total_success/len(search_terms):.2f}%)")
print(f"FAILED: {total_failed} ({100*total_failed/len(search_terms):.2f}%)")
print(f"AVG SPEED: {len(search_terms)/total_elapsed:.2f} records/sec")
print(f"CHECKPOINTS: {CHECKPOINT_DIR}/")
print(f"{'='*50}")
print(f"\nCompleted! All data uploaded to {destination_table}")

Unique search terms: 373

START TIME: 2026-01-09 15:37:46

Batch 1/1 - Processing 373 terms


Batch 1: 100%|██████████| 373/373 [00:32<00:00, 11.50it/s] 


Checkpoint saved: checkpoints/batch_1.csv
Batch 1 uploaded: 373 rows | Success: 365 | Failed: 8 | Time: 36.33s

END TIME: 2026-01-09 15:38:23
TOTAL TIME: 0h 0m 36.34s
TOTAL RECORDS: 373
SUCCESS: 365 (97.86%)
FAILED: 8 (2.14%)
AVG SPEED: 10.27 records/sec
CHECKPOINTS: checkpoints/

Completed! All data uploaded to rome-prod.temp.search_internal_keyword_20260109_v2
