In [None]:
from google import genai
from google.genai import types
import wave

# Set up the wave file to save the output:
def wave_file(filename, pcm, channels=1, rate=24000, sample_width=2):
   with wave.open(filename, "wb") as wf:
      wf.setnchannels(channels)
      wf.setsampwidth(sample_width)
      wf.setframerate(rate)
      wf.writeframes(pcm)

client = genai.Client(api_key="GEMINI_API_KEY")

response = client.models.generate_content(
   model="gemini-2.5-flash-preview-tts",
   contents="Say cheerfully: Have a wonderful day!",
   config=types.GenerateContentConfig(
      response_modalities=["AUDIO"],
      speech_config=types.SpeechConfig(
         voice_config=types.VoiceConfig(
            prebuilt_voice_config=types.PrebuiltVoiceConfig(
               voice_name='Kore',
            )
         )
      ),
   )
)

data = response.candidates[0].content.parts[0].inline_data.data

file_name='out.wav'
wave_file(file_name, data) # Saves the file to current directory

In [None]:
# To run this code you need to install the following dependencies:
# pip install google-genai

import base64
import mimetypes
import os
import re
import struct
from google import genai
from google.genai import types


def save_binary_file(file_name, data):
    f = open(file_name, "wb")
    f.write(data)
    f.close()
    print(f"File saved to to: {file_name}")


def generate():
    client = genai.Client(
        api_key=os.environ.get("GEMINI_API_KEY"),
    )

    model = "gemini-2.5-pro-preview-tts"
    contents = [
        types.Content(
            role="user",
            parts=[
                types.Part.from_text(text="""INSERT_INPUT_HERE"""),
            ],
        ),
    ]
    generate_content_config = types.GenerateContentConfig(
        temperature=1,
        response_modalities=[
            "audio",
        ],
        speech_config=types.SpeechConfig(
            voice_config=types.VoiceConfig(
                prebuilt_voice_config=types.PrebuiltVoiceConfig(
                    voice_name="Zephyr"
                )
            )
        ),
    )

    file_index = 0
    for chunk in client.models.generate_content_stream(
        model=model,
        contents=contents,
        config=generate_content_config,
    ):
        if (
            chunk.candidates is None
            or chunk.candidates[0].content is None
            or chunk.candidates[0].content.parts is None
        ):
            continue
        if chunk.candidates[0].content.parts[0].inline_data and chunk.candidates[0].content.parts[0].inline_data.data:
            file_name = f"ENTER_FILE_NAME_{file_index}"
            file_index += 1
            inline_data = chunk.candidates[0].content.parts[0].inline_data
            data_buffer = inline_data.data
            file_extension = mimetypes.guess_extension(inline_data.mime_type)
            if file_extension is None:
                file_extension = ".wav"
                data_buffer = convert_to_wav(inline_data.data, inline_data.mime_type)
            save_binary_file(f"{file_name}{file_extension}", data_buffer)
        else:
            print(chunk.text)

def convert_to_wav(audio_data: bytes, mime_type: str) -> bytes:
    """Generates a WAV file header for the given audio data and parameters.

    Args:
        audio_data: The raw audio data as a bytes object.
        mime_type: Mime type of the audio data.

    Returns:
        A bytes object representing the WAV file header.
    """
    parameters = parse_audio_mime_type(mime_type)
    bits_per_sample = parameters["bits_per_sample"]
    sample_rate = parameters["rate"]
    num_channels = 1
    data_size = len(audio_data)
    bytes_per_sample = bits_per_sample // 8
    block_align = num_channels * bytes_per_sample
    byte_rate = sample_rate * block_align
    chunk_size = 36 + data_size  # 36 bytes for header fields before data chunk size

    # http://soundfile.sapp.org/doc/WaveFormat/

    header = struct.pack(
        "<4sI4s4sIHHIIHH4sI",
        b"RIFF",          # ChunkID
        chunk_size,       # ChunkSize (total file size - 8 bytes)
        b"WAVE",          # Format
        b"fmt ",          # Subchunk1ID
        16,               # Subchunk1Size (16 for PCM)
        1,                # AudioFormat (1 for PCM)
        num_channels,     # NumChannels
        sample_rate,      # SampleRate
        byte_rate,        # ByteRate
        block_align,      # BlockAlign
        bits_per_sample,  # BitsPerSample
        b"data",          # Subchunk2ID
        data_size         # Subchunk2Size (size of audio data)
    )
    return header + audio_data

def parse_audio_mime_type(mime_type: str) -> dict[str, int | None]:
    """Parses bits per sample and rate from an audio MIME type string.

    Assumes bits per sample is encoded like "L16" and rate as "rate=xxxxx".

    Args:
        mime_type: The audio MIME type string (e.g., "audio/L16;rate=24000").

    Returns:
        A dictionary with "bits_per_sample" and "rate" keys. Values will be
        integers if found, otherwise None.
    """
    bits_per_sample = 16
    rate = 24000

    # Extract rate from parameters
    parts = mime_type.split(";")
    for param in parts: # Skip the main type part
        param = param.strip()
        if param.lower().startswith("rate="):
            try:
                rate_str = param.split("=", 1)[1]
                rate = int(rate_str)
            except (ValueError, IndexError):
                # Handle cases like "rate=" with no value or non-integer value
                pass # Keep rate as default
        elif param.startswith("audio/L"):
            try:
                bits_per_sample = int(param.split("L", 1)[1])
            except (ValueError, IndexError):
                pass # Keep bits_per_sample as default if conversion fails

    return {"bits_per_sample": bits_per_sample, "rate": rate}


if __name__ == "__main__":
    generate()


In [None]:
import chromadb
chroma_client = chromadb.PersistentClient("../DataBases/my_chroma_db")
import sys
import os
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(parent_dir)

In [None]:
from dotenv import load_dotenv
load_dotenv("../.env")
from utils.LLM import GeminiLLM

In [None]:
chroma_client.list_collections()

In [None]:
chroma_client.delete_collection("yt_transcripts")

In [None]:
my_collection = chroma_client.get_or_create_collection("yt_transcripts", embedding_function=GeminiLLM(os.getenv("GOOGLE_API_KEY")))

In [None]:
my_collection.query(query_texts="what happen in deep sea",n_results=5)

In [None]:
my_collection.get(where={"youtube_id": "NEk5cEEalYw"})['documents']

In [None]:
vstore = VectorStore(os.getenv("GOOGLE_API_KEY"))

In [None]:
db = chroma_client.get_collection("yt_transcripts")

In [None]:
/*************  ✨ Windsurf Command ⭐  *************/
CELL:
db.get({"youtube_id": "NEk5cEEalYw", "time_stamp": {"$gte": 120, "$lte": 180}})
/*******  e8e462ab-6ba1-4c85-b3ae-8061d251a52e  *******/

In [None]:
results = db.query(query_texts="what are the names of deep sea creatures",n_results=5)

In [None]:
results

In [None]:
db.get(where={"youtube_id": "NEk5cEEalYw"})

In [None]:
import sys
import os
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(parent_dir)

In [None]:
from dotenv import load_dotenv
load_dotenv("../.env")

In [None]:
from utils.AudioDownloader import AudioDownloader
from utils.LLM import GroqLLM
from pytubefix import YouTube

llm = GroqLLM(api_key=os.getenv("GROQ_API_KEY"))
# audio_downloader = AudioDownloader().download_audio("gm_lQG8fYjI")

In [None]:
import tempfile

def download_audio(video_id):
    video_id = video_id
    url = "https://www.youtube.com/watch?v=" + video_id
    yt = YouTube(url)
    audio_stream = yt.streams.filter(only_audio=True, abr="128kbps")

    with tempfile.NamedTemporaryFile(
        suffix=".m4a", delete=False
    ) as temp_audio:

        audio_directory = temp_audio.name.split("/")
        audio_stream.first().download(
            filename=audio_directory[-1],
            output_path="/" + audio_directory[1],
        )
    return temp_audio.name

In [None]:
url = "https://www.youtube.com/watch?v=" + "NEk5cEEalYw"
yt = YouTube(url)

In [None]:
from IPython.display import Audio

In [None]:
the_audio = download_audio("NEk5cEEalYw")

In [None]:
the_audio

In [None]:
os.path.getsize(the_audio)

In [None]:
from pydub import AudioSegment
import numpy as np
the_audio = AudioSegment.from_file(the_audio)


In [None]:
segments = np.linspace(0, 31220649+1, 3)

In [None]:
segments

In [None]:
new_chunk = the_audio[0:15610325].export("test.mp3",format="mp3")

In [None]:
len(the_audio.raw_data)

In [None]:
len(the_audio)

In [None]:
os.path.getsize("/home/kamal/Desktop/YT-QnA/notebooks/test.mp3")

In [None]:
from pydub.utils import make_chunks

In [None]:
chunks = make_chunks(the_audio, 35336798)

In [None]:

for i, chunk in enumerate(chunks):
    chunk_name = f"chunk_{i}.wav"
    chunk.export(chunk_name, format="wav")

In [None]:
Audio(the_audio[0:35336798])

In [None]:
new_chunk

In [None]:
if os.path.getsize(temp_audio.name) > (15 * 1024**2):
    audio = AudioSegment.from_file(temp_audio.name)
    temp_audio_paths = []
    no_of_required_chunks = int(np.ceil(os.path.getsize(temp_audio.name) / (15 * 1024**2)))
    segments = np.linspace(0, os.path.getsize(temp_audio.name)+1, self.no_of_required_chunks + 1)


In [None]:
the_audio[12:].export("test.mp3", format="mp3")

In [None]:
from pydub import AudioSegment

the_audio = AudioSegment.from_file("Something STRANGE Found in Deep Sea | The Dark Oxygen Mystery.m4a")


In [None]:
import os
os.path.getsize("Something STRANGE Found in Deep Sea | The Dark Oxygen Mystery.m4a")/(1024**2*18)

In [None]:
import numpy as np

In [None]:
len(the_audio)

In [None]:
a = np.linspace(start=0,stop=len(the_audio)+1,num=2+1,dtype=int)

In [None]:
from pydub import AudioSegment
import numpy as np
the_audio = AudioSegment.from_file("Something STRANGE Found in Deep Sea | The Dark Oxygen Mystery.m4a")
a = np.linspace(start=0,stop=len(the_audio)+1,num=2+1,dtype=int)
for i,j in zip(a[:-1],a[1:]):
    the_audio[i:j].export(f"test{i}.mp3",format="mp3")

In [None]:
(len(the_audio)//1000)//60,(len(the_audio)//1000)%60

In [None]:
19*60*1000

In [None]:
the_audio[:10000].export("test.mp3", format="mp3")

In [None]:
a = [1,2,4]
b=[3,4]
a.extend(b)

In [None]:
a

In [None]:
the_audio

In [None]:
1929067//(60**3),1929067%(60**3)

In [None]:
import numpy as np
np.linspace(start=0, stop=31220649, num=2+1,dtype=int)

In [None]:
np.

In [None]:
a=[1,2,]

In [None]:
18*1024**2


In [None]:
from pytubefix import YouTube
yt = YouTube("https://www.youtube.com/watch?v=NEk5cEEalYw")
audio_stream = yt.streams.filter(only_audio=True, abr="128kbps")

In [None]:
audio_stream.first().download()

In [None]:
340287488/(1024**2)/8

In [None]:
31220649/(1024**2)

In [None]:
from DataBases.VectorStore import VectorStore

vector_store = VectorStore("AIzaSyBqfX2X2X2X2X2X2X2X2X2X2X2X2X2X2X")

vector_store.collection.get(where={"youtube_id": "5YP7GOeFTCY"}, include=["documents"])


In [None]:
from google import genai
from dotenv import load_dotenv
load_dotenv("../.env")

In [None]:
import sys
import os
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(parent_dir)

In [None]:
from utils.LLM import GeminiLLM

In [None]:
llm = GeminiLLM(os.getenv("GEMINI_API_KEY"))

In [None]:
result = llm.TextLLM("you are a helpfull assistant", [{"role": "user", "parts": [{"text": "hello"}]}])

In [None]:
result.send_message("who are you").text

In [None]:
from utils.Transcript import Transcript

In [None]:
transcript = Transcript()

In [None]:
whisp_api = transcript.with_whisper(os.getenv("GROQ_API_KEY"), "2nNGN72eYiU")

In [None]:
chat = client.chats.create(model="gemini-2.5-flash-preview-05-20",
history=[
    {
        "role": "user",
        "parts": [{"text": "Hello, how are you?"}]
    },
    {
        "role": "model",
        "parts": [{"text": "I'm a large language model, trained by google."}]
    }
])



In [None]:
chat.send_message("who are you")

In [None]:
chat.get_history()

In [None]:
from google import genai
from dotenv import load_dotenv
import os
load_dotenv("../.env")
client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))

from google.genai import types

result = client.models.embed_content(
        model="models/text-embedding-004",
        contents=["What is the meaning of life?","for real?"],
        config=types.EmbedContentConfig(task_type="SEMANTIC_SIMILARITY")
)
print(result.embeddings)

In [None]:
[i.values for i in result.embeddings]

In [None]:
client.ListModels

In [None]:
import sys
import os
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(parent_dir)

In [None]:
from utils.Transcript import Transcript
transcript = Transcript()
texts = transcript.with_youtube_api("5YP7GOeFTCY")

In [None]:
texts[:10]

In [None]:
with open('texts.txt','w+') as f:
    for i in texts[:10]:
        f.write(i['text'] + '\n')

In [None]:
from langdetect import detect

In [None]:
detect(" ".join([text['text'] for text in texts[:10]]))

In [None]:
len(texts)

In [None]:

import os
from groq import Groq

client = Groq()
filename = "/tmp/tmp6g5jvppl.m4a"

with open(filename, "rb") as file:
    transcription = client.audio.transcriptions.create(
      file=(filename, file.read()), 
      language="en",
      model="whisper-large-v3-turbo",
      response_format="verbose_json",
    )
    print(transcription.text)

In [None]:
transcription.segments[0]['text']

In [None]:
transcription.segments[0]['start']

In [None]:
import os
paths = ['/tmp/tmp5ydyyyv4.m4a','/tmp/tmpggun29z0.m4a','/tmp/tmp30meyr1c.m4a']
for path in paths:
    if os.path.exists(path):
        os.remove(path)

In [None]:
import sys
import os
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(parent_dir)

In [None]:
import asyncio
from utils.Translator import translate_text

In [None]:
from utils.Transcript import Transcript

transcript = Transcript()
texts = transcript.with_whisper("2nNGN72eYiU")

In [None]:
texts

In [None]:
with open('texts.txt','w') as f:
    f.write(texts)

In [None]:
translated_texts = asyncio.run(translate_text(texts))

In [None]:
import os
output_path="/tmp/tmpe9wob2yo.mp4"
if output_path:
    if not os.path.isabs(output_path):
        output_path = os.path.join(os.getcwd(), output_path)
else:
    output_path = os.getcwd()
os.makedirs(output_path, exist_ok=True)
output_path

In [None]:
output_path

In [None]:
from pytubefix import YouTube
import tempfile

url = "https://www.youtube.com/watch?v=" + "5YP7GOeFTCY"
yt = YouTube(url)
audio_stream = yt.streams.filter(only_audio=True,abr="128kbps")

with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_audio:

    directory = temp_audio.name.split("/")
    
    audio_stream.first().download(filename = directory[-1],output_path = "/"+directory[1])

In [None]:
temp_audio.name

In [None]:
directory

In [None]:
import filetype

kind = filetype.guess(temp_audio.name)
if kind:
    print(f"Detected type: {kind.mime}, extension: {kind.extension}")
else:
    print("File type could not be determined.")


In [None]:
if os.path.getsize(temp_audio.name) == 0:
    print("File is empty.")
else:
    print("File is not empty.")

In [None]:
import magic

file_path = temp_audio.name

# Create a Magic object
mime = magic.Magic(mime=True)
mime.from_file('tmptmpxdexbsq8.mp4')

In [None]:
from IPython.display import display, Audio
display(Audio(filename=temp_audio.name))

In [None]:
from pytubefix import YouTube
url = "https://www.youtube.com/watch?v="+"0zhPaRZev8w"
yt = YouTube(url)
audio_stream = yt.streams.filter(only_audio=True,abr="128kbps")

In [None]:
audio_stream.first().download()

In [None]:
audio_stream.download()

In [None]:
audio_stream.download()

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi

# Replace with the YouTube video ID
video_id = "ZPUtA3W-7_I"

In [None]:
# ytt_api = YouTubeTranscriptApi()
# transcript_list = ytt_api.fetch(video_id)

In [None]:
ytt_api = YouTubeTranscriptApi()

In [None]:

transcript_list = ytt_api.get_transcript(video_id,languages=["en",'hi'])
print(transcript_list)

In [None]:
transcript_list[:5]

In [None]:
hind_transcript = ytt_api.get_transcript("giqjHxdN-Ok",languages=["en",'hi'])

In [None]:
hind_transcript[:3]

In [None]:
texts = " ".join([i['text'] for i in hind_transcript[:10]])

In [None]:
texts

In [None]:
texts="नमस्कार वेलकम टू करियर 247 मैं हूं प्रशांत धवन कभी 2 बिलियन डॉल के न्यूक्लियर बमबर्स 30 सेकंड में डिस्ट्रॉय होते हुए देखे हैं नहीं देखे मैं आपको दिखाता हूं इधर आप देख पाओगे यह फुटेज पूरी दुनिया में वायरल हो रही है। नीचे जो यह प्लेन दिख रहे हैं ना यह न्यूक्लियर स्ट्रेटेजिक बमबर्स हैं। ऐसा कोई भी प्लेन भारत के पास नहीं है। गिनती के दो-तीन कंट्रीज के पास ऐसे बमबर्स हैं और यूक्रेन"

In [None]:


# Initialize the Translator
translator = Translator()

# Example text in another language (e.g., Hindi)

# Translate to English
translated = translator.translate(texts, dest='en')

In [None]:
translated.text

In [None]:
from googletrans import Translator
import asyncio
async def translate_text(texts):
    async with Translator() as translator:
        result = await translator.translate(texts)
        print(result)  # <Translated src=ko dest=en text=Good evening. pronunciation=Good evening.>
asyncio.run(translate_text(texts))

In [None]:


for i in translated:
    print(i.text)

In [None]:
from langdetect import detect

In [None]:
detect(texts)

In [None]:
type(transcript_list)

In [None]:
temp_ = ""
for i in transcript_list:
    temp_ += i['text']
len(temp_)

In [None]:
[True for i in temp_.split(". ") if len(i) > 500]

In [None]:
68.76+3.719

In [None]:

chunk_size = 300
chunks = []
timestamps = []

chunk_stack = ""
timestamp_stack = ""

for i in transcript_list:
    the_text = i['text'].strip()
    the_timestamp = i['start']
        
    # if the text is longer than the chunk size
    if len(the_text) >= chunk_size:

        # if previously any chunks exist, append them to the list
        if chunk_stack:
            chunks.append(chunk_stack)
            timestamps.append(timestamp_stack)

            # reset the stack
            chunk_stack = ""
            timestamp_stack = 0

        # append the current chunk
        chunks.append(the_text)
        timestamps.append(the_timestamp)


    # if the text is shorter than the chunk size
    else:
        
        # if chunk and text combined is longer than the chunk size
        if len(chunk_stack := chunk_stack + " " + the_text) > chunk_size:
            
            # split the chunk stack into sentences
            splits = chunk_stack.split(". ")
            temp_chunk_stack = ""
            
            # while the chunk stack is longer than the chunk size
            while(len(chunk_stack := ". ".join(splits)) > chunk_size):
                
                # pop the last sentence from the splits and add it to the temp chunk stack
                temp_chunk_stack = splits.pop() + ". " + temp_chunk_stack
            
            # append the chunk stack and timestamp when the chunk stack is shorter than the chunk size
            chunks.append(chunk_stack.strip())
            timestamps.append(timestamp_stack)

            # reset the stack to temp chunk stack
            chunk_stack = temp_chunk_stack

        # always update the timestamp    
        timestamp_stack = the_timestamp


In [None]:
len(chunks)

In [None]:
len(timestamps)

ChromaDB

In [None]:
import chromadb
chroma_client = chromadb.PersistentClient("./my_chroma_db")

In [None]:
chroma_client.list_collections()

In [None]:
# generating embeddings

from google import genai
from dotenv import load_dotenv
import os
load_dotenv()
gemini_api_key = os.getenv("GOOGLE_API_KEY")

client = genai.Client(api_key=gemini_api_key)

class GeminiEmbedding:
    def __call__(self, input):
        result = client.models.embed_content(
            model="models/text-embedding-004",
            contents=input)

        return [e.values for e in result.embeddings]


In [None]:
collection = chroma_client.get_or_create_collection("youtube_transcripts",embedding_function=GeminiEmbedding())

In [None]:
import numpy as np
import uuid

chunks_len = len(chunks)
for i in range(0,chunks_len,100):

    collection.add(
        documents=chunks[i:i+100],
        ids=[str(uuid.uuid4()) for _ in range(100 if chunks_len >= 100 else chunks_len%100)],
        metadatas=[{
            "time_stamp": t,
            "youtube_id": video_id,
        } for t in timestamps[i:i+100] ]
    )
    chunks_len -= 100

In [None]:
len(collection.get()['ids'])

In [None]:
iiii = collection.query( query_texts='pollution in India').embeddings[0].values)

In [None]:
collection.query( query_texts='pollution in India', n_results=5).get('documents','metadatas')

In [None]:
iiii

In [None]:
client.models.embed_content(
        model="models/text-embedding-004",
        contents='who are you').embeddings[0].values