In [None]:
import os
import sys
import json
import torch
import tempfile
import torchaudio
import numpy as np
import pandas as pd
import concurrent.futures
from typing import Any, Tuple
from moviepy.editor import *
from pydub import AudioSegment
from supabase import create_client, Client
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor

In [None]:
videopath = '../files/1/2/input/raw.mp4'
audiopath = '../files/1/2/raw.wav'
outputpath = '../files/1/2/output.wav'
filename = 'raw.wav'
fileformat = 'audio'
videoname = 'raw.mp4'

supabase_key = ''
supabase_url = 'https://kglmfklezrjwfvtcolgb.supabase.co'
supabase_bucket = 'interviews'

output_s3_folder = '1/2/output'
s3_temp_path = '1/2/output/temp'
current_speaker = 'speaker_000'

In [None]:
def __check_supabase_connection() -> Client:
    try:
        client = create_client(supabase_url, supabase_key)
    except Exception as e:
        message = ('Error connecting to Supabase, the program can not continue. {}'.
                   format(e.args[0]['message']))
        print(message)
        sys.exit(1)
    return client

supabase_client = __check_supabase_connection()

def __connect_to_bucket() -> Any:
    bucket_name = supabase_bucket
    connection = supabase_client.storage.from_(bucket_name)
    try:
        connection.list()
        print('Connection to S3 bucket {} successful'.format(bucket_name))
    except Exception as e:
        message = ('Error connecting to S3 bucket {}, the program can not continue. {}'.
                   format(bucket_name, e.args[0]['message']))
        print(message)
        sys.exit(1)
    return connection

supabase_connection = __connect_to_bucket()
supabase: Client = create_client(supabase_url, supabase_key)

In [None]:
supabase.table('results').delete().eq('interview_id', 2).execute()

In [None]:
path = '1/2/output'
subfolders = ([file['name'] for file in supabase_connection.list(path) if len(file['name'].split('.')) == 1])
subfolders

In [None]:
path = '1/2/output'
content = ([file['name'] for file in supabase_connection.list(path)])
subfolders = [folder for folder in content if '.' not in folder]
files_to_delete = ['{}/{}'.format(path, file) for file in content if '.' in file]

In [None]:
# List files to delete in subfolders
for folder in subfolders:
    files_to_delete += (['{}/{}/{}'.format(path, folder, file['name'])
                            for file in supabase_connection.list('{}/{}'.format(path, folder))])


In [None]:
subfolders, files_to_delete

In [None]:
# Delete results in DB
supabase.table('results').delete().eq('interview_id', 2).execute()

In [None]:
for file in files_to_delete:
    supabase_connection.remove(file)

In [None]:
def __open_input_file(videoname: str, env: str) -> Tuple[AudioFileClip, str, str] | None:
    if env == 'S3':
        s3_path = '1/2/raw/{}'.format(videoname)
        try:
            video_bytes = supabase_connection.download(s3_path)
            # Step 1: Create a temporary file
            with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
                temp_file_path = temp_file.name
                try:
                    # Step 2: Write the video_bytes to the temporary file
                    temp_file.write(video_bytes)
                    # Step 3: Ensure data is written to disk
                    temp_file.flush()
                    # Step 4: Open the video from the temporary file
                    audio = VideoFileClip(temp_file_path).audio
                    # Step 5: Extract the audio from the video file
                    
                    with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file_2:
                        temp_file_path_2 = temp_file_2.name
                
                        try:
                            # Step 2: Write the AudioFileClip to the temporary file
                            audio.write_audiofile(temp_file_path_2, codec='pcm_s16le')
                            
                            # Step 3: Load the temporary file with pydub
                            audio_segment = AudioSegment.from_file(temp_file_path_2, format='wav')
                        
                        finally:
                            temp_file_2.close()
                finally:
                    temp_file.close()
        except Exception as e:
            message = ('Error downloading the file {} from the S3 bucket: {}'.
                       format(videoname, e.args[0]['message']))
            sys.exit(1)
        return audio_segment, temp_file_path, temp_file_path_2

In [None]:
audio, temp_file_path, temp_file_path_2 = __open_input_file(videoname, 'S3')
type(audio)

In [None]:
audio.frame_rate

In [None]:
def save_to_s3(filename: str, content: bytes, file_format: str, s3_subfolder: str = None) -> bool:
    match file_format:
        case 'audio': content_type = 'audio/mpeg'
        case 'video': content_type = 'video/mp4'
        case 'text': content_type = 'text/plain'
        case _: content_type = 'text/plain'

    try:
        s3_path = '{}/{}/{}'.format(output_s3_folder,
                                    s3_subfolder,
                                    filename) if s3_subfolder else '{}/{}'.format(output_s3_folder, filename)
        supabase_connection.upload(file=content, path=s3_path, file_options={"content-type": content_type})
        return True
    except Exception as e:
        message = ('Error uploading the file {} to the S3 bucket: {}'.
                   format(filename, e.args[0]['message']))
        return False

Video to audio

In [None]:
def __video_to_audio(video_path: str, audio_path: str) -> None:
    video = VideoFileClip(video_path)
    audio = video.audio
    audio.write_audiofile(audio_path)

    print('Audio extraction finished. Audio file saved at {}'.format(audio_path))

    video.close()
    
__video_to_audio(videopath, audiopath)

Connect to Supabase

Connect to S3 bucket

In [None]:
import io
videoname = 'raw.mp4'
s3_path = '1/2/raw/{}'.format(videoname)

In [None]:
video_bytes = supabase_connection.download(s3_path)

In [None]:
print(type(video_bytes))
print(len(video_bytes))

In [None]:
with open('../files/{}'.format(videoname), 'wb+') as f:
    f.write(video_bytes)

In [None]:
import time
def __object_to_video(video_bytes):
    # Step 1: Create a temporary file
    with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
        temp_file_path = temp_file.name
    
        try:
            # Step 2: Write the video_bytes to the temporary file
            temp_file.write(video_bytes)
        
            # Step 3: Ensure data is written to disk
            temp_file.flush()
        
            # Open the video from the temporary file
            video = VideoFileClip(temp_file_path)
        
        finally:
            temp_file.close()
            video.close()
            # Step 4: Clean up the temporary file after use
            if os.path.exists(temp_file_path):
                os.remove(temp_file_path)
        
    return video

In [None]:
video = __object_to_video(video_bytes)
print(video.duration)

Split audio

In [None]:
def save_to_s3(filename: str, content: bytes, file_format: str, s3_subfolder: str = None) -> None:
    match file_format:
        case 'audio': content_type = 'audio/mpeg'
        case 'video': content_type = 'video/mp4'
        case 'text': content_type = 'text/plain'
        case _: content_type = 'text/plain'

    s3_path = '{}/{}/{}'.format(output_s3_folder, s3_subfolder, filename) if s3_subfolder else '{}/{}'.format(output_s3_folder, filename)
    print('Upload file {} to {}'.format(filename, s3_path))
    supabase_connection.upload(file=content,path=s3_path, file_options={"content-type": content_type})
    print('File {} uploaded to S3 bucket'.format(filename))

In [None]:
def __multiple_split() -> None:
    audio = AudioSegment.from_wav(audiopath)
    print('Start splitting {}'.format(audiopath))
    start = 2500
    end = 12500
    split_audio = audio[start:end]
    #split_audio.export(outputpath, format="wav")
    save_to_s3(filename, split_audio.export(format='wav').read(), 'audio', '{}/audioparts'.format(current_speaker))
    print('End splitting {}'.format(audiopath))
    
__multiple_split()

In [None]:
video = VideoFileClip(videopath)
audio = video.audio
audio.write_audiofile(audiopath)

In [None]:
print(type(audio))

In [None]:
import tempfile
def audiofileclip_to_audiosegment(audio_clip):
    # Step 1: Create a temporary file
    with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
        temp_file_path = temp_file.name
    
    try:
        # Step 2: Export the AudioFileClip to the temporary file
        audio_clip.write_audiofile(temp_file_path, codec='pcm_s16le')
        
        # Step 3: Read the temporary file with pydub
        audio_segment = AudioSegment.from_file(temp_file_path, format="wav")
        
    finally:
        # Step 4: Clean up the temporary file
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)
    
    return audio_segment

audio_segment = audiofileclip_to_audiosegment(audio)
print(type(audio_segment))

In [None]:
print(audio_tensor.shape)

In [None]:
print(type(audio))

In [None]:
stt_model_id = 'openai/whisper-large-v3'
stt_model = AutoModelForSpeechSeq2Seq.from_pretrained(stt_model_id)
stt_processor = AutoProcessor.from_pretrained(stt_model_id)

In [None]:
def audiosegment_to_tensor(audio_segment):
    # Step 1: Extract raw audio data
    raw_data = np.array(audio_segment.get_array_of_samples())
    
    # Step 2: Convert to NumPy array
    if audio_segment.channels == 2:  # Stereo audio
        raw_data = raw_data.reshape((-1, 2)).T.astype(np.float32)
    else:  # Mono audio
        raw_data = raw_data.astype(np.float32)
    
    # Normalize the audio data (optional)
    # Convert the raw data to range [-1, 1]
    if audio_segment.sample_width == 2:  # 16-bit audio
        raw_data /= 32768.0
    elif audio_segment.sample_width == 4:  # 32-bit audio
        raw_data /= 2147483648.0
    
    # Step 3: Convert to Torch Tensor
    audio_tensor = torch.from_numpy(raw_data)
    
    return audio_tensor, audio_segment.frame_rate

TENSOR

In [None]:
waveform, sampling_rate = torchaudio.load(audiopath)
model_sampling_rate = stt_processor.feature_extractor.sampling_rate
resampler = torchaudio.transforms.Resample(sampling_rate, model_sampling_rate)
resampled_waveform = resampler(waveform).squeeze().numpy()

input_features = stt_processor(resampled_waveform, sampling_rate=model_sampling_rate, return_tensors="pt").input_features

predicted_ids = stt_model.generate(input_features, language='french', task="transcribe")

transcription = stt_processor.batch_decode(predicted_ids, skip_special_tokens=True)

print(transcription[0].strip())

AudioSegment

In [None]:
audio_segment = AudioSegment.from_wav(audiopath)
waveform, sampling_rate = audiosegment_to_tensor(audio_segment)
model_sampling_rate = stt_processor.feature_extractor.sampling_rate
resampler = torchaudio.transforms.Resample(sampling_rate, model_sampling_rate)
resampled_waveform = resampler(waveform).squeeze().numpy()

input_features = stt_processor(resampled_waveform, sampling_rate=model_sampling_rate, return_tensors="pt").input_features

predicted_ids = stt_model.generate(input_features, language='french', task="transcribe")

transcription = stt_processor.batch_decode(predicted_ids, skip_special_tokens=True)

print(transcription[0].strip())

In [None]:
waveform, sampling_rate = torchaudio.load(audiopath)
audio_segment = AudioSegment.from_wav(audiopath)
waveform_2, sampling_rate_2 = audiosegment_to_tensor(audio_segment)


In [None]:
sampling_rate == sampling_rate_2

In [None]:
waveform == waveform_2

In [None]:
from pyannote.audio import Pipeline as AudioPipeline
diarization_model_id = 'pyannote/speaker-diarization-3.1'
diarization_pipeline = AudioPipeline.from_pretrained(diarization_model_id,use_auth_token='')

In [None]:
waveform, sampling_rate = torchaudio.load('../part_00000.wav')

In [None]:
diarization = diarization_pipeline({"waveform": waveform, "sample_rate": sampling_rate}, num_speakers=2)

In [None]:
text = diarization.to_rttm()

In [None]:
def save_to_s3(filename: str, content: str, file_format: str, s3_subfolder: str = None) -> bool:
    match file_format:
        case 'audio': content_type = 'audio/mpeg'
        case 'video': content_type = 'video/mp4'
        case 'text': content_type = 'text/plain'
        case _: content_type = 'text/plain'

    try:
        s3_path = '{}/{}/{}'.format(output_s3_folder,
                                    s3_subfolder,
                                    filename) if s3_subfolder else '{}/{}'.format(output_s3_folder, filename)
        supabase_connection.upload(file=content, path=s3_path, file_options={"content-type": content_type})
        return True
    except Exception as e:
        message = ('Error uploading the file {} to the S3 bucket: {}'.
                   format(filename, e.args[0]['message']))
        return False

In [None]:
rttm_str = diarization.to_rttm().encode()

In [None]:
save_to_s3('diarization.rttm', rttm_str, 'text')

In [None]:
storage_url = f"{supabase_url}storage/v1/object/{supabase_bucket}/1/2/output/diarization.rttm"
storage_url

In [None]:
headers = {
        "apikey": supabase_key,
        "Authorization": f"Bearer {supabase_key}",
        "Content-Type": "text/plain"
    }
headers

In [None]:
import requests
response = requests.put(storage_url, headers=headers, data=rttm_str)

In [None]:
response.text

In [None]:
s3_path = '1/2/output/speaker_000/audioparts'
list = supabase_connection.list(s3_path)

In [None]:
for file in list:
    filename = file['name']
    if filename.split('.')[-1] == 'wav' and filename == 'part_00000.wav':
        downloaded_file = supabase_connection.download('{}/{}'.format(s3_path, filename))

In [None]:
with open('./part_0000.wav', 'wb+') as f:
  f.write(downloaded_file)

In [None]:
downloaded_file

In [None]:
type(downloaded_file)

In [None]:
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
    temp_file_path = temp_file.name

    try:
    # Step 2: Export the AudioFileClip to the temporary file
        temp_file.write(downloaded_file)
    #downloaded_file.write_audiofile(temp_file_path, codec='pcm_s16le')
    
    # Step 3: Read the temporary file with torchaudio
        waveform, sample_rate = torchaudio.load(temp_file_path)
    finally:
        temp_file.close()
        # Step 4: Clean up the temporary file
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

In [None]:
waveform

In [None]:
sample_rate

In [None]:
results_path = '../files/1/2/output/results_speaker_000.h5'
audio = pd.read_hdf(results_path, key='audio')
video = pd.read_hdf(results_path, key='video')
texts = pd.read_hdf(results_path, key='text')

all = pd.read_hdf(results_path, key='all')

In [None]:
with tempfile.NamedTemporaryFile(suffix='.h5', delete=False) as temp_file:
    temp_file_path = temp_file.name
    print(temp_file_path)
    try:
        # Write the file to the temporary file
        with pd.HDFStore(temp_file_path) as store:
            store.put('text', texts)
            store.put('video', video)
            store.put('audio', audio)
            store.put('all', all)
        
        with open(temp_file_path, 'rb') as f:
            try:
                s3_path = '1/2/output/'
                filename = 'results.h5'
                response = supabase_connection.upload(file=f, path='{}/{}'.format(s3_path, filename), file_options={'content-type': 'application/octet-stream'})
                print('File {} uploaded to S3 bucket at {}'.format(filename, s3_path))
            except Exception as e:
                message = ('Error uploading the file {} to the S3 bucket: {}'.format(filename, e.args[0]['message']))
                print(message) 
    finally:
        temp_file.close()
        if os.path.exists(temp_file_path):
                os.remove(temp_file_path)

In [None]:
path = '{}/texts.tmp'.format(s3_temp_path)
path

In [None]:
def read_df_from_s3(filename) -> pd.DataFrame:
    path = '{}/temp/{}.tmp'.format(output_s3_folder, filename)
    with tempfile.NamedTemporaryFile(suffix='.h5', delete=False) as temp_file:
        temp_file_path = temp_file.name
        try:
            res = supabase_connection.download(path)
            temp_file.write(res)
            df = pd.read_hdf(temp_file_path, key='data', index_col=None)
        finally:
            temp_file.close()
            if os.path.exists(temp_file_path):
                os.remove(temp_file_path)
    return df

In [None]:
text_results = read_df_from_s3('text_emotions')
audio_results = read_df_from_s3('audio_emotions')
video_results = read_df_from_s3('video_emotions')

In [None]:
results = pd.merge(text_results, audio_results, on=['speaker', 'part'], how='inner')

In [None]:
results = pd.merge(results, video_results, on=['speaker', 'part'], how='inner')

In [None]:
results

In [None]:
df = pd.read_hdf(res, key='data', index_col=None)
print(df)

In [None]:
def save_to_s3(filename: str, content: Any, file_format: str, s3_subfolder: str = None) -> None:
    match file_format:
        case 'audio':
            content_type = 'audio/mpeg'
        case 'video':
            content_type = 'video/mp4'
        case 'text':
            content_type = 'text/plain'
        case _:
            content_type = 'text/plain'

    try:
        s3_path = '{}/{}/{}'.format(output_s3_folder,
                                    s3_subfolder,
                                    filename) if s3_subfolder else '{}/{}'.format(output_s3_folder, filename)
        supabase_connection.upload(file=content, path=s3_path, file_options={"content-type": content_type})
        print('File {} uploaded to S3 bucket at {}'.format(filename, s3_path))
    except Exception as e:
        message = ('Error uploading the file {} to the S3 bucket.'.
                   format(filename), str(e))
        print(message)

In [None]:
speakers_str = '{"speaker_000": [[0.30096875, 8.907218750000002], [41.391593750000006, 41.45909375], [55.44846875, 107.05221875000001], [118.10534375, 119.69159375000001], [120.78846875, 129.19221875], [129.90096875, 146.80971875]], "speaker_001": [[9.818468750000001, 46.994093750000005], [47.837843750000005, 55.44846875], [107.45721875000001, 117.76784375000001], [119.13471875, 120.78846875], [128.11221875, 130.54221875000002]]}'
speakers_str

In [None]:
save_to_s3('speakers.json', speakers_str.encode(), 'text', 'temp')

In [None]:
speakers_path = '{}/temp/speakers.json'.format(output_s3_folder)
res = supabase_connection.download(speakers_path)

In [None]:
json.loads(res.decode())

In [None]:
speak = json.loads(res)
my_list = list(speak.keys())

In [None]:
my_list

In [None]:
str = '{"speaker_000": [[0.30096875, 8.907218750000002], [41.391593750000006, 41.45909375], [55.44846875, 107.05221875000001], [118.10534375, 119.69159375000001], [120.78846875, 129.19221875], [129.90096875, 146.80971875]], "speaker_001": [[9.818468750000001, 46.994093750000005], [47.837843750000005, 55.44846875], [107.45721875000001, 117.76784375000001], [119.13471875, 120.78846875], [128.11221875, 130.54221875000002]]}'
json.loads(str)

In [None]:
supabase_secret_key = ''

In [None]:
supabase: Client = create_client(supabase_url, supabase_key)

In [None]:
def fetch_all_records(table_name: str):
    try:
        # Fetch all records from the specified table
        response = supabase.table(table_name).select('first_name').execute()
        
        # Check if the request was successful
        if response.get('error'):
            print("Error:", response['error'])
        else:
            # Access the data
            data = response['data']
            print(f"Data from {table_name}:", json.dumps(data, indent=2))
    except Exception as e:
        print("An error occurred:", e)

In [None]:
fetch_all_records('sessions')

In [None]:
response = supabase.table('interviews').select("*").execute()

In [None]:
response

In [None]:
path = '1/2/output_old/temp'
for file in supabase_connection.list(path):
    filepath = '{}/{}'.format(path, file['name'])
    supabase_connection.remove(filepath)
supabase_connection.remove(path)

In [None]:
path = '1/2/output_old/temp/'
supabase_connection.remove(path)

In [None]:
supabase_connection.list()

In [None]:
champ_name = 'text_ok'
try:
    ((supabase.table('interviews').update({champ_name: False})).eq('id', 2)).execute()
    print("Record updated successfully")
except Exception as e:
    message = ('Error updating {} in the database'.format(champ_name), str(e))
    print("Failed to update the record:", message)

In [None]:
response.get('error')

In [None]:
text_results, audio_results, video_results = False, False, False

In [None]:
import concurrent.futures

def method1():
    # Simulate work
    print("Method 1 started")
    # Add your method logic here
    return "Result of method 1"

def method2():
    # Simulate work
    print("Method 2 started")
    # Add your method logic here
    return "Result of method 2"

def method3():
    # Simulate work
    print("Method 3 started")
    # Add your method logic here
    return "Result of method 3"

def final_method(results):
    print("Final method started")
    # Do something with the results
    print("Results:", results)
    # Add your final method logic here

In [None]:
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Submit the three methods to be executed concurrently
    future1 = executor.submit(method1)
    future2 = executor.submit(method2)
    future3 = executor.submit(method3)
    
    # Wait for all methods to complete and get their results
    results = [future.result() for future in concurrent.futures.as_completed([future1, future2, future3])]
    
    # Call the final method with the results
    final_method(results)

In [None]:
type(results)

In [47]:
def __analyse_text():
    return {'text': True}

def __analyse_audio():
    raise Exception("Error in video analysis")
    #return {'audio': True}

def __analyse_video():
    raise Exception("Error in video analysis")
    #return {'video': False}

In [48]:
def merge_results(results):
    text = [d['text'] for d in results if 'text' in d][0]
    audio = [d['audio'] for d in results if 'audio' in d][0]
    video = [d['video'] for d in results if 'video' in d][0]
    
    print("Results:")
    print("Text:", text)
    print("Audio:", audio)
    print("Video:", video)

In [51]:
all_results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
    text_results = executor.submit(__analyse_text)
    audio_results = executor.submit(__analyse_audio)
    video_results = executor.submit(__analyse_video)

for future in concurrent.futures.as_completed([text_results, audio_results, video_results]):
    try:
        result = future.result()
        all_results.append(result)
    except Exception as e:
        print('An error occurred: {}'.format(e))

    
#results = [analysis.result() for analysis in concurrent.futures.as_completed([text_results, audio_results, video_results])]

In [50]:
results


In [34]:
merge_results(results)

In [35]:
text = [d['text'] for d in results if 'text' in d][0]
audio = [d['audio'] for d in results if 'audio' in d][0]
video = [d['video'] for d in results if 'video' in d][0]

In [36]:
text