# 📄 Executive Summary: Health Violations Dataset Enrichment Pipeline

## Overview
This project builds a **high-quality**, **augmented**, and **structured** dataset of restaurant health inspection violations, preparing it for downstream NLP modeling.

---

## Steps

### 1. Data Sampling and Splitting
- Loaded the full `Food_Inspections` dataset.
- Filtered out empty "Violations" entries.
- Randomly sampled **3,000** examples.
- Split into:
  - **Train** (80%)
  - **Validation** (10%)
  - **Test** (10%)
- Saved each split as CSV files.

### 2. Training Set Enrichment
- Ran **parallel GPT API calls** to:
  - Generate **plain-language summaries**.
  - Assign **3–7 customer-friendly keywords**.
  - Enforce a **standardized safety verdict** at the end of each summary.
- Implemented **retry logic**, **backoff**, and **rate limit handling**.
- Dropped any samples that failed parsing or exceeded retry attempts.

### 3. Synthetic Data Augmentation
- Generated **100 fictional** "unsafe to eat here" violations to address class imbalance.
- Enriched synthetic violations with summaries and keywords using the same GPT pipeline.
- Saved synthetic examples separately.

### 4. Dataset Merging
- Merged real and synthetic examples into a final training set (`violations_train_final.jsonl`).
- Shuffled the merged dataset to prevent ordering bias.

### 5. Validation and Test Enrichment
- Enriched the validation and test splits using the same pipeline.
- Saved results into `violations_val.jsonl` and `violations_test.jsonl`.

---

## Quality Checks
- Verified that each summary ends with exactly one of the predefined verdicts.
- Random sampling of outputs to manually inspect data quality.
- (Note: Summary verdict distribution was planned but minor missing code at the end.)

---

## Outputs
- `violations_train_final.jsonl` → Final train set (real + synthetic).
- `violations_val.jsonl` → Validation set.
- `violations_test.jsonl` → Test set.
- All saved in clean JSONL format for seamless ingestion.

---


# Training Set

In [None]:
!pip install openai --quiet

In [None]:
# ─── 0. Constants & Imports ────────────────────────────────────────────────
import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.auto import tqdm
from openai import OpenAI, RateLimitError
from math import ceil
import json, re
from collections import Counter

# adjust to your org’s limits
MAX_RETRIES   = 5
TARGET_RPM    = 300           # your total allowed calls/minute
MAX_WORKERS   = 5             # number of threads you actually want to run
SLEEP_PER_CALL = (60 / TARGET_RPM) * MAX_WORKERS  # seconds each thread should wait after a successful call

# Instantiate the OpenAI client (paste your key below)
client = OpenAI(api_key="Redacted")

In [None]:
# ─── 1. Load & Split ─────────────────────────────────────────────────────

# 1. Load the full violations dataset
df = pd.read_csv('/content/Food_Inspections_20250419.csv')

# 2. Filter out any missing "Violations"
df = df.dropna(subset=['Violations'])

# 3. Sample 3,000 rows for processing
df_sample = df.sample(n=3000, random_state=42)[['Violations']]

# 4. Split into train/validation/test (80/10/10)
train_df = df_sample.sample(frac=0.8, random_state=42)
rem_df   = df_sample.drop(train_df.index)
val_df   = rem_df.sample(frac=0.5, random_state=42)
test_df  = rem_df.drop(val_df.index)

# 5. Save each split to CSV
train_df.to_csv('violations_train.csv', index=False)
val_df.to_csv('violations_val.csv', index=False)
test_df.to_csv('violations_test.csv', index=False)

# Display confirmation
print("Splits saved: ")
print(f"  train: {len(train_df)} rows")
print(f"  val:   {len(val_df)} rows")
print(f"  test:  {len(test_df)} rows")


Splits saved: 
  train: 2400 rows
  val:   300 rows
  test:  300 rows


In [None]:
# ─── 2. Prep for Parallel Processing ─────────────────────────────────────

# 1. Load the training split you just created
df = pd.read_csv("/content/violations_train.csv")

# 2. Ensure 'Violations' is a non-empty string column
df["Violations"] = df["Violations"].astype(str).str.strip()
df = df[df["Violations"] != ""]

# 3. Prepare for parallel processing
total = len(df)
outputs = [None] * total

print(f"Ready to process {total} violations.")

Ready to process 2400 violations.


In [None]:
# ─── 3. Worker fn: retry + backoff + throttle ─────────────────────────────────

def process_violation(args):
    idx, vio = args
    prompt = (
    "You are given one or more violation descriptions from a health inspection report. Each violation may or may not be related to the safety of the food being served.\n"
    "Your task is to generate a valid JSON object with exactly two keys:\n"
    '  "summary": A plain-language 1–2 sentence explanation for customers. Focus on whether any of the violations suggest a risk to food safety. Your summary should end with exactly one of the following:\n'
    '    - "This violation is not related to food safety."\n'
    '    - "This violation is related to food safety, but it is safe to eat here."\n'
    '    - "This violation is related to food safety and may make it unsafe to eat here."\n'
    '    - "This violation is related to food safety and it is not safe to eat here."\n'
    '  "keywords": An array of 3–7 lowercase, customer-friendly keywords that summarize the key issues (e.g., "sanitation", "pests", "utensils", "equipment", "food contact").\n'
    "Do not include the raw violation text or any additional commentary.\n"
    f"Violations: {vio}"
    )
    backoff = SLEEP_PER_CALL

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            resp = client.chat.completions.create(
                model="gpt-4o-mini-2024-07-18",
                messages=[{"role": "user", "content": prompt}],
                temperature=0.2,
                max_tokens=300,
            )
            return idx, resp.choices[0].message.content.strip()

        except RateLimitError as e:
            headers = getattr(e, "headers", {}) or {}
            retry_after = float(headers.get("Retry-After", backoff))
            time.sleep(max(1, ceil(retry_after)))
            backoff = min(backoff * 2, 60)
        except Exception as e:
            if attempt == MAX_RETRIES:
                print(f"❌ Fatal error on idx={idx}: {e}")
                return idx, "[ERROR]"
            time.sleep(1)


In [None]:
# ─── 4. Run in ThreadPool & Collect ───────────────────────────────────────
#Took 10 minutes

# Submit all violations to be processed in parallel
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = {
        executor.submit(process_violation, (i, v)): i
        for i, v in enumerate(df["Violations"])
    }

    for future in tqdm(as_completed(futures),
                       total=total,
                       desc="Processing rows",
                       unit="row"):
        idx, result = future.result()
        outputs[idx] = result

# ─── 5. Drop errors & write JSONL ─────────────────────────────────────────

# Filter out any failures
clean = [(vio, r) for vio, r in zip(df["Violations"], outputs) if r != "[ERROR]"]
print(f"Dropped {len(df) - len(clean)} error rows; writing {len(clean)} clean JSON objects.")

# Write out the clean train split as JSONL
with open("violations_train.jsonl", "w") as outf:
    for vio, resp in clean:
        try:
            parsed = json.loads(resp)
        except json.JSONDecodeError:
            continue
        clean_obj = {
            "violation": vio,
            "summary":   parsed.get("summary"),
            "keywords":  parsed.get("keywords")
        }
        outf.write(json.dumps(clean_obj) + "\n")
print("✅ Wrote violations_train.jsonl")

Processing rows:   0%|          | 0/2400 [00:00<?, ?row/s]

Dropped 0 error rows; writing 2400 clean JSON objects.
✅ Wrote violations_train.jsonl


In [None]:
# ─── 1.7 Generate & Enrich Synthetic “Not Safe” Violations ─────────────────

def generate_synthetic_violations(n_samples=100, batch_size=5):
    synthetic_violations = []
    total_batches = n_samples // batch_size
    for batch_idx in tqdm(range(total_batches), desc="Generating synthetic violations", unit="batch"):
        prompt = (
            "You are creating synthetic restaurant health inspection violations that would clearly "
            "result in the verdict: \"This violation is related to food safety and it is not safe to eat here.\"\n"
            f"Generate {batch_size} fictional violations. Each one must:\n"
            "- Start with a realistic inspector-style comment (no need for code numbers or titles).\n"
            "- Describe a serious food-safety hazard (e.g., raw meat at room temp, sewage, pests).\n"
            "- Use inspection tone (words like 'observed', 'must', 'discarded').\n"
            "- Output each violation separated by two newlines, with no summaries or JSON.\n"
        )
        try:
            resp = client.chat.completions.create(
                model="gpt-4o-mini-2024-07-18",
                messages=[{"role":"user","content":prompt}],
                temperature=0.8,
                max_tokens=200,
            )
            batch = [v.strip() for v in resp.choices[0].message.content.split("\n\n") if v.strip()]
            synthetic_violations.extend(batch)
        except Exception as e:
            print(f"❌ Error generating batch {batch_idx}: {e}")
    print(f"✅ Generated {len(synthetic_violations)} synthetic violations.")
    return synthetic_violations[:n_samples]


def enrich_synthetic_violations(violation_list, output_jsonl="violations_synthetic.jsonl"):
    total = len(violation_list)
    outputs = [None]*total

    def worker(args):
        idx, vio = args
        prompt = (
              "You are given a violation description. Output a JSON object with exactly two keys:\n"
              '  "summary": 1–2 sentence consumer-friendly verdict ending with '
              '"This violation is related to food safety and it is not safe to eat here."\n'
              '  "keywords": array of 3–7 lowercase keywords summarizing key issues.\n'
              "Do not include the raw violation text or any additional commentary.\n"
              f"Violation: {vio}"
          )
        backoff = SLEEP_PER_CALL
        for attempt in range(1, MAX_RETRIES+1):
            try:
                r = client.chat.completions.create(
                    model="gpt-4o-mini-2024-07-18",
                    messages=[{"role":"user","content":prompt}],
                    temperature=0.2, max_tokens=200
                )
                return idx, r.choices[0].message.content.strip()
            except RateLimitError as e:
                retry = float(getattr(e, "headers",{}).get("Retry-After", backoff))
                time.sleep(max(1, retry))
                backoff = min(backoff*2,60)
            except:
                if attempt==MAX_RETRIES:
                    return idx, "[ERROR]"
                time.sleep(1)

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futures = { ex.submit(worker,(i,v)):i
                    for i,v in enumerate(violation_list) }
        for future in tqdm(as_completed(futures), total=total, desc="Enriching synthetic"):
            i, resp = future.result()
            outputs[i]=resp

    clean = [(v,r) for v,r in zip(violation_list, outputs) if r!="[ERROR]"]
    with open(output_jsonl, "w") as outf:
        for vio, resp in clean:
            try:
                parsed = json.loads(resp)
            except json.JSONDecodeError:
                continue
            clean_obj = {
                "violation": vio,
                "summary":   parsed.get("summary"),
                "keywords":  parsed.get("keywords")
            }
            outf.write(json.dumps(clean_obj) + "\n")
    print(f"✅ Wrote {output_jsonl} ({len(clean)} rows)")


def merge_jsonl_files(real_jsonl="violations_train.jsonl",
                      synthetic_jsonl="violations_synthetic.jsonl",
                      output_jsonl="violations_train_final.jsonl"):
    with open(output_jsonl,"w") as out:
        for fname in (real_jsonl, synthetic_jsonl):
            with open(fname) as inp:
                for line in inp:
                    out.write(line)
    print(f"✅ Merged into {output_jsonl}")


In [None]:
# ─── 1.8 Run Synthetic Augmentation ──────────────────────────────────────

# 1) Generate 100–200 “not safe” violation comments
synthetic = generate_synthetic_violations(n_samples=100)

# 2) Enrich them to JSONL
enrich_synthetic_violations(synthetic, "violations_synthetic.jsonl")

# 3) Merge real+synthetic into final train file
merge_jsonl_files(
    real_jsonl="violations_train.jsonl",
    synthetic_jsonl="violations_synthetic.jsonl",
    output_jsonl="violations_train_final.jsonl"
)

Generating synthetic violations:   0%|          | 0/20 [00:00<?, ?batch/s]

✅ Generated 94 synthetic violations.


Enriching synthetic:   0%|          | 0/94 [00:00<?, ?it/s]

✅ Wrote violations_synthetic.jsonl (94 rows)
✅ Merged into violations_train_final.jsonl


In [None]:
#Print the first 5 of the train.json
!head -n 5 violations_train_final.jsonl

{"violation": "3. MANAGEMENT, FOOD EMPLOYEE AND CONDITIONAL EMPLOYEE; KNOWLEDGE, RESPONSIBILITIES AND REPORTING - Comments: OBSERVED NO EMPLOYEE HEALTH POLICY ON SITE. MUST PROVIDE WRITTEN EMPLOYEE HEALTH POLICY FOR ALL EMPLOYEES. PRIORITY FOUNDATION VIOLATION. 7-38-010. NO CITATION ISSUED.\t | 5. PROCEDURES FOR RESPONDING TO VOMITING AND DIARRHEAL EVENTS - Comments: OBSERVED NO VOMIT AND DIARRHEA CLEAN UP KIT/PROCEDURE ON SITE. MUST PROVIDE WRITTEN PROCEDURE FOR THE CLEAN UP OF VOMIT AND DIARRHEAL EVENTS, AND CORRESPONDING EQUIPMENT OUTLINE IN PROCEDURE. PRIORITY FOUNDATION VIOLATION 7-38-005. NO CITATION ISSUED.\t | 36. THERMOMETERS PROVIDED & ACCURATE - Comments: MUST PROVIDE A METAL STEM THERMOMETER TO ACCURATELY MEASURE INTERNAL FOOD TEMPERATURE. PRIORITY FOUNDATION VIOLATION 7-38-005. NO CITATION ISSUED.", "summary": "The establishment lacks essential health policies and procedures for employee health and cleaning up after vomiting or diarrhea, as well as proper thermometers for 

In [None]:
pattern = re.compile(
    r'(This violation is not related to food safety\.|'
    r'This violation is related to food safety, but it is safe to eat here\.|'
    r'This violation is related to food safety and may make it unsafe to eat here\.|'
    r'This violation is related to food safety and it is not safe to eat here\.)\s*$'
)

with open("violations_train_final.jsonl") as f:
    for i, line in enumerate(f):
        summary = json.loads(line)["summary"]
        if not pattern.search(summary):
            print(f"Line {i}: {summary}")


Line 26: The inspection revealed several violations, including a lack of employee health policies and no supplies for responding to vomiting and diarrheal events, which are related to food safety and may make it unsafe to eat here.
Line 44: The inspection revealed several violations related to employee health policies, proper procedures for handling vomiting and diarrhea, and food safety training. These issues suggest a risk to food safety and may make it unsafe to eat here.
Line 51: The inspection revealed several violations related to food safety, including improper handwashing practices and the absence of a thermometer in the milk cooler, which may make it unsafe to eat here.
Line 54: The inspection revealed critical violations related to the improper storage temperatures of potentially hazardous foods, which may make it unsafe to eat here. Additionally, there are issues with labeling and maintenance that do not directly impact food safety.
Line 55: The inspection revealed issues wi

In [None]:
# ─── 1.9 Optional: Shuffle the final training examples ────────────────
import random
lines = open("violations_train_final.jsonl").read().splitlines()
random.shuffle(lines)
with open("violations_train_final.jsonl","w") as f:
    f.write("\n".join(lines))
print("✅ Shuffled final train file")

✅ Shuffled final train file


In [None]:
# Preview first few synthetic enrichments
with open("violations_synthetic.jsonl") as f:
    for _ in range(5):
        print(json.loads(f.readline()), "\n---")

{'violation': 'During the inspection, it was observed that raw chicken was stored at room temperature on the preparation table for over two hours. This poses a significant risk for the growth of harmful bacteria, and all items must be discarded immediately to ensure food safety.', 'summary': 'Raw chicken was improperly stored at room temperature for over two hours, creating a serious risk for bacterial growth. This violation is related to food safety and it is not safe to eat here.', 'keywords': ['raw chicken', 'room temperature', 'bacterial growth', 'food safety', 'improper storage']} 
---
{'violation': 'The presence of live cockroaches was observed in the kitchen area, particularly near the food storage shelves. This infestation represents a serious health hazard, and immediate pest control measures must be implemented, while all affected food items must be disposed of to prevent contamination.', 'summary': 'The presence of live cockroaches in the kitchen poses a serious health risk,

In [None]:
print("\n📊 Final Summary Verdict Distribution:")
for k, v in counts.items():
    print(f"{k}: {v}")


📊 Final Summary Verdict Distribution:
This violation is related to food safety and may make it unsafe to eat here.: 1158
This violation is not related to food safety.: 418
Unknown: 236
This violation is related to food safety, but it is safe to eat here.: 568
This violation is related to food safety and it is not safe to eat here.: 96


# Validation

In [None]:
def enrich_to_jsonl(input_csv: str, output_jsonl: str):
    # 1. Load & clean
    df = pd.read_csv(input_csv)
    df["Violations"] = df["Violations"].astype(str).str.strip()
    df = df[df["Violations"] != ""]

    total = len(df)
    outputs = [None] * total

    # 2. Parallel GPT calls
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {
            executor.submit(process_violation, (i, v)): i
            for i, v in enumerate(df["Violations"])
        }
        for future in tqdm(as_completed(futures),
                           total=total,
                           desc=f"Processing {input_csv}",
                           unit="row"):
            idx, result = future.result()
            outputs[idx] = result

    # 3. Filter errors
    clean = [(vio, r) for vio, r in zip(df["Violations"], outputs) if r != "[ERROR]"]
    with open(output_jsonl, "w") as outf:
        for vio, resp in clean:
            try:
                parsed = json.loads(resp)
            except json.JSONDecodeError:
                continue
            clean_obj = {
                "violation": vio,
                "summary":   parsed.get("summary"),
                "keywords":  parsed.get("keywords")
            }
            outf.write(json.dumps(clean_obj) + "\n")
    print(f"✅ Wrote {output_jsonl} ({len(clean)} rows)")

In [None]:
enrich_to_jsonl("violations_val.csv", "violations_val.jsonl")

Processing violations_val.csv:   0%|          | 0/300 [00:00<?, ?row/s]

✅ Wrote violations_val.jsonl (300 rows)


# Test

In [None]:
enrich_to_jsonl("violations_test.csv", "violations_test.jsonl")

Processing violations_test.csv:   0%|          | 0/300 [00:00<?, ?row/s]

✅ Wrote violations_test.jsonl (300 rows)
