In [None]:
! pip install pandas
! pip install pytube
! pip install numpy
! pip install pinecone-client
! pip install git+https://github.com/openai/whisper.git

In [None]:
# Import the modules
import os
import torch
import whisper
import pinecone
import numpy as np
import pandas as pd
from pytube import YouTube

In [None]:
def video_to_audio(video_url, destination):

    # Get the video
    video = YouTube(video_url)

    # Convert video to Audio
    audio = video.streams.filter(only_audio=True).first()

    # Save to destination
    output = audio.download(output_path = destination)

    name, ext = os.path.splitext(output)
    new_file = name + '.mp3'

    # Replace spaces with "_"
    new_file = new_file.replace(" ", "_")

    # Change the name of the file
    os.rename(output, new_file)

    return new_file


In [None]:
%%bash
mkdir "audio_data"

In [None]:
# Create URL column
audio_path = "audio_data"

# Have just provided a sample of links for experimentation purpose
list_videos = ["https://www.youtube.com/watch?v=IdTMDpizis8",
              "https://www.youtube.com/watch?v=fLeJJPxua3E",
              "https://www.youtube.com/watch?v=z3FA2kALScU"]
# Create dataframe
transcription_df = pd.DataFrame(list_videos, columns=['URLs'])

In [None]:

transcription_df.head()

In [None]:
# Create the files_name
transcription_df["file_name"] = transcription_df["URLs"].apply(lambda url: video_to_audio(url, audio_path))
transcription_df.head()

In [None]:
# Set the device
device = "cuda" if torch.cuda.is_available() else "cpu"

# Load the model
whisper_model = whisper.load_model("base", device=device)

In [None]:
def audio_to_text(audio_file):

    return whisper_model.transcribe(audio_file)["text"]

In [None]:
# Apply the function to all the audio files
transcription_df["transcriptions"] = transcription_df["file_name"].apply(lambda f_name: audio_to_text(f_name))


# Show the first five rows
transcription_df.head()

In [None]:
transcription_df.head()

In [None]:
model_id = "sentence-transformers/all-MiniLM-L6-v2"

In [None]:
from getpass import getpass
os.environ["HUGGING_FACE_TOKEN"] = getpass('Enter Hugging Face token: ')
hf_token = os.getenv('HUGGING_FACE_TOKEN')

In [None]:
import requests

api_url = f"https://api-inference.huggingface.co/pipeline/feature-extraction/{model_id}"
headers = {"Authorization": f"Bearer {hf_token}"}

In [None]:
def query(texts):
    response = requests.post(api_url, headers=headers, json={"inputs": texts, "options":{"wait_for_model":True}})
    return response.json()

In [None]:
transcription_df["embedding"] = transcription_df["transcriptions"].astype(str).apply(query)

In [None]:
transcription_df.head()

In [None]:
vector_dim = transcription_df.iloc[2].embedding
len(vector_dim)

In [None]:
import os


os.environ["PINECONE_API_KEY"] = getpass('Enter your Pinecone API Key: ')

In [None]:
os.environ["PINECONE_ENVIRONMENT"] = getpass('Enter your Pinecone Environment: ')

In [None]:
# find API key in console at app.pinecone.io
api_key = os.getenv('PINECONE_API_KEY') or 'PINECONE_API_KEY'
# find ENV (cloud region) next to API key in console
env = os.getenv('PINECONE_ENVIRONMENT') or 'PINECONE_ENVIRONMENT'

# Initialize connection to pinecone
pinecone.init(
  api_key=api_key,
  environment=env
)

In [None]:


# Index params
my_index_name = "audio-search"
vector_dim = len(transcription_df.iloc[0].embedding)

if my_index_name not in pinecone.list_indexes():
  # Create the index
  pinecone.create_index(name = my_index_name,
                      dimension=vector_dim,
                      metric="cosine", shards=1,
                      pod_type='s1.x1')
# Connect to the index
my_index = pinecone.Index(index_name = my_index_name)

In [None]:
transcription_df["vector_id"] = transcription_df.index
transcription_df["vector_id"] = transcription_df["vector_id"].apply(str)

# Get all the metadata
final_metadata = []

for index in range(len(transcription_df)):
  final_metadata.append({
      'ID':  index,
      'url': transcription_df.iloc[index].URLs,
      'transcription': transcription_df.iloc[index].transcriptions
  })

audio_IDs = transcription_df.vector_id.tolist()
audio_embeddings = [arr for arr in transcription_df.embedding]

# Create the single list of dictionary format to insert
data_to_upsert = list(zip(audio_IDs, audio_embeddings, final_metadata))

# Upload the final data
my_index.upsert(vectors = data_to_upsert)

# Show information about the vector index
my_index.describe_index_stats()

In [None]:
N = 2
my_query_embedding = transcription_df.embedding[0]

# Run the Query Search
my_index.query(my_query_embedding, top_k=N, include_metadata=True)