# 1. Batch Preprocessing with LLM Function Calling

Batch Preprocessing step takes advantage of LLM’s language understanding to turn unstructured reviews into clean and structured texts.

Instead of manual efforts to parse and chunk text, prompt the LLM to break down each review into multiple key intents. And for each intent, summarize text to reduce redundancy, extract named entities, and detect sentiment. The results are high-quality input data that still capture all important information (like specific names mentioned in a review).
Function calling ensures LLM outputs adhere to a consistent format. Compared to a JSON parser or few shot prompting, it is easier to scale and more robust for large datasets.

**This notebook will go through:**
1. Batch Preprocessing with LLM Function Calling
    - Load Data
    - Define LLM Prompt and Tool with example output

2. Batch Inference with Tool Calls
    - Batch Inference Functions
    - Parse LLM Output
3. Save Results
    - Write table to UC

**Blog Post with Detailed Instructions:** 
https://cindycwu.medium.com/analyze-customer-reviews-with-llm-enhanced-topic-modeling-2db7b8d98917


In [0]:
%%capture
%pip install openai tenacity tqdm
dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
CATALOG = "cindy_demo_catalog"
SCHEMA = "airline_bookings"
REVIEWS_TABLE = "airline_scrapped_review"
INTENTS_TABLE = "raw_intents_1000_function"

In [0]:
DATABRICKS_TOKEN = (
    dbutils.notebook.entry_point.getDbutils()
    .notebook()
    .getContext()
    .apiToken()
    .getOrElse(None)
)
DATABRICKS_BASE_URL = (
    f'https://{spark.conf.get("spark.databricks.workspaceUrl")}/serving-endpoints'
)
MODEL_ENDPOINT_ID="databricks-meta-llama-3-1-70b-instruct"


## Load Data
Using 1000 random reviews for demo purposes

In [0]:
sdf = spark.table(f"{CATALOG}.{SCHEMA}.{REVIEWS_TABLE}")#.limit(1000)
pdf = sdf.toPandas()

In [0]:
sample_pdf = pdf.sample(n=1000).reset_index()
all_reviews = sample_pdf['Review'].tolist()

## Define LLM Prompt and Tool

In [0]:
tools = [
  {
    "type": "function",
    "function": {
        "name": "extracts_intents",
        "parameters": {
          "type": "object",
          "properties": {
            "intents": {
              "type": "array",
              "description": "List of intents identified from the customer review",
              "items": {
                "type": "object",
                "properties": {
                  "intent": {
                    "type": "string",
                    "description": "Description of the identified intent"
                  },
                  "text_summary": {
                    "type": "string",
                    "description": "Summary of the intent"
                  },
                  "sentiment": {
                    "type": "string",
                    "enum": ["Positive", "Negative", "Neutral"],
                    "description": "Sentiment of the intent"
                  },
                  "named_entities": {
                    "type": "array",
                    "items": {
                      "type": "string",
                      "description": "Named entities in the text, if any, like 'Chicago' or 'XYZ Airlines'"
                    }
                  }
                },
                "required": ["intent", "text_summary", "sentiment"]
              }
            }
          }
        }
    }
  }
]


In [0]:
example_output = {
  "intents": [
    {
      "intent": "Check-in experience",
      "text_summary": "The check-in process was smooth.",
      "sentiment": "Positive",
      "named_entities": ["XYZ Airlines"]
    },
    {
      "intent": "Seating comfort",
      "text_summary": "The seating was cramped.",
      "sentiment": "Negative",
      "named_entities": []
    },
    {
      "intent": "Food quality",
      "text_summary": "The food quality was below average.",
      "sentiment": "Negative",
      "named_entities": []
    },
    {
      "intent": "Flight attendant service",
      "text_summary": "The flight attendants were very polite and helpful.",
      "sentiment": "Positive",
      "named_entities": []
    },
    {
      "intent": "Baggage issue",
      "text_summary": "I had an issue with my baggage, but it was quickly resolved.",
      "sentiment": "Neutral",
      "named_entities": []
    }
  ]
}
example_output

{'intents': [{'intent': 'Check-in experience',
   'text_summary': 'The check-in process was smooth.',
   'sentiment': 'Positive',
   'named_entities': ['XYZ Airlines']},
  {'intent': 'Seating comfort',
   'text_summary': 'The seating was cramped.',
   'sentiment': 'Negative',
   'named_entities': []},
  {'intent': 'Food quality',
   'text_summary': 'The food quality was below average.',
   'sentiment': 'Negative',
   'named_entities': []},
  {'intent': 'Flight attendant service',
   'text_summary': 'The flight attendants were very polite and helpful.',
   'sentiment': 'Positive',
   'named_entities': []},
  {'intent': 'Baggage issue',
   'text_summary': 'I had an issue with my baggage, but it was quickly resolved.',
   'sentiment': 'Neutral',
   'named_entities': []}]}

In [0]:
prompt_template = f"""
Follow instructions below and extract intents from a customer review as a json string. DO NOT include any notes or additional information in the output.

### Instructions:
- **Identify each distinct intent** in the review as "intent". The review may contain multiple distinct intents related to different aspects of the customer's experience (e.g., service, seating, food, check-in, baggage handling). 
- **Summarize the text** associated with each intent as "text_summary".
- **Classify the sentiment** (Positive, Negative, or Neutral) of each intent as "sentiment".
- **If applicable, extract any "named entities"**, such as the airline name or specific service mentioned.
- **Return a list of intents a JSON string.** Follow the output format and use example ouput below as a reference. Make sure the JSON string is COMPLETE. Do not include additional information.

### Output Format
{{format_instructions}}

### Example Review:
"I flew with XYZ Airlines for a 6-hour flight. The check-in process was smooth, but the seating was cramped, and the food quality was below average. The flight attendants were very polite and helpful. I had an issue with my baggage, but it was quickly resolved."

### Example Output (JSON format):
{{example_output}}

### Review to analyze:
{{review}}

"""
format_instructions = {
  "intents": [
    {
      "intent": "<Intent description>",
      "text_summary": "<Summarized or specific text related to the intent>",
      "sentiment": "<Positive | Negative | Neutral>",
      "named_entities": ["<Named entities, if any>"]
    },
    {
      "intent": "<Intent description>",
      "text_summary": "<Summarized or specific text related to the intent>",
      "sentiment": "<Positive | Negative | Neutral>",
      "named_entities": ["<Named entities, if any>"]
    }
  ]
}

example_output = {
  "intents": [
    {
      "intent": "Check-in experience",
      "text_summary": "The check-in process was smooth.",
      "sentiment": "Positive",
      "named_entities": ["XYZ Airlines"]
    },
    {
      "intent": "Seating comfort",
      "text_summary": "The seating was cramped.",
      "sentiment": "Negative",
      "named_entities": []
    },
    {
      "intent": "Food quality",
      "text_summary": "The food quality was below average.",
      "sentiment": "Negative",
      "named_entities": []
    },
    {
      "intent": "Flight attendant service",
      "text_summary": "The flight attendants were very polite and helpful.",
      "sentiment": "Positive",
      "named_entities": []
    },
    {
      "intent": "Baggage issue",
      "text_summary": "I had an issue with my baggage, but it was quickly resolved.",
      "sentiment": "Neutral",
      "named_entities": []
    }
  ]
}
example_output


{'intents': [{'intent': 'Check-in experience',
   'text_summary': 'The check-in process was smooth.',
   'sentiment': 'Positive',
   'named_entities': ['XYZ Airlines']},
  {'intent': 'Seating comfort',
   'text_summary': 'The seating was cramped.',
   'sentiment': 'Negative',
   'named_entities': []},
  {'intent': 'Food quality',
   'text_summary': 'The food quality was below average.',
   'sentiment': 'Negative',
   'named_entities': []},
  {'intent': 'Flight attendant service',
   'text_summary': 'The flight attendants were very polite and helpful.',
   'sentiment': 'Positive',
   'named_entities': []},
  {'intent': 'Baggage issue',
   'text_summary': 'I had an issue with my baggage, but it was quickly resolved.',
   'sentiment': 'Neutral',
   'named_entities': []}]}

In [0]:
from langchain.prompts import PromptTemplate
prompt = PromptTemplate(
    input_variables=["review"],
    template=prompt_template,
    partial_variables={
        "format_instructions": format_instructions,,
         "example_output":example_output}
)


# get_request(prompt=prompt,review="I flew with XYZ Airlines for a 6-hour flight.")

## Batch Inference with Tool Calling
Documentation Example Code: https://docs.databricks.com/en/machine-learning/model-serving/function-calling.html#notebook-example

In [0]:
 
import os
import json
import concurrent.futures
from openai import OpenAI, RateLimitError
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
    retry_if_exception,
)  # for exponential backoff
from tqdm.notebook import tqdm
from typing import List, Optional

client = OpenAI(
  api_key=DATABRICKS_TOKEN,
  base_url=DATABRICKS_BASE_URL
)

@retry(
    wait=wait_random_exponential(min=1, max=30),
    stop=stop_after_attempt(3),
    retry=retry_if_exception(RateLimitError),
)
def call_chat_model(
    prompt: str,review: str, temperature: float = 0.1, max_tokens: int = 500, **kwargs
):
    """Calls the chat model and returns the response text or tool calls."""
    chat_args = {
      "model": MODEL_ENDPOINT_ID,
    "messages": [
      {
        "role": "system",
        "content": 'You are a helpful analyst for a major airline. You help anayze customer reviews and extrarct insights.'
      },
      {
        "role": "user",
        "content": prompt.format(review=review)
      }
    ],
    "max_tokens": max_tokens,
    "temperature": temperature
  }

    chat_args.update(kwargs)

    chat_completion = client.chat.completions.create(**chat_args)

    response = chat_completion.choices[0].message
    if response.tool_calls:
        call_args = [c.function.arguments for c in response.tool_calls]
        if len(call_args) == 1:
            return call_args[0]
        return call_args
    return response.content
  
def call_in_parallel(func, prompts: List[str]) -> List:
    """Calls func(p) for all prompts in parallel and returns responses."""
    # This uses a relatively small thread pool to avoid triggering default workspace rate limits.
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        results = []
        for r in tqdm(executor.map(func, prompts), total=len(prompts)):
            results.append(r)
        return results


In [0]:
import pandas as pd
def extract_batch(inp: str):
    return call_chat_model(prompt=prompt,review=inp, tools=tools)
  
def results_to_dataframe(reviews: List[str], responses: List[str]):
    """Combines reviews and model responses into a dataframe for tabular display."""
    return pd.DataFrame({"Review": reviews, "Model response": responses})
  
results = call_in_parallel(extract_batch, all_reviews)
results_to_dataframe(all_reviews, results)

  0%|          | 0/1000 [00:00<?, ?it/s]

Unnamed: 0,Review,Model response
0,"The worst airline I have flown with, ground ...","{""intents"": [{""intent"": ""Ground staff service""..."
1,Kuala Lumpur to Perth. Best low cost airline...,"{""intents"": [{""intent"": ""Overall experience"", ..."
2,\n,"{""intents"": []}"
3,"Once on the plane, Cubana Airlines is no bette...","{""intents"": [{""intent"": ""In-flight experience""..."
4,Los Angeles to Ho Chi Minh City via Tokyo. T...,"{""intents"": [{""intent"": ""Overall experience"", ..."
...,...,...
995,Having traveled via American Eagle for years f...,"{""intents"": [{""intent"": ""Previous travel exper..."
996,I purchased additional carry-on option so th...,"{""intents"": [{""intent"": ""Carry-on luggage poli..."
997,Tehran Iran to Almaty Kazakhstan on a relative...,"{""intents"": [{""intent"": ""Flight experience"", ""..."
998,Okinawa to Seoul. I will never choose Asiana...,"{""intents"": [{""intent"": ""Baggage handling"", ""n..."


[Trace(request_id=tr-b72356d84cb241f287813cfceb217f5f), Trace(request_id=tr-39cf062c99b34af6a93378b8547b9cfc), Trace(request_id=tr-aea6069c856a46d99ad4c15123aea5d8), Trace(request_id=tr-a29f1707210f438b9d73a2265ca2dc9a), Trace(request_id=tr-95f91cbf3f8b40dbb7907cee013f520b), Trace(request_id=tr-90f06f0d92954cb3b88cd8dbe7ad84d3), Trace(request_id=tr-f2235fdd01ea4a8cabd75f2dadc4c0df), Trace(request_id=tr-2a07614dda544218bd9a90d07d5f1948), Trace(request_id=tr-268865e49c144c93bd9be2714b42c174), Trace(request_id=tr-63815d8483fb4638b614421d33c2ed2e)]

## Parse llm output, dropping invalid entries
- Review too long, incomplete json string responses (increase max_token, or chunk the review beforehand, or specify more in prompt)
- Review is empty or not valid (additional preprocessing could help)

In [0]:
parsed_data = []
for index, item in enumerate(results):
    try:
        parsed_data.append({"id": index, **json.loads(item)})  # Add an id column
    except json.JSONDecodeError:
        parsed_data.append({"id": index, "intents": []})  # Add an empty intents list for invalid JSON

# Create a pandas DataFrame from the parsed data
# Explode the 'intents' column to create one row per intent
exploded_df = pd.DataFrame(parsed_data).explode('intents')

# Filter out rows where 'intents' is not a dictionary (i.e., valid JSON object)
exploded_df = exploded_df[exploded_df['intents'].apply(lambda x: isinstance(x, dict))]

# Normalize the valid 'intents' column into separate columns
results_df = pd.json_normalize(exploded_df['intents'])
results_df['id'] = exploded_df['id'].values
results_df['llm_response'] = exploded_df['intents'].values


In [0]:

merged_df = pd.merge(sample_pdf, results_df, left_on= sample_pdf.index, right_on='id', how='right')

In [0]:
merged_df

Unnamed: 0,Airline Name,Overall_Rating,Review_Title,Review Date,Verified,Review,Aircraft,Type Of Traveller,Seat Type,Route,Date Flown,Seat Comfort,Cabin Staff Service,Food & Beverages,Ground Service,Inflight Entertainment,Wifi & Connectivity,Value For Money,Recommended,_rescued_data,intent,named_entities,sentiment,text_summary,id,llm_response
0,Condor Airlines,3,"""rude and miserable people""",15th September 2019,False,"The worst airline I have flown with, ground ...",,Solo Leisure,Economy Class,Frankfurt to Anchorage,September 2019,3.0,2.0,2.0,2.0,,,3.0,no,"{""_c0"":""7447"",""_file_path"":""dbfs:/Volumes/cind...",Ground staff service,"[Frankfurt, Condor]",Negative,The ground staff at Frankfurt were rude and ic...,0,"{'intent': 'Ground staff service', 'named_enti..."
1,Condor Airlines,3,"""rude and miserable people""",15th September 2019,False,"The worst airline I have flown with, ground ...",,Solo Leisure,Economy Class,Frankfurt to Anchorage,September 2019,3.0,2.0,2.0,2.0,,,3.0,no,"{""_c0"":""7447"",""_file_path"":""dbfs:/Volumes/cind...",Cabin crew service,[Condor],Negative,The cabin crew give the impression they do not...,0,"{'intent': 'Cabin crew service', 'named_entiti..."
2,Condor Airlines,3,"""rude and miserable people""",15th September 2019,False,"The worst airline I have flown with, ground ...",,Solo Leisure,Economy Class,Frankfurt to Anchorage,September 2019,3.0,2.0,2.0,2.0,,,3.0,no,"{""_c0"":""7447"",""_file_path"":""dbfs:/Volumes/cind...",Food quality,[],Negative,"Food was just about edible, pasta tubes with a...",0,"{'intent': 'Food quality', 'named_entities': [..."
3,Condor Airlines,3,"""rude and miserable people""",15th September 2019,False,"The worst airline I have flown with, ground ...",,Solo Leisure,Economy Class,Frankfurt to Anchorage,September 2019,3.0,2.0,2.0,2.0,,,3.0,no,"{""_c0"":""7447"",""_file_path"":""dbfs:/Volumes/cind...",In-flight amenities,[],Negative,No water for first two hours on return flight ...,0,"{'intent': 'In-flight amenities', 'named_entit..."
4,Condor Airlines,3,"""rude and miserable people""",15th September 2019,False,"The worst airline I have flown with, ground ...",,Solo Leisure,Economy Class,Frankfurt to Anchorage,September 2019,3.0,2.0,2.0,2.0,,,3.0,no,"{""_c0"":""7447"",""_file_path"":""dbfs:/Volumes/cind...",Entertainment options,[],Negative,All entertainment has to be paid for.,0,"{'intent': 'Entertainment options', 'named_ent..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4330,Karthago Airlines,1,Karthago Airlines customer review,8th October 2008,False,Monastir to Gatwick. The aircraft was an absol...,,,,,,,,,,,,,no,"{""_c0"":""12716"",""_file_path"":""dbfs:/Volumes/cin...",Crew service,[],Negative,The crew are not interested in customer service.,999,"{'intent': 'Crew service', 'named_entities': [..."
4331,Karthago Airlines,1,Karthago Airlines customer review,8th October 2008,False,Monastir to Gatwick. The aircraft was an absol...,,,,,,,,,,,,,no,"{""_c0"":""12716"",""_file_path"":""dbfs:/Volumes/cin...",Amenities,[],Negative,There was no milk for the coffee.,999,"{'intent': 'Amenities', 'named_entities': [], ..."
4332,Karthago Airlines,1,Karthago Airlines customer review,8th October 2008,False,Monastir to Gatwick. The aircraft was an absol...,,,,,,,,,,,,,no,"{""_c0"":""12716"",""_file_path"":""dbfs:/Volumes/cin...",Ground staff service,[Karthago],Negative,The ground staff who had no courtesy to keep u...,999,"{'intent': 'Ground staff service', 'named_enti..."
4333,Karthago Airlines,1,Karthago Airlines customer review,8th October 2008,False,Monastir to Gatwick. The aircraft was an absol...,,,,,,,,,,,,,no,"{""_c0"":""12716"",""_file_path"":""dbfs:/Volumes/cin...",Delay handling,[],Negative,No apology or explanation given.,999,"{'intent': 'Delay handling', 'named_entities':..."


## Save Results

In [0]:
output_df = merged_df[['id','Airline Name', 'Overall_Rating', 'Review_Title',
       'Review',  'llm_response', 'intent','text_summary',
       'sentiment', 'named_entities']].rename({'Airline Name':'Airline_Name'})

from pyspark.sql.types import ArrayType, IntegerType, DoubleType, StringType, StructField, StructType

# Define the schema for the Spark DataFrame
schema = StructType([
    StructField('id', IntegerType()),
    StructField('Airline_Name', StringType()),
    StructField('Overall_Rating', StringType()),
    StructField('Review_title', StringType()),
    StructField('Review', StringType()),
    StructField('llm_response', StringType()),
    StructField('intent', StringType()),
    StructField('text_summary', StringType()),
    StructField('sentiment', StringType()),
    StructField('named_entities', ArrayType(StringType()))
])

# Convert Pandas DataFrame to Spark DataFrame with the specified schema
output_sdf = spark.createDataFrame(output_df.dropna(), schema)


  Exception thrown when converting pandas.Series (object) with name 'llm_response' to Arrow Array (string).
Direct cause: Expected bytes, got a 'dict' object
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [0]:
output_sdf.write.mode('overwrite').format("delta").saveAsTable(f"{CATALOG}.{SCHEMA}.{INTENTS_TABLE}")