In [1]:
# Load model directly
from transformers import AutoImageProcessor, AutoModelForImageClassification
from PIL import Image
import requests
import time

processor = AutoImageProcessor.from_pretrained("dima806/medicinal_plants_image_detection")
model = AutoModelForImageClassification.from_pretrained("dima806/medicinal_plants_image_detection")
model

# Load the local image file
image = Image.open("download (2).jpeg")

# Process the image for model input
inputs = processor(images=image, return_tensors="pt")

time_start = time.time()
# Get model predictions
outputs = model(**inputs)
time_end = time.time()
print("Time taken for prediction:", time_end - time_start)
predictions = outputs.logits.softmax(dim=1)

# Get the predicted class
predicted_class_idx = predictions.argmax().item()
predicted_class = model.config.id2label[predicted_class_idx]
confidence = predictions[0][predicted_class_idx].item()

print(f"Predicted class: {predicted_class}")
print(f"Confidence: {confidence:.2%}")

# 0.03592205047607422 laptop gpu

  from .autonotebook import tqdm as notebook_tqdm
Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


Time taken for prediction: 5.109289646148682
Predicted class: Mint
Confidence: 35.10%


In [5]:
import sounddevice as sd
import numpy as np
from pynput import keyboard as pynput_keyboard
import threading
import queue
import time

# Setup audio parameters
sample_rate = 16000  # Sample rate expected by Whisper
channels = 1
dtype = 'float32'

from transformers import WhisperProcessor, WhisperForConditionalGeneration
from datasets import load_dataset

# load model and processor
processor = WhisperProcessor.from_pretrained("openai/whisper-tiny")
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny")
model.config.forced_decoder_ids = None

# Create a queue to store audio chunks
audio_queue = queue.Queue()
is_recording = False
stop_recording = False

def audio_callback(indata, frames, time, status):
    """This is called for each audio block"""
    if is_recording:
        # Add the current chunk to our queue
        audio_queue.put(indata.copy())

def key_monitor():
    """Monitor space bar presses and control recording"""
    global is_recording, stop_recording
    
    print("Press and hold SPACE to record. Release to transcribe. Press ESC to quit.")
    
    while not stop_recording:
        # Start recording when space is pressed
        if keyboard.is_pressed('space') and not is_recording:
            is_recording = True
            audio_queue.queue.clear()  # Clear any old audio
            print("Recording... (holding space)")
        
        # Stop recording when space is released
        elif not keyboard.is_pressed('space') and is_recording:
            is_recording = False
            print("Processing...")
            process_audio()
        
        # Exit on ESC
        if keyboard.is_pressed('esc'):
            stop_recording = True
            is_recording = False
            print("Stopping...")
        
        time.sleep(0.01)  # Small sleep to prevent CPU hogging

def process_audio():
    """Process collected audio chunks and transcribe"""
    if audio_queue.empty():
        print("No audio recorded")
        return
    
    # Combine all audio chunks
    chunks = []
    while not audio_queue.empty():
        chunks.append(audio_queue.get())
    
    if not chunks:
        return
        
    audio_data = np.concatenate(chunks, axis=0)
    audio_flat = audio_data.flatten()
    
    # Process through Whisper
    input_features = processor(
        audio_flat,
        sampling_rate=sample_rate,
        return_tensors="pt"
    ).input_features
    
    # Generate transcription
    predicted_ids = model.generate(input_features)
    transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
    
    print(f"Transcription: {transcription}")

def start_space_bar_transcription():
    """Start the main transcription loop"""
    global stop_recording, is_recording
    
    # Reset flags
    stop_recording = False
    is_recording = False
    
    # Start audio stream
    with sd.InputStream(samplerate=sample_rate, channels=channels, dtype=dtype, callback=audio_callback):
        # Start key monitoring in a separate thread
        monitor_thread = threading.Thread(target=key_monitor)
        monitor_thread.start()
        
        try:
            # Wait for the monitor thread to finish
            while not stop_recording:
                time.sleep(0.1)
        except KeyboardInterrupt:
            stop_recording = True
            is_recording = False
            print("Interrupted by user")
        
        # Wait for the monitor thread to finish
        monitor_thread.join()
    
    print("Transcription stopped")                     


start_space_bar_transcription()

Exception in thread Thread-5 (key_monitor):
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "/home/zoro/.virtualenvs/pi/lib/python3.11/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/usr/lib/python3.11/threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_2321/966556466.py", line 40, in key_monitor
  File "/home/zoro/.virtualenvs/pi/lib/python3.11/site-packages/keyboard/__init__.py", line 410, in is_pressed
    _listener.start_if_necessary()
  File "/home/zoro/.virtualenvs/pi/lib/python3.11/site-packages/keyboard/_generic.py", line 35, in start_if_necessary
    self.init()
  File "/home/zoro/.virtualenvs/pi/lib/python3.11/site-packages/keyboard/__init__.py", line 196, in init
    _os_keyboard.init()
  File "/home/zoro/.virtualenvs/pi/lib/python3.11/site-packages/keyboard/_nixkeyboard.py", line 113, in i

Press and hold SPACE to record. Release to transcribe. Press ESC to quit.
Interrupted by user
Transcription stopped


# Voice for linux

In [None]:
import sounddevice as sd
import numpy as np
from pynput import keyboard as pynput_keyboard
import threading
import queue
import time

# Setup audio parameters
sample_rate = 16000  # Sample rate expected by Whisper
channels = 1
dtype = 'float32'

from transformers import WhisperProcessor, WhisperForConditionalGeneration
from datasets import load_dataset

# load model and processor
processor = WhisperProcessor.from_pretrained("openai/whisper-tiny")
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny")
model.config.forced_decoder_ids = None

# Create a queue to store audio chunks
audio_queue = queue.Queue()
is_recording = False
stop_recording = False

def audio_callback(indata, frames, time, status):
    """This is called for each audio block"""
    if is_recording:
        audio_queue.put(indata.copy())
        # print(f"Captured audio chunk of shape {indata.shape}")

def key_monitor():
    """Monitor space bar and ESC using pynput"""
    global is_recording, stop_recording

    def on_press(key):
        global is_recording
        try:
            # Check if key is the space key using pynput's Key.space
            if key == pynput_keyboard.Key.space and not is_recording:
                is_recording = True
                audio_queue.queue.clear()
                print("Recording... (holding space)")
        except AttributeError:
            pass

    def on_release(key):
        global is_recording, stop_recording
        try:
            if key == pynput_keyboard.Key.space and is_recording:
                is_recording = False
                print("Processing...")
                process_audio()
        except AttributeError:
            if key == pynput_keyboard.Key.esc:
                stop_recording = True
                is_recording = False
                print("Stopping...")
                return False  # Stop listener

    print("Press and hold SPACE to record. Release to transcribe. Press ESC to quit.")
    with pynput_keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
        listener.join()

def process_audio():
    """Process collected audio chunks and transcribe"""
    if audio_queue.empty():
        print("No audio recorded")
        return
    
    chunks = []
    while not audio_queue.empty():
        chunks.append(audio_queue.get())
    
    if not chunks:
        return
        
    audio_data = np.concatenate(chunks, axis=0)
    audio_flat = audio_data.flatten()
    
    input_features = processor(
        audio_flat,
        sampling_rate=sample_rate,
        return_tensors="pt"
    ).input_features
    
    predicted_ids = model.generate(input_features)
    transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
    
    print(f"Transcription: {transcription}")

def start_space_bar_transcription():
    """Start the main transcription loop"""
    global stop_recording, is_recording
    
    stop_recording = False
    is_recording = False
    
    with sd.InputStream(samplerate=sample_rate, channels=channels, dtype=dtype, callback=audio_callback):
        monitor_thread = threading.Thread(target=key_monitor)
        monitor_thread.start()
        
        try:
            while not stop_recording:
                time.sleep(0.1)
        except KeyboardInterrupt:
            stop_recording = True
            is_recording = False
            print("Interrupted by user")
        
        monitor_thread.join()
    
    print("Transcription stopped")                     

start_space_bar_transcription()


  from .autonotebook import tqdm as notebook_tqdm


Press and hold SPACE to record. Release to transcribe. Press ESC to quit.
Recording... (holding space)


Due to a bug fix in https://github.com/huggingface/transformers/pull/28687 transcription using a multilingual Whisper will default to language detection followed by transcription instead of translation to English.This might be a breaking change for your use case. If you want to instead always translate your audio to English, make sure to pass `language='en'`.


Processing...


The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Transcription:  you
Recording... (holding space)
Processing...
Transcription:  you
Recording... (holding space)
Processing...
Transcription:  Hello, can you hear me now?
Recording... (holding space)
Processing...
Transcription:  It looks like the problem was in the mic and it was not connected.
Interrupted by user


In [2]:

import subprocess
import os
import asyncio
import numpy as np
import time
import shlex

async def speak(text):
    # engine.say(text)
    # engine.runAndWait()
    # command = f'echo "{text}" | ./rasp/piper/piper/piper --model ./piper/en_US-kathleen-low.onnx --rate 125 --output-raw | aplay -r 16050 -f S16_LE -t raw -'
    
    safe_text = shlex.quote(text)
    command = (
        f"echo {safe_text} | piper --model en_US-lessac-medium.onnx --output-raw | "
        f"aplay -r 22050 -f S16_LE -t raw -"
    )
    # Execute the command
    text_length = len(text)
    duration_per_character = 0.05  # Example: 50 milliseconds per character
    estimated_duration = text_length * duration_per_character

    subprocess.run(command, shell=True)
    await asyncio.sleep(1)
    # time.sleep(estimated_duration+4)
    print("TTS is done")

# asyncio.run(speak("Hello how are you doing on a fine day like this?"))            
# speak("Hello how are you doing on a fine day like this?")                              


In [None]:
import ollama
response = ollama.generate(
                model="gemma3:1b",
                prompt=f"""You are an outdoor voice assistant. Explain in detail.
                Current query: {"Explain why light speed is constant"}
                Remember to answer the question without any preamble or introduction.""",
                stream=True,
                options={
                    "temperature": 0.7,
                    "num_predict": 1000  # Limit response length
                }
            )

import re

buffer = ""
sentence_endings = re.compile(r'([.!?])')  # Detect sentence-ending punctuation

for chunk in response:
    chunk_text = chunk['response']
    buffer += chunk_text
    # Check for full sentences
    while True:
        match = sentence_endings.search(buffer)
        if not match:
            break  # No sentence-ending punctuation yet
        end_idx = match.end()
        complete_sentence = buffer[:end_idx].strip()
        buffer = buffer[end_idx:]  # Keep the remainder
        print(complete_sentence, end=' ', flush=True)
        asyncio.run(speak(complete_sentence))  # Call your TTS function here

# If anything is left unspoken after the loop, speak it
if buffer.strip():
    print(buffer.strip(), end=' ', flush=True)
    asyncio.run(speak(buffer.strip()))

Okay, let’s delve into why light speed is considered constant. 

RuntimeError: asyncio.run() cannot be called from a running event loop

In [3]:
async def stream_and_speak():
    response = ollama.generate(
        model="gemma3:1b",
        prompt=(
            "You are an outdoor voice assistant. Explain in detail.\n"
            "Current query: Explain why light speed is constant.\n"
            "Remember to answer the question without any preamble or introduction."
        ),
        stream=True,
        options={
            "temperature": 0.7,
            "num_predict": 1000
        }
    )

    buffer = ""
    sentence_endings = re.compile(r'([.!?])')

    for chunk in response:
        chunk_text = chunk['response']
        buffer += chunk_text

        while True:
            match = sentence_endings.search(buffer)
            if not match:
                break
            end_idx = match.end()
            complete_sentence = buffer[:end_idx].strip()
            buffer = buffer[end_idx:]
            print(complete_sentence, end=' ', flush=True)
            await speak(complete_sentence)

    if buffer.strip():
        print(buffer.strip(), end=' ', flush=True)
        await speak(buffer.strip())

# Run it
asyncio.run(stream_and_speak())

RuntimeError: asyncio.run() cannot be called from a running event loop

In [None]:
from transformers import pipeline
import torch

pipe = pipeline("text-generation", model="google/gemma-3-1b-it", torch_dtype=torch.bfloat16)

messages = [
    [
        {
            "role": "system",
            "content": [{"type": "text", "text": "You are a helpful assistant."},]
        },
        {
            "role": "user",
            "content": [{"type": "text", "text": "Write a poem on Hugging Face, the company"},]
        },
    ],
]

output = pipe(messages, max_new_tokens=50)
output[0][0]["generated_text"]


Device set to use cpu


NameError: name 'generate_response' is not defined