In [1]:
import json
from typing import List, Dict
import datetime
from pydantic import BaseModel, Field

from langchain_community.chat_models.ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser


## Data Loading

In [30]:


def fetch_pending_reviews_from_file(filepath: str, limit: int = 100) -> List[Dict]:
    pending_reviews = []
    try:
        print(f"Opening file '{filepath}' to find pending reviews...")
        with open(filepath, "r") as f:
            for line in f:
                if len(pending_reviews) >= limit:
                    break
                review = json.loads(line)

                if review.get("analysis_status") == "pending":
                    pending_reviews.append(review)
        return pending_reviews
    except FileNotFoundError:
        print("Data file not found")
        return []
    except Exception as e:
        print(f"An error occurred while reading the file: {e}")
        


In [31]:
filepath = "./data/sample_data.jsonl"
raw_data = fetch_pending_reviews_from_file(filepath)
raw_data

Opening file './data/sample_data.jsonl' to find pending reviews...


[{'rating': 5.0,
  'title': 'Absolutely fantastic! Exceeded my expectations.',
  'text': 'I was hesitant at first, but this product is brilliant. The build quality is solid, setup was a breeze, and it works perfectly. The packaging was also very premium. Highly recommended for anyone on the fence.',
  'images': [{'small_image_url': 'https://example.com/images/product_A_small.jpg',
    'medium_image_url': 'https://example.com/images/product_A_medium.jpg',
    'large_image_url': 'https://example.com/images/product_A_large.jpg'}],
  'asin': 'B07YQBEK13',
  'parent_asin': 'B07YQBEK13',
  'user_id': 'AG3D6O4STAQKAY2UVGEUV46KN35Q',
  'timestamp': 1678886400000,
  'verified_purchase': True,
  'helpful_vote': 22,
  'analysis_status': 'pending'},
 {'rating': 1.0,
  'title': 'Broke after two days, complete waste of money.',
  'text': "The device stopped charging after the second use. I tried different cables and power outlets, but it's completely dead. The support number just rings and no one pi

In [32]:
from datetime import datetime
import json

def select_and_process_review_robust(raw_review: dict) -> dict:
    """
    A more robust version that selects keys and handles missing
    or invalid timestamps gracefully.
    """
    try:
        text = raw_review.get('text', 'No text provided')
        helpful_vote = raw_review.get('helpful_vote', 0)
        
        readable_time = "N/A"  # A safe default value
        
        # --- Robust Timestamp Processing ---
        unix_timestamp_ms = raw_review.get('timestamp')
        
        # Check if timestamp exists and is a number (int or float)
        if isinstance(unix_timestamp_ms, (int, float)):
            unix_timestamp_s = unix_timestamp_ms / 1000
            dt_object = datetime.fromtimestamp(unix_timestamp_s)
            readable_time = dt_object.strftime("%Y-%m-%d %H:%M:%S")

        # --- Build the new, clean dictionary ---
        processed_data = {
            "text": text,
            "helpful_vote": helpful_vote,
            "review_time": readable_time
        }
        
        return processed_data

    except Exception as e:
        # This except block will now only catch unexpected errors
        print(f"An unexpected error occurred: {e}")
        return {}


# --- Example using your data ---
if __name__ == "__main__":
    list_of_raw_reviews = [
        {'rating': 5.0, 'title': 'Absolutely fantastic! Exceeded my expectations.', 'text': 'I was hesitant at first, but this product is brilliant. The build quality is solid, setup was a breeze, and it works perfectly. The packaging was also very premium. Highly recommended for anyone on the fence.', 'images': [{'small_image_url': 'https://example.com/images/product_A_small.jpg', 'medium_image_url': 'https://example.com/images/product_A_medium.jpg', 'large_image_url': 'https://example.com/images/product_A_large.jpg'}], 'asin': 'B07YQBEK13', 'parent_asin': 'B07YQBEK13', 'user_id': 'AG3D6O4STAQKAY2UVGEUV46KN35Q', 'timestamp': 1678886400000, 'verified_purchase': True, 'helpful_vote': 22, 'analysis_status': 'pending'},
        {'rating': 1.0, 'title': 'Broke after two days, complete waste of money.', 'text': "The device stopped charging after the second use. I tried different cables and power outlets, but it's completely dead. The support number just rings and no one picks up. I want a refund but the return process is a nightmare. Do not buy.", 'images': [], 'asin': 'B08Z1Y2X3W', 'parent_asin': 'B08Z1Y2X3W', 'user_id': 'AHV2EVOU42B6H5V2V4V4J4Q4Q4QQ', 'timestamp': 1679884800000, 'verified_purchase': True, 'helpful_vote': 45, 'analysis_status': 'pending'},
        {'rating': 3.0, 'title': "It's okay, but has some major flaws.", 'text': "The main feature works as advertised and the screen is beautiful. However, a key feature is missing and the battery life is terrible.", 'images': [], 'asin': 'B09G8Y7X6V', 'parent_asin': 'B09G8Y7X6V', 'user_id': 'AIC3FCN35ZWER3T3W3W3G3R3R3RR', 'timestamp': None, 'verified_purchase': False, 'helpful_vote': 3, 'analysis_status': 'pending'} # Example with a 'None' timestamp
    ]

    # Use a list comprehension with the new robust function
    processed_list = [select_and_process_review_robust(review) for review in list_of_raw_reviews]

    # --- Correct Final Output ---
    print(json.dumps(processed_list, indent=2))

[
  {
    "text": "I was hesitant at first, but this product is brilliant. The build quality is solid, setup was a breeze, and it works perfectly. The packaging was also very premium. Highly recommended for anyone on the fence.",
    "helpful_vote": 22,
    "review_time": "2023-03-15 18:50:00"
  },
  {
    "text": "The device stopped charging after the second use. I tried different cables and power outlets, but it's completely dead. The support number just rings and no one picks up. I want a refund but the return process is a nightmare. Do not buy.",
    "helpful_vote": 45,
    "review_time": "2023-03-27 08:10:00"
  },
  {
    "text": "The main feature works as advertised and the screen is beautiful. However, a key feature is missing and the battery life is terrible.",
    "helpful_vote": 3,
    "review_time": "N/A"
  }
]


In [33]:
# --- Processing the list ---
processed_list = [select_and_process_review_robust(review) for review in raw_data]
processed_list

[{'text': 'I was hesitant at first, but this product is brilliant. The build quality is solid, setup was a breeze, and it works perfectly. The packaging was also very premium. Highly recommended for anyone on the fence.',
  'helpful_vote': 22,
  'review_time': '2023-03-15 18:50:00'},
 {'text': "The device stopped charging after the second use. I tried different cables and power outlets, but it's completely dead. The support number just rings and no one picks up. I want a refund but the return process is a nightmare. Do not buy.",
  'helpful_vote': 45,
  'review_time': '2023-03-27 08:10:00'},
 {'text': 'The main feature works as advertised and the screen is beautiful. However, the battery life is terrible, barely lasting half a day. It also gets surprisingly hot during normal use. Good concept, but the execution needs work.',
  'helpful_vote': 3,
  'review_time': '2023-04-07 21:30:00'}]

In [34]:

class ReviewAnalysis(BaseModel):
    sentiment: str = Field(description="The overall sentiment: 'Positive', 'Negative', or 'Neutral'")
    main_topic: str = Field(description="The single primary topic from ['Durability', 'Performance', 'Shipping', 'Price', 'Features', 'Usability', 'Customer Service', 'Other']")
    
    # --- UPDATED DESCRIPTION FOR THIS FIELD ---
    key_drivers: Dict[str, str] = Field(description="A dictionary where keys are specific product features or topics mentioned (e.g., 'Battery Life', 'Screen Quality') and values are the sentiment for that specific topic: 'Positive', 'Negative', or 'Neutral'.")
    
    is_actionable: bool = Field(description="Does this review contain specific feedback a team can act on?")
    summary: str = Field(description="A concise one-sentence summary of the review's main point.")

In [35]:
# 2. The System Prompt (Defines the LLM's behavior)
system_prompt = """You are a world-class customer experience (CX) analysis API. Your sole purpose is to analyze a customer review and respond ONLY with a single, valid JSON object that strictly adheres to the provided schema. Do not include any introductory text, apologies, or explanations in your response. The user will provide the review data, and you must use the following formatting instructions to structure your response.

{format_instructions}"""

# 3. The Human Prompt (Provides the specific data for each call)
human_prompt = """Here is the review data to analyze:
```json
{review_data}
```"""
def analyze_review_with_ollama(processed_review: Dict) -> ReviewAnalysis:

    parser = PydanticOutputParser(pydantic_object=ReviewAnalysis)

    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", human_prompt)
    ])


    model = ChatOllama(model="llama3")
    chain = prompt | model | parser

    try:
        result = chain.invoke({
            "review_data": json.dumps(processed_review),
            "format_instructions": parser.get_format_instructions()
        })
        return result
    except Exception as e:
        print(f"--- Could not analyze review. Error: {e} ---")
        return None
    




In [36]:
# This is the string you will use in your ChatPromptTemplate
detailed_prompt = """
# --- Persona & Goal ---
You are a meticulous Customer Experience (CX) Analyst. Your task is to analyze customer reviews and convert them into a structured JSON object. You must adhere strictly to the format and rules defined below. Your response must be ONLY the single JSON object, with no other text or explanations.

# --- Task Definitions ---
You will generate a JSON object with the following keys:
1.  `sentiment`: Must be one of three strings: 'Positive', 'Negative', or 'Neutral'.
2.  `main_topic`: Must be the single most fitting category from this list: ['Durability', 'Performance', 'Shipping', 'Price', 'Features', 'Usability', 'Customer Service', 'Other'].
3.  `key_drivers`: A dictionary where each key is a specific feature mentioned (e.g., "Battery Life") and the value is its corresponding sentiment ('Positive', 'Negative', or 'Neutral').
4.  `is_actionable`: A boolean (`true` or `false`). Set to `true` only if the review contains specific, concrete feedback that a team could act on. A vague complaint like "I don't like it" is not actionable.
5.  `summary`: A concise, single-sentence summary of the review's main point.

# --- Examples ---

## Example 1:
### Input Review:
```json
{{
  "text": "The device stopped charging after the second use. I tried different cables and power outlets, but it's completely dead. The support number just rings and no one picks up. I want a refund but the return process is a nightmare. Do not buy.",
  "helpful_vote": 45,
  "review_time": "2023-03-27 13:30:00"
}}
"""

In [37]:
# Create an empty list to store all the results
all_analysis_results = []

print(f"--- Starting analysis for {len(processed_list)} reviews ---")

# This is the loop that processes each review one by one
for i, review in enumerate(processed_list):
    print(f"\n➡️ Analyzing review #{i+1}...")
    
    # Call the analysis function for the current review
    analysis_result = analyze_review_with_ollama(review)
    
    # If the analysis was successful, add it to our results list
    if analysis_result:
        print(f"✅ Success for review #{i+1}.")
        all_analysis_results.append(analysis_result.dict()) # Append as a dictionary
    else:
        print(f"❌ Failed to analyze review #{i+1}.")

print("\n\n🏁 --- Batch processing complete! ---")
print("--- Final list of all analysis results: ---")
print(json.dumps(all_analysis_results, indent=2))

--- Starting analysis for 3 reviews ---

➡️ Analyzing review #1...
✅ Success for review #1.

➡️ Analyzing review #2...


/var/folders/wh/4xk7n0ss7xg9nxqk4znmw1d80000gn/T/ipykernel_98095/494278473.py:16: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  all_analysis_results.append(analysis_result.dict()) # Append as a dictionary


✅ Success for review #2.

➡️ Analyzing review #3...


/var/folders/wh/4xk7n0ss7xg9nxqk4znmw1d80000gn/T/ipykernel_98095/494278473.py:16: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  all_analysis_results.append(analysis_result.dict()) # Append as a dictionary


✅ Success for review #3.


🏁 --- Batch processing complete! ---
--- Final list of all analysis results: ---
[
  {
    "sentiment": "Positive",
    "main_topic": "Performance",
    "key_drivers": {
      "Build Quality": "Positive",
      "Setup": "Positive",
      "Packaging": "Positive"
    },
    "is_actionable": false,
    "summary": "This product is brilliant with solid build quality and easy setup."
  },
  {
    "sentiment": "Negative",
    "main_topic": "Durability",
    "key_drivers": {
      "Charging Issues": "Negative",
      "Support": "Negative",
      "Return Process": "Negative"
    },
    "is_actionable": true,
    "summary": "The device stopped charging after the second use and the support is unresponsive."
  },
  {
    "sentiment": "Neutral",
    "main_topic": "Usability",
    "key_drivers": {
      "Battery Life": "Negative",
      "Screen Quality": "Positive"
    },
    "is_actionable": true,
    "summary": "The device's usability is hindered by poor battery life and

/var/folders/wh/4xk7n0ss7xg9nxqk4znmw1d80000gn/T/ipykernel_98095/494278473.py:16: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  all_analysis_results.append(analysis_result.dict()) # Append as a dictionary


In [38]:
all_analysis_results

[{'sentiment': 'Positive',
  'main_topic': 'Performance',
  'key_drivers': {'Build Quality': 'Positive',
   'Setup': 'Positive',
   'Packaging': 'Positive'},
  'is_actionable': False,
  'summary': 'This product is brilliant with solid build quality and easy setup.'},
 {'sentiment': 'Negative',
  'main_topic': 'Durability',
  'key_drivers': {'Charging Issues': 'Negative',
   'Support': 'Negative',
   'Return Process': 'Negative'},
  'is_actionable': True,
  'summary': 'The device stopped charging after the second use and the support is unresponsive.'},
 {'sentiment': 'Neutral',
  'main_topic': 'Usability',
  'key_drivers': {'Battery Life': 'Negative', 'Screen Quality': 'Positive'},
  'is_actionable': True,
  'summary': "The device's usability is hindered by poor battery life and excessive heat."}]