#Data Pipeline using LLM

Go to https://groq.com/ and generate a Free API Key.


1. Data Cleaning:

  Begin by loading the dataset into your Colab environment.
  Use pandas functions like head(), info(), describe(), and value_counts() to explore the structure, data types, and basic statistics of the dataset.

  Identify potential data quality issues such as missing values, inconsistent formats, or incorrect entries.
  Prompt Engineering:

  This is the core of the lab. Your task is to craft a prompt that instructs an LLM (Groq's LLama2) to clean the data.

  The Cleaning Goals: Your prompt should guide the LLM to perform the following tasks:

  * Address missing values: Infer or fill in missing information where possible (e.g., city names from addresses).
  * Standardize text: Correct spelling, apply consistent capitalization, and ensure uniformity in categorical values.
  * Validate and format: Ensure that addresses are in a standard format (e.g., "Street, Borough, NY"), and that dates and times follow ISO 8601.
  * Categorize: Assign clear categories to ambiguous complaint descriptions (e.g., "Noise," "Non-Noise").

  You are not given the prompt used in the example code, but you are given the expected results.
  Iterative Refinement: Start with a basic prompt and gradually refine it based on the LLM's output. Observe how the LLM responds and make adjustments to improve the cleaning process.

2. Data Validation:

  After cleaning the data, write unit tests (using Python's assert statements) to validate the output.
  Your tests should check data types, value ranges, and ensure that required fields are not null.
  Generate code for tests. Try to see the problems in running the code.

Submission: Write your prompts in a text file and upload on LMS.

In [1]:
# Groq-Powered Data Engineering Pipeline

# Step 1: Install Required Libraries
!pip install groq itables

Collecting groq
  Downloading groq-0.22.0-py3-none-any.whl.metadata (15 kB)
Collecting itables
  Downloading itables-2.3.0-py3-none-any.whl.metadata (8.6 kB)
Collecting jedi>=0.16 (from ipython->itables)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading groq-0.22.0-py3-none-any.whl (126 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m126.7/126.7 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading itables-2.3.0-py3-none-any.whl (2.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m29.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m21.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jedi, itables, groq
Successfully installed groq-0.22.0 itables-2.3.0 jedi-0.19.2


In [2]:
# Step 2: Import Libraries
from groq import Groq
import pandas as pd
from itables import init_notebook_mode
from google.colab import userdata
import json
import re
from tqdm import tqdm
import itables

init_notebook_mode(all_interactive=True)

In [3]:
# Load a manageable sample (500 rows) for this lab
url = "https://data.cityofnewyork.us/resource/erm2-nwe9.csv?$limit=100"
df = pd.read_csv(url)
df

unique_key,created_date,closed_date,agency,agency_name,complaint_type,descriptor,location_type,incident_zip,incident_address,street_name,cross_street_1,cross_street_2,intersection_street_1,intersection_street_2,address_type,city,landmark,facility_type,status,due_date,resolution_description,resolution_action_updated_date,community_board,bbl,borough,x_coordinate_state_plane,y_coordinate_state_plane,open_data_channel_type,park_facility_name,park_borough,vehicle_type,taxi_company_borough,taxi_pick_up_location,bridge_highway_name,bridge_highway_direction,road_ramp,bridge_highway_segment,latitude,longitude,location
Loading ITables v2.3.0 from the internet... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [4]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 41 columns):
 #   Column                          Non-Null Count  Dtype  
---  ------                          --------------  -----  
 0   unique_key                      100 non-null    int64  
 1   created_date                    100 non-null    object 
 2   closed_date                     14 non-null     object 
 3   agency                          100 non-null    object 
 4   agency_name                     100 non-null    object 
 5   complaint_type                  100 non-null    object 
 6   descriptor                      99 non-null     object 
 7   location_type                   94 non-null     object 
 8   incident_zip                    100 non-null    int64  
 9   incident_address                98 non-null     object 
 10  street_name                     98 non-null     object 
 11  cross_street_1                  98 non-null     object 
 12  cross_street_2                  96 no

In [5]:
df

unique_key,created_date,closed_date,agency,agency_name,complaint_type,descriptor,location_type,incident_zip,incident_address,street_name,cross_street_1,cross_street_2,intersection_street_1,intersection_street_2,address_type,city,landmark,facility_type,status,due_date,resolution_description,resolution_action_updated_date,community_board,bbl,borough,x_coordinate_state_plane,y_coordinate_state_plane,open_data_channel_type,park_facility_name,park_borough,vehicle_type,taxi_company_borough,taxi_pick_up_location,bridge_highway_name,bridge_highway_direction,road_ramp,bridge_highway_segment,latitude,longitude,location
Loading ITables v2.3.0 from the internet... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [19]:
client = Groq(api_key=userdata.get("GROQ_API"))

In [42]:

def llm_complex_clean(record):
    """
    Cleans a single NYC 311 record using the Groq LLM.

    Args:
        record: A pandas Series representing a single record.

    Returns:
        A dictionary with the cleaned record, or None if cleaning fails.
    """
    from groq import Groq
    import json
    from google.colab import userdata


    prompt = f"""
    Given the following record:

    {json.dumps(record.to_dict())}

    Clean the record according to these instructions:
    1. **Address Missing Values:**
       - If city is missing, infer it as "New York" if incident_address, borough, or incident_zip suggests NYC (e.g., address contains "Brooklyn" or incident_zip is 11201). Otherwise, set to "Unknown".
       - If borough is missing, infer from incident_address or incident_zip if possible. Otherwise, set to "Unknown".
       - Use the following zip code ranges to infer borough (partial list):
         - Manhattan: 10001-10282
         - Brooklyn: 11201-11256
         - Queens: 11101-11697
         - Bronx: 10451-10475
         - Staten Island: 10301-10314
       - Leave missing complaint_type, descriptor, or closed_date as-is.
    2. **Standardize Text:**
       - Correct spelling in city (e.g., "Brookyln" → "Brooklyn").
       - Ensure the following text fields are in title case (e.g., "new york" → "New York", "GILES PLACE" → "Giles Place") and have no leading or trailing spaces:
         - city
         - borough
         - complaint_type
         - incident_address
         - cross_street_1
         - cross_street_2
         - intersection_street_1
         - intersection_street_2
         - landmark
         - community_board (standardize format to "District 08 Bronx", "District 10 Manhattan", etc.)
       - Standardize city to "New York" for variations like "NY", "NYC".
       - Ensure borough is one of "Manhattan", "Brooklyn", "Queens", "Bronx", "Staten Island", or "Unknown" in title case.
    3. **Validate and Format:**
       - Ensure incident_address follows the format "Street, Borough, NY" with borough in title case (e.g., "3435 GILES PLACE" → "3435 Giles Place, Bronx, NY") or null if invalid.
       - Convert created_date to YYYY-MM-DD (e.g., "2025-04-18T01:32:11.000" → "2025-04-18") or null if invalid.
       - Add a time field (HH:MM:SS) extracted from created_date (e.g., "01:32:11") or null if invalid.
       - If closed_date exists, convert to YYYY-MM-DD or null if invalid (do not add a separate time field for closed_date).
       - Set location to null (redundant with latitude/longitude).
    4. **Categorize:**
       - Add a complaint_category field with value "Noise" or "Non-Noise".
       - Classify as "Noise" if complaint_type or descriptor contains keywords like "noise", "loud", "music", "banging" in a disturbance context (e.g., "Loud Music/Party" is Noise, "Illegal Parking" is Non-Noise). Default to "Non-Noise" if ambiguous.
    5. **Output Fields:**
       - Return only the following fields in the cleaned record:
         - unique_key
         - created_date
         - time
         - incident_address
         - city
         - borough
         - complaint_type
         - descriptor
         - incident_zip
         - complaint_category
         - closed_date
         - cross_street_1
         - cross_street_2
         - intersection_street_1
         - intersection_street_2
         - landmark
         - community_board
       - Exclude all other fields (e.g., agency, latitude, longitude).

    **Output Instructions:**
    - Return ONLY the cleaned record as a valid JSON dictionary (use double quotes, null for None, true/false for booleans).
    - Do NOT include explanatory text, markdown, code blocks, or any additional content.
    - Example for input {{"unique_key": 64678942, "created_date": "2025-04-18T01:32:11.000", "incident_address": "441 east 57 street", "city": null, "borough": null, "complaint_type": "noise - residential", "descriptor": "loud music/party", "incident_zip": "10022"}}:
      {{"unique_key": 64678942, "created_date": "2025-04-18", "time": "01:32:11", "incident_address": "441 East 57 Street, Manhattan, NY", "city": "New York", "borough": "Manhattan", "complaint_type": "Noise - Residential", "descriptor": "Loud Music/Party", "incident_zip": "10022", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "1 Avenue", "cross_street_2": "Sutton Place", "intersection_street_1": "1 Avenue", "intersection_street_2": "Sutton Place", "landmark": "East 57 Street", "community_board": "District 06 Manhattan"}}
    """

    try:
        chat_completion = client.chat.completions.create(
            model="llama3-70b-8192",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        response_content = chat_completion.choices[0].message.content.strip()
        print(f"Raw Clean Response: {response_content}")  # Debug
        cleaned_record = extract_dict_from_response(response_content)
        if cleaned_record is None:
            print(f"Failed to parse LLM response for record: {record.to_dict()}")
            return None
        # Debug: Print key fields to verify cleaning
        print(f"Cleaned record index {record.name}: borough = {cleaned_record.get('borough')}, complaint_category = {cleaned_record.get('complaint_category')}, incident_address = {cleaned_record.get('incident_address')}, cross_street_1 = {cleaned_record.get('cross_street_1')}, landmark = {cleaned_record.get('landmark')}")
        return cleaned_record
    except Exception as e:
        print(f"API or parsing error for record index {record.name}: {e}")
        return None

In [43]:
def extract_dict_from_response(response_string):
    import re
    import json

    # Strip narrative text and common prefixes
    cleaned_string = re.sub(
        r'Here is the cleaned record according to the instructions:\n\n|```json\n|```',
        '',
        response_string,
        flags=re.MULTILINE
    ).strip()

    # Extract JSON object if it exists
    json_match = re.search(r'\{.*\}', cleaned_string, re.DOTALL)
    if not json_match:
        print(f"Response is not a JSON object: {cleaned_string}")
        return None

    json_string = json_match.group(0)
    # Normalize JSON string (replace single quotes, None, etc.)
    json_string = json_string.replace("'", '"').replace("None", "null").replace("True", "true").replace("False", "false")

    try:
        extracted_dict = json.loads(json_string)
        print(f"Successfully parsed JSON: {list(extracted_dict.keys())}")  # Debug
        return extracted_dict
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        print(f"Response string: {json_string}")
        return None

In [44]:
from tqdm import tqdm
import time

cleaned_records = []
sample_df = df.head(10)
for idx, row in tqdm(sample_df.iterrows(), total=len(sample_df)):
    try:
        cleaned_record = llm_complex_clean(row)
        if cleaned_record is not None:
            cleaned_records.append(cleaned_record)
        else:
            print(f"Skipping row {idx}: No valid record returned")
        time.sleep(1)  # Avoid rate limits
    except Exception as e:
        print(f"Error cleaning row {idx}: {e}")
cleaned_df = pd.DataFrame(cleaned_records)
print("Cleaned DataFrame:")
print(cleaned_df[['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'cross_street_1', 'landmark', 'community_board']])
print("\nMissing Values in Cleaned DataFrame:")
print(cleaned_df.isnull().sum())

  0%|          | 0/10 [00:00<?, ?it/s]

Raw Clean Response: Here is the cleaned record according to the instructions:

{"unique_key": 64677480, "created_date": "2025-04-18", "time": "01:35:22", "incident_address": "3435 Giles Place, Bronx, NY", "city": "New York", "borough": "Bronx", "complaint_type": "Illegal Parking", "descriptor": "Double Parked Blocking Vehicle", "incident_zip": "10463", "complaint_category": "Non-Noise", "closed_date": null, "cross_street_1": "Cannon Place", "cross_street_2": "Sedgwick Avenue", "intersection_street_1": "Cannon Place", "intersection_street_2": "Sedgwick Avenue", "landmark": "Giles Place", "community_board": "District 08 Bronx"}
Successfully parsed JSON: ['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'closed_date', 'cross_street_1', 'cross_street_2', 'intersection_street_1', 'intersection_street_2', 'landmark', 'community_board']
Cleaned record index 0: borough = Bronx, complaint_category 

 10%|█         | 1/10 [00:01<00:17,  1.90s/it]

Raw Clean Response: Here is the cleaned record according to the instructions:

{"unique_key": 64672937, "created_date": "2025-04-18", "time": "01:34:18", "incident_address": "219 Fordham Street, Bronx, NY", "city": "New York", "borough": "Bronx", "complaint_type": "Noise - Street/Sidewalk", "descriptor": "Loud Music/Party", "incident_zip": "10464", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "Fordham Place", "cross_street_2": "Fordham Place", "intersection_street_1": "Fordham Place", "intersection_street_2": "Fordham Place", "landmark": "Fordham Street", "community_board": "District 10 Bronx"}
Successfully parsed JSON: ['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'closed_date', 'cross_street_1', 'cross_street_2', 'intersection_street_1', 'intersection_street_2', 'landmark', 'community_board']
Cleaned record index 1: borough = Bronx, complaint_category = Noise

 20%|██        | 2/10 [00:03<00:14,  1.86s/it]

Raw Clean Response: Here is the cleaned record according to the instructions:

{"unique_key": 64679960, "created_date": "2025-04-18", "time": "01:33:57", "incident_address": "523 East 117 Street, Manhattan, NY", "city": "New York", "borough": "Manhattan", "complaint_type": "Noise - Street/Sidewalk", "descriptor": "Loud Music/Party", "incident_zip": "10035", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "Pleasant Avenue", "cross_street_2": "Unnamed Street", "intersection_street_1": "Pleasant Avenue", "intersection_street_2": "Unnamed Street", "landmark": "East 117 Street", "community_board": "District 11 Manhattan"}
Successfully parsed JSON: ['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'closed_date', 'cross_street_1', 'cross_street_2', 'intersection_street_1', 'intersection_street_2', 'landmark', 'community_board']
Cleaned record index 2: borough = Manhattan, co

 30%|███       | 3/10 [00:05<00:13,  1.87s/it]

Raw Clean Response: Here is the cleaned record according to the instructions:

{"unique_key": 64675055, "created_date": "2025-04-18", "time": "01:33:38", "incident_address": "370 East 31 Street, Brooklyn, NY", "city": "New York", "borough": "Brooklyn", "complaint_type": "Noise - Residential", "descriptor": "Banging/Pounding", "incident_zip": "11226", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "Clarendon Road", "cross_street_2": "Avenue D", "intersection_street_1": "Clarendon Road", "intersection_street_2": "Avenue D", "landmark": "East 31 Street", "community_board": "District 17 Brooklyn"}
Successfully parsed JSON: ['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'closed_date', 'cross_street_1', 'cross_street_2', 'intersection_street_1', 'intersection_street_2', 'landmark', 'community_board']
Cleaned record index 3: borough = Brooklyn, complaint_category = Noise

 40%|████      | 4/10 [00:07<00:11,  1.86s/it]

Raw Clean Response: Here is the cleaned record according to the instructions:

{"unique_key": 64680154, "created_date": "2025-04-18", "time": "01:33:25", "incident_address": "3505 Rochambeau Avenue, Bronx, NY", "city": "New York", "borough": "Bronx", "complaint_type": "Noise - Residential", "descriptor": "Loud Music/Party", "incident_zip": "10467", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "East Gun Hill Road", "cross_street_2": "East 212 Street", "intersection_street_1": "East Gun Hill Road", "intersection_street_2": "East 212 Street", "landmark": "Rochambeau Avenue", "community_board": "District 07 Bronx"}
Successfully parsed JSON: ['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'closed_date', 'cross_street_1', 'cross_street_2', 'intersection_street_1', 'intersection_street_2', 'landmark', 'community_board']
Cleaned record index 4: borough = Bronx, complaint

 50%|█████     | 5/10 [00:24<00:36,  7.31s/it]

Raw Clean Response: Here is the cleaned record according to the instructions:

{"unique_key": 64674005, "created_date": "2025-04-18", "time": "01:33:24", "incident_address": "1869 83 Street, Brooklyn, NY", "city": "New York", "borough": "Brooklyn", "complaint_type": "Noise - Residential", "descriptor": "Banging/Pounding", "incident_zip": "11214", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "18 Avenue", "cross_street_2": "19 Avenue", "intersection_street_1": "18 Avenue", "intersection_street_2": "19 Avenue", "landmark": "83 Street", "community_board": "District 11 Brooklyn"}
Successfully parsed JSON: ['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'closed_date', 'cross_street_1', 'cross_street_2', 'intersection_street_1', 'intersection_street_2', 'landmark', 'community_board']
Cleaned record index 5: borough = Brooklyn, complaint_category = Noise, incident_addres

 60%|██████    | 6/10 [00:42<00:43, 10.92s/it]

Raw Clean Response: Here is the cleaned record according to the instructions:

{"unique_key": 64681084, "created_date": "2025-04-18", "time": "01:33:22", "incident_address": "135 Elmira Loop, Brooklyn, NY", "city": "New York", "borough": "Brooklyn", "complaint_type": "Noise - Residential", "descriptor": "Loud Talking", "incident_zip": "11239", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "Schroeders Avenue", "cross_street_2": "Bend", "intersection_street_1": "Schroeders Avenue", "intersection_street_2": "Bend", "landmark": "Elmira Loop", "community_board": "District 05 Brooklyn"}
Successfully parsed JSON: ['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'closed_date', 'cross_street_1', 'cross_street_2', 'intersection_street_1', 'intersection_street_2', 'landmark', 'community_board']
Cleaned record index 6: borough = Brooklyn, complaint_category = Noise, incident_a

 70%|███████   | 7/10 [00:58<00:37, 12.57s/it]

Raw Clean Response: Here is the cleaned record according to the instructions:

{"unique_key": 64677685, "created_date": "2025-04-18", "time": "01:32:48", "incident_address": "219 Fordham Street, Bronx, NY", "city": "New York", "borough": "Bronx", "complaint_type": "Noise - Street/Sidewalk", "descriptor": "Loud Music/Party", "incident_zip": "10464", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "Fordham Place", "cross_street_2": "Fordham Place", "intersection_street_1": "Fordham Place", "intersection_street_2": "Fordham Place", "landmark": "Fordham Street", "community_board": "District 10 Bronx"}
Successfully parsed JSON: ['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'closed_date', 'cross_street_1', 'cross_street_2', 'intersection_street_1', 'intersection_street_2', 'landmark', 'community_board']
Cleaned record index 7: borough = Bronx, complaint_category = Noise

 80%|████████  | 8/10 [01:00<00:18,  9.18s/it]

Raw Clean Response: Here is the cleaned record according to the instructions:

{"unique_key": 64681329, "created_date": "2025-04-18", "time": "01:32:47", "incident_address": "671 Marcy Avenue, Brooklyn, NY", "city": "New York", "borough": "Brooklyn", "complaint_type": "Noise - Residential", "descriptor": "Loud Music/Party", "incident_zip": "11216", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "Dekalb Avenue", "cross_street_2": "Kosciuszko Street", "intersection_street_1": "Dekalb Avenue", "intersection_street_2": "Kosciuszko Street", "landmark": "Marcy Avenue", "community_board": "District 03 Brooklyn"}
Successfully parsed JSON: ['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'closed_date', 'cross_street_1', 'cross_street_2', 'intersection_street_1', 'intersection_street_2', 'landmark', 'community_board']
Cleaned record index 8: borough = Brooklyn, complaint_cate

 90%|█████████ | 9/10 [01:02<00:06,  6.91s/it]

Raw Clean Response: Here is the cleaned record according to the instructions:

{"unique_key": 64678942, "created_date": "2025-04-18", "time": "01:32:11", "incident_address": "441 East 57 Street, Manhattan, NY", "city": "New York", "borough": "Manhattan", "complaint_type": "Noise - Residential", "descriptor": "Loud Music/Party", "incident_zip": "10022", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "1 Avenue", "cross_street_2": "Sutton Place", "intersection_street_1": "1 Avenue", "intersection_street_2": "Sutton Place", "landmark": "East 57 Street", "community_board": "District 06 Manhattan"}
Successfully parsed JSON: ['unique_key', 'created_date', 'time', 'incident_address', 'city', 'borough', 'complaint_type', 'descriptor', 'incident_zip', 'complaint_category', 'closed_date', 'cross_street_1', 'cross_street_2', 'intersection_street_1', 'intersection_street_2', 'landmark', 'community_board']
Cleaned record index 9: borough = Manhattan, complaint_category = Noise

100%|██████████| 10/10 [01:04<00:00,  6.41s/it]

Cleaned DataFrame:
   unique_key created_date      time                    incident_address  \
0    64677480   2025-04-18  01:35:22         3435 Giles Place, Bronx, NY   
1    64672937   2025-04-18  01:34:18       219 Fordham Street, Bronx, NY   
2    64679960   2025-04-18  01:33:57  523 East 117 Street, Manhattan, NY   
3    64675055   2025-04-18  01:33:38    370 East 31 Street, Brooklyn, NY   
4    64680154   2025-04-18  01:33:25   3505 Rochambeau Avenue, Bronx, NY   
5    64674005   2025-04-18  01:33:24        1869 83 Street, Brooklyn, NY   
6    64681084   2025-04-18  01:33:22       135 Elmira Loop, Brooklyn, NY   
7    64677685   2025-04-18  01:32:48       219 Fordham Street, Bronx, NY   
8    64681329   2025-04-18  01:32:47      671 Marcy Avenue, Brooklyn, NY   
9    64678942   2025-04-18  01:32:11   441 East 57 Street, Manhattan, NY   

       city    borough           complaint_type  \
0  New York      Bronx          Illegal Parking   
1  New York      Bronx  Noise - Street/Sid




Data Validation

In [71]:
def generate_complex_validation_tests(record):
    """
    Generates Python unit tests to validate a cleaned NYC 311 record based on provided validation rules.

    Args:
        record: A dictionary representing a single cleaned record.

    Returns:
        A string containing Python assert statements.
    """
    import re
    import json
    from groq import Groq
    from google.colab import userdata



    prompt = f"""
    Generate Python code with `assert` statements to validate a cleaned NYC 311 complaints record:

    {json.dumps(record)}

    The tests must verify:
    - unique_key: Must be an integer, non-negative, not None.
    - created_date: Must be a string, non-empty, in YYYY-MM-DD format (e.g., "2025-04-18"), not None.
    - time: Must be a string in HH:MM:SS format (e.g., "01:32:11") or None.
    - incident_address: If present, non-empty, and a string, must end with ", NY" and include a valid borough (Manhattan, Brooklyn, Queens, Bronx, Staten Island, Unknown) before ", NY".
    - city: Must be a string in title case (e.g., "New York") or None.
    - borough: Must be a string in title case (e.g., "Bronx"), one of Manhattan, Brooklyn, Queens, Bronx, Staten Island, Unknown, not None.
    - complaint_type: Must be a string in title case (e.g., "Noise - Residential"), not None.
    - descriptor: Must be a string or None.
    - incident_zip: If present and not None, must be a 5-digit string (e.g., "10022") and within 10000-11697 when converted to integer.
    - complaint_category: If present, must be "Noise" or "Non-Noise".
    - closed_date: If present and not None, must be a string in YYYY-MM-DD format (e.g., "2025-04-18").
    - cross_street_1, cross_street_2, intersection_street_1, intersection_street_2, landmark, community_board: If present, must be strings in title case or None.

    Skip validation for fields not present in the record (e.g., agency, status, latitude, longitude).

    **Output Instructions**:
    - Output ONLY valid Python code starting with a single `import re` followed by a newline.
    - Each `assert` statement MUST include an error message (e.g., `assert condition, "error message"`).
    - Use separate `assert` statements for each condition (e.g., type, range, not None) to improve debuggability.
    - Do NOT include duplicate code blocks, explanatory text, markdown, code block markers (e.g., ```python), comments, or multiple `import re` statements.
    - Ensure the code is concise, executable, and validates the record exactly once.
    - Place each `assert` statement on a new line.
    - Generate a single block of code with one `import re` at the start.

    Example:
    import re
    assert isinstance(record["unique_key"], int), "unique_key must be an integer"
    assert record["unique_key"] >= 0, "unique_key must be non-negative"
    assert record["unique_key"] is not None, "unique_key cannot be None"
    assert isinstance(record["created_date"], str), "created_date must be a string"
    assert record["created_date"] != "", "created_date cannot be empty"
    assert re.match(r"^\d{{4}}-\d{{2}}-\d{{2}}$", record["created_date"]), "created_date must be YYYY-MM-DD"
    assert record["created_date"] is not None, "created_date cannot be None"
    assert record["time"] is None or re.match(r"^\d{{2}}:\d{{2}}:\d{{2}}$", record["time"]), "time must be HH:MM:SS or None"
    assert record["incident_address"] is None or (isinstance(record["incident_address"], str) and record["incident_address"] != "" and any(record["incident_address"].endswith(f", {{b}}, NY") for b in ["Manhattan", "Brooklyn", "Queens", "Bronx", "Staten Island", "Unknown"])), "incident_address must end with , Borough, NY"
    assert record["city"] is None or (isinstance(record["city"], str) and record["city"].title() == record["city"]), "city must be in title case or None"
    assert isinstance(record["borough"], str), "borough must be a string"
    assert record["borough"] in ["Manhattan", "Brooklyn", "Queens", "Bronx", "Staten Island", "Unknown"], "borough must be a valid NYC borough"
    assert record["borough"].title() == record["borough"], "borough must be in title case"
    assert record["borough"] is not None, "borough cannot be None"
    assert isinstance(record["complaint_type"], str), "complaint_type must be a string"
    assert record["complaint_type"].title() == record["complaint_type"], "complaint_type must be in title case"
    assert record["complaint_type"] is not None, "complaint_type cannot be None"
    assert record["descriptor"] is None or isinstance(record["descriptor"], str), "descriptor must be a string or None"
    assert record["incident_zip"] is None or (re.match(r"^\d{{5}}$", str(record["incident_zip"])) and 10000 <= int(record["incident_zip"]) <= 11697), "incident_zip must be a 5-digit string in range 10000-11697 or None"
    assert "complaint_category" not in record or record["complaint_category"] in ["Noise", "Non-Noise"], "complaint_category must be Noise or Non-Noise"
    assert record["closed_date"] is None or (isinstance(record["closed_date"], str) and re.match(r"^\d{{4}}-\d{{2}}-\d{{2}}$", record["closed_date"])), "closed_date must be YYYY-MM-DD or None"
    assert record["cross_street_1"] is None or (isinstance(record["cross_street_1"], str) and record["cross_street_1"].title() == record["cross_street_1"]), "cross_street_1 must be in title case or None"
    assert record["cross_street_2"] is None or (isinstance(record["cross_street_2"], str) and record["cross_street_2"].title() == record["cross_street_2"]), "cross_street_2 must be in title case or None"
    assert record["intersection_street_1"] is None or (isinstance(record["intersection_street_1"], str) and record["intersection_street_1"].title() == record["intersection_street_1"]), "intersection_street_1 must be in title case or None"
    assert record["intersection_street_2"] is None or (isinstance(record["intersection_street_2"], str) and record["intersection_street_2"].title() == record["intersection_street_2"]), "intersection_street_2 must be in title case or None"
    assert record["landmark"] is None or (isinstance(record["landmark"], str) and record["landmark"].title() == record["landmark"]), "landmark must be in title case or None"
    assert record["community_board"] is None or (isinstance(record["community_board"], str) and record["community_board"].title() == record["community_board"]), "community_board must be in title case or None"
    """

    try:
        chat_completion = client.chat.completions.create(
            model="llama3-70b-8192",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        response = chat_completion.choices[0].message.content.strip()
        # Print raw response for debugging
        print(f"Raw LLM Response: {response}")
        # Extract code block between ```
        code_match = re.search(r'```(?:python)?\n(.*?)\n```', response, re.DOTALL)
        if code_match:
            code = code_match.group(1).strip()
            print(f"Extracted Code Block: {code}")
        else:
            # Fallback: Extract content starting with import re
            code_match = re.search(r'(import re\n.*)', response, re.DOTALL)
            if code_match:
                code = code_match.group(1).strip()
                print(f"Fallback Code Block: {code}")
            else:
                print("No valid code block found")
                return ""
        # Ensure single import re and no duplicates
        lines = code.split('\n')
        cleaned_lines = [lines[0]] if lines and lines[0].strip() == "import re" else ["import re"]
        cleaned_lines.extend(line for line in lines[1:] if line.strip() and not line.strip() == "import re")
        final_code = "\n".join(cleaned_lines)
        # Add unique marker for tracing
        final_code = f"# ValidationCodeMarker\n{final_code}"
        if not final_code.startswith("# ValidationCodeMarker\nimport re\n"):
            print(f"Invalid response format (not Python code): {final_code}")
            return ""
        # Debug: Print length of final_code
        print(f"Final Code Length: {len(final_code)} lines")
        print(f"Generated Test Code: {final_code}")
        return final_code
    except Exception as e:
        print(f"Error generating tests: {e}")
        return ""

In [72]:
sample_record = cleaned_df.iloc[0].to_dict()
test_code = generate_complex_validation_tests(sample_record)
print(test_code)


Raw LLM Response: Here is the Python code with `assert` statements to validate the cleaned NYC 311 complaints record:
```
import re

assert isinstance(record["unique_key"], int), "unique_key must be an integer"
assert record["unique_key"] >= 0, "unique_key must be non-negative"
assert record["unique_key"] is not None, "unique_key cannot be None"

assert isinstance(record["created_date"], str), "created_date must be a string"
assert record["created_date"] != "", "created_date cannot be empty"
assert re.match(r"^\d{4}-\d{2}-\d{2}$", record["created_date"]), "created_date must be YYYY-MM-DD"
assert record["created_date"] is not None, "created_date cannot be None"

assert record["time"] is None or re.match(r"^\d{2}:\d{2}:\d{2}$", record["time"]), "time must be HH:MM:SS or None"

assert record["incident_address"] is None or (isinstance(record["incident_address"], str) and record["incident_address"] != "" and any(record["incident_address"].endswith(f", {b}, NY") for b in ["Manhattan", "Brook

In [None]:
# Evaluate tests programmatically (OPTIONAL)
exec(test_code)

In [74]:
def write_nyc_311_prompts():
    """
    Writes the NYC 311 cleaning and validation prompts to a text file.
    """
    prompts_content = """# Prompt for llm_complex_clean
Given the following record:

{record}

Clean the record according to these instructions:
1. **Address Missing Values:**
   - If city is missing, infer it as "New York" if incident_address, borough, or incident_zip suggests NYC (e.g., address contains "Brooklyn" or incident_zip is 11201). Otherwise, set to "Unknown".
   - If borough is missing, infer from incident_address or incident_zip if possible. Otherwise, set to "Unknown".
   - Use the following zip code ranges to infer borough (partial list):
     - Manhattan: 10001-10282
     - Brooklyn: 11201-11256
     - Queens: 11101-11697
     - Bronx: 10451-10475
     - Staten Island: 10301-10314
   - Leave missing complaint_type, descriptor, or closed_date as-is.
2. **Standardize Text:**
   - Correct spelling in city (e.g., "Brookyln" → "Brooklyn").
   - Ensure the following text fields are in title case (e.g., "new york" → "New York", "GILES PLACE" → "Giles Place") and have no leading or trailing spaces:
     - city
     - borough
     - complaint_type
     - incident_address
     - cross_street_1
     - cross_street_2
     - intersection_street_1
     - intersection_street_2
     - landmark
     - community_board (standardize format to "District 08 Bronx", "District 10 Manhattan", etc.)
   - Standardize city to "New York" for variations like "NY", "NYC".
   - Ensure borough is one of "Manhattan", "Brooklyn", "Queens", "Bronx", "Staten Island", or "Unknown" in title case.
3. **Validate and Format:**
   - Ensure incident_address follows the format "Street, Borough, NY" with borough in title case (e.g., "3435 GILES PLACE" → "3435 Giles Place, Bronx, NY") or null if invalid.
   - Convert created_date to YYYY-MM-DD (e.g., "2025-04-18T01:32:11.000" → "2025-04-18") or null if invalid.
   - Add a time field (HH:MM:SS) extracted from created_date (e.g., "01:32:11") or null if invalid.
   - If closed_date exists, convert to YYYY-MM-DD or null if invalid (do not add a separate time field for closed_date).
   - Set location to null (redundant with latitude/longitude).
4. **Categorize:**
   - Add a complaint_category field with value "Noise" or "Non-Noise".
   - Classify as "Noise" if complaint_type or descriptor contains keywords like "noise", "loud", "music", "banging" in a disturbance context (e.g., "Loud Music/Party" is Noise, "Illegal Parking" is Non-Noise). Default to "Non-Noise" if ambiguous.
5. **Output Fields:**
   - Return only the following fields in the cleaned record:
     - unique_key
     - created_date
     - time
     - incident_address
     - city
     - borough
     - complaint_type
     - descriptor
     - incident_zip
     - complaint_category
     - closed_date
     - cross_street_1
     - cross_street_2
     - intersection_street_1
     - intersection_street_2
     - landmark
     - community_board
   - Exclude all other fields (e.g., agency, latitude, longitude).

**Output Instructions:**
- Return ONLY the cleaned record as a valid JSON dictionary (use double quotes, null for None, true/false for booleans).
- Do NOT include explanatory text, markdown, code blocks, or any additional content.
- Example for input {"unique_key": 64678942, "created_date": "2025-04-18T01:32:11.000", "incident_address": "441 east 57 street", "city": null, "borough": null, "complaint_type": "noise - residential", "descriptor": "loud music/party", "incident_zip": "10022"}:
  {"unique_key": 64678942, "created_date": "2025-04-18", "time": "01:32:11", "incident_address": "441 East 57 Street, Manhattan, NY", "city": "New York", "borough": "Manhattan", "complaint_type": "Noise - Residential", "descriptor": "Loud Music/Party", "incident_zip": "10022", "complaint_category": "Noise", "closed_date": null, "cross_street_1": "1 Avenue", "cross_street_2": "Sutton Place", "intersection_street_1": "1 Avenue", "intersection_street_2": "Sutton Place", "landmark": "East 57 Street", "community_board": "District 06 Manhattan"}

# Prompt for generate_complex_validation_tests
Generate Python code with `assert` statements to validate a cleaned NYC 311 complaints record:

{record}

The tests must verify:
- unique_key: Must be an integer, non-negative, not None.
- created_date: Must be a string, non-empty, in YYYY-MM-DD format (e.g., "2025-04-18"), not None.
- time: Must be a string in HH:MM:SS format (e.g., "01:32:11") or None.
- incident_address: If present, non-empty, and a string, must end with ", NY" and include a valid borough (Manhattan, Brooklyn, Queens, Bronx, Staten Island, Unknown) before ", NY".
- city: Must be a string in title case (e.g., "New York") or None.
- borough: Must be a string in title case (e.g., "Bronx"), one of Manhattan, Brooklyn, Queens, Bronx, Staten Island, Unknown, not None.
- complaint_type: Must be a string in title case (e.g., "Noise - Residential"), not None.
- descriptor: Must be a string or None.
- incident_zip: If present and not None, must be a 5-digit string (e.g., "10022") and within 10000-11697 when converted to integer.
- complaint_category: If present, must be "Noise" or "Non-Noise".
- closed_date: If present and not None, must be a string in YYYY-MM-DD format (e.g., "2025-04-18").
- cross_street_1, cross_street_2, intersection_street_1, intersection_street_2, landmark, community_board: If present, must be strings in title case or None.

Skip validation for fields not present in the record (e.g., agency, status, latitude, longitude).

**Output Instructions**:
- Output ONLY valid Python code starting with a single `import re` followed by a newline.
- Each `assert` statement MUST include an error message (e.g., `assert condition, "error message"`).
- Use separate `assert` statements for each condition (e.g., type, range, not None) to improve debuggability.
- Do NOT include duplicate code blocks, explanatory text, markdown, code block markers (e.g., ```python), comments, or multiple `import re` statements.
- Ensure the code is concise, executable, and validates the record exactly once.
- Place each `assert` statement on a new line.
- Generate a single block of code with one `import re` at the start.

Example:
import re
assert isinstance(record["unique_key"], int), "unique_key must be an integer"
assert record["unique_key"] >= 0, "unique_key must be non-negative"
assert record["unique_key"] is not None, "unique_key cannot be None"
assert isinstance(record["created_date"], str), "created_date must be a string"
assert record["created_date"] != "", "created_date cannot be empty"
assert re.match(r"^\d{4}-\d{2}-\d{2}$", record["created_date"]), "created_date must be YYYY-MM-DD"
assert record["created_date"] is not None, "created_date cannot be None"
assert record["time"] is None or re.match(r"^\d{2}:\d{2}:\d{2}$", record["time"]), "time must be HH:MM:SS or None"
assert record["incident_address"] is None or (isinstance(record["incident_address"], str) and record["incident_address"] != "" and any(record["incident_address"].endswith(f", {b}, NY") for b in ["Manhattan", "Brooklyn", "Queens", "Bronx", "Staten Island", "Unknown"])), "incident_address must end with , Borough, NY"
assert record["city"] is None or (isinstance(record["city"], str) and record["city"].title() == record["city"]), "city must be in title case or None"
assert isinstance(record["borough"], str), "borough must be a string"
assert record["borough"] in ["Manhattan", "Brooklyn", "Queens", "Bronx", "Staten Island", "Unknown"], "borough must be a valid NYC borough"
assert record["borough"].title() == record["borough"], "borough must be in title case"
assert record["borough"] is not None, "borough cannot be None"
assert isinstance(record["complaint_type"], str), "complaint_type must be a string"
assert record["complaint_type"].title() == record["complaint_type"], "complaint_type must be in title case"
assert record["complaint_type"] is not None, "complaint_type cannot be None"
assert record["descriptor"] is None or isinstance(record["descriptor"], str), "descriptor must be a string or None"
assert record["incident_zip"] is None or (re.match(r"^\d{5}$", str(record["incident_zip"])) and 10000 <= int(record["incident_zip"]) <= 11697), "incident_zip must be a 5-digit string in range 10000-11697 or None"
assert "complaint_category" not in record or record["complaint_category"] in ["Noise", "Non-Noise"], "complaint_category must be Noise or Non-Noise"
assert record["closed_date"] is None or (isinstance(record["closed_date"], str) and re.match(r"^\d{4}-\d{2}-\d{2}$", record["closed_date"])), "closed_date must be YYYY-MM-DD or None"
assert record["cross_street_1"] is None or (isinstance(record["cross_street_1"], str) and record["cross_street_1"].title() == record["cross_street_1"]), "cross_street_1 must be in title case or None"
assert record["cross_street_2"] is None or (isinstance(record["cross_street_2"], str) and record["cross_street_2"].title() == record["cross_street_2"]), "cross_street_2 must be in title case or None"
assert record["intersection_street_1"] is None or (isinstance(record["intersection_street_1"], str) and record["intersection_street_1"].title() == record["intersection_street_1"]), "intersection_street_1 must be in title case or None"
assert record["intersection_street_2"] is None or (isinstance(record["intersection_street_2"], str) and record["intersection_street_2"].title() == record["intersection_street_2"]), "intersection_street_2 must be in title case or None"
assert record["landmark"] is None or (isinstance(record["landmark"], str) and record["landmark"].title() == record["landmark"]), "landmark must be in title case or None"
assert record["community_board"] is None or (isinstance(record["community_board"], str) and record["community_board"].title() == record["community_board"]), "community_board must be in title case or None"
"""

    try:
        with open("nyc_311_prompts.txt", "w", encoding="utf-8") as f:
            f.write(prompts_content)
        print("Successfully wrote prompts to nyc_311_prompts.txt")
    except Exception as e:
        print(f"Error writing to file: {e}")

if __name__ == "__main__":
    write_nyc_311_prompts()

Successfully wrote prompts to nyc_311_prompts.txt
