In [1]:
!pip install fastapi uvicorn nest_asyncio pyngrok langchain langchain-openai pydantic > /dev/null


In [None]:
from pyngrok import ngrok

ngrok.set_auth_token("34Wm2vFpbiPv55mRQmj5oCrpy3w_3jrm47qTQyXWgQZg5rtz7")

# Set up ngrok tunnel to expose FastAPI app
public_url = ngrok.connect(8000)
print("Public URL:", public_url)


Public URL: NgrokTunnel: "https://valarie-nongrooming-ilse.ngrok-free.dev" -> "http://localhost:8000"


In [None]:
import os
import gzip, json
os.environ["OPENAI_API_KEY"] = "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"

In [20]:
# IMPORTS AND SETUP
import re, asyncio, os, gzip, json, time
from typing import List, Dict
from tqdm import tqdm
from fastapi import FastAPI, Body
from pydantic import BaseModel
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import StructuredOutputParser, ResponseSchema
from langchain_openai import ChatOpenAI
from fastapi.middleware.cors import CORSMiddleware

# CREATE APP + ADD MIDDLEWARE
app = FastAPI(
    title="ASAPP Unstructured Data Extractor",
    description="Extract emails, phone numbers, and order IDs using Regex and LLMs",
    version="1.0.0"
)

# Add CORS middleware here (not later)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# CORE CLASSES AND FUNCTIONS

# --- Regex extraction ---
def regex_extract_fields(text: str) -> Dict[str, List[str]]:
    email_match = re.findall(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", text)
    phone_match = re.findall(r"\+?\d[\d\s-]{8,}\d", text)
    order_match = re.findall(r"order[\s#:]*\d+", text, re.IGNORECASE)
    return {
        "emails": list(set(email_match)),
        "phone_numbers": list(set(phone_match)),
        "order_ids": list(set(order_match))
    }

# --- LLM extractor ---
class AsyncLLMExtractor:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
        schema = [
            ResponseSchema(name="emails", description="List of email addresses"),
            ResponseSchema(name="phone_numbers", description="List of phone numbers"),
            ResponseSchema(name="order_ids", description="List of order IDs"),
        ]
        self.parser = StructuredOutputParser.from_response_schemas(schema)
        self.instructions = self.parser.get_format_instructions()
        self.prompt = ChatPromptTemplate.from_template("""
        You are an intelligent assistant that extracts structured data from customer service conversations.
        Identify all email addresses, phone numbers, and order IDs from the text below.
        Return valid JSON with keys: "emails", "phone_numbers", and "order_ids".

        Conversation:
        {conversation}

        {format_instructions}
        """)

    async def extract_async(self, text: str):
        try:
            prompt_input = self.prompt.format_prompt(
                conversation=text,
                format_instructions=self.instructions
            )
            output = await self.llm.agenerate([prompt_input.to_messages()])
            content = output.generations[0][0].text
            return self.parser.parse(content)
        except Exception as e:
            return {"emails": [], "phone_numbers": [], "order_ids": [], "error": str(e)}

extractor = AsyncLLMExtractor()

# ROUTES (DEFINE BEFORE RUNNING SERVER)
#
@app.get("/")
async def root():
    return {"message": "Welcome to the ASAPP Field Extraction API ðŸš€"}

@app.get("/extract_from_dataset")
async def extract_from_dataset(n: int = 10):
    """
    Run regex+LLM extraction on first N conversations in the dataset.
    Example: GET /extract_from_dataset?n=10
    """
    subset = all_data[:n]
    results = await process_conversations_async(subset, extractor, batch_size=5)
    return {"count": len(results), "results": results}

# DATASET LOADING + ASYNC PROCESSING
def load_abcd_dataset():
    with gzip.open("abcd_v1.1.json.gz", "rt", encoding="utf-8") as f:
        data_full = json.load(f)
    all_data = data_full["train"] + data_full["dev"] + data_full["test"]
    print(f"Loaded full dataset: {len(all_data)} conversations.")
    return all_data

all_data = load_abcd_dataset()

def convo_to_text(convo):
    return "\n".join([f"{m[0].capitalize()}: {m[1]}" for m in convo.get("original", [])])

async def process_conversations_async(conversations, extractor, batch_size=5):
    results = []
    start_time = time.time()
    for i in tqdm(range(0, len(conversations), batch_size)):
        batch = conversations[i:i+batch_size]
        tasks = []
        for convo in batch:
            convo_text = convo_to_text(convo)
            async def process_one(text):
                regex_res = regex_extract_fields(text)
                llm_res = await extractor.extract_async(text)
                return {k: list(set(regex_res.get(k, []) + llm_res.get(k, []))) for k in ["emails", "phone_numbers", "order_ids"]}
            tasks.append(process_one(convo_text))
        batch_results = await asyncio.gather(*tasks)
        results.extend(batch_results)
    print(f"âš¡ Processed {len(conversations)} conversations")
    return results

# RUN FASTAPI SERVER
import nest_asyncio
from pyngrok import ngrok
from uvicorn import Config, Server

nest_asyncio.apply()

public_url = ngrok.connect(8000)
print("FastAPI running at:", public_url)

config = Config(app=app, host="0.0.0.0", port=8000, log_level="info")
server = Server(config)
await server.serve()


Loaded full dataset: 10042 conversations.
FastAPI running at: NgrokTunnel: "https://valarie-nongrooming-ilse.ngrok-free.dev" -> "http://localhost:8000"


INFO:     Started server process [409]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


INFO:     2601:405:4401:2db0:7892:b8e:504b:a470:0 - "GET /docs HTTP/1.1" 200 OK
INFO:     2601:405:4401:2db0:7892:b8e:504b:a470:0 - "GET /openapi.json HTTP/1.1" 200 OK


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 2/2 [00:05<00:00,  2.68s/it]

âš¡ Processed 10 conversations
INFO:     2601:405:4401:2db0:7892:b8e:504b:a470:0 - "GET /extract_from_dataset?n=10 HTTP/1.1" 200 OK



100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 2/2 [00:04<00:00,  2.18s/it]

âš¡ Processed 10 conversations
INFO:     2601:405:4401:2db0:7892:b8e:504b:a470:0 - "GET /extract_from_dataset?n=10 HTTP/1.1" 200 OK



INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [409]
