In [1]:
import gradio as gr
from google import genai
from google.genai import types
import io
from IPython.display import display, Audio
from kokoro import KPipeline
import librosa
import math
import numpy as np
import os
import os.path
from PIL import Image 
import requests 
import shutil
import subprocess
import soundfile as sf
import time
from transformers import AutoModelForCausalLM, AutoProcessor, BarkModel, BitsAndBytesConfig
import torch
import threading
client = genai.Client(api_key="GEMIN")

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
image_local = True
start_time = time.time()

ending = ("To adjust intonation, please add dedicated punctuation like ; : , . ! ? … ( ) “ ” "
         "For more dramatic effects use symbols such as — or … for hesitations, and word capitalization for more emphasis.")

system_prompt = ("You are a friendly chatty photo commentator who likes to casually describe work done by a photographer " 
         "in various details, even by pondering the implications on where and in what kind of setting the photo was taken, etc. Write your " 
         "response in a very personal way using personal pronouns and explaining what you see, perhaps also adding how it makes you feel. " 
         "Do your best to not be repetative in your choice of words and keep the response length down to a few sentences. ")

system_prompt += ending

pipeline = KPipeline(lang_code='a')

if image_local:
    model_id = "microsoft/Phi-3.5-vision-instruct" 
    quantization_config = BitsAndBytesConfig(load_in_4bit=True)
    
    # Note: set _attn_implementation='eager' if you don't have flash_attn installed
    model = AutoModelForCausalLM.from_pretrained(
        model_id, 
        device_map="cuda", 
        trust_remote_code=True, 
        quantization_config=quantization_config,
        torch_dtype="auto", 
        _attn_implementation='flash_attention_2'    
    )
    
    # for best performance, use num_crops=4 for multi-frame, num_crops=16 for single-frame.
    processor = AutoProcessor.from_pretrained(model_id, 
      trust_remote_code=True, 
      num_crops=4
    ) 
    
    generation_args = { 
        "max_new_tokens": 200, 
        "temperature": 0.2, 
        "do_sample": True, 
    }
else:
    model = processor = None
    
end_time = time.time()
print("Loading finished in " + str(round(end_time - start_time, 2)) + " seconds")



  WeightNorm.apply(module, name, dim)
Loading checkpoint shards: 100%|██████████████████████████████████████████████████████████| 2/2 [00:02<00:00,  1.25s/it]


Loading finished in 8.92 seconds


In [3]:
def generate_text(local, system_prompt, file1, file2=None, model=None, processor=None): 
    start_time = time.time()

    images = []
    placeholder = ""
    
    # Setting the points for cropped image
    left = 25
    top = 170
    right = 2090
    bottom = 1450
    
    local1 = open(file1, 'rb')
    openLocalImage1 = Image.open(local1)
     
    # Cropped image of above dimension
    croppedImage1 = openLocalImage1.crop((left, top, right, bottom))
    images.append(croppedImage1)
    placeholder += f"<|image_1|>\n"
    # For Gemini
    img_byte_arr1 = io.BytesIO()
    croppedImage1.save(img_byte_arr1, format='PNG')
    img_byte_arr1 = img_byte_arr1.getvalue()

    user_prompt = ("Summarize what is visible in this photo. " + ending)

    if file2 is not None:
        local2 = open(file2, 'rb')
        openLocalImage2 = Image.open(local2)
         
        # Cropped image of above dimension
        croppedImage2 = openLocalImage2.crop((left, top, right, bottom))
        images.append(croppedImage2)
        placeholder += f"<|image_2|>\n"
        # For Gemini
        img_byte_arr2 = io.BytesIO()
        croppedImage2.save(img_byte_arr2, format='PNG')
        img_byte_arr2 = img_byte_arr2.getvalue()
        
        user_prompt = ("Summarize what is visible in the current photo (the first one). " + 
             "How is it different from the previous photo (the second one)? There may be some subtle differences as well. " + ending)

    if local:
    
        messages = [
            {"role": "system", "content": system_prompt,},
            {"role": "user", "content": placeholder + user_prompt},
        ]
    
        prompt = processor.tokenizer.apply_chat_template(
          messages, 
          tokenize=False, 
          add_generation_prompt=True
        )
        
        inputs = processor(prompt, images, return_tensors="pt").to("cuda:0") 
        
        generate_ids = model.generate(**inputs, 
          eos_token_id=processor.tokenizer.eos_token_id, 
          **generation_args
        )
        
        # remove input tokens 
        generate_ids = generate_ids[:, inputs['input_ids'].shape[1]:]
        response = processor.batch_decode(generate_ids, 
          skip_special_tokens=True, 
          clean_up_tokenization_spaces=False)[0]
        
        end_time = time.time()
        print("Generating text finished in " + str(round(end_time - start_time, 2)) + " seconds")
        return response
    else:
        # Create the prompt with text and multiple images
        response = client.models.generate_content(
            model="gemini-2.0-flash",
            config=types.GenerateContentConfig(system_instruction = system_prompt),
            contents=[
                user_prompt,
                types.Part.from_bytes(
                    data=img_byte_arr1,
                    mime_type='image/png'
                ),
                types.Part.from_bytes(
                    data=img_byte_arr2,
                    mime_type='image/png'
                )
            ]
        )
        
        end_time = time.time()
        print("Generating text finished in " + str(round(end_time - start_time, 2)) + " seconds")
        return response.text

In [4]:
def take_screenshot():
    windir = "C:\\Users\\matis\\OneDrive\\Desktop\\script-it\\"
    lindir = "/mnt/c/Users/matis/OneDrive/Desktop/script-it/"

    name = "shot.png"
    if os.path.isfile(lindir+name):
        shutil.copyfile(lindir+name, lindir+name.replace("shot","shot_prev"))
    
    subprocess.call(['/mnt/c/Users/matis/OneDrive/Desktop/script-it/nircmd.exe', 'cmdwait', '2000', 'savescreenshot', 
                     windir+'shot.png'])


In [12]:
def generate_audio(pipeline, text):
    start_time = time.time()
    
    text = text.replace("first photo", "current photo")
    text = text.replace("second photo", "previous photo")
    
    voice_tensor1 = torch.load('af_nicole.pt', weights_only=True)
    voice_tensor2 = torch.load('jf_alpha.pt', weights_only=True)
    t = 0.3
    interp_voice = (1 - t) * voice_tensor1 + t * voice_tensor4

    generator = pipeline(text, voice=interp_voice, speed=1, split_pattern=r'\n+')
    
    end_time = time.time()
    print("Generating speech finished in " + str(round(end_time - start_time, 2)) + " seconds")
    
    for i, (gs, ps, audio) in enumerate(generator):
        print(i, gs)
        duration = math.ceil(librosa.get_duration(y=audio, sr=24000))
        countdown_state["time_left"] += int(duration)
        audio_data = Audio(data=audio, rate=24000, autoplay=True)
        display(audio_data)
        time.sleep(duration)
        


In [13]:
# Shared state
loop_flag = {"running": False}
countdown_state = {"time_left": 25}

def loop_task():
    filename1 = "./shot.png"
    filename2 = "./shot_prev.png"
    while loop_flag["running"]:
        countdown_state["time_left"] = 20
        
        # take_screenshot()
        if os.path.isfile(filename2):
            text = generate_text(image_local, system_prompt, filename1, filename2, model, processor)
        else:
            text = generate_text(image_local, system_prompt, filename1, None, model, processor)
        
        start_time = time.time()
        generate_audio(pipeline, text)
        end_time = time.time()
        # How much time past in the audio?
        elapsed_time = end_time - start_time
        if elapsed_time < 45.00:
            countdown_state["time_left"] = int(45 - elapsed_time)
            print("Waiting " + str(countdown_state["time_left"]) + " seconds...")
            while countdown_state["time_left"] > 0:
                time.sleep(1)
                countdown_state["time_left"] -= 1
            else:
                countdown_state["time_left"] = 0
    loop_flag["running"] = False

def start_loop():
    if not loop_flag["running"]:
        loop_flag["running"] = True
        thread = threading.Thread(target=loop_task)
        thread.start()
        return "Loop started."
    return "Loop already running."

def stop_loop():
    loop_flag["running"] = False
    return "Loop stopped."

def update_textbox():
    start_loop()
    while loop_flag["running"]:
        yield gr.update(value=str(countdown_state["time_left"]))
        countdown_state["time_left"] -= 1
        if not loop_flag["running"]:
            break
        time.sleep(1)

with gr.Blocks(
    theme=gr.themes.Soft(spacing_size="sm"),
    title='Live Commentary Demo',
    css="footer{display:none !important}"
  ) as demo:
    with gr.Column():
        with gr.Row():
            start_btn = gr.Button("Start Loop")
            stop_btn = gr.Button("Stop Loop")
        
        text = gr.Textbox(label="Remaining time in seconds", lines=1, interactive=False)
        stop_btn.click(stop_loop, outputs=text)
        start_btn.click(fn=update_textbox, outputs=text)

demo.launch(height=165)

* Running on local URL:  http://127.0.0.1:7864

To create a public link, set `share=True` in `launch()`.


