In [1]:
from flair.models import SequenceTagger
from flair.data import Sentence
from speechbrain.pretrained import EncoderASR, EncoderDecoderASR, EncoderClassifier
from text_to_num import alpha2digit
import base64
import subprocess
import re

In [2]:
alpha2digit("two hours", "en",
           relaxed=False, signed=True, ordinal_threshold=0)

'2 hours'

In [3]:
def predict_flare(model, TEXT_test):
    """
    """
    sentence = Sentence(TEXT_test)
    # predict the tags
    model.predict(sentence)
    result_dict = sentence.to_dict("ner")
    return result_dict


def doc_to_spans_flare(doc):
    """
    """
    spans = []
    scores = []
    entities = []
    results = []
    zipped = []
    predictions = doc["entities"]
    for prediction in predictions:
        if not prediction:
            continue
        spans.append({
            'from_name': 'label',
            'to_name': 'text',
            'type': 'labels',
            'value': {
                'start': prediction["start_pos"],
                'end': prediction["end_pos"],
                'text': prediction["text"],
                'labels': [str(prediction["labels"][0]).split()[0]],
#                 'score': [str(prediction["labels"][0]).split()[1].strip("()")]
            }
        })
        scores.append(float(str(prediction["labels"][0]).split()[1].strip("()")))
        entities.append(str(prediction["labels"][0]).split()[0])
        results.append(prediction["text"])
    final_dict = {#"spans":spans,
                 "entities":entities,
                 "scores":scores,
                 "result":results,
                 "zipped":[list(a) for a in zip(results, entities, scores)]}
    return final_dict


def speech_ner(wav_file_path="output.wav"):
    """
    """
    raw_text = asr_model.transcribe_file(wav_file_path)
    print(raw_text)
    digit_text = alpha2digit(raw_text,
                "fr", relaxed=True, signed=False, ordinal_threshold=10).lower()
    print(digit_text)
    #alpha2digit bug
    digit_text = digit_text.replace("une heure","1 heure").replace("une minute","1 minute")
    return doc_to_spans_flare(predict_flare(tagger, digit_text))

def speech_ner_fr(wav_file_path="output.wav"): #NEW
    """
    """
    raw_text = asr_model_fr.transcribe_file(wav_file_path)
    print(raw_text)
    digit_text = alpha2digit(raw_text,
                "fr", relaxed=True, signed=False, ordinal_threshold=10).lower()
    print(digit_text)
    #alpha2digit bug
    digit_text = digit_text.replace("une heure","1 heure").replace("une minute","1 minute").replace("une second","1 seconde")
    return doc_to_spans_flare(predict_flare(tagger_fr, digit_text))

def speech_ner_en(wav_file_path="output.wav"): #NEW
    """
    """
    raw_text = asr_model_en.transcribe_file(wav_file_path)
    print(raw_text)
    digit_text = alpha2digit(raw_text,
                "en", relaxed=True, signed=False, ordinal_threshold=10).lower()
    print(digit_text)
    #alpha2digit bug
    digit_text = digit_text.replace("one hour","1 hour").replace("one minute","1 minute").replace("one second","1 seconde")
    return doc_to_spans_flare(predict_flare(tagger_en, digit_text))

In [4]:
# Load models
tagger_fr = SequenceTagger.load("amtam0/speech-timer")
tagger_en = SequenceTagger.load("amtam0/timer-ner-en")
asr_model_fr = EncoderASR.from_hparams(source="speechbrain/asr-wav2vec2-commonvoice-fr",
                                    savedir="./pretrained_models/asr-wav2vec2-commonvoice-fr",
                                   run_opts={"device":"cuda"})
asr_model_en = EncoderDecoderASR.from_hparams(source="speechbrain/asr-wav2vec2-commonvoice-en",
                                              savedir="./pretrained_models/asr-wav2vec2-commonvoice-en",
                                             run_opts={"device":"cuda"})
lang_model = EncoderClassifier.from_hparams(source="speechbrain/lang-id-commonlanguage_ecapa",
                                            savedir="./pretrained_models/lang-id-commonlanguage_ecapa",
                                           run_opts={"device":"cuda"})

2021-11-17 19:37:19,611 loading file /root/.flair/models/speech-timer/19e799904eb07a52c4f62608b81fba30e2aa1c943f4f42d513783fa9380b2839.464f8a698a1df420b87ab0f4ed5b89bf7e2c40ca7c80b15fb570a7d4b2280b03
2021-11-17 19:37:30,919 loading file /root/.flair/models/timer-ner-en/55f2e96ed914757a19908cab758790ffdf14750200fe8eeed171da38603006f4.de59a20e65f2d1772de6aa71a900f29502c0c5a1637703041764f4ac2cefc9c5


In [5]:
enc = base64.b64encode(open("output1.wav", "rb").read())
body_image64 = enc.decode("utf-8")

In [8]:
%%time
DATA = {"body64":body_image64,
"bmodel_name": ""}
img_path = "out.wav"
with open(img_path, "wb") as f:
    f.write(base64.b64decode(DATA["body64"]))
subprocess.call("ffmpeg -i {} -c:a pcm_f32le {} -y".format("output1.wav", "out.wav"),
                shell=True)

CPU times: user 3.85 ms, sys: 40.1 ms, total: 44 ms
Wall time: 151 ms


0

In [12]:
%%time
out_prob, score, index, text_lab = lang_model.classify_file('out.wav')
if text_lab==["French"]:
    print("French")
    res = speech_ner_fr(wav_file_path="out.wav")
elif text_lab==["English"]:
    print("English")
    res = speech_ner_en(wav_file_path="out.wav")
res

English
START THREE SETS OF THIRTY MINUTES FORTY FIVE MINUTES BETWEEN EACH SET
start 3 sets of 30 minutes 45 minutes between each set
CPU times: user 754 ms, sys: 524 µs, total: 754 ms
Wall time: 228 ms


{'entities': ['nb_rounds', 'duration_wt_min', 'duration_br_min'],
 'scores': [1.0, 1.0, 1.0],
 'result': ['3', '30 minutes', '45 minutes between each set'],
 'zipped': [['3', 'nb_rounds', 1.0],
  ['30 minutes', 'duration_wt_min', 1.0],
  ['45 minutes between each set', 'duration_br_min', 1.0]]}

In [None]:
%%time
# French Example
out_prob, score, index, text_lab = lang_model.classify_file('speechbrain/lang-id-commonlanguage_ecapa/example-fr.wav')
print(text_lab)

In [None]:
def timer_format(_dict):
    """
    """
    hours = int(_dict["hours"])
    minutes = int(_dict["minutes"])
    seconds = int(_dict["seconds"])

    # minutes
    sd_minutes = seconds // 60
    # remaining seconds
    seconds = seconds - (sd_minutes * 60)
    # total time
    time = '{:02}:{:02}'.format(int(minutes)+sd_minutes+60*hours, int(seconds))
    return time

def format_result(zipped, entities, Thresh = 0.6):
    
    wt_dict = {"seconds":0,
              "minutes":0,
              "hours":0}

    br_dict = {"seconds":0,
              "minutes":0,
              "hours":0}

    result_dict = {"nb_rounds":"2",
                   "wt":"01:00",
                  "br":"01:00"}
    Idx_to_rm = []
    #control Thresh
    for idx,zip_line in enumerate(zipped):
        if zip_line[2]<Thresh:
            Idx_to_rm.append(idx)
    for index in sorted(Idx_to_rm, reverse=True):
        del zipped[index]

    #control duplicates
    if len(entities)!=len(set(entities)):
        return result_dict
    #     pass
    else:
        #format
        for idx,zip_line in enumerate(zipped):
            if zip_line[1]=="nb_rounds":
                result_dict["nb_rounds"] = re.findall(r'\d+',zip_line[0])[0]
            if "_wt" in zip_line[1]:
                if zip_line[1]=="duration_wt_sd":
                    wt_dict["seconds"] = re.findall(r'\d+',zip_line[0])[0]
                elif zip_line[1]=="duration_wt_min":
                    wt_dict["minutes"] = re.findall(r'\d+',zip_line[0])[0]
                elif zip_line[1]=="duration_wt_hr":
                    wt_dict["hours"] = re.findall(r'\d+',zip_line[0])[0]
            if "_br" in zip_line[1]:
                if zip_line[1]=="duration_br_sd":
                    br_dict["seconds"] = re.findall(r'\d+',zip_line[0])[0]
                elif zip_line[1]=="duration_br_min":
                    br_dict["minutes"] = re.findall(r'\d+',zip_line[0])[0]
                elif zip_line[1]=="duration_br_hr":
                    br_dict["hours"] = re.findall(r'\d+',zip_line[0])[0]
        # print(br_dict)
        result_dict["br"] = timer_format(br_dict)
        result_dict["wt"] = timer_format(wt_dict)
        return result_dict

In [None]:
format_result(res["zipped"], res["entities"], Thresh = 0.6)

In [None]:
DATA["result"] = res