In [1]:
import polars as pl
from ollama import Client,AsyncClient
from pydantic import BaseModel, Field,conlist
from typing import List,Literal
import traceback
import json
import tqdm
import asyncio

In [2]:
path = "/home/aymen/Desktop/my_work/data_engineer/data/data.parquet"

In [3]:
data = pl.read_parquet(path)
data = data.with_columns(
    pl.arange(1, data.height + 1).alias("item_id")
)
category_description = data[["item_id","description"]].to_dict(as_series=False)
items = [dict(zip(category_description.keys(), values)) for values in zip(*category_description.values())]

In [6]:
data.head()

product_name,price,quantity,category,description,availability,discount_percentage,date,shop_id,id,item_id
str,f64,i64,str,str,bool,f64,str,str,str,i64
"""Stuffed Grape Leaves""",931.11,1,"""Food - Prepared Foods""","""Grape leaves stuffed with rice…",True,22.87,"""8/7/2024""","""shop_0""","""542feb77-abbd-4b0c-b006-0230f6…",1
"""Bike Repair Tool Kit""",877.48,4,"""Outdoor""","""Essential tools for bike maint…",False,31.06,"""2/4/2025""","""shop_1""","""81477846-99e2-4831-96eb-c1ea8f…",2
"""Sooji (Semolina)""",582.93,1,"""Food - Baking""","""Fine semolina flour, perfect f…",False,28.96,"""7/12/2025""","""shop_2""","""93ba46d8-b268-40f4-ba81-108b4f…",3
"""Apple Juice""",870.24,2,"""Food - Beverages""","""Refreshing apple juice, 100% j…",True,36.97,"""2/16/2025""","""shop_3""","""dc449455-0d32-422e-be64-5480d0…",4
"""Chili Powder""",58.31,4,"""Food - Spices""","""Spicy chili powder to add heat…",False,15.58,"""8/12/2024""","""shop_4""","""e19934d4-0e3c-401f-8590-f77e9c…",5


In [5]:
data.schema

Schema([('product_name', String),
        ('price', Float64),
        ('quantity', Int64),
        ('category', String),
        ('description', String),
        ('availability', Boolean),
        ('discount_percentage', Float64),
        ('date', String),
        ('shop_id', String),
        ('id', String),
        ('item_id', Int64)])

In [7]:
# constantents

BATCH_SIZE=10

In [8]:
class ItemReview(BaseModel):
    item_id: int = Field(description="unique identifier that is provided in the input.",title="item_id")
    classification: str = Field(description="The classification of the item (e.g.,Food,cloths).")
    review: str = Field(description="A brief review of the item.")

class Response(BaseModel):
    reviews: List[ItemReview] = Field(min_length=BATCH_SIZE,description="A list of classifications and reviews for the provided items.")

In [11]:
Response.model_json_schema()

{'$defs': {'ItemReview': {'properties': {'item_id': {'description': 'unique identifier that is provided in the input.',
     'title': 'item_id',
     'type': 'integer'},
    'classification': {'description': 'The classification of the item (e.g.,Food,cloths).',
     'title': 'Classification',
     'type': 'string'},
    'review': {'description': 'A brief review of the item.',
     'title': 'Review',
     'type': 'string'}},
   'required': ['item_id', 'classification', 'review'],
   'title': 'ItemReview',
   'type': 'object'}},
 'properties': {'reviews': {'description': 'A list of classifications and reviews for the provided items.',
   'items': {'$ref': '#/$defs/ItemReview'},
   'minItems': 10,
   'title': 'Reviews',
   'type': 'array'}},
 'required': ['reviews'],
 'title': 'Response',
 'type': 'object'}

## Using AsyncIO

In [6]:
client = AsyncClient("http://localhost:11434")

In [7]:
async def Chat(client,context):
    # set tempture to 1 
    response = await client.chat(
        messages=context,
        model="gemma-small:latest",
        format=Response.model_json_schema(),
        keep_alive=20,
        stream=False,
        options={
            "num_gpu": 30,
            }
        )
    return response
    

In [8]:
def dict_to_text(items: list[dict]) -> str:
    lines = []
    for item in items:
        line = "\n".join([f"{key} : {value}" for key, value in item.items()])
        lines.append(line)
    return "\n".join(lines)

In [9]:
def creatPrompt(batch_items):
    prompt_instruction = """
                You are a helpful assistant that classifies and reviews items.
                
                Each item has:
                - "item_id": unique id for each item
                - "description": the item's description
                
                Return a JSON array of objects with the following keys:
                - "item_id" : same as item_id from input
                - "category" : classification of item category  
                - "review" : small review 1-2 phrases max
                """
    
    # Build prompt
    number_of_items = len(batch_items)
    batch_items = dict_to_text(batch_items)
    context = [
                {'role': 'system',
                 'content': f"{prompt_instruction}"},
                {"role": "user", "content": f"The following {number_of_items} items need to be classified and reviewed:\n\n{batch_items}"},
            ]
    return context

In [10]:
def retry(result,BATCH_SIZE,batch_items):
    if len(result["reviews"]) != BATCH_SIZE:
        print(f"Expected {len(batch_items)} reviews, but got {len(result['reviews'])} \nwhat i got : {result} , handeling error")
        
        items_ids = [r["item_id"] for r in result["reviews"]] 
        rest = list(filter(lambda Id : Id not in items_ids , batch_items))
    else:
        rest = None
    return rest

In [11]:
from itertools import islice

def batch_iter(iterable, BATCH_SIZE):
    it = iter(iterable)
    while True:
        batch = list(islice(it, BATCH_SIZE))
        if not batch:
            break
        yield batch

In [12]:
# use async io to send multiple batches at the same time 
# process time : it used to be 13 days current time is 7 days to : 46.15% gain of process time
async def main():
    
    all_reviews = []
    rest = None
    items_iter = batch_iter(items, BATCH_SIZE)  # gives batches of 100 items
    
    # Initialize progress bar
    total_items = len(items)
    pbar = tqdm.tqdm(total=total_items, desc="Processing items", unit="items")
    processed_count = 0

    while True:
        if rest:
            batches = [rest]
            rest = None
        else:
            batches = [list(next(items_iter, [])) for _ in range(3)]
            batches = [b for b in batches if b]  # remove empty

        if not batches:
            break

        contexts = [creatPrompt(b) for b in batches]
        responses = await asyncio.gather(*(Chat(client, ctx) for ctx in contexts))

        for batch_items, response in zip(batches, responses):
            try:
                parsed = Response.model_validate_json(response.message.content)
                result = json.loads(parsed.model_dump_json())
                rest = retry(result, BATCH_SIZE, batch_items)
                
                # Only count as processed if no retry is needed
                if rest is None:
                    items_processed = len(batch_items)
                    processed_count += items_processed
                    pbar.update(items_processed)
                    pbar.set_postfix({"Processed": processed_count, "Reviews": len(all_reviews)})
                all_reviews.extend(result["reviews"])
                
            except json.JSONDecodeError as jd:
                print(f"JSONDecodeError: {jd}")
            except ValueError as ve:
                print(f"ValueError: {ve}")
            except Exception as e:
                print(f"Unexpected error: {e}")

    pbar.close()
    return all_reviews

In [13]:
await main()

Processing items:   0%|          | 80/1000000 [00:39<140:13:53,  1.98items/s, Processed=80, Reviews=81]

Expected 10 reviews, but got 11 
what i got : {'reviews': [{'item_id': 61, 'classification': 'item_id', 'review': 'The compact coffee grinder for fresh ground coffee beans.  This is a great option for coffee lovers who want to elevate their daily routine.'}, {'item_id': 62, 'classification': 'item_id', 'review': 'All-natural skincare set for daily use.  This is a great option for coffee lovers who want to elevate their daily routine.'}, {'item_id': 63, 'classification': 'item_id', 'review': 'Collapsible travel bowl for pets on the go.  This is a great option for coffee lovers who want to elevate their daily routine.'}, {'item_id': 64, 'classification': 'item_id', 'review': 'Crispy and sweet dried apple slices.  A delightful and flavorful snack.'}, {'item_id': 65, 'classification': 'item_id', 'review': 'Dried tomatoes packed with deep flavor for salads and pasta.  A classic and satisfying snack.'}, {'item_id': 66, 'classification': 'item_id', 'review': 'Fun and educational puzzle set fo

CancelledError: 

In [None]:
# switch to llama cpp
# write the api for it or use exisiting one "https://llama-cpp-python.readthedocs.io/en/latest/api-reference/"
# make with python first than if it get  you where you want fine else
# switch to go language for go check this out "https://github.com/go-skynet/go-llama.cpp"