In [None]:
!pip install pytube
!pip install --q git+https://github.com/m-bain/whisperx.git

In [None]:
!pip install streamlit

In [None]:
from transformers import pipeline
from pytube import YouTube
import os
import subprocess
import whisperx
from matplotlib import pyplot as plt
import seaborn as sns
import numpy as np
import copy
import matplotlib.cm as cm

In [None]:
huggingface_token = "Your Hugging Face Token Which has access to the model"
audio_folder_path = "/content/Audio"
model_folder_path = "/content/Models"

sentiment_chunks = 10
min_speakers = 2
max_speakers = 2
device = "cuda"
batch_size = 4 # reduce if low on GPU mem
compute_type = "float16" # change to "int8" if low on GPU mem (may reduce accuracy)


In [None]:
# Diarization

def video_data(video_url):
    # Initialize the YouTube object with the video URL
    yt = YouTube(video_url)
    
    # Get video metadata
    data = {
        "url": video_url,
        "title": yt.title,
        "author": yt.author,
        "length_seconds": yt.length,
        "description": yt.description,
        "views": yt.views,
        "rating": yt.rating,
        "thumbnail_url": yt.thumbnail_url,
#         "publish_date": yt.publish_date, # Removed since not json serializable
        "keywords": yt.keywords,
#         "captions": yt.captions,   # Removed since not json serializable
        # Add more metadata fields as needed
    }
    
    return data

def download_youtube_webm(url):
    yt = YouTube(url)
    print(f"Downloading Audio: {yt.title}")
    audio = yt.streams.filter(only_audio=True).last()
    audio_file_path = audio.download(output_path=audio_folder_path)
    print("Audio Downloaded")
    return convert_webm_to_wav(audio_file_path)


def convert_webm_to_wav(webm_path):
    wav_path = os.path.splitext(webm_path)[0] + ".wav"  # Fixes the file path and changes the extension to .wav
    print("Converting webm to wav")
    subprocess.call(['ffmpeg', '-i', webm_path, wav_path])
    print("Converstion done")
    return wav_path


def get_transcript(audio):
    model = whisperx.load_model("distil-medium.en", device, compute_type=compute_type, download_root=model_folder_path)
    result = model.transcribe(audio, batch_size=batch_size)
    return result


def get_aligned_transcript(transcript, audio):
    model_aligned, metadata = whisperx.load_align_model(language_code=transcript["language"], device=device)
    aligned_transcript = whisperx.align(transcript["segments"], model_aligned, metadata, audio, device, return_char_alignments=False)
    return aligned_transcript


def diarize(audio):
    diarize_model = whisperx.DiarizationPipeline(use_auth_token=huggingface_token, device=device)
    diarize_segments = diarize_model(audio, min_speakers=min_speakers, max_speakers=max_speakers)
    return diarize_segments


def plot_speakers(diarize_segments):
  diarize_segments.groupby('speaker').size().plot(kind='barh', color=sns.palettes.mpl_palette('Dark2'))
  plt.gca().spines[['top', 'right',]].set_visible(False)


def assign_words(diarize_segments, aligned_transcript):
    speaker_mapped_transcript = whisperx.assign_word_speakers(diarize_segments, aligned_transcript)
    return speaker_mapped_transcript


def get_labelled_transcript(speaker_mapped_transcript):
    labelled_transcript = ""
    speaker = None
    for item in speaker_mapped_transcript['segments']:
      for words in item['words']:
        try:
            if speaker is not words['speaker']:
                speaker = words['speaker']
                labelled_transcript += speaker + ": " +  words['word']
            else:
                labelled_transcript += " " +  words['word']
        except KeyError:
            labelled_transcript += " " +  words['word']
    return labelled_transcript


def get_spoke_when(speaker_mapped_transcript):
    spoke_when = []
    for item in speaker_mapped_transcript['segments']:
        try:
            speaker = item['speaker']
            if speaker == "SPEAKER_00":
                spoke_when.append(0)
            elif speaker == "SPEAKER_01":
                spoke_when.append(1)
        except KeyError:
            if speaker == "SPEAKER_00":
                spoke_when.append(0)
            elif speaker == "SPEAKER_01":
                spoke_when.append(1)
    return spoke_when

                
def plot_spoke_when(spoke_when, speaker0="Speaker 0", speaker1='Speaker 1'):

    perc = np.linspace(0, 100, len(spoke_when))

    # Create subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(19, 5))

    # Plot first subplot (line plot)
    ax1.plot(perc, spoke_when, color='blue')
    ax1.set_xlabel('Progress (%)')
    ax1.set_ylabel('Y-values')
    ax1.set_yticks([0, 1])
    ax1.set_yticklabels([speaker0, speaker1])

    # Plot second subplot (heatmap)
    heatmap = ax2.imshow([spoke_when], cmap='viridis', aspect='auto', extent=[0, 100, 0, 1])
    ax2.set_xlabel('Progress (%)')
    ax2.set_ylabel('Y-values')
    ax2.set_yticks([0, 1])
    ax2.set_yticklabels([speaker0, speaker1])

    # Add legend to second subplot
    color_speaker2 = cm.viridis(2)
    legend = ax2.legend(handles=[plt.Line2D([0], [0], color='yellow', label=speaker1),
                         plt.Line2D([0], [0], color=color_speaker2, label=speaker0)], loc='upper left')

    # Set the transparency of the legend box
    legend.get_frame().set_alpha(1.0)
    fig.suptitle('Spoke When Graph', fontsize=16)
    plt.show()

In [None]:
# Sentiment



def get_text_from_transcript(transcript):
    transcript_text = ""
    for text in transcript['segments']:
        transcript_text += text['text']
    return transcript_text


def get_chunks(paragraph):
    words = paragraph.split()
    split_paragraphs = []
    sublist = []
    word_count = 0
    for word in words:
        sublist.append(word)
        word_count += 1
        if word_count >= sentiment_chunks:
            split_paragraphs.append(' '.join(sublist))
            sublist = []
            word_count = 0
    if sublist:
        split_paragraphs.append(' '.join(sublist))
    return split_paragraphs


def get_emotions(chunks, pipeline):
    emotions = []
    for words in chunks:
        emotions.append(pipeline(words))
    return emotions


def organize_emotions(data):
    label_dict = {}
    for sublist in data:
        for d in sublist[0]:
            label = d['label']
            score = d['score']
            if label not in label_dict:
                label_dict[label] = []
            label_dict[label].append(score)
    return label_dict


def plot_combined_emotion_graphs(organized_labels):
    # Define the number of subplots based on the number of items in organized_labels
    num_subplots = len(organized_labels)

    # Calculate the number of rows and columns for the subplots
    num_rows = num_subplots
    num_cols = 2

    # Create subplot
    fig, axs = plt.subplots(num_rows, num_cols, figsize=(15, 5 * num_rows))

    # Plot each set of emotion scores
    for i, item in enumerate(organized_labels):
        row = i
        col = 0

        # Plot line graph for the emotion
        perc = np.linspace(0, 100, len(organized_labels[item]))
        axs[row, col].plot(perc, organized_labels[item], color='blue')
        axs[row, col].set_yticks([0, 0.5, 1])
        axs[row, col].set_xlabel("Video Percentage", fontsize=12)
        axs[row, col].set_ylabel("Emotion Score", fontsize=12)
        axs[row, col].set_title(f'{item.title()}', color='blue', fontsize=14)

        # Plot heatmap for the emotion
        ax_heatmap = axs[row, col + 1]
        ax_heatmap.imshow([organized_labels[item]], cmap='viridis', aspect='auto', extent=[0, 100, 0, 1])
        ax_heatmap.set_ylabel("Emotion Score", fontsize=12)
        ax_heatmap.set_title(f'{item.title()}', color='red', fontsize=14)

    # Adjust layout and show plot
    plt.tight_layout()
    fig.suptitle('Emotions Graph', fontsize=16)
    plt.show()


def get_speaker_text_with_padding(mapped_transcript):
  padded_speaker_text = {'SPEAKER_00' : "", 'SPEAKER_01' : ""}
  for item in mapped_transcript["segments"]:
    for words in item['words']:
      try:
          speaker = words['speaker']
          if words['speaker'] == 'SPEAKER_00':
            padded_speaker_text[words['speaker']] += " " + words['word']
            for i in range(0, len(words['word'].split())):
              padded_speaker_text['SPEAKER_01'] += " ※"
          if words['speaker'] == 'SPEAKER_01':
            padded_speaker_text[words['speaker']] += " " + words['word']
            for i in range(0, len(words['word'].split())):
                padded_speaker_text['SPEAKER_00'] += " ※"
      except KeyError:
          if speaker == 'SPEAKER_00':
            padded_speaker_text[speaker] += " " + words['word']
            for i in range(0, len(words['word'].split())):
              padded_speaker_text['SPEAKER_01'] += " ※"
          if speaker == 'SPEAKER_01':
            padded_speaker_text[speaker] += " " + words['word']
            for i in range(0, len(words['word'].split())):
                padded_speaker_text['SPEAKER_00'] += " ※"
            
  return padded_speaker_text


In [None]:
youtube_url = "https://youtu.be/WIR6KbGwGU0"
audio_path = download_youtube_webm(youtube_url)
audio = whisperx.load_audio(audio_path)

transcript = get_transcript(audio)
# To copy transcript for later use
aligned_transcript = get_aligned_transcript(copy.deepcopy(transcript), audio)

diarize_segments = diarize(audio)
# plot_speakers(diarize_segments)

speaker_mapped_transcript = assign_words(diarize_segments, aligned_transcript)

labelled_transcript = get_labelled_transcript(speaker_mapped_transcript)

In [None]:
pipe = pipeline('text-classification', model="cardiffnlp/twitter-roberta-base-emotion-latest", return_all_scores=True, token=huggingface_token)

transcript_text = get_text_from_transcript(copy.deepcopy(transcript))

transcript_chunks = get_chunks(transcript_text)

emotions = get_emotions(transcript_chunks, pipeline=pipe)



organized_emotions = organize_emotions(emotions)

# plot_combined_emotion_graphs(organized_emotions)

In [None]:
# speaker level analysis

spoke_when = get_spoke_when(speaker_mapped_transcript)
# plot_spoke_when(spoke_when)

padded_speaker_text = get_speaker_text_with_padding(speaker_mapped_transcript)

padded_speaker_00 = padded_speaker_text['SPEAKER_00']
padded_speaker_01 = padded_speaker_text['SPEAKER_01']

# Speaker 1
print("SPEAKER 00", "*"*80)
speaker_00_transcript_chunks = get_chunks(padded_speaker_00)

speaker_00_emotions = get_emotions(speaker_00_transcript_chunks, pipeline=pipe)

speaker_00_organized_emotions = organize_emotions(speaker_00_emotions)

# plot_combined_emotion_graphs(speaker_00_organized_emotions)

# Speaker 2
print("SPEAKER 01")
speaker_01_transcript_chunks = get_chunks(padded_speaker_01)

speaker_01_emotions = get_emotions(speaker_01_transcript_chunks, pipeline=pipe)

speaker_01_organized_emotions = organize_emotions(speaker_01_emotions)

# plot_combined_emotion_graphs(speaker_01_organized_emotions)

In [None]:
# def get_summary()
print(len(labelled_transcript.split()))

In [None]:
# Use a pipeline as a high-level helper
from transformers import pipeline

pipe = pipeline("summarization", model="StDestiny/DialogLED-base-16384-dialogsum-finetuned-10epochs")

In [None]:
def get_split_text(named_transcript):
    list_of_text = []
    start = 0
    split_transcript = named_transcript.split()
    len_of_split_transcript = len(named_transcript.split())
    if len_of_split_transcript > 12000:
        for end in range(12000, len_of_split_transcript, 12000):
            list_of_text.append(' '.join(split_transcript[start:end]))
            print(f"{list_of_text}")
            start = end
            if end + 12000 > len_of_split_transcript:
                end += len_of_split_transcript - end
                list_of_text.append(' '.join(split_transcript[start:end]))
        return list_of_text
    else:
        return named_transcript

def name_speakers(labelled_transcript, name1, name2):
    named_transcript = labelled_transcript.replace("SPEAKER_00", name1)
    named_transcript = named_transcript.replace("SPEAKER_01", name2)
    return named_transcript

In [None]:
named_transcript = name_speakers(labelled_transcript, "## Piers Morgan", "## Tristan Tate")
split_text = get_split_text(named_transcript)

summary = pipe(split_text,max_length=500, min_length=30, do_sample=False)

In [None]:
print(summary)

In [None]:
video_data = video_data(youtube_url)
video_data

In [None]:
data = {
    'spoke_when': spoke_when,
    'organized_emotions': organized_emotions,
    'speaker_00_organized_emotions': speaker_00_organized_emotions,
    'speaker_01_organized_emotions': speaker_01_organized_emotions,
    'summary': summary,
    'video_data': video_data
}

In [None]:
import json
with open('data.json', 'w') as json_file:
    json.dump(data, json_file)

In [None]:
%%writefile app.py
import streamlit as st
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import json

st.title("DiAna - Dialog Analyesr")

# Load data from a JSON file
with open('data.json', 'r') as json_file:
    data = json.load(json_file)

organized_emotions = data['organized_emotions']
speaker00_data = data['speaker_00_organized_emotions']
speaker01_data = data['speaker_01_organized_emotions']
summary = data['summary']
joined_summary = " ".join(item['summary_text'] for item in summary)
spoke_when = data['spoke_when']
video_info = data['video_data']
video_length_seconds = video_info['length_seconds']


@st.cache_data
def print_video_info(video_info):
    st.video(video_info['url'])
    # st.write(f"**Title:** {video_info['title']}")
    st.write(f"**Author:** {video_info['author']}")
    st.write(f"**Length:** {video_info['length_seconds']} seconds")
    st.write(f"**Views:** {video_info['views']}")
    if video_info['rating'] is not None:
        st.write(f"**Rating:** {video_info['rating']}")
    else:
        st.write("**Rating:** Not available")

@st.cache_data
def plot_spoke_when_pie(spoke_when):
    count_1 = spoke_when.count(1)
    count_0 = spoke_when.count(0)
    fig = go.Figure(data=[go.Pie(labels=['Speaker 1', 'Speaker 2'], values=[count_0, count_1])])
    return fig

@st.cache_data
def plot_spoke_when_bar(spoke_when, total_length_seconds):
    total_length_minutes = total_length_seconds / 60  # Convert total length to minutes
    x_values = [total_length_minutes * j / (len(spoke_when) - 1) for j in range(len(spoke_when))]
    fig = go.Figure(go.Bar(x=x_values, y=spoke_when))
    fig.update_xaxes(title="Time (minutes)", range=[0, total_length_minutes])
    fig.update_yaxes(title="Value")
    fig.update_layout(title="Spoke When Distribution")
    return fig


@st.cache_data
def plot_emotions_heatmap(emotions_data, total_length_seconds):
    total_length_minutes = total_length_seconds / 60  # Convert total length to minutes
    fig = make_subplots(rows=len(emotions_data), cols=1, subplot_titles=list(emotions_data.keys()), shared_xaxes=False, vertical_spacing=0.05)
    for i, (emotion, values) in enumerate(emotions_data.items(), start=1):
        minutes = [total_length_minutes * j / (len(values) - 1) for j in range(len(values))]
        fig.add_trace(go.Heatmap(z=[values], x=minutes, colorscale='Viridis', showscale=False), row=i, col=1)
    fig.update_layout(height=2500, width=850)
    fig.update_yaxes(showticklabels=False)
    fig.update_xaxes(title_text="Time (minutes)")
    return fig

@st.cache_data
def plot_emotions_barcharts(emotions_data, total_length_seconds):
    total_length_minutes = total_length_seconds / 60  # Convert total length to minutes
    fig = make_subplots(rows=len(emotions_data), cols=1, subplot_titles=list(emotions_data.keys()), shared_xaxes=False, vertical_spacing=0.05)
    for i, (emotion, values) in enumerate(emotions_data.items(), start=1):
        minutes = [total_length_minutes * (j + 1) / len(values) for j in range(len(values))]
        fig.add_trace(go.Bar(x=minutes, y=values, name=emotion), row=i, col=1)
    fig.update_layout(height=2500, width=850, title_text="Emotion Distribution", showlegend=False)
    fig.update_xaxes(range=[0, total_length_minutes], title_text="Time (minutes)")
    fig.update_yaxes(range=[0, 1], title_text="Value")
    return fig

@st.cache_data
def plot_emotions_linechart(emotions_data, total_length_seconds):
    total_length_minutes = total_length_seconds / 60  # Convert total length to minutes
    fig = make_subplots(rows=len(emotions_data), cols=1, subplot_titles=list(emotions_data.keys()), shared_xaxes=False, vertical_spacing=0.05)
    for i, (emotion, values) in enumerate(emotions_data.items(), start=1):
        minutes = [total_length_minutes * (j + 1) / len(values) for j in range(len(values))]
        fig.add_trace(go.Scatter(x=minutes, y=values, mode='lines', name=emotion), row=i, col=1)
    fig.update_layout(height=2500, width=850, title_text="Emotion Distribution")
    fig.update_yaxes(range=[0, 1], title_text="Value")
    fig.update_xaxes(title_text="Time (minutes)")
    return fig

print_video_info(video_info)

st.title("Summary")
st.write(joined_summary)

st.write("## Spoke When")
spoke_when_graph_type = st.selectbox("Select Plot Type for Spoke When", ["Bar Chart", "Pie Chart"])
if spoke_when_graph_type == "Bar Chart":
    fig = plot_spoke_when_bar(spoke_when, video_length_seconds)
    st.plotly_chart(fig)
elif spoke_when_graph_type == "Pie Chart":
    fig = plot_spoke_when_pie(spoke_when)
    st.plotly_chart(fig)

st.title("Video Level Emotion Visualization")
plot_type = st.selectbox("Select Plot Type", ["Bar Chart", "Line Chart", "Heatmap"])
if plot_type == "Bar Chart":
    fig = plot_emotions_barcharts(organized_emotions, video_length_seconds)
    st.plotly_chart(fig)
elif plot_type == "Line Chart":
    fig = plot_emotions_linechart(organized_emotions, video_length_seconds)
    st.plotly_chart(fig)
elif plot_type == "Heatmap":
    fig = plot_emotions_heatmap(organized_emotions, video_length_seconds)
    st.plotly_chart(fig)

st.title("Speaker Level Emotion Visualization")
st.write("## Speaker 1")
plot_type_speaker_1 = st.selectbox("Select Plot Type for Speaker 1", ["Bar Chart", "Line Chart", "Heatmap"])
if plot_type_speaker_1 == "Bar Chart":
    fig = plot_emotions_barcharts(speaker00_data, video_length_seconds)
    st.plotly_chart(fig)
elif plot_type_speaker_1 == "Line Chart":
    fig = plot_emotions_linechart(speaker00_data, video_length_seconds)
    st.plotly_chart(fig)
elif plot_type_speaker_1 == "Heatmap":
    fig = plot_emotions_heatmap(speaker00_data, video_length_seconds)
    st.plotly_chart(fig)

st.write("## Speaker 2")
plot_type_speaker_2 = st.selectbox("Select Plot Type for Speaker 2", ["Bar Chart", "Line Chart", "Heatmap"])
if plot_type_speaker_2 == "Bar Chart":
    fig = plot_emotions_barcharts(speaker01_data, video_length_seconds)
    st.plotly_chart(fig)
elif plot_type_speaker_2 == "Line Chart":
    fig = plot_emotions_linechart(speaker01_data, video_length_seconds)
    st.plotly_chart(fig)
elif plot_type_speaker_2 == "Heatmap":
    fig = plot_emotions_heatmap(speaker01_data, video_length_seconds)
    st.plotly_chart(fig)


In [None]:
!streamlit run app.py bypass-tunnel-reminder & npx -y localtunnel --port 8501 