In [None]:
import aioodbc
import asyncio

from config import config

import nest_asyncio
nest_asyncio.apply()
async def query_sql_server():
    dsn = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={config.SERVER_NAME};DATABASE={config.DATABASE_NAME};UID={config.USERNAME};PWD={config.PASSWORD};trusted_connection=yes'
    async with aioodbc.create_pool(dsn=dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT * FROM dbo.store_info")
                result = await cur.fetchall()
                print(result)

loop = asyncio.get_event_loop()
loop.run_until_complete(query_sql_server())


In [None]:
from agents.visual_analyzer import visual_analyzer

resp = await visual_analyzer._analyze_single_image(image_path="C:\\Users\\rahul_thatikonda\\Desktop\\AIStoreAssistant\\Inputs\\Sample\\Store Images\\Image_4.jpg")
resp

In [None]:
from utils.sql_handler import sql_handler

resp = sql_handler.query_data('dbo.store_info', filters={"Store ID": "Store1"})
resp.columns = list(Store.model_fields.keys())
# resp.to_dict(orient="records")
# list(resp.itertuples(index=False, name=None))

from database_models import Store

Store.model_fields.keys()
[Store(**record) for record in resp.to_dict(orient="records")]

# from sqlalchemy import create_engine
# from config import config

# string = f"mssql+pyodbc://{config.USERNAME}:{config.PASSWORD}@{config.SERVER_NAME}/{config.DATABASE_NAME}?driver=ODBC+Driver+17+for+SQL+Server"

# print(string)
# engine = create_engine(
#                 f"mssql+pyodbc://{config.USERNAME}:{config.PASSWORD}@{config.SERVER_NAME}/{config.DATABASE_NAME}"
#                 "?driver=ODBC+Driver+17+for+SQL+Server"
#             )
# import pandas.io.sql as pdsql

# pdsql.read_sql('select * from dbo.store_info', engine)

In [None]:
from utils.key_frame_detector import keyframeDetection

keyframeDetection(source="C:\\Users\\rahul_thatikonda\\Desktop\\AIStoreAssistant\\Inputs\\Sample\\Store Videos\\Video_3.mp4", Thres=0.4, max_keyframes=10)

In [1]:
from typing import List, Optional
from pydantic import BaseModel, Field, conint, confloat 

class Metric(BaseModel):
    score: conint(ge=0, le=100) = Field(description="Score between 0 and 100", ge=0, le=100)
    details: Optional[str] = Field(description="Brief description of the observation")

class StoreEvaluation(BaseModel):
    cleanliness: Metric = Field(description="How clean and tidy is the store?")
    empty_shelves: Metric = Field(description="Are there empty or poorly stocked shelves? (100 = fully stocked, 0 = many empty)")
    queue_length: Optional[Metric] = Field(description="Only relevant for checkout situations")
    staff_presence: Metric = Field(description="Is staff visible and available? (100 = good staffing, 0 = no staff visible)")
    store_organization: Metric = Field(description="How well organized is the store layout?")
    immediate_issues: Optional[List[str]] = Field(default=None, description="List of immediate issues, if any")


class ThemeSentiment(BaseModel):
    theme: str = Field(description="Name of the theme")
    score: confloat(ge=-1, le=1) = Field(description="Sentiment score from -1 (very negative) to 1 (very positive)")
    explanation: str = Field(description="Brief explanation of the sentiment")
    sample_quotes: List[str] = Field(description="Sample quotes from reviews")

class SentimentAnalysis(BaseModel):
    themes: List[ThemeSentiment] = Field(description="List of theme sentiment analyses")
    overall_sentiment: confloat(ge=-1, le=1) = Field(description="Overall sentiment score from -1 to 1")


class QueryResponse(BaseModel):
    response: str = Field(description="Main answer to the query")
    insights: List[str] = Field(description="Key insights derived from analysis")
    recommendations: List[str] = Field(description="Actionable recommendations based on findings")
    metrics_referenced: List[str] = Field(description="Names or keys of metrics involved in the response")



In [2]:
# from langchain.output_parsers import PydanticOutputParser
# from pydantic import BaseModel

# class StockAnswer(BaseModel):
#     symbol: str
#     price: float
#     rationale: str

# parser = PydanticOutputParser(pydantic_object=StockAnswer)
# format_instructions = parser.get_format_instructions()

# prompt = (
#     "Provide the current stock price.\n"
#     "{format}\n"
#     "Stock symbol: {symbol}"
# ).format(format=format_instructions, symbol="AAPL")
# # Run the Chain
# from langchain.llms import OpenAI

# llm = OpenAI(model="gpt-3.5-turbo", temperature=0)
# response = llm(prompt)
# structured = parser.parse(response)
# print(structured.price, structured.rationale)


# # Call LLM asynchronously with messages
# response = await llm.ainvoke(lc_messages)

# # Parse raw text content output into Pydantic model
# try:
#     parsed_response = parser.parse(response.content)
# except Exception as e:
#     raise ValueError(f"Failed to parse response into QueryResponse: {e}")

# return parsed_response


In [3]:
from local_openai_client import local_client
from langchain_core.output_parsers import PydanticOutputParser

messages = [
        {"role": "user", "content": "Explain quantum computing in simple terms."},
        {"role": "user", "content": f"""Provide a detailed analysis and answer. Structure your response as JSON:
            {{
                "response": "Main answer to the query",
                "insights": ["Key insight 1", "Key insight 2", ...],
                "recommendations": ["Recommendation 1", "Recommendation 2", ...],
                "metrics_referenced": ["metric1", "metric2", ...]
            }}"""}
    ]
response = await local_client.chat_completion(messages, temperature=0.5, max_tokens=2000, response_model=QueryResponse)
print(f"Chat completion response:{type(response)}\n", response)



image_path = "C:\\Users\\rahul_thatikonda\\Desktop\\AIStoreAssistant\\Inputs\\Images\\Store 2\\Store2-09-06-2024.png"
prompt = f"""Analyze this retail store image and provide scores (0-100) for the following metrics:

        1. Cleanliness: How clean and tidy is the store?
        2. Empty Shelves: Are there empty or poorly stocked shelves? (100 = fully stocked, 0 = many empty)
        3. Queue Length: How long are the checkout queues?, this field is only relevant situations like checkout (100 = no queue, 0 = very long queues)
        4. Staff Presence: Is staff visible and available? (100 = good staffing, 0 = no staff visible)
        5. Store Organization: How well organized is the store layout?
        6. Immediate Issues: This is optional field, only provide value if it's relevant to situation (["Restock aisle 1"], [""])

        For each metric, provide:
        - score (0-100)
        - details (brief description - ignore if it's optional or not relevant to situation)

        Also identify any issues that need immediate attention (spills, hazards, etc.)

        Respond in JSON format:
        {{
            "cleanliness": {{"score": 85, "details": "Store is generally clean"}},
            "empty_shelves": {{"score": 70, "details": "Some gaps in produce section"}},
            "queue_length": {{"score": 60, "details": "Moderate queues at 2 checkouts"}},
            "staff_presence": {{"score": 80, "details": "Staff visible in multiple areas"}},
            "store_organization": {{"score": 90, "details": "Well organized layout"}},
            "immediate_issues": ["Minor spill in aisle 3"]
        }}"""

response = await local_client.analyze_image(image_path, prompt, response_model=StoreEvaluation)
print(f"Image analysis result: {type(response)}\n", response)



reviews = [
    "The staff was friendly but the wait time was terrible.",
    "Great product quality, but the checkout process was slow.",
    "Amazing experience overall!"
]

themes = ["staff", "wait time", "product quality", "checkout"]

result = await local_client.analyze_sentiment(reviews, themes, response_model=SentimentAnalysis)
print(f"Sentiment analysis JSON: {type(result)}\n", result)
# 9m 13.1s

Chat completion response:<class 'dict'>
 {'response': "Okay, let's break down quantum computing in a way that's understandable, even if you don't have a physics background. Here’s a detailed explanation:\n\n**Classical Computing vs. Quantum Computing**\n\n* **Classical Computers:** These are the computers we use every day. They store information as bits, which are like switches that are either on (1) or off (0). Everything a classical computer does – from browsing the web to running complex simulations – is based on manipulating these bits.\n* **Quantum Computers:** Quantum computers are fundamentally different. Instead of bits, they use **qubits**. This is the crucial difference.\n\n**What is a Qubit?**\n\nThink of a regular bit as a light switch – it's either on or off. A qubit is like a dimmer switch. It can be on, off, *or* a combination of both *at the same time*. This is due to a concept called **superposition**. \n\n**Superposition:**  This allows a qubit to represent 0, 1, *or*

In [None]:
from langchain_openai import ChatOpenAI
from config import config
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
import base64
import logging
import json
from typing import List, Dict, Any, Optional

logger = logging.getLogger(__name__)


class AzureOpenAIClient:
    def __init__(self):
        # Initialize only if Azure configuration is available
        if not config.is_azure_configured() or config.AZURE_OPENAI_API_KEY.startswith("mock"):
            logger.warning("Azure OpenAI is in mock mode or not configured.")
            self.llm = None
        else:
            try:
                self.llm = ChatOpenAI(
                base_url="http://127.0.0.1:8080",
                model="",
                api_key="NA"
                )
            except Exception as e:
                logger.error(f"Failed to initialize langchain-openai ChatOpenAI client: {str(e)}")
                self.llm = None

    def is_configured(self) -> bool:
        return self.llm is not None

    async def chat_completion(
        self,
        messages: List[Dict[str, Any]],
        deployment: Optional[str] = None,
        temperature: float = 0.7,
        max_tokens: int = 2000,
    ) -> str:
        """General chat completion for text generation"""
        if not self.is_configured():
            return "Azure OpenAI is not configured. Please set API credentials."

        try:
            # Convert OpenAI-style messages to LangChain messages
            formatted_messages = []
            for msg in messages:
                role = msg.get("role")
                content = msg.get("content")
                if role == "system":
                    formatted_messages.append(SystemMessage(content=content))
                elif role == "assistant":
                    formatted_messages.append(AIMessage(content=content))
                else:
                    formatted_messages.append(HumanMessage(content=content))

            llm = ChatOpenAI(
                base_url="http://127.0.0.1:8080",
                model="",
                api_key="NA",
                temperature=temperature,
                max_tokens=max_tokens,
            )

            response = await llm.ainvoke(formatted_messages)
            return response.content
        except Exception as e:
            logger.error(f"Azure OpenAI API error: {str(e)}")
            raise Exception(f"Failed to get completion: {str(e)}")

    async def analyze_image(self, image_path: str, prompt: str, deployment: Optional[str] = None) -> str:
        """Analyze image using GPT-4 Vision"""
        if not self.is_configured():
            return "Azure OpenAI is not configured. Please set API credentials."

        try:
            with open(image_path, 'rb') as image_file:
                image_data = base64.b64encode(image_file.read()).decode('utf-8')

            image_message = {
                "role": "user",
                "content": [
                    {"type": "text", "text": prompt},
                    {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_data}"}, "detail": "low"},
                ],
            }

            response = await self.chat_completion(
                messages=[image_message],
                deployment=deployment,
                temperature=0.5,
                max_tokens=2000,
            )

            return response
        except Exception as e:
            logger.error(f"Image analysis error: {str(e)}")
            raise Exception(f"Failed to analyze image: {str(e)}")

    async def analyze_sentiment(self, reviews: List[str], themes: List[str]) -> Dict[str, Any]:
        """Analyze sentiment across multiple themes"""
        prompt = f"""Analyze the following customer reviews for a retail store and provide sentiment scores for each theme.

Themes to analyze: {', '.join(themes)}

Reviews:
{chr(10).join([f'{i+1}. {review}' for i, review in enumerate(reviews)])}

For each theme, provide:
1. Sentiment score from -1 (very negative) to 1 (very positive)
2. Brief explanation
3. Sample quotes from reviews

Respond in JSON format:
{{
    "themes": [
        {{
            "theme": "theme_name",
            "score": 0.5,
            "explanation": "brief explanation",
            "sample_quotes": ["quote1", "quote2"]
        }}
    ],
    "overall_sentiment": 0.3
}}"""

        messages = [{"role": "user", "content": prompt}]
        response = await self.chat_completion(messages, temperature=0.3)

        try:
            return json.loads(response.replace("```", "").replace("json", ""))
        except Exception as e:
            logger.error(f"Azure: Sentiment analysis error: {str(e)}, response: {str(response)}")
            return {"themes": [], "overall_sentiment": 0}


# Global instance
azure_client = AzureOpenAIClient()


In [None]:
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "What's the weather like in Paris?"}
]
result = await azure_client.chat_completion(messages)
print("Chat Completion:", result)


image_path = "C:\\Users\\rahul_thatikonda\\Desktop\\AIStoreAssistant\\Inputs\\Images\\Store 2\\Store2-09-06-2024.png"
prompt = f"""Analyze this retail store image and provide scores (0-100) for the following metrics:

        1. Cleanliness: How clean and tidy is the store?
        2. Empty Shelves: Are there empty or poorly stocked shelves? (100 = fully stocked, 0 = many empty)
        3. Queue Length: How long are the checkout queues?, this field is only relevant situations like checkout (100 = no queue, 0 = very long queues)
        4. Staff Presence: Is staff visible and available? (100 = good staffing, 0 = no staff visible)
        5. Store Organization: How well organized is the store layout?
        6. Immediate Issues: This is optional field, only provide value if it's relevant to situation (["Restock aisle 1"], [""])

        For each metric, provide:
        - score (0-100)
        - details (brief description - ignore if it's optional or not relevant to situation)

        Also identify any issues that need immediate attention (spills, hazards, etc.)

        Respond in JSON format:
        {{
            "cleanliness": {{"score": 85, "details": "Store is generally clean"}},
            "empty_shelves": {{"score": 70, "details": "Some gaps in produce section"}},
            "queue_length": {{"score": 60, "details": "Moderate queues at 2 checkouts"}},
            "staff_presence": {{"score": 80, "details": "Staff visible in multiple areas"}},
            "store_organization": {{"score": 90, "details": "Well organized layout"}},
            "immediate_issues": ["Minor spill in aisle 3"]
        }}"""
result = await azure_client.analyze_image(image_path, prompt)
print("Image Analysis:", result)


reviews = [
    "The store was clean and the staff was very friendly.",
    "The product selection could have been better.",
    "Checkout was fast, but parking was difficult."
]
themes = ["cleanliness", "staff friendliness", "product selection", "checkout", "parking"]
result = await azure_client.analyze_sentiment(reviews, themes)
print("Sentiment Analysis:", result)
# 5m 46.0s