In [8]:
# %pip install haystack-ai
%pip install google-vertex-haystack
%pip install git+https://github.com/deepset-ai/haystack.git@main

^C
Note: you may need to restart the kernel to use updated packages.


In [1]:
from haystack_integrations.components.generators.google_vertex import VertexAIGeminiGenerator
# import Part of Google Vertex AI
from vertexai.generative_models import Part

## Defining a Schema to Parse the JSON Object

In [2]:
from typing import List
from pydantic import BaseModel, validator

class audioDescription(BaseModel):
    Audiodescription: str

In [3]:
json_schema = audioDescription.schema_json()

In [4]:
class videoSummary(BaseModel):
    who: str
    what: str
    when: str
    where: str
    how: str
    why: str

In [5]:
summary_schema = videoSummary.schema_json(indent=2)

## Creating a Custom Component: OutputValidator

In [6]:
import json
import random
import pydantic
from pydantic import ValidationError
from typing import Optional, List
from colorama import Fore
from haystack import component
import re

# Define the component input parameters
@component
class OutputValidator:
    def __init__(self, pydantic_model: pydantic.BaseModel):
        self.pydantic_model = pydantic_model
        self.iteration_counter = 0

    # Define the component output
    @component.output_types(valid_replies=List[str], invalid_replies=Optional[List[str]], error_message=Optional[str])
    def run(self, replies: List[str]):
        self.iteration_counter += 1

        # Try to parse the LLM's reply
        try:
            json_match = re.search(r'```json\s*([\s\S]*?)\s*```', replies[0])
            if json_match is None:
                json_match = re.search(r'```python\s*([\s\S]*?)\s*```', replies[0])
                if json_match is None:
                    raise ValueError("No JSON block found in the LLM's reply")
            
            output_dict = json.loads(json_match.group(1))
            replies[0] = json_match.group(1)
            self.pydantic_model.parse_obj(output_dict)
            
            print(
                Fore.GREEN
                + f"OutputValidator at Iteration {self.iteration_counter}: Valid JSON from LLM - No need for looping: {replies[0]}"
            )
            return {"valid_replies": replies}

        # Handle invalid JSON or other errors
        except (ValueError, ValidationError) as e:
            print(
                Fore.RED
                + f"OutputValidator at Iteration {self.iteration_counter}: Invalid JSON from LLM - Let's try again.\n"
                f"Output from LLM:\n {replies[0]} \n"
                f"Error from OutputValidator: {e}"
            )
            return {"invalid_replies": replies, "error_message": str(e)}

In [7]:
output_validator = OutputValidator(pydantic_model=audioDescription)

In [8]:
summary_validator = OutputValidator(pydantic_model=videoSummary)

## Creating the Prompt

In [9]:
from haystack.components.builders import PromptBuilder

prompt_template = """
影片摘要:
{{ summary }}

你是專業的口述影像稿生成器，參考影片及影片摘要。
僅依照提供的資料，創建一個50字以內的口述影像腳本JSON文件，儘量貼近原作品再現的原則，著重於劇情相關的畫面描述。無須描述對話，其中包含一個繁體中文口述影像旁白腳本。內容應該是一個字符串。例如：
{{schema}}
僅使用提供的資料，不要添加任何其他資訊並確保您的答案符合格式要求，確保回覆是dict類型。
{% if invalid_replies and error_message %}
您已經在先前的嘗試中建立了以下輸出：{{invalid_replies}}
但是，這不符合上面的格式要求並觸發了此 Python 異常：{{error_message}}
更正輸出並重試。只需返回正確的輸出，無需任何額外的解釋。
{% endif %}
"""

prompt_builder = PromptBuilder(template=prompt_template)

In [10]:
summary_template = """
用 who、what、when、where、what、how、why 摘要以下影片
僅依照提供的資料，創建一個JSON文件，其中包含who、what、when、where、what、how、why 字段。內容應該是一個字符串。例如：
{{schema}}
僅使用提供的資料，不要添加任何其他資訊並確保您的答案符合格式要求，確保回覆是dict類型。
{% if invalid_replies and error_message %}
您已經在先前的嘗試中建立了以下輸出：{{invalid_replies}}
但是，這不符合上面的格式要求並觸發了此 Python 異常：{{error_message}}
更正輸出並重試。只需返回正確的輸出，無需任何額外的解釋。
{% endif %}
"""

In [11]:
summary_prompt_builder = PromptBuilder(template=summary_template)

In [12]:
# 在prompt中加入影片
@component
class AddVideo2Prompt:
    # [
    #     Part.from_uri(
    #         "gs://gemini-ad-gen/AD001.mp4", mime_type="video/mp4"
    #     ),
    #     prompt
    # ]

    @component.output_types(prompt=list)
    def run(self, uri: str, prompt: str):
        return {"prompt": [Part.from_uri(uri, mime_type="video/mp4"),prompt]}


In [13]:
add_video_2_prompt = AddVideo2Prompt()

In [14]:
add_video_2_summary_prompt = AddVideo2Prompt()

In [15]:
@component
class GeminiGenerator:
    def __init__(self, project_id, location, model):
        self.project_id = project_id
        self.location = location
        self.model = model
    
    @component.output_types(replies=List[str])
    def run(self, prompt: List):
        generator = VertexAIGeminiGenerator(project_id=self.project_id, location=self.location, model=self.model)
        return {"replies": generator.run(prompt)["replies"]}

In [16]:
gemini_generator = GeminiGenerator(project_id="gemini-rain-py", location="us-central1", model="gemini-1.5-pro-preview-0514")

In [17]:
summary_gemini_generator = GeminiGenerator(project_id="gemini-rain-py", location="us-central1", model="gemini-1.5-flash-001")

In [18]:
from google.cloud import storage
@component
class upload2GCS:
    def __init__(self, bucket_name: str):
        self.bucket_name = bucket_name

    @component.output_types(uri=str)
    def run(self, file_path: str):
        storage_client = storage.Client()
        bucket = storage_client.bucket(self.bucket_name)
        file_name = file_path.split("/")[-1]
        blob = bucket.blob(file_name)
        blob.upload_from_filename(file_path)
        return {"uri": f"gs://{self.bucket_name}/{file_name}"}
    

In [19]:
upload2gcs = upload2GCS(bucket_name="gemini-ad-gen")

In [20]:
from haystack import Pipeline
pipeline = Pipeline(max_loops_allowed=5)

# Add components to your pipeline
pipeline.add_component(instance=upload2gcs, name="upload2gcs")
pipeline.add_component(instance=summary_prompt_builder, name="summary_prompt_builder")
pipeline.add_component(instance=add_video_2_summary_prompt, name="add_video_2_summary_prompt")
pipeline.add_component(instance=summary_gemini_generator, name="summary_generator")
pipeline.add_component(instance=summary_validator, name="summary_validator")
pipeline.add_component(instance=prompt_builder, name="prompt_builder")
pipeline.add_component(instance=add_video_2_prompt, name="add_video")
pipeline.add_component(instance=gemini_generator, name="llm")
pipeline.add_component(instance=output_validator, name="output_validator")

# Now, connect the components to each other
pipeline.connect("upload2gcs", "add_video")
pipeline.connect("upload2gcs", "add_video_2_summary_prompt")
pipeline.connect("summary_prompt_builder", "add_video_2_summary_prompt")
pipeline.connect("add_video_2_summary_prompt", "summary_generator")
pipeline.connect("summary_generator", "summary_validator")
pipeline.connect("summary_validator.valid_replies", "prompt_builder.summary")
pipeline.connect("summary_validator.invalid_replies", "summary_prompt_builder")
pipeline.connect("summary_validator.error_message", "summary_prompt_builder")
pipeline.connect("prompt_builder", "add_video")
pipeline.connect("add_video.prompt", "llm")
pipeline.connect("llm", "output_validator")
# # If a component has more than one output or input, explicitly specify the connections:
pipeline.connect("output_validator.invalid_replies", "prompt_builder.invalid_replies")
pipeline.connect("output_validator.error_message", "prompt_builder.error_message")

<haystack.core.pipeline.pipeline.Pipeline object at 0x000002F19DF5CE10>
🚅 Components
  - upload2gcs: upload2GCS
  - summary_prompt_builder: PromptBuilder
  - add_video_2_summary_prompt: AddVideo2Prompt
  - summary_generator: GeminiGenerator
  - summary_validator: OutputValidator
  - prompt_builder: PromptBuilder
  - add_video: AddVideo2Prompt
  - llm: GeminiGenerator
  - output_validator: OutputValidator
🛤️ Connections
  - upload2gcs.uri -> add_video.uri (str)
  - upload2gcs.uri -> add_video_2_summary_prompt.uri (str)
  - summary_prompt_builder.prompt -> add_video_2_summary_prompt.prompt (str)
  - add_video_2_summary_prompt.prompt -> summary_generator.prompt (list)
  - summary_generator.replies -> summary_validator.replies (List[str])
  - summary_validator.valid_replies -> prompt_builder.summary (List[str])
  - summary_validator.invalid_replies -> summary_prompt_builder.invalid_replies (Optional[List[str]])
  - summary_validator.error_message -> summary_prompt_builder.error_message (Op

In [21]:
pipeline.draw("auto-correct-pipeline_2_layer.png")

In [25]:
path = r"D:\NUK\GraduationProject\UI\VideoEyes_Vite-UI\Haystack\AD005.mp4"
result = pipeline.run({
    "upload2gcs": { "file_path": path},
    "summary_prompt_builder": {"schema": summary_schema},
    "prompt_builder": {"schema": json_schema}
})


[32mOutputValidator at Iteration 3: Valid JSON from LLM - No need for looping: {
  "who": "A security guard",
  "what": "searching for a missing child in a subway station",
  "when": "not specified",
  "where": "in a subway station",
  "how": "asking passengers and checking security cameras",
  "why": "to find the missing child"
}
[31mOutputValidator at Iteration 4: Invalid JSON from LLM - Let's try again.
Output from LLM:
 {"audioDescription": {"Audiodescription": "一名保安人員，正於捷運站內協尋一名走失兒童，他詢問乘客並查看監視器畫面"}} 
Error from OutputValidator: 1 validation error for audioDescription
audioDescription
  str type expected (type=type_error.str)
[31mOutputValidator at Iteration 5: Invalid JSON from LLM - Let's try again.
Output from LLM:
 {"audioDescription": {"Audiodescription": "捷運站內，一名保安人員手持無線電通話，神色焦急地搜尋著。另一名女保安彎腰詢問一名婦人，月台上的乘客來來往往。"}} 
Error from OutputValidator: 1 validation error for audioDescription
audioDescription
  str type expected (type=type_error.str)


ResourceExhausted: 429 Quota exceeded for aiplatform.googleapis.com/generate_content_requests_per_minute_per_project_per_base_model with base model: gemini-1.5-pro. Please submit a quota increase request. https://cloud.google.com/vertex-ai/docs/generative-ai/quotas-genai.

In [24]:
valid_reply = result["output_validator"]["valid_replies"][0]
valid_json = json.loads(valid_reply)
print(valid_json)

{'audioDescription': '一名保安人員在車站月台和閘門搜尋走失兒童。'}
