# Homework 1

### Installing packages

In [6]:
# !pip install langchain_google_genai
!pip install langchain-openai



## Downloading receipts.zip
The codes below download and unzip receipts.zip from Google Drive. receipts.zip contains all images from the Fusion folder on BlackBoard.


In [7]:
import gdown
file_id = "1oe2FZd3ZTO7nrDqjCafNvxicl08oF8JF"
download_url = f"https://drive.google.com/uc?id={file_id}"
gdown.download(download_url, "receipts.zip", quiet=False)

Downloading...
From: https://drive.google.com/uc?id=1oe2FZd3ZTO7nrDqjCafNvxicl08oF8JF
To: /content/receipts.zip
100%|██████████| 1.61M/1.61M [00:00<00:00, 20.6MB/s]


'receipts.zip'

In [None]:
!unzip receipts.zip

Archive:  receipts.zip
replace receipt1.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

## 1. Helper functions

We need two functions
* image_to_base64 convert your jpg image into Base64 encoded string (basically a sequence of 64 characters to make your image easily transfered via API)
* get_image_data_url takes your jpg image, converting them into base64 string and construct a suitable input for GEMINI api call.

In [None]:
import base64
import mimetypes

# Helper function to read and encode image
def image_to_base64(img_path):
    with open(img_path, "rb") as img_file:
        return base64.b64encode(img_file.read()).decode('utf-8')

# Helper function to encode local file to Base64 Data URL
def get_image_data_url(image_path):
    # Guess the mime type (e.g., image/png, image/jpeg) based on file extension
    mime_type, _ = mimetypes.guess_type(image_path)
    if mime_type is None:
        mime_type = "image/png" # Default fallback

    encoded_string = image_to_base64(image_path)

    # Construct the Data URL
    return f"data:{mime_type};base64,{encoded_string}"

In [None]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gemini-2.5-flash",
    api_key="sk-e2GpF64Q21z5Ha1qt3Eg6NJiiBJrhFuf5s4rhr6nH9M78OWR",
    base_url="https://api.chatanywhere.tech/v1",
    temperature=0
)

Display jpg images. Alternatively, open the folder icon on the left pannel to see the images.

In [None]:
from IPython.display import HTML, display
import glob, os

image_paths = glob.glob("*.jpg")
image_paths.sort()
html_content = '<div style="display: flex; flex-wrap: wrap; gap: 20px;">'

for path in image_paths:
    b64 = image_to_base64(path)
    filename = os.path.basename(path) # Clean up path to show just the name

    # Create a vertical column for each image + text
    html_content += f'''
    <div style="display: flex; flex-direction: column; align-items: center;">
        <img src="data:image/jpeg;base64,{b64}" style="height: 300px; border: 1px solid #ddd; margin-bottom: 5px;"/>
        <span style="font-family: monospace; font-size: 14px;">{filename}</span>
    </div>
    '''

html_content += '</div>'

display(HTML(html_content))

## 2. Image input to Gemini
Different from text, image needs to be converted into base64 encoded string and then formated into url before inputting to the language model. This is convenient for image-type input to be transfered through the API.

You can find out more appropriate format for image-type data in this [Link](https://docs.langchain.com/oss/python/langchain/messages)



In [None]:
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", [
        {"type": "text", "text": "{question}"},
        {"type": "image_url", "image_url": {"url": "{image_url1}"}},
        {"type": "image_url", "image_url": {"url": "{image_url2}"}},
    ]),
])

chain = prompt | llm

image_path = "/content/receipt1.jpg"
image_data_url = get_image_data_url(image_path)

image_path2 = "/content/receipt2.jpg"
image_data_url2 = get_image_data_url(image_path2)

response = chain.invoke({
    "question": "What is in this picture?",
    "image_url1": image_data_url,
    "image_url2": image_data_url2
})

print(response.content)

In [None]:
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage

# 初始化模型
llm = ChatOpenAI(
    model="gemini-2.5-flash",
    api_key="sk-e2GpF64Q21z5Ha1qt3Eg6NJiiBJrhFuf5s4rhr6nH9M78OWR",
    base_url="https://api.chatanywhere.tech/v1",
    temperature=0
)

def get_single_receipt_data(image_path):
    """提取单张收据的核心金额"""
    system_msg = """You are a receipt parser. Return ONLY a JSON object with:
    {
        "total_spent": float (How much money did I spend in total for these bills?),
        "original_price": float (How much would I have had to pay without the discount)
    }
    Think step by step.
    If the image is not a receipt, return {"error": "REJECTED"}.
    """

    img_data = get_image_data_url(image_path)

    messages = [
        SystemMessage(content=system_msg),
        HumanMessage(content=[
            {"type": "image_url", "image_url": {"url": img_data}}
        ])
    ]

    response = llm.invoke(messages)
    try:
        # 解析模型返回的 JSON 字符串
        return json.loads(response.content.replace("```json", "").replace("```", ""))
    except:
        return None

In [None]:
image_paths = [f"/content/receipt{i}.jpg" for i in range(1, 8)]

query1_list = []
query2_list = []

print("--- 开始提取 7 张收据数据 ---")
for i, path in enumerate(image_paths):
    data = get_single_receipt_data(path)
    if data and "total_spent" in data:
        query1_list.append(data["total_spent"])
        query2_list.append(data["original_price"])
        print(f"收据 {i+1}: 实付 ${data['total_spent']}, 原价 ${data['original_price']}")
    else:
        print(f"收据 {i+1}: 识别失败或非收据格式")

# --- 最终计算结果给测试函数 ---
query1_answer = sum(query1_list)
query2_answer = sum(query2_list)

print("-" * 30)
print(f"总计实付 (Query 1): {query1_answer}")
print(f"总计原价 (Query 2): {query2_answer}")

## 3. Evaluation Code

* Make sure your LLM return a single float as the answer, stored in `query1_answer` and `query2_answer`
* Run the following code blocks: (1) If the blocks does not return any error, then your chain design is correct. Otherwise, please check your chain design.

* Do not modify `query_1_costs` and `query_2_costs`

In [None]:
def test_query(answer, ground_truth_costs):
    # Convert string to float if necessary
    if isinstance(answer, str):
        answer = float(answer)

    # Calculate the ground truth sum once for clarity
    expected_total = sum(ground_truth_costs)

    # Check if the answer is within +/- $2 of the expected total
    assert abs(answer - expected_total) <= 2

Run the following code block to evaluate query 1:
> How much money did I spend in total for these bills?

In [None]:
query_1_costs = [394.7, 316.1, 140.8, 514.0, 102.3, 190.8, 315.6] # do not modify this
query1_answer = sum(query1_list)
test_query(query1_answer, query_1_costs)

Run the following code block to evaluate query 2:
> How much would I have had to pay without the discount?

In [None]:
query_2_costs = [480.20, 392.20, 160.10, 590.80, 107.70, 221.20, 396.00] # do not modify this
query2_answer = sum(query2_list)
test_query(query2_answer, query_2_costs)

In [None]:
sum([480.20, 392.20, 160.10, 590.80, 107.70, 221.20, 396.00])

In [None]:
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage

# 初始化模型
llm = ChatOpenAI(
    model="gemini-2.5-flash",
    api_key="sk-e2GpF64Q21z5Ha1qt3Eg6NJiiBJrhFuf5s4rhr6nH9M78OWR",
    base_url="https://api.chatanywhere.tech/v1",
    temperature=0
)

def get_single_receipt_data(image_path):
    """提取单张收据的核心金额"""
    system_msg = """You are a receipt parser. Return ONLY a JSON object with:
    {
        "total_spent": float (How much money did I spend in total for these bills?),
        "original_price": float (How much would I have had to pay without the discount)
    }
    Think step by step.
    If the image is not a receipt, return {"error": "REJECTED"}.
    """

    img_data = get_image_data_url(image_path)

    messages = [
        SystemMessage(content=system_msg),
        HumanMessage(content=[
            {"type": "image_url", "image_url": {"url": img_data}}
        ])
    ]

    # 增加错误处理，防止某张图识别失败导致程序崩溃
    try:
        response = llm.invoke(messages)
        content = response.content.replace("```json", "").replace("```", "")
        return json.loads(content)
    except:
        return None

def decide(query):
    """判断意图"""

    system_msg = """You are a query classifier.
    Analyze the user's query and classify it into one of these integers:
    1: The user asks about "Total Spent", "Final Cost", or "How much paid".
    2: The user asks about "Original Price", "Before Discount", or "Price without savings".
    0: The user asks anything else (Irrelevant query).

    Return ONLY a JSON object: {"class": int}
    """

    messages = [
        SystemMessage(content=system_msg),
        HumanMessage(content=query) # 2. 修正：必须把用户的问题传进去！
    ]

    try:
        response = llm.invoke(messages)
        content = response.content.replace("```json", "").replace("```", "")
        result = json.loads(content)
        return result.get("class", 0) # 3. 修正：直接返回整数
    except:
        return 0 # 解析失败默认拒绝

def llmchat(query, images):
    # --- 第一步：判断意图 ---
    intent_class = decide(query)
    print(f"意图类别: {intent_class}")

    # --- 第二步：拒绝逻辑 (符合 Source 49) ---
    if intent_class == 0:
        print("Reject! Irrelevant query.")
        return "I can only answer questions about receipt totals." # 直接返回，不要继续跑图片循环

    # --- 第三步：如果意图有效，才开始处理图片 ---
    query1_list = []
    query2_list = []

    print(f"--- 开始提取 {len(images)} 张收据数据 ---")
    for i, path in enumerate(images):
        data = get_single_receipt_data(path)
        if data and "total_spent" in data:
            # 确保数据是数字
            t_spent = data.get("total_spent", 0.0)
            o_price = data.get("original_price", 0.0)

            if isinstance(t_spent, (int, float)): query1_list.append(t_spent)
            if isinstance(o_price, (int, float)): query2_list.append(o_price)

            print(f"收据 {i+1}: 实付 ${t_spent}, 原价 ${o_price}")
        else:
            print(f"收据 {i+1}: 识别失败")

    # --- 第四步：根据意图返回结果 ---
    query1_answer = sum(query1_list)
    query2_answer = sum(query2_list)

    if intent_class == 1:
        return query1_answer
    elif intent_class == 2:
        return query2_answer

In [None]:
image_paths = [f"/content/receipt{i}.jpg" for i in range(1, 8)]
llmchat("123",images=image_paths)

In [None]:
image_paths = [f"/content/receipt{i}.jpg" for i in range(1, 8)]
llmchat("How much money did I spend in total for these bills?",images=image_paths)

In [None]:
image_paths = [f"/content/receipt{i}.jpg" for i in range(1, 8)]
llmchat("How much would I have had to pay without the discount?",images=image_paths)