In [None]:
import whisper, json, datetime
start = datetime.datetime.now()

from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization', use_auth_token=True)
# Check si CUDA ok
import torch
print(torch.cuda.is_available())
print(f'{datetime.datetime.now() - start}')

In [None]:
start = datetime.datetime.now()
pathfile="./AlainWOrk/wav/i.wav"
diarization_result = pipeline(pathfile)
print(f'diarization_result ok in {datetime.datetime.now() - start}')

In [None]:
start = datetime.datetime.now()
model = whisper.load_model("large-v2")
print(f"Model chargé en {datetime.datetime.now() - start} Fait chauffer le gpu Marcel ! ")

start = datetime.datetime.now()
whisper_res = model.transcribe(pathfile, verbose=True, language="French")
print(f"whisper_res OK in {datetime.datetime.now() - start}")

In [None]:
from pyannote.core import Segment, Annotation, Timeline


def get_text_with_timestamp(transcribe_res):
    timestamp_texts = []
    for item in transcribe_res['segments']:
        start = item['start']
        end = item['end']
        text = item['text']
        timestamp_texts.append((Segment(start, end), text))
    return timestamp_texts


def add_speaker_info_to_text(timestamp_texts, ann):
    spk_text = []
    for seg, text in timestamp_texts:
        spk = ann.crop(seg).argmax()
        spk_text.append((seg, spk, text))
    return spk_text


def merge_cache(text_cache):
    sentence = ''.join([item[-1] for item in text_cache])
    spk = text_cache[0][1]
    start = text_cache[0][0].start
    end = text_cache[-1][0].end
    return Segment(start, end), spk, sentence


# PUNC_SENT_END = []
PUNC_SENT_END = ['.', '?', '!']


def merge_sentence(spk_text):
    merged_spk_text = []
    pre_spk = None
    text_cache = []
    for seg, spk, text in spk_text:
        if spk != pre_spk and pre_spk is not None and len(text_cache) > 0:
            merged_spk_text.append(merge_cache(text_cache))
            text_cache = [(seg, spk, text)]
            pre_spk = spk

        elif text[-1] in PUNC_SENT_END:
            text_cache.append((seg, spk, text))
            merged_spk_text.append(merge_cache(text_cache))
            text_cache = []
            pre_spk = spk
        else:
            text_cache.append((seg, spk, text))
            pre_spk = spk
    if len(text_cache) > 0:
        merged_spk_text.append(merge_cache(text_cache))
    return merged_spk_text


def diarize_text(transcribe_res, diarization_result):
    timestamp_texts = get_text_with_timestamp(transcribe_res)
    spk_text = add_speaker_info_to_text(timestamp_texts, diarization_result)
    res_processed = merge_sentence(spk_text)
    return res_processed


def write_to_txt(spk_sent, file):
    with open(file, 'w') as fp:
        for seg, spk, sentence in spk_sent:
            line = f'{seg.start:.2f} {seg.end:.2f} {spk} {sentence}\n'
            fp.write(line)
        
        
def add_space_before_punctuation(text):
    text = re.sub(r'(\w)([?!])', r'\1 \2', text)
    return text

def remove_space_and_tiret(text):
    text = re.sub(r'^\ ', '',text)
    text = re.sub(r'^\— ', '',text)
    return text

def replace_words(text):

    word_dict = {
        "que l'on doit": "que nous devons",
        "on propose": "nous proposons",
        "on restait": "nous restions",
        "on restera": "nous resterons",
        "on reste": "nous restons",
        "on reprend": "nous reprenons",
        "qu'on s'était": "que nous nous étions",
        "qu'on s'est": "que nous nous sommes",
        "on se rend": "nous nous rendons",
        "on vous suit": "nous vous suivons",
        "on pourrait": "nous pourrions",
        "on peut": "nous pouvons",
        "on ne saura": "nous ne saurons",
        "on peut s'en": "nous pouvons nous en ",
        "il va falloir": "il faudra",
        "on n'avait": "nous n'avions ",
        "qu'on ferait": "que nous ferions",
        "c'est des": "ce sont des",
        "c'était pas": "ce n'était pas",
        "j'ai pas": "je n'ai pas",
        "il a pas ": "il n'a pas",
        "c'est des": "ce sont des",
        "qu'on n'est plus": "que nous ne sommes plus",
        "qu'on n'est": "que nous sommes",
        "qu'on est": "que nous sommes",
        "qu'on a": "que nous avons",
        "qu'on n'a plus": "que nous n'avons plus",
        "on en avait": "nous en avions",
        "on en a": "nous en avons",
        "on tenait": "nous tenions",
        "il faut pas": "il ne faut pas",
        "il doit pas": "il ne doit pas",
        "il travaille pas": "il ne travaille pas",
        "on n'est pas": "nous ne sommes pas",
        "qu'on puisse": "que nous puissions",
        "est-ce que vous avez": "avez-vous",
        "est-ce que vous êtes ": "etes-vous",
        "est-ce qu'il serait": "serait-il",
        "on ne s'est pas": "nous ne nous sommes pas",
        "qu'on n'était pas": "que nous n'étions pas",
        "qu'on était pas": "que nous n'étions pas",
        "qu'on ne mettait pas": "que nous ne mettions pas",
        "qu'on mettait pas": "que nous ne mettions pas",
        "on peut se": "nous pouvons nous",
        "qu'on en a": "que nous en avons",
        "est-ce qu'on peut": "pouvons-nous",
        "on a": "nous avons",
        "on est": "nous sommes",
        "on vient": "nous venons",
        "on va": "nous allons",
        "on obtient": "nous obtenons",
        "on fait": "nous faisons",
        "on coordonne": "nous coordonnons",
        "on se frotte": "nous nous frottons",
        "on se donne": "nous nous donnons",
        "on mange": "nous mangeons",
        "on donne": "nous donnons",
        "on apprend": "nous apprenons",
        "on s'est dit": "nous nous sommes dit",
        "on sait": "nous savons",
        "on prend": "nous prenons",
        "ça": "cela",
        "est-ce que vous avez": "avez-vous",
        "est-ce qu'il y a": "y a-t-il ",
        "puisqu'on voit": "puisque nous voyons",
        "qu'on voit": "que nous voyons",
        "comment on peut": "comment pouvons-nous",
        "on participe": "nous participons",
        "-là": "",
        "qu'nous": "que nous",
    }
    

    for old_word, new_word in word_dict.items():
        pattern = r'\b(' + old_word + r')\b'                   
        pattern_case_insensitive = re.compile(pattern, re.IGNORECASE)
        text = remove_space_and_tiret(text)
        text = add_space_before_punctuation(text)
        text = re.sub(pattern_case_insensitive, lambda m: new_word.capitalize() if m.group(1)[0].isupper() else new_word.lower(), text)
    return text                                                             
                        

In [None]:
# Enregistrement des resultats pour plus tard !
import pathlib, os, json
path = pathlib.Path(pathfile)
dir_result = f"{path.parent.absolute()}/{path.name}_result"

pathlib.Path(f"{dir_result}").mkdir(parents=True, exist_ok=True)

os.listdir(path.parent.absolute())

with open(f"{dir_result}/{path.name}.whisper.json", 'w') as f:
    json.dump(whisper_res, f, ensure_ascii=False, indent=4)
f.close()
    
with open(f"{dir_result}/{path.name}.diarization.json", 'w') as d:
    json.dump(diarization_result.for_json(), d, ensure_ascii=False, indent=4)
d.close()

with open(f"{dir_result}/{path.name}.diarization.txt", 'w') as t:
    t.write(f"{diarization_result}")
t.close()


In [None]:
import re

final_result = diarize_text(whisper_res, diarization_result)
with open(f"{dir_result}/{path.name}.final.txt", 'w') as r:
    ex_speaker = None
    for segment, speaker, text in final_result:
        if speaker != ex_speaker:
            r.write('\n\n')
            r.write(f'{speaker}')
            r.write('\n\n')
            ex_speaker = speaker
        r.write(replace_words(text))

r.close()

In [None]:
from transformers import GPT2TokenizerFast
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
# tokenizer(final_result)['input_ids']

bloc = ""
list_bloc = []

ex_speaker = None
for segment, speaker, text in final_result:
    if speaker != ex_speaker:
        len_token = len(tokenizer(bloc)['input_ids'])
        if len_token > 3000 :
            list_bloc.append(bloc)
            bloc = ""
        bloc += '\n\n'
        bloc += f'{speaker} : '
        ex_speaker = speaker
    bloc += replace_words(text)

print(len(list_bloc))

In [None]:
import os
import openai
openai.api_key = ""

response = openai.Completion.create(
  model="text-davinci-003",
  prompt=f"Résume moi chaque intervention : \n{list_bloc[0]}",
  temperature=0,
  max_tokens=1000,
  top_p=1.0,
  frequency_penalty=0.0,
  presence_penalty=0.0
)

text = response.get('choices')[0].get('text')
text
