In [None]:
from langchain_ollama import ChatOllama
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
from langchain.agents import create_agent
from typing import List
from pydantic import BaseModel, Field
from langchain_core.output_parsers import PydanticOutputParser
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from kafka import KafkaProducer, KafkaConsumer
from kafka.structs import TopicPartition
import json
pd.options.display.max_columns = None

In [23]:
class Review(BaseModel):
    review: str = Field(
        description="Text of the review"
    )
    stars: int = Field(
        description="The quantity of stars on the rating"
    )


In [24]:
parser = PydanticOutputParser(pydantic_object=Review)

str_prompt = """
You are a product review generator.

Your task is to create ONE customer-style product review based on the information provided below.

Product name: {product_name}
Category: {product_category}
Review size: {review_size}
Review type: {type}
Aspect being reviewed: {content}

Guidelines:

1. The review must sound natural, like a real customer experience.
2. The tone must strictly follow the review type:
   - If the type is positive, the overall sentiment must be positive.
   - If the type is negative, the overall sentiment must be negative.
3. Reviews may be controversial:
   - The text may include mixed opinions.
   - The star rating may be slightly unexpected.
   - Even so, the final sentiment must always respect the defined review type.
4. The review must focus mainly on the specified aspect:
   - Delivery
   - Product appearance
   - Functionality
   - Price
5. The product name may be mentioned in the review, but prefer to not mention.
6. Follow the review size rules:
   - Small: exactly one sentence.
   - Medium: at least two sentences.
   - Long: one paragraph with at least 5 lines.
7. Assign a star rating from 1 to 5 that matches the overall sentiment:
   - Negative reviews → 1 or 2 stars (controversial cases may slightly vary)
   - Neutral or mixed but positive → 3 stars
   - Clearly positive → 4 or 5 stars
8. Theses reviews will not be used for real products and reviews. They are purely synthetic and for testing purposes. So, feel free to be creative, as long as you respect the defined review type.
9. Return ONLY valid JSON.
10. Do not include any extra text.
11. Do not include explanations.

Strictly follow this output format instruction:

``` 
json 
	"review": <generated review text>,
	"stars": <number from 1 to 5>
```

{format_instructions}
"""

prompt = ChatPromptTemplate(
      (["system", str_prompt]),
      partial_variables={"format_instructions": parser.get_format_instructions()}
      )

In [25]:
df = pd.read_csv('data/produtcs.csv')
products = df.values

In [26]:
list_review_size = ['Small', 'Medium', 'Long']
list_type = ['Positive', 'Negative']
list_content = [
    'Delivery',
    'Product appearance',
    'Functionality',
    'Price',
]
list_index_produtcs = [i for i in range(1, 100, 1)]
 

In [27]:
def generate_review(model, prod:bool=False):
    
    custom_profile = {
    "structured_output": True,
	}
    llm = ChatOllama(
			model=model,
			temperature=0,
			validate_model_on_init=True,
			num_ctx=8192,
			reasoning=False,
			profile=custom_profile
		) 
    chain = prompt | llm | parser
    
    review_size =  np.random.choice(list_review_size, size=None, replace=True, p=None)
    type = np.random.choice(list_type, size=None, replace=True, p=None)
    content = np.random.choice(list_content, size=np.random.randint(1, len(list_content)), replace=False, p=None)
    index = np.random.choice(list_index_produtcs, size=None, replace=True, p=None)
    product = products[index]
    try:
        output = chain.invoke(
				{
					"product_name":product[0],
					"product_category":product[1],
					"review_size":review_size,
					"type":type,
					"content":content
				}
			)
    except Exception as e:
        return None
    
    if prod == False:
        data = {
			"product_name":product[0],
			"product_category":product[1],
			"review_size":str(review_size),
			"type":str(type),
			"content":content.tolist(),
			"review":output.review,
			"stars":output.stars
		}
        return data
    else:
        data = {
            "product_id": index,
			"review":output.review,
			"stars":output.stars
		}
    return data

In [28]:
models = [ 
            "gemma3:1b", 
            "deepseek-r1:1.5b",
            "qwen3:1.7b", 
            "granite3.3:2b", 
            "llama3.2:1b",
        ]

In [None]:
def kafka_publisher(topic: str, message: dict, bootstrap_servers: List[str] = ["localhost:9092"]):
    """
    Publish a message to a Kafka topic.
    
    Args:
        topic (str): The Kafka topic to publish to
        message (dict): The message to publish (will be converted to JSON)
        bootstrap_servers (str): Kafka broker address(es), defaults to "localhost:9092"
    
    Returns:
        bool: True if successful, False otherwise
    """
    try:
        producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            acks='all',
            retries=3,
            api_version=(4, 2, 0)
        )
        future = producer.send(topic, value=message)
        record_metadata = future.get(timeout=10)
        producer.close()
        
        # print(f"Message published to topic '{topic}' at partition {record_metadata.partition}, offset {record_metadata.offset}")
        return True
    except Exception as e:
        print(f"Error publishing to Kafka: {e}")
        return False


In [None]:
def kafka_consumer(topic: str, bootstrap_servers: List[str] = ["localhost:9092"], max_messages: int = None, callback=None):
    """
    Consume messages from a Kafka topic and stop when all currently-available messages are read.

    Args:
        topic (str): The Kafka topic to consume from
        bootstrap_servers (List[str]): Kafka broker address(es), defaults to ["localhost:9092"]
        max_messages (int): Maximum number of messages to consume (None for continuous or until all current messages read)
        callback (function): Optional callback function to process each message

    Returns:
        list or bool: List of consumed messages (if no callback provided), True if callback used and successful
    """
    try:
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='review-group',
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            api_version=(4, 2, 0)
        )

        # Trigger partition assignment
        consumer.poll(timeout_ms=1000)
        partitions = consumer.assignment()
        if not partitions:
            print("No partitions assigned; nothing to consume.")
            consumer.close()
            return []

        # Determine current end offsets (the offset of the next message) for each assigned partition
        end_offsets = consumer.end_offsets(partitions)

        messages = []
        count = 0

        for message in consumer:
            count += 1
            data = message.value

            if callback:
                callback(data)
            else:
                messages.append(data)

            # print(f"Consumed message {count}: {data}")

            # Stop if we've reached the end offsets for all partitions (i.e., consumed all currently-available messages)
            finished = True
            for tp in partitions:
                if consumer.position(tp) < end_offsets[tp]:
                    finished = False
                    break

            if finished:
                break

            if max_messages and count >= max_messages:
                break

        consumer.close()
        return messages if not callback else True
    except Exception as e:
        print(f"Error consuming from Kafka: {e}")
        return [] if not callback else False


In [None]:
with ThreadPoolExecutor(max_workers=len(models)) as executor:
    futures = [executor.submit(generate_review, model) for model in models]
    for future in as_completed(futures):
        review = future.result()
        if review:
            kafka_publisher(topic="reviews", message=review)
            # print(review)


Message published to topic 'reviews' at partition 2, offset 0
{'product_name': 'HERMAN MILLER ELM', 'product_category': 'DRESSER', 'review_size': 'Medium', 'type': 'Positive', 'content': ['Price', 'Product appearance'], 'review': 'I was pleasantly surprised by the price of this dresser, considering its quality and appearance. The product looks great in my bedroom!', 'stars': 4}
Message published to topic 'reviews' at partition 0, offset 0
{'product_name': 'LG DWT-12', 'product_category': 'DISHWASHER', 'review_size': 'Small', 'type': 'Negative', 'content': ['Price', 'Functionality'], 'review': 'LG DWT-12 dishwasher arrived with a dented front panel, despite being brand new in the box. Functionality is also questionable as it struggles to clean dishes thoroughly, leaving food residue behind.', 'stars': 2}
Message published to topic 'reviews' at partition 1, offset 0
{'product_name': 'SAMSUNG BL-200', 'product_category': 'BLENDER', 'review_size': 'Small', 'type': 'Positive', 'content': ['

In [38]:
kafka_consumer(topic="reviews", max_messages=10)

Consumed message 1: {'product_name': 'LG DWT-12', 'product_category': 'DISHWASHER', 'review_size': 'Small', 'type': 'Negative', 'content': ['Price', 'Functionality'], 'review': 'LG DWT-12 dishwasher arrived with a dented front panel, despite being brand new in the box. Functionality is also questionable as it struggles to clean dishes thoroughly, leaving food residue behind.', 'stars': 2}
Consumed message 2: {'product_name': 'SAMSUNG BL-200', 'product_category': 'BLENDER', 'review_size': 'Small', 'type': 'Positive', 'content': ['Delivery', 'Functionality', 'Product appearance'], 'review': 'The SAMSUNG BL-200 blender is a solid product, offering good delivery and functionality. The appearance is clean and easy to use.', 'stars': 4}
Consumed message 3: {'product_name': 'WHIRLPOOL WM-200', 'product_category': 'MICROWAVE', 'review_size': 'Long', 'type': 'Negative', 'content': ['Price', 'Product appearance'], 'review': "I was really disappointed with the WHIRLPOOL WM-200 microwave. The prod

KeyboardInterrupt: 