<a href="https://colab.research.google.com/github/1ucky40nc3/TREX/blob/main/TREX.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!nvidia-smi

Thu Aug 26 16:58:07 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.57.02    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   45C    P0    27W / 250W |      0MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [2]:
# @title Utils for the entire Notebook
# @markdown ✋ Rerun Cell if Runtime was restarted 🔄

from IPython.utils.io import capture_output


def execute(func, *args, verbose: bool = False, **kwargs):
    if verbose:
        return func(*args, **{"verbose": verbose, **kwargs})
    
    with capture_output() as captured:
        return func(*args, **{"verbose": verbose, **kwargs})

---


# ***Natural Language Processing (NLP)*** 📰🤯


---

In [3]:
# @title | NLP | Install Dependencies ⇩
VERBOSE = False # @param {type:"boolean"}
    

def install_nlp_dependencies(**kwargs):
    !pip install sentencepiece
    !pip install transformers
    !pip install torch==1.9.0+cu102 torchvision==0.10.0+cu102 torchaudio===0.9.0 -f https://download.pytorch.org/whl/torch_stable.html
    !pip install torch-geometric
    !pip install torch-scatter==2.0.8 -f https://pytorch-geometric.com/whl/torch-1.9.0+cu102.html


execute(install_nlp_dependencies, verbose=VERBOSE)

In [None]:
# @title | NLP | Initialize the NLP Pipelines
# @markdown ✋ Rerun Cell if Runtime was restarted 🔄

import torch
import transformers


def device(boolean: bool) -> int:
    return 0 if boolean else -1

#@markdown ---
#@markdown ### Model selection for the NLP toolkit 🤖📰
ZERO_SHOT_MODEL = "facebook/bart-large-mnli" #@param ["facebook/bart-large-mnli", "typeform/distilbert-base-uncased-mnli", "joeddav/xlm-roberta-large-xnli", "Narsil/deberta-large-mnli-zero-cls"]
TABLE_QA_MODEL = "google/tapas-large-finetuned-wikisql-supervised" #@param ["lysandre/tiny-tapas-random-wtq", "lysandre/tiny-tapas-random-sqa", "google/tapas-base-finetuned-wtq", "google/tapas-base-finetuned-sqa", "google/tapas-base-finetuned-wikisql-supervised", "google/tapas-large-finetuned-wtq", "google/tapas-large-finetuned-sqa", "google/tapas-large-finetuned-wikisql-supervised"]
SMALL_TALK_MODEL = "facebook/blenderbot-90M" #@param ["facebook/blenderbot-90M", "facebook/blenderbot-400M-distill", "facebook/blenderbot-1B-distill", "facebook/blenderbot-3B"]
FEW_SHOT_MODEL = "EleutherAI/gpt-neo-125M" #@param ["gpt2", "gpt2-medium", "gpt2-large", "gpt2-xl", "EleutherAI/gpt-neo-125M", "EleutherAI/gpt-neo-1.3B", "EleutherAI/gpt-neo-2.7B"]

#@markdown ---
#@markdown ### Language selection during operation 🏳️‍🌈/🏴‍☠️
LANGUAGE = "de" #@param ["en", "de"]

#@markdown ---
#@markdown ### Model selection for translation between English and German
GERMAN_TO_ENGLISH_MODEL = "facebook/wmt19-de-en" #@param ["Helsinki-NLP/opus-mt-de-en", "facebook/wmt19-de-en"]
ENGLISH_TO_GERMAN_MODEL = "facebook/wmt19-en-de" #@param ["Helsinki-NLP/opus-mt-en-de", "facebook/wmt19-en-de"]

#@markdown ---
#@markdown ### Select if the individual model shall be on GPU 💻🔥
USE_GPU_FOR_ZERO_SHOT = False # @param {type:"boolean"}
USE_GPU_FOR_SMALL_TALK = True # @param {type:"boolean"}
USE_GPU_FOR_FEW_SHOT = False # @param {type:"boolean"}

USE_GPU_FOR_GERMAN_TO_ENGLISH = False # @param {type:"boolean"}
USE_GPU_FOR_ENGLISH_TO_GERMAN = False # @param {type:"boolean"}

#@markdown ---
VERBOSE = True # @param {type:"boolean"}

def initialize_nlp_pipelines(**kwargs):
    print("[DEBUG] Downloading Zero-Shot-Classification Components")
    ZERO_SHOT = transformers.pipeline(
        "zero-shot-classification",
        model=ZERO_SHOT_MODEL,
        device=device(USE_GPU_FOR_ZERO_SHOT))
    
    print("[DEBUG] Downloading Table QA Components")
    TABLE_QA = transformers.pipeline(
        "table-question-answering", 
        model=TABLE_QA_MODEL)

    print("[DEBUG] Downloading Small-Talk Components")
    SMALL_TALK = transformers.pipeline(
        "conversational", 
        model=SMALL_TALK_MODEL, 
        device=device(USE_GPU_FOR_SMALL_TALK))

    print("[DEBUG] Downloading Text-To-Text Components")
    FEW_SHOT = transformers.pipeline(
        "text-generation", 
        model=FEW_SHOT_MODEL, 
        device=device(USE_GPU_FOR_FEW_SHOT))
    
    if LANGUAGE == "de":
        print("[DEBUG] Downloading German-To-English Translation Components")
        GERMAN_TO_ENGLISH_TRANSLATOR = transformers.pipeline(
            "translation_de_to_en", 
            model=GERMAN_TO_ENGLISH_MODEL)
        print("[DEBUG] Downloading English-To-German Translation Components")
        ENGLISH_TO_GERMAN_TRANSLATOR = transformers.pipeline(
            "translation_en_to_de", 
            model=ENGLISH_TO_GERMAN_MODEL)
    
    return locals()

PIPELINES = execute(initialize_nlp_pipelines, verbose=VERBOSE)

In [34]:
# @title | NLP | Set up Services for Data
# @markdown ✋ Rerun Cell if Runtime was restarted 🔄

import io
import pandas as pd
import transformers


TRAIN_TABLE = """Location,Train,Start,Destination,Departure,Arrival,Delay
Munich,ICE 77,Munich,Berlin,12:30,13:33,2
Munich,ICE 56,Munich,Leipzig,12:30,13:30,4
Munich,ICE 33,Leipzig,Berlin,13:30,14:00,45"""

PLANE_TABLE = """Location,Plane,Start,Destination,Departure,Arrival,Delay
Munich,Eurowings (EW8003),Munich,Berlin,12:30,13:33,0
Munich,Lufthansa (LH1940),Munich,Leipzig,12:30,13:30,10
Munich,Lufthansa (LH2040),Leipzig,Berlin,13:30,14:00,7"""


def train_table() -> pd.DataFrame:
    return pd.read_csv(io.StringIO(TRAIN_TABLE))

def plane_table() -> pd.DataFrame:
    return pd.read_csv(io.StringIO(PLANE_TABLE))

def df_to_csv(df) -> str:
    csv = io.StringIO()
    df.to_csv(csv, index=False)
    return csv.getvalue()

def csv_to_df(csv) -> pd.DataFrame:
    return pd.read_csv(io.StringIO(csv))

def travel_tables(tables=[train_table(), plane_table()]) -> pd.DataFrame:
    travel = pd.concat(tables)
    travel = travel.fillna("NONE")

    csv = df_to_csv(travel)
    travel = csv_to_df(csv)
    travel = travel.astype(
        {column: str for column in travel.columns.values})
    return travel

travel_samples = """Question: "I am in Hannover. It is 17:48. Which train can I take from Munich to Berlin?" Context: "ICE 33" Answer: "The train ICE 33 will travel from Munich to Berlin."
Question: "I am in Frankfurt. It is 09:32. When will the next flight to Madrid leave?" Context: "07:59" Answer: "The next to Madrid will leave 07:59."
Question: "I am in Hannover. It is 17:48. What is the best train to get to Berlin from London?" Context: "ICE 33" Answer: "The train ICE 33 is the fastest."
Question: "I am in Munich. It is 05:51. From which platform does the RB 61 exit to Zurich?" Context: "8" Answer: "The RB 61 will departs from the platform 8."
Question: "I am in Berlin. It is 12:20. How can I get to London by aircraft?" Context: "Eurowings (EW8003)" Answer: "The flight Eurowings (EW8003) will leave for London."
Question: "I am in London. It is 12:20. I am at a train station. How can I get to Frankfurt?" Context: "ICE 923" Answer: "The train ICE 77 is available for your trip."
Question: "I am in Hamburg. It is 23:43. From which gate does the Lufthansa aircraft (LH1940) take off?" Context: "23" Answer: "The Lufthansa airliner (LH1940) will take off from gate 23."
Question: "I am in Stuttgart. It is 08:24. When will the next ICE to Bremen leave?" Context: "07:59" Answer: "The next to Bremen will leave 07:59."
Question: "I am in Bremen. It is 20:15. Is the ICE 1556 delayed?" Context: "9" Answer: "The ICE 1556 will be delayed by 9 minutes."
Question: "I am in Dresden. It is 10:17. How much is the flight LH1239 delayed?" Context: "59" Answer: "The flight LH1239 will be delayed by 59 minutes."
"""

def samples_length(samples: str, model: str) -> int:
    if not "EleutherAI/gpt-neo" in model:
        raise Exception(
            """The tokenizer of the FEW-SHOT model is not accessible!
            Therefore the sample length can not be computed.""")
        
    tokenizer = transformers.GPT2Tokenizer.from_pretrained(FEW_SHOT_MODEL)
    input_ids = tokenizer(travel_samples, return_tensors="pt").input_ids
    return input_ids.shape[-1]


TRAVEL_TIME = "7:00" #@param {type: "string"}
TRAVEL_LOCATION = "Munich" #@param {type: "string"}

def travel_time() -> str:
    return TRAVEL_TIME

def travel_location() -> str:
    return TRAVEL_LOCATION

travel_tables()

Unnamed: 0,Location,Train,Start,Destination,Departure,Arrival,Delay,Plane
0,Munich,ICE 77,Munich,Berlin,12:30,13:33,2,NONE
1,Munich,ICE 56,Munich,Leipzig,12:30,13:30,4,NONE
2,Munich,ICE 33,Leipzig,Berlin,13:30,14:00,45,NONE
3,Munich,NONE,Munich,Berlin,12:30,13:33,0,Eurowings (EW8003)
4,Munich,NONE,Munich,Leipzig,12:30,13:30,10,Lufthansa (LH1940)
5,Munich,NONE,Leipzig,Berlin,13:30,14:00,7,Lufthansa (LH2040)


In [35]:
# @title | NLP | NLP Implementation
# @markdown ✋ Rerun Cell if Runtime was restarted 🔄

#@markdown ---
VERBOSE = False # @param {type:"boolean"}

from typing import Dict
from typing import List
from typing import Tuple
from typing import Callable
from typing import Optional


class Associations(Dict):
    data: Callable[[], pd.DataFrame]
    samples: str

Response = Tuple[str, Optional[transformers.Conversation]]

Function = Callable[
    [transformers.Conversation], 
    Response
]

class Skill(Dict):
    associations: Associations
    function: Function

class Skills(Dict):
    name: str
    skill: Skill

WarmUp = Callable[
    [transformers.Conversation], 
    transformers.Conversation
]

"""~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Section for Classification on a Zero-Shot basis.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"""

def zero_shot_classification(input: str, 
                             labels: List[str], 
                             top_k: Optional[int] = 1,
                             **kwargs) -> List[str]:
    return PIPELINES["ZERO_SHOT"](input, labels)["labels"][:top_k]

def skill_classification(input: str, 
                         skills: List[str], 
                         verbose: Optional[bool],
                         **kwargs) -> List[str]:
    if verbose:
        print(f"[DEBUG] |Skill Classification| input: {input}")
        print(f"[DEBUG] |Skill Classification| skills: {skills}")

    skill = zero_shot_classification(input, skills, **kwargs)[0]

    if verbose:
        print(f"[DEBUG] |Skill Classification| skill: {skill}")
    return skill

def toxicity_classification(input: str,
                            verbose: Optional[bool],
                            labels: List[str] = ["toxic", "non-toxic"],
                            **kwargs) -> List[str]:
    if verbose:
        print(f"[DEBUG] |Toxicity Classification| input: {input}")
        print(f"[DEBUG] |Toxicity Classification| labels: {labels}")

    label = zero_shot_classification(input, labels, **kwargs)[0]

    if verbose:
        print(f"[DEBUG] |Toxicity Classification| label: {label}")
    return label


"""~~~~~~~~~~~~~~~~~~~
Section for Table QA.
~~~~~~~~~~~~~~~~~~~"""

def table_question_answering(input: str, 
                             table: pd.DataFrame, 
                             verbose: Optional[bool] = False, 
                             **kwargs) -> str:
    if verbose:
        print(f"[DEBUG] |Table Question Answering| input: {input}")
        print(f"[DEBUG] |Table Question Answering| table: \n{table}")
    
    print(type(input), type(table))
    output = PIPELINES["TABLE_QA"](table=table, query=input)

    if verbose:
        print(f"[DEBUG] |Table Question Answering| output: \n{output}")
    return output["answer"]


"""~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Section for Few-Shot Text Generation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"""

def few_shot(query: str, 
             samples: str, 
             verbose: Optional[bool] = False, 
             **kwargs) -> str:
    if verbose:
        print(f"[DEBUG] |Few-Shot Text Generation| query: \n{query}")
        print(f"[DEBUG] |Few-Shot Text Generation| samples: \n{samples}")
    
    output = PIPELINES["FEW_SHOT"](samples + query, **kwargs)

    if verbose:
        print(f"[DEBUG] |Few-Shot Text Generation| output: \n{output}")
    return output[0]["generated_text"]


def travel_few_shot(query: str, 
                    samples: str, 
                    verbose: Optional[bool] = False, 
                    **kwargs) -> str:
    if verbose:
        print(f"[DEBUG] |Travel Few-Shot Text Generation| query: \n{query}")
        print(f"[DEBUG] |Travel Few-Shot Text Generation| samples: \n{samples}")

    output = few_shot(query, 
                      samples, 
                      verbose, 
                      **kwargs)
    output = output[len(samples + query):]
    output = output.split('"')[0]

    if verbose:
        print(f"[DEBUG] |Travel Few-Shot Text Generation| output: \n{output}")
    return output


"""~~~~~~~~~~~~~~~~~~~~~~~~~
Section for Skill Functions
~~~~~~~~~~~~~~~~~~~~~~~~~"""

def travel_skill(conversation: transformers.Conversation, 
                 associations: Associations, 
                 verbose: Optional[bool] = False, 
                 **kwargs) -> Response:
    if verbose:
        print(f"[DEBUG] |Travel Skill| input conversation: \n{conversation}")
        print(f"[DEBUG] |Travel Skill| associations: \n{associations}")

    input = conversation.new_user_input
    labels = list(associations.keys())

    variant = zero_shot_classification(input, labels, **kwargs)[0]

    data = associations[variant]["data"]()
    time = associations[variant]["time"]()
    location = associations[variant]["location"]()
    samples = associations[variant]["samples"]
    config = associations[variant]["config"]

    cell = table_question_answering(input, data, verbose)

    input = f"I am in {location}. It is {time}. {input}"
    query = f'Question: "{input}" Context: "{cell}" Answer: "'

    output = travel_few_shot(
        query, 
        samples, 
        verbose, 
        **{**config, **kwargs})

    if verbose:
        print(f"[DEBUG] |Travel Skill| variant: {variant}")
        print(f"[DEBUG] |Travel Skill| cell: {cell}")
        print(f"[DEBUG] |Travel Skill| output: {output}")
    return output, None

def small_talk_skill(conversation: transformers.Conversation, 
                     verbose: Optional[bool] = False, 
                     **kwargs) -> Response:
    if verbose:
        print(f"[DEBUG] |Small Talk Skill| input conversation: \n{conversation}")

    conversation = PIPELINES["SMALL_TALK"](conversation)
    output = conversation.generated_responses[-1]

    if verbose:
        print(f"[DEBUG] |Small Talk Skill| output conversation: \n{conversation}")
        print(f"[DEBUG] |Small Talk Skill| output: {output}")
    return output, conversation


"""~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Section for Personas and Warm Up
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"""


def warm_up(conversation: transformers.Conversation, 
            personas: List[str],
            verbose: bool = False,
            **kwargs) -> transformers.Conversation:
    for persona in personas:
        conversation.add_user_input(persona)
        conversation = PIPELINES["SMALL_TALK"](conversation)

    if verbose:
        print(f"[DEBUG] |Warm Up| personas: {personas}")
        print(f"[DEBUG] |Warm Up| personas: {conversation}")
    return conversation


"""~~~~~~~~~~~~~~~~~
Language Processors
~~~~~~~~~~~~~~~~~"""

def german_to_english_translation(input: str,
                                  verbose: Optional[bool] = False, 
                                  **kwargs) -> str:
    if verbose:
        print(f"[DEBUG] |German-To-English Translation| input: {input}")

    translation = PIPELINES["GERMAN_TO_ENGLISH_TRANSLATOR"](input, **{"num_beams": 40, **kwargs})

    if verbose:
        print(f"[DEBUG] |German-To-English Translation| translation: {translation}")
    return translation[0]["translation_text"]

def english_to_german_translation(input: str,
                                  verbose: Optional[bool] = False, 
                                  **kwargs) -> str:
    if verbose:
        print(f"[DEBUG] |English-To-German Translation| input: {input}")

    translation = PIPELINES["ENGLISH_TO_GERMAN_TRANSLATOR"](input, **{"num_beams": 40, **kwargs})

    if verbose:
        print(f"[DEBUG] |English-To-German Translation| translation: {translation}")
    return translation[0]["translation_text"]


"""~~~~~~~~~~~~~~~
NLP Configuration
~~~~~~~~~~~~~~~"""

LANGUAGES = {
    "en": {
        "preprocessor": lambda x: x,
        "postprocessor": lambda x: x,
    },
    "de": {
        "preprocessor": german_to_english_translation,
        "postprocessor": english_to_german_translation,
    }
}

TRAVEL_ASSOCIATIONS = {
    "travel": {
        "data": travel_tables,
        "time": travel_time,
        "location": travel_location,
        "samples": travel_samples,
        "config": {
            "temperature": 0.6,
            "do_sample": True,
            "max_length": samples_length(
                travel_samples, 
                FEW_SHOT_MODEL) + 50,
        }
    }
}

TRAVEL_SKILL = {
    "associations": TRAVEL_ASSOCIATIONS, 
    "function": travel_skill
}

SKILLS: Skills = {
    "small talk": {
        "associations": {}, 
        "function": small_talk_skill
    },
    "travel": TRAVEL_SKILL,
    "travel on time": TRAVEL_SKILL,
    "travel delayed": TRAVEL_SKILL,
    "other": {
        "associations": {}, 
        "function": small_talk_skill
    },
}

PERSONAS = []


CONFIG = {
    "languages": LANGUAGES,
    "personas": PERSONAS,
    "skills": SKILLS,
} 


class NLP:
    def __init__(self,
                 config: dict = CONFIG,
                 verbose: bool = True,
                 **kwargs):
        self.config = config

        language_processors = config["languages"][LANGUAGE]
        self.language_preprocessor = language_processors["preprocessor"]
        self.language_postprocessor = language_processors["postprocessor"]

        self.conversation = transformers.Conversation()
        self.conversation = warm_up(
            self.conversation, 
            personas=config["personas"],
            verbose=verbose,
            **kwargs)

        self.skills = config["skills"]
        
    def __call__(self, 
                 input: str, 
                 verbose: bool = False, 
                 **kwargs) -> str:
        if verbose:
            print(f"[DEBUG] |NLP __call__ START|" + "~"*20)
            print(f"[DEBUG] |NLP ATTR skills|: {self.skills}")
            print(f"[DEBUG] |NLP ATTR conversation|: \n{self.conversation}")
            print(f"[DEBUG] |NLP User input|: {input}")
        
        input = self.language_preprocessor(input)
        self.conversation.add_user_input(input)

        skill = skill_classification(
            input, 
            list(self.skills.keys()), 
            verbose=verbose, 
            **kwargs)
        
        pipeline = self.skills[skill]
        function = pipeline["function"]
        associations = pipeline["associations"]
        
        output, conversation = function(
            self.conversation, 
            associations=associations, 
            verbose=verbose,
            **kwargs)

        if conversation:
            self.conversation = conversation
        output = self.language_postprocessor(output)

        if verbose:
            print(f"[DEBUG] |NLP conversation |: \n{self.conversation}")
            print(f"[DEBUG] |NLP User output|: {output}")
            print(f"[DEBUG] |NLP __call__ END|" + "~"*20)
        return output

nlp = execute(NLP, verbose=VERBOSE, config=CONFIG)

---


# ***Speech Recognition (STT)*** 🎤💬


---


In [None]:
# @title | STT | Installation of Dependencies ⇩

#@markdown ---
VERBOSE = False # @param {type:"boolean"}

def install_sst_dependencies(**kwargs):
    !pip install transformers
    !pip install numpy==1.20
    !pip install numba==0.48
    !pip install ffmpeg-python
    !pip install -q https://github.com/tugstugi/dl-colab-notebooks/archive/colab_utils.zip

execute(install_sst_dependencies, verbose=VERBOSE)

In [22]:
# @title | STT | Audio Recording Utils
# @markdown ✋ Rerun Cell if Runtime was restarted 🔄
"""Utils for recording audio in a Google Colaboratory notebook.

This code is adapted from:
    https://ricardodeazambuja.com/deep_learning/2019/03/09/audio_and_video_google_colab/
    https://colab.research.google.com/gist/ricardodeazambuja/03ac98c31e87caf284f7b06286ebf7fd/microphone-to-numpy-array-from-your-browser-in-colab.ipynb
"""

SILENT = "&> /dev/null"

import io
import ffmpeg
import numpy as np
from base64 import b64decode
from scipy.io.wavfile import write
from dl_colab_notebooks.audio import audio_bytes_to_np

from IPython.display import display
from IPython.display import HTML
from google.colab.output import eval_js


STYLES_HTML = """
<script>

var styles = `

button {
    width: 300px;
    height: 54px;

    padding: 20px;
    margin: 5px;

    display: flex;
    justify-content: center;
    align-items: center;
    border-radius: 40px;
    border: none;

    text-align: center;
    font-size: 28px;
    
    transition: all 0.5s;
    cursor: pointer;
}

button span {
    display: inline-block;
    position: relative;

    cursor: pointer;
    transition: 0.5s;
}

button span:after {
    content: '🙏';

    position: absolute;
    right: -20px;

    opacity: 0;
    transition: 0.5s;
}

button:hover span {
    padding-right: 25px;
}

button:hover span:after {
    right: 0;
    opacity: 1;
}
`

var styleSheet = document.createElement("style")
styleSheet.type = "text/css"
styleSheet.innerText = styles
document.head.appendChild(styleSheet);

</script>
"""

AUDIO_HTML = """
<script>

var container = document.createElement("div");
var button = document.createElement("button");
var span = document.createElement("span");

button.appendChild(span);
container.appendChild(button);
document.body.appendChild(container);

var base64data = 0;
var reader, recorder, gumStream;

var handleSuccess = function(stream) {
    gumStream = stream;
    var options = {
            mimeType : 'audio/webm;codecs=opus'
    };            
    recorder = new MediaRecorder(stream);
    recorder.ondataavailable = function(e) {            
        var url = URL.createObjectURL(e.data);
        var preview = document.createElement('audio');

        preview.controls = true;
        preview.src = url;
        container.appendChild(preview);

        reader = new FileReader();
        reader.readAsDataURL(e.data); 
        reader.onloadend = function() {
            base64data = reader.result;
        }
    };
    recorder.start();
};

span.innerText = "⏸︎";
button.style.verticalAlign = "middle";
navigator.mediaDevices.getUserMedia({audio: true}).then(handleSuccess);

function toggleRecording() {
    if (recorder && recorder.state == "recording") {
        recorder.stop();
        gumStream.getAudioTracks()[0].stop();
        span.innerText = "✅"
    }
}

// https://stackoverflow.com/a/951057
function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

var data = new Promise(resolve => {
    button.onclick = () => {
        toggleRecording()

        sleep(2000).then(() => {
            resolve(base64data.toString())
        });
    }
});
      
</script>
"""

def record(sample_rate: int = 16000) -> str:
    display(HTML(STYLES_HTML + AUDIO_HTML))
    data = eval_js("data")
    
    audio_bytes = b64decode(data.split(',')[1])
    return audio_bytes_to_np(audio_bytes, sample_rate)

In [None]:
# @title | STT | Wav2Vec2 Speech Recognition
# @markdown ✋ Rerun Cell if Runtime was restarted 🔄

#@markdown ---
VERBOSE = False # @param {type:"boolean"}

from typing import List

import torch
import numpy as np


def speech_to_text_implementation(**kwargs):
    from transformers import Wav2Vec2Tokenizer
    from transformers import Wav2Vec2ForCTC

    STT_MODEL = "facebook/wav2vec2-large-960h-lv60-self" if LANGUAGE == "en" else "facebook/wav2vec2-large-xlsr-53-german"

    # load model and tokenizer
    tokenizer = Wav2Vec2Tokenizer.from_pretrained(STT_MODEL)
    wav2vec2 = Wav2Vec2ForCTC.from_pretrained(STT_MODEL)

    def speech_to_text(audio: np.ndarray) -> List[str]:   
        input_values = tokenizer(
            [audio], 
            return_tensors="pt", 
            padding="longest"
        ).input_values

        logits = wav2vec2(input_values).logits
        predicted_ids = torch.argmax(logits, dim=-1)

        text = tokenizer.batch_decode(predicted_ids)
        text = " ".join(text)
        return text

    return speech_to_text

speech_to_text = execute(speech_to_text_implementation, verbose=VERBOSE)

---


# ***Text-To-Speech (TTS)*** 💭📣


---

In [13]:
# @title | TTS | Installation of Dependencies ⇩

#@markdown ---
VERBOSE = False # @param {type:"boolean"}

def install_tts_dependencies(**kwargs):
    !apt-get install -y espeak

    if LANGUAGE == "de":
        !gdown --id 1VG0EI7J6S1bk3h0q1VBc9ALExkdZdeVm -O tts_model.pth.tar
        !gdown --id 1s1GcSihlj58KX0LeA-FPFvdMWGMkcxKI -O config.json
        !gdown --id 1zYFHElvYW_oTeilvbZVLMLscColWRbck -O vocoder_model.pth.tar
        !gdown --id 1ye9kVDbatAKMncRMui7watrLQ_5DaJ3e -O config_vocoder.json
        !gdown --id 1QD40bU_M7CWrj9k0MEACNBRqwqVTSLDc -O scale_stats.npy
        !sudo apt-get install espeak
        !git clone https://github.com/coqui-ai/TTS

        %cd TTS
        !git checkout 540d811
        !pip install -r requirements.txt
        !python setup.py install

        # sometimes installation does not work
        import os, sys
        sys.path.append(os.getcwd())
        %cd ..
    else:
        !git clone https://github.com/1ucky40nc3/TransformerTTS.git
        %cd TransformerTTS
        !git checkout package
        !pip install torchaudio
        !pip install -r /content/TransformerTTS/requirements.txt
        !pip install -r /content/TransformerTTS/TransformerTTS/vocoding/extra_requirements.txt
        !python setup.py develop

        !wget https://public-asai-dl-models.s3.eu-central-1.amazonaws.com/hifigan.zip
        !unzip -q hifigan.zip
        !rsync -avq hifigan/ /content/TransformerTTS/TransformerTTS/vocoding/hifigan/

execute(install_tts_dependencies, verbose=VERBOSE)

In [14]:
# @title | TTS | TTS Implementation
# @markdown ✋ Rerun Cell if Runtime was restarted 🔄

# @markdown ---

# @markdown #### ❗ ***If "de" is selected as language an error may accure.***
# @markdown #### ⏩ Just try to rerun this cell. 👻 

# @markdown ---
USE_GPU_4_GERMAN_TTS = False # @param {type:"boolean"}

#@markdown ---
VERBOSE = False # @param {type:"boolean"}

import torch
import numpy as np
from torchaudio import functional as F

def text_to_speech_implementation(**kwargs):
    if LANGUAGE == "de":
        import os
        from TTS.utils.io import load_config
        from TTS.utils.audio import AudioProcessor
        from TTS.tts.utils.io import load_checkpoint
        from TTS.tts.utils.synthesis import synthesis
        from TTS.tts.utils.text.symbols import symbols
        from TTS.tts.utils.generic_utils import setup_model
        from TTS.vocoder.utils.generic_utils import setup_generator
        from TTS.vocoder.utils.io import load_checkpoint as load_vocoder_checkpoint

        TTS_MODEL = "/content/tts_model.pth.tar"
        TTS_CONFIG = "/content/config.json"
        VOCODER_MODEL = "/content/vocoder_model.pth.tar"
        VOCODER_CONFIG = "/content/config_vocoder.json"

        TTS_CONFIG = load_config(TTS_CONFIG)
        TTS_CONFIG.audio["stats_path"] = "/content/scale_stats.npy"

        VOCODER_CONFIG = load_config(VOCODER_CONFIG)

        audio_processor = AudioProcessor(**TTS_CONFIG.audio)

        model, _ = load_checkpoint(
            setup_model(
                num_chars=len(symbols), 
                num_speakers=0,
                c=TTS_CONFIG),
            checkpoint_path=TTS_MODEL)

        vocoder, _ = load_vocoder_checkpoint(
            setup_generator(VOCODER_CONFIG), 
            checkpoint_path=VOCODER_MODEL)
        vocoder.remove_weight_norm()
        vocoder.inference_padding = 0

        if USE_GPU_4_GERMAN_TTS:
            model.cuda()
            vocoder.cuda()

        model.eval()
        vocoder.eval()

        def text_to_speech(text: str) -> np.ndarray:
            _, _, _, mel_postnet_spec, _, _ = synthesis(
                model, 
                text, 
                TTS_CONFIG,
                USE_GPU_4_GERMAN_TTS, 
                audio_processor)
            
            speech = vocoder.inference(
                torch.FloatTensor(
                    mel_postnet_spec.T,
                ).unsqueeze(0))
            speech = speech.flatten().cpu().numpy()

            return speech
        
        return text_to_speech
    
    %cd /content/TransformerTTS

    from TransformerTTS.model.factory import tts_ljspeech
    from TransformerTTS.vocoding.predictors import HiFiGANPredictor


    folder = "/content/TransformerTTS/TransformerTTS/vocoding/hifigan/en"


    model, _ = tts_ljspeech()
    vocoder = HiFiGANPredictor.from_folder(folder)

    def text_to_speech(text: str) -> np.ndarray:
        speech = model.predict(text)
        speech = speech["mel"].numpy().T
        speech = vocoder([speech])[0]

        return speech

    %cd ..
    return text_to_speech

text_to_speech = execute(text_to_speech_implementation, verbose=VERBOSE)

def postprocessing(wav: np.ndarray) -> np.ndarray:
    wav = torch.from_numpy(wav)

    wav = wav.unsqueeze(-1).T
    wav = F.apply_codec(
        waveform=wav, 
        sample_rate=22050,
        format="wav", 
        encoding="PCM_F")
    wav = F.resample(
        waveform=wav, 
        orig_freq=22050, 
        new_freq=16000)

    wav = wav.squeeze()
    wav = wav.numpy()
    
    return wav

---


# ***Avatar (PC-AVS)*** 🤗🤖


---

In [15]:
# @title | PC-AVS | Install Dependencies ⇩

#@markdown ---
VERBOSE = False # @param {type:"boolean"}

def install_avatar_dependencies(**kwargs):
    !git clone https://github.com/1ucky40nc3/Talking-Face_PC-AVS.git
    %cd /content/Talking-Face_PC-AVS

    !pip install -r requirements.txt
    !pip install lws
    !pip install face-alignment
    !pip install av
    !pip install torchaudio

    !unzip ./misc/Audio_Source.zip -d ./misc/
    !unzip ./misc/Input.zip -d ./misc/
    !unzip ./misc/Mouth_Source.zip -d ./misc/ 
    !unzip ./misc/Pose_Source.zip -d ./misc/

    !gdown https://drive.google.com/u/0/uc?id=1Zehr3JLIpzdg2S5zZrhIbpYPKF-4gKU_&export=download
    !mkdir checkpoints
    !unzip demo.zip -d ./checkpoints/

execute(install_avatar_dependencies, verbose=VERBOSE)

In [16]:
# @title | PC-AVS | PC-AVS Implementation
# @markdown ✋ Rerun Cell if Runtime was restarted 🔄
%cd /content/Talking-Face_PC-AVS

import os
import sys
import torch
import torchvision
from tqdm import tqdm

sys.path.append('..')

from data import create_dataloader
from models import create_model


torch.manual_seed(0)


class Namespace:
    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)


def pc_avs_inference(opt, 
                     path_label, 
                     model, 
                     wav) -> str:
    opt.path_label = path_label
    dataloader = create_dataloader(opt, wav=wav)

    fake_image_driven_pose_as = []

    for data_i in tqdm(dataloader):
        _, fake_image_driven_pose_a = model.forward(
            data_i, mode='inference')

        fake_image_driven_pose_as.append(
            fake_image_driven_pose_a)

    filename = os.path.join(
        dataloader.dataset.get_processed_file_savepath(), 
        "G_Pose_Driven_.mp4")

    video_array = torch.cat(fake_image_driven_pose_as, dim=0)
    video_array = video_array.cpu().transpose(1, 3)
    video_array = video_array * 125.5 + 125.5 
    video_array = video_array.type(torch.uint8)
    video_array = torch.rot90(video_array, -1, [1, 2])

    wav = torch.from_numpy(wav)
    wav = torch.unsqueeze(wav, dim=0)
    
    torchvision.io.write_video(
        filename=filename, 
        video_array=video_array,
        fps=25,
        video_codec="libx264",
        audio_array=wav,
        audio_fps=16000,
        audio_codec="aac"
    )    

    del dataloader
    return filename


def avatar(opt,
           path_label,
           wav) -> str:
    opt.isTrain = False

    model = create_model(opt).cuda()
    model.eval()

    return pc_avs_inference(
        opt, 
        path_label, 
        model, 
        wav)
    

opt = Namespace(
    D_input='single', 
    VGGFace_pretrain_path='', 
    aspect_ratio=1.0, 
    audio_nc=256, 
    augment_target=False, 
    batchSize=16, 
    beta1=0.5, 
    beta2=0.999, 
    checkpoints_dir='./checkpoints', 
    clip_len=1, 
    crop=False, 
    crop_len=16, 
    crop_size=224, 
    data_path='/home/SENSETIME/zhouhang1/Downloads/VoxCeleb2/voxceleb2_train.csv', 
    dataset_mode='voxtest', 
    defined_driven=False, 
    dis_feat_rec=False, 
    display_winsize=224, 
    driven_type='face', 
    driving_pose=True, 
    feature_encoded_dim=2560, 
    feature_fusion='concat', 
    filename_tmpl='{:06}.jpg', 
    fitting_iterations=10, 
    frame_interval=1, 
    frame_rate=25, 
    gan_mode='hinge', 
    gen_video=True, 
    generate_from_audio_only=True, 
    generate_interval=1, 
    gpu_ids=[0], 
    has_mask=False, 
    heatmap_size=3, 
    hop_size=160, 
    how_many=1000000, 
    init_type='xavier', 
    init_variance=0.02, 
    input_id_feature=True, 
    input_path='./checkpoints/results/input_path', 
    isTrain=False, 
    label_mask=False, 
    lambda_D=1, 
    lambda_contrastive=100, 
    lambda_crossmodal=1, 
    lambda_feat=10.0, 
    lambda_image=1.0, 
    lambda_rotate_D=0.1, 
    lambda_softmax=1000000, 
    lambda_vgg=10.0, 
    lambda_vggface=5.0, 
    landmark_align=False, 
    landmark_type='min', 
    list_end=1000000, 
    list_num=0, 
    list_start=0, 
    load_from_opt_file=False, 
    load_landmark=False, 
    lr=0.001, 
    lrw_data_path='/home/SENSETIME/zhouhang1/Downloads/VoxCeleb2/voxceleb2_train.csv', 
    max_dataset_size=9223372036854775807, 
    meta_path_vox='./conversations/feaa8fc7-8fc7-4ecf-acef-f06ca221b493/15/avatar.csv', 
    mode='cpu', 
    model='av', 
    multi_gpu=False, 
    nThreads=4, 
    n_mel_T=4, 
    name='demo', 
    ndf=64, 
    nef=16, 
    netA='resseaudio', 
    netA_sync='ressesync', 
    netD='multiscale', 
    netE='fan', 
    netG='modulate', 
    netV='resnext', 
    ngf=64, 
    no_TTUR=False, 
    no_flip=True, 
    no_ganFeat_loss=False, 
    no_gaussian_landmark=False, 
    no_id_loss=False, 
    no_instance=False, 
    no_pairing_check=False, 
    no_spectrogram=False, 
    no_vgg_loss=False, 
    noise_pose=True, 
    norm_A='spectralinstance', 
    norm_D='spectralinstance', 
    norm_E='spectralinstance', 
    norm_G='spectralinstance', 
    num_bins_per_frame=4, 
    num_classes=5830, 
    num_clips=1, 
    num_frames_per_clip=5, 
    num_inputs=1, 
    onnx=False, 
    optimizer='adam', 
    output_nc=3, 
    phase='test', 
    pose_dim=12, 
    positional_encode=False, 
    preprocess_mode='resize_and_crop', 
    results_dir='./conversations/feaa8fc7-8fc7-4ecf-acef-f06ca221b493/15', 
    save_path='./conversations/feaa8fc7-8fc7-4ecf-acef-f06ca221b493/15', 
    serial_batches=False, 
    start_ind=0, 
    style_dim=2560, 
    style_feature_loss=True, 
    target_crop_len=0, 
    train_dis_pose=False, 
    train_recognition=False, 
    train_sync=False, 
    train_word=False, 
    trainer='audio', 
    use_audio=1, 
    use_audio_id=0, 
    use_transformer=False, 
    verbose=False, 
    vgg_face=False, 
    which_epoch='latest', 
    word_loss=False
)

/content/Talking-Face_PC-AVS


---
---


# ***T-REX*** 🦖


---
---

In [17]:
# @title | T-REX | Start new Conversation
# @markdown ✋ Rerun Cell if Runtime was restarted 🔄

#@markdown ---
ACTIVATE_PERSONAS = False # @param {type:"boolean"}
PERSONA_1 = "I work in a travel agency" # @param {type:"string"}
PERSONA_1 = f"your persona: {PERSONA_1}"
PERSONA_2 = "My name is Mia" # @param {type:"string"}
PERSONA_2 = f"your persona: {PERSONA_2}"

#@markdown ---
VERBOSE = False # @param {type:"boolean"}

import uuid
import base64
from IPython.display import HTML


def trex_setup(**kwargs):
    config = {
        "languages": LANGUAGES,
        "personas": [PERSONA_1, PERSONA_2] if ACTIVATE_PERSONAS else [],
        "skills": SKILLS,
    }

    nlp = NLP(config=config, **kwargs)

    conversation_id = uuid.uuid4()
    conversation_dir = f"./conversations/{conversation_id}"
    !mkdir ./conversations/
    !mkdir {conversation_dir}

    interaction_counter = 0
    f"Current Conversation is logged at: {conversation_dir}"

    !rm -r /content/Talking-Face_PC-AVS/results/id_input_pose_00473_audio_tts_output

    return nlp

nlp = execute(trex_setup, verbose=VERBOSE)

In [None]:
# @title # T-REX 🦖
# @markdown ✋ Rerun Cell if Runtime was restarted 🔄


#@markdown ---
#@markdown ### Default: STT Input

# States if STT output shall be used.
RECORD_AUDIO = True # @param {type:"boolean"}

#@markdown ---
#@markdown ### Alternative: Text Input

# Get the text input.
TEXT_INPUT = "Is the ICE 77 delayed?" # @param {type:"string"}

#@markdown ---

VERBOSE = False # @param {type:"boolean"}
verbose = "" if VERBOSE else "&> /dev/null"

def trex(input: str="", **kwargs) -> str:
    print(f"[DEBUG] |T-REX STT| STT Output: {input}")

    input = f'"{input}"'

    output = nlp(input, **kwargs)
    output = f'"{output}"'
    print(f"[DEBUG] |T-REX NLP| NLP Output: {output}")

    audio = text_to_speech(output)
    audio = postprocessing(audio)

    image_id = "1" if LANGUAGE == "de" else "2"
    path_labels = f"./misc/Input/input {image_id} ./misc/Pose_Source/00473 158 ./misc/Audio_Source/tts_output.mp3 None 0 None"

    print(path_labels)
    video = avatar(
        opt,
        path_labels,
        audio
    )

    return video

input = speech_to_text(record()) if RECORD_AUDIO else TEXT_INPUT
video = execute(trex, input=input, verbose=VERBOSE)

# Show the final output.
mp4 = open(video,'rb').read()
data_url = "data:video/mp4;base64," + base64.b64encode(mp4).decode()

HTML("""
<video width=700 controls autoplay>
    <source src="%s" type="video/mp4">
</video>
""" % data_url)