### API YouTubeTranscriptApi

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import JSONFormatter
import pandas as pd
import os

df = pd.read_excel('youtube.xlsx')

def get_jsons_file (data , output_folder):
        if not os.path.exists(output_folder):
             os.makedirs(output_folder)
        count = 0
        for i in df['id']:
            try:
                transcript = YouTubeTranscriptApi.get_transcript(i , languages=['ar'])
                formatter = JSONFormatter()
                # .format_transcript(transcript) turns the transcript into a JSON string.
                json_formatted = formatter.format_transcript(transcript)
                with open(os.path.join(output_folder , f'{i}.json' ) , 'w', encoding='utf-8') as json_file:
                    json_file.write(json_formatted)
            except :
                count+=1
                print(count)
                print(f"can't find file of id named : {i}")

output_folder = "transcripts"

get_jsons_file(df , output_folder)

1
can't find file of id named : O-FUZfqchxA
2
can't find file of id named : kR3Ap2RwiGs
3
can't find file of id named : YKbf6ODQxs4
4
can't find file of id named : aoXCXzjHJs8
5
can't find file of id named : ewg1JMnECp4
6
can't find file of id named : fJsDmB6iU4E
7
can't find file of id named : 77Zr8i3SRgw
8
can't find file of id named : PoUkOOLAmsA
9
can't find file of id named : cU58BlQjXjI
10
can't find file of id named : 092zf-iJOxA
11
can't find file of id named : sXcR6ZvjJFk
12
can't find file of id named : FA-ymiTqHjg
13
can't find file of id named : 1RWl9ocxqt8
14
can't find file of id named : GK8MJi1cERw
15
can't find file of id named : ceORr737o_s
16
can't find file of id named : uLOyttpst-o
17
can't find file of id named : yjgSdVcgStM
18
can't find file of id named : E07y9D3Q5S0
19
can't find file of id named : YFnLDeAdnx0
20
can't find file of id named : gYUMxnSsYao
21
can't find file of id named : CdRJKZ_L1Xs


### Indivisual Transcription Extractor

In [None]:
import os
import json
import time
from tqdm import tqdm

def process_json_files(folder_path):
    start_time = time.time()
    output_folder = "text_files"

    # Check if the output folder exists, create it if it doesn't
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    json_files = [f for f in os.listdir(folder_path) if f.endswith('.json')]

    for file in tqdm(json_files, desc="Processing JSON files"):
        text_values = []

        with open(os.path.join(folder_path, file), 'r', encoding='utf-8') as json_file:
            data = json.load(json_file)
            for item in data:
                text_value = item.get("text", "")
                if text_value:
                    text_value = text_value + ".\n"
                    text_values.append(text_value)

        output_file_name = os.path.splitext(file)[0] + ".txt"
        with open(os.path.join(output_folder, output_file_name), 'w', encoding='utf-8') as output_file:
            output_file.write(' '.join(text_values))

    end_time = time.time()
    print(f"Completed in {end_time - start_time:.2f} seconds.")

# Replace 'your_folder_path_here' with the actual path to the folder containing your JSON files
folder_path = 'videos_transcript'
process_json_files(folder_path)


Processing JSON files: 100%|██████████| 221/221 [00:09<00:00, 23.65it/s]

Completed in 9.35 seconds.





### *Merge All Transcription in one Line*  

In [None]:
import os

def merge_text_files(folder_path, output_file_path):
    with open(output_file_path, 'w', encoding='utf-8') as output_file:
        for file_name in os.listdir(folder_path):
            if file_name.endswith('.txt'):
                file_path = os.path.join(folder_path, file_name)
                with open(file_path, 'r', encoding='utf-8') as input_file:
                    output_file.write(input_file.read())
                    output_file.write('\n')  # Add a newline between the content of each file

# Replace 'folder_path' with the actual path to the folder containing your text files
folder_path = 'text_files'
# Replace 'output_file.txt' with the desired name of the merged output file
output_file_path = 'merged_output_file.txt'

merge_text_files(folder_path, output_file_path)


### *Whisper Transcription and Segmentation Pipeline*

In [None]:
from faster_whisper import WhisperModel
import csv
import os
from pydub import AudioSegment
import time
import torch

model_size = "large-v3"
model = WhisperModel(model_size, device="cuda", compute_type="float16")

def transcribe_and_segment(audio_path, output_folder_base):
    audio_basename = os.path.basename(audio_path).rsplit(".", 1)[0]
    output_folder = os.path.join(output_folder_base, audio_basename)
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    csv_file_path = os.path.join(output_folder, "dataset.csv")

    with open(csv_file_path, mode='w', newline='', encoding="utf-16") as file:
        writer = csv.writer(file)
        writer.writerow(["id", "transcription", "start", "end"])

        audio_file = AudioSegment.from_file(audio_path)
        output_file_id = 1
        combined_segments = []
        segments, info = model.transcribe(
                audio_path,
                vad_filter=True,
                beam_size=11,  # Reduced beam size
                best_of=9,  # Reduced best of
                word_timestamps=True,
                no_speech_threshold=0.2,
                vad_parameters=dict(min_silence_duration_ms=2000),
                initial_prompt="______________________________________________________________________",)

        for segment in segments:
            combined_segments.append(segment)
            if len(combined_segments) == 5:
                start_time = combined_segments[0].start * 1000
                end_time = combined_segments[-1].end * 1003
                combined_text = ' '.join([seg.text for seg in combined_segments])
                output_segment_filename = f"{output_file_id}.wav"
                output_segment_path = os.path.join(output_folder, output_segment_filename)

                writer.writerow([output_file_id, combined_text, start_time / 1000, end_time / 1000])

                segment_audio = audio_file[start_time:end_time]
                segment_audio.export(output_segment_path, format="wav")

                combined_segments = []
                output_file_id += 1
                torch.cuda.synchronize()
                torch.cuda.empty_cache()

        if combined_segments:
            start_time = combined_segments[0].start * 1000
            end_time = combined_segments[-1].end * 1003
            combined_text = ' '.join([seg.text for seg in combined_segments])
            output_segment_filename = f"{output_file_id}.wav"
            output_segment_path = os.path.join(output_folder, output_segment_filename)

            writer.writerow([output_file_id, combined_text, start_time / 1000, end_time / 1000])

            segment_audio = audio_file[start_time:end_time]
            segment_audio.export(output_segment_path, format="wav")

    print(f"Transcription of {audio_basename} is Done ...")

def process_assets_folder(assets_folder, output_folder_base):
    start_time = time.time()
    for file_name in os.listdir(assets_folder):
        if file_name.endswith(".wav") or file_name.endswith(".mp3"):
            audio_path = os.path.join(assets_folder, file_name)
            transcribe_and_segment(audio_path, output_folder_base)
            torch.cuda.synchronize()
            torch.cuda.empty_cache()
    end_time = time.time()
    print(f"All transcriptions are done. Total time consumption is {(end_time - start_time):.2f} seconds.")

assets_folder = "Batch_2"
output_folder_base = "audio_dataset_batch_2"

process_assets_folder(assets_folder, output_folder_base)

  from .autonotebook import tqdm as notebook_tqdm


### *Arabic Text Similarity Analysis with Sentence Transformers*


In [None]:
from sentence_transformers import SentenceTransformer, util
import numpy as np
import pandas as pd

# Option 1: mBert model for Arabic (public model, no authentication needed)
model_name = 'aubmindlab/bert-base-arabertv2'

# Load the model
model = SentenceTransformer(model_name)

# Function to find the most similar sentence
def find_most_similar(target_vector, data_vectors, data_sentences):
    highest_similarity = -1
    most_similar_sentence = ""
    for sentence, vector in zip(data_sentences, data_vectors):
        similarity = util.pytorch_cos_sim(target_vector, vector)[0][0]
        if similarity > highest_similarity:
            highest_similarity = similarity
            most_similar_sentence = sentence
    return most_similar_sentence, highest_similarity

# Read the cleaned transcription CSV and select the 'transcription' column
df_transcription = pd.read_csv('cleaned_transcription.csv')
target_sentences = df_transcription['transcription'].tolist()

# Read and clean data sentences
full_data_sentences = pd.read_csv('«شي إن» تحقق جولة تاريخية بـ2 مليار دولار _ بودكاست السوق.txt', header=None, sep="\n", quoting=3)[0].tolist()

# Encode data sentences once since they are compared against multiple target sentences
data_vectors = model.encode(full_data_sentences)

# Prepare a list to hold the results
results = []

# Iterate through each target sentence, encode, and find the most similar sentence
for target_sentence in target_sentences:
    target_vector = model.encode([target_sentence])
    most_similar_sentence, highest_similarity = find_most_similar(target_vector, data_vectors, full_data_sentences)

    # Compare target sentence words with most similar sentence words
    comparison_results = []
    target_words = target_sentence.split()
    similar_words = most_similar_sentence.split()
    for i, word in enumerate(target_words):
        if i < len(similar_words) and word == similar_words[i]:
            comparison_results.append(f"{word} - OK")
        elif i < len(similar_words):
            comparison_results.append(f"{word} replaced with {similar_words[i]}")
        else:
            comparison_results.append(f"{word} has no match")

    # Store the result including the comparison
    results.append({
        'Original Sentence': target_sentence,
        'Most Similar Sentence': most_similar_sentence,
        'Similarity Score': highest_similarity,
        'Comparison Results': " | ".join(comparison_results)  # Joining the comparison results with a separator for better readability in CSV
    })

# Convert the results into a DataFrame
results_df = pd.DataFrame(results)

# Save the DataFrame to a new CSV file
output_file_path = "similarity_results.csv"
results_df.to_csv(output_file_path, index=False)

print(f"Similarity results saved to '{output_file_path}'")


No sentence-transformers model found with name aubmindlab/bert-base-arabertv2. Creating a new one with MEAN pooling.


The most similar sentence to 'فتحتاج زياده االتنم في المئه في نفس السهم.' is:
'فتحتاج زياده مئه في المئه في نفس السهم.' with a similarity score of 0.8902
فتحتاج - OK
زياده - OK
االتنم replaced with مئه
في - OK
المئه - OK
في - OK
نفس - OK
السهم. - OK
The most similar sentence to 'اتمنى اذا يعجبكم هذا النوع من المحتوى.' is:
'اتمنى اذا يعجبكم هذا النوع من المحتوى.' with a similarity score of 1.0000
اتمنى - OK
اذا - OK
يعجبكم - OK
هذا - OK
النوع - OK
من - OK
المحتوى. - OK


# *Gradio interface for manual revision*

In [None]:
import os
import pandas as pd
import gradio as gr

csv_path = r'audio_dataset\assets.csv'
voice_data = r'audio_dataset\voice_data'

# Ensure the CSV includes the "Edited" and "Reviewed" columns
def add_columns_to_csv(debug=False):
    try:
        # Attempt to read the CSV with the specified encoding and 'id' as the index column
        df = pd.read_csv(csv_path, index_col='id', encoding='utf-16')
    except ValueError as e:
        if debug:
            print("Attempting to read CSV without specifying index_col due to error:", e)
        # Attempt to read the CSV without specifying the index column
        df = pd.read_csv(csv_path, encoding='utf-16')
        # Check if 'id' column exists
        if 'id' not in df.columns:
            raise ValueError("'id' column not found in CSV. Please verify the file structure.")

    # Proceed with adding 'Edited' and 'Reviewed' columns
    if 'Edited' not in df.columns:
        df['Edited'] = False
    if 'Reviewed' not in df.columns:
        df['Reviewed'] = False
    df.to_csv(csv_path, index_label='id', encoding='utf-16')

# Call the function with debug=True for additional output
add_columns_to_csv(debug=True)

# Load the updated dataframe
df = pd.read_csv(csv_path, index_col='id', encoding='utf-16')

# Folder containing the WAV files
audio_folder_path = voice_data

# Load the audio and corresponding text
def load_audio_and_text(index):
    audio_path = os.path.join(audio_folder_path, f"{index}.wav")
    if not os.path.exists(audio_path):
        return None, "Audio file does not exist.", ""
    text = df.loc[index, 'transcription'] if index in df.index else "Text not found."
    return audio_path, text, f"{index}.wav"

# Save the edited text, mark as edited in the CSV, and mark as reviewed
def save_text(index, text, original_text):
    if 0 < index <= df.index.max() and df.at[index, 'transcription'] != text:
        df.at[index, 'transcription'] = text
        df.at[index, 'Edited'] = True  # Mark as edited
        df.at[index, 'Reviewed'] = True  # Mark as reviewed
        df.to_csv(csv_path, encoding='utf-16')

# Gradio app interface
with gr.Blocks(theme="dark") as app:
    gr.Markdown("## Start Annotation From WAV File Number")
    start_from = gr.Number(label="Start From WAV Number", value=1)

    gr.Markdown("## Voice Data Annotation System")
    gr.Markdown("### Please annotate the following voice data")
    wav_file_name = gr.Label(label="WAV File Name")
    audio_player = gr.Audio()
    annotation_area = gr.Textbox(label="Annotation Area")
    index_state = gr.Number(value=1, label="Index", visible=False)
    edited_marker = gr.Label(value="", label="Edited Status", visible=False)

    def update_interface(index, text, direction):
        original_text = df.loc[index, 'transcription'] if index in df.index else ""
        save_text(index, text, original_text)

        if direction == 'next':
            index += 1
        elif direction == 'previous':
            index = max(1, index - 1)

        new_audio_path, new_text, file_name = load_audio_and_text(index)
        edited = df.loc[index, 'Edited'] if index in df.index else False

        edited_status = "✅ Edited" if edited else ""

        return new_audio_path, new_text, index, edited_status, file_name

    def initialize_interface(start_index):
        new_audio_path, new_text, file_name = load_audio_and_text(start_index)
        edited = df.loc[start_index, 'Edited'] if start_index in df.index else False
        edited_status = "✅ Edited" if edited else ""
        return new_audio_path, new_text, start_index, edited_status, file_name

    start_from.change(initialize_interface, inputs=[start_from], outputs=[audio_player, annotation_area, index_state, edited_marker, wav_file_name])

    with gr.Row():
        gr.Button("Previous").click(
            lambda index, text: update_interface(index, text, 'previous'),
            inputs=[index_state, annotation_area],
            outputs=[audio_player, annotation_area, index_state, edited_marker, wav_file_name]
        )
        gr.Button("Next").click(
            lambda index, text: update_interface(index, text, 'next'),
            inputs=[index_state, annotation_area],
            outputs=[audio_player, annotation_area, index_state, edited_marker, wav_file_name]
        )

    # Apply custom CSS for layout adjustments
    app.css = ".gr-textbox { height: 500px; } .gr-row { align-items: center; }"

app.launch(share=True)