From af34109cc4b0ff875c545c910df3a8d6cb013315 Mon Sep 17 00:00:00 2001 From: Huy Le Nguyen Date: Thu, 28 Jan 2021 01:51:00 +0700 Subject: [PATCH 1/4] :writing_hand: initialize data pipeline in pure tf --- examples/conformer/config.yml | 3 +- examples/conformer/train_subword_conformer.py | 6 +- scripts/create_tfrecords.py | 16 ++- tensorflow_asr/augmentations/augments.py | 41 +++++++- tensorflow_asr/augmentations/spec_augment.py | 66 ++++++++++++- tensorflow_asr/datasets/asr_dataset.py | 98 +++++++++++++------ tensorflow_asr/datasets/base_dataset.py | 2 + tensorflow_asr/datasets/keras/asr_dataset.py | 94 ++++++++++++++++-- .../featurizers/speech_featurizers.py | 5 + tensorflow_asr/losses/rnnt_losses.py | 2 +- tensorflow_asr/utils/utils.py | 4 +- 11 files changed, 286 insertions(+), 51 deletions(-) diff --git a/examples/conformer/config.yml b/examples/conformer/config.yml index e538ab6383..e9bab34c5d 100755 --- a/examples/conformer/config.yml +++ b/examples/conformer/config.yml @@ -60,6 +60,7 @@ model_config: learning_config: augmentations: + use_tf: True after: time_masking: num_masks: 10 @@ -77,7 +78,7 @@ learning_config: - /mnt/Miscellanea/Datasets/Speech/LibriSpeech/dev-other/transcripts.tsv test_paths: - /mnt/Miscellanea/Datasets/Speech/LibriSpeech/test-clean/transcripts.tsv - tfrecords_dir: /mnt/Miscellanea/Datasets/Speech/LibriSpeech/tfrecords + tfrecords_dir: /mnt/Miscellanea/Datasets/Speech/LibriSpeech/tfrecords-test optimizer_config: warmup_steps: 40000 diff --git a/examples/conformer/train_subword_conformer.py b/examples/conformer/train_subword_conformer.py index c6a54a1339..5ff20b070c 100644 --- a/examples/conformer/train_subword_conformer.py +++ b/examples/conformer/train_subword_conformer.py @@ -59,7 +59,7 @@ strategy = setup_strategy(args.devices) from tensorflow_asr.configs.config import Config -from tensorflow_asr.datasets.asr_dataset import ASRTFRecordDataset, ASRSliceDataset +from tensorflow_asr.datasets.asr_dataset import TFASRTFRecordDataset, ASRSliceDataset from tensorflow_asr.featurizers.speech_featurizers import TFSpeechFeaturizer from tensorflow_asr.featurizers.text_featurizers import SubwordFeaturizer, SentencePieceFeaturizer from tensorflow_asr.runners.transducer_runners import TransducerTrainer @@ -84,7 +84,7 @@ text_featurizer.save_to_file(args.subwords) if args.tfrecords: - train_dataset = ASRTFRecordDataset( + train_dataset = TFASRTFRecordDataset( data_paths=config.learning_config.dataset_config.train_paths, tfrecords_dir=config.learning_config.dataset_config.tfrecords_dir, speech_featurizer=speech_featurizer, @@ -94,7 +94,7 @@ stage="train", cache=args.cache, shuffle=True, buffer_size=args.bfs, ) - eval_dataset = ASRTFRecordDataset( + eval_dataset = TFASRTFRecordDataset( data_paths=config.learning_config.dataset_config.eval_paths, tfrecords_dir=config.learning_config.dataset_config.tfrecords_dir, tfrecords_shards=args.tfrecords_shards, diff --git a/scripts/create_tfrecords.py b/scripts/create_tfrecords.py index 509d6b891b..ab34ee9ba4 100644 --- a/scripts/create_tfrecords.py +++ b/scripts/create_tfrecords.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import argparse +from tensorflow_asr.configs.config import Config from tensorflow_asr.utils.utils import preprocess_paths from tensorflow_asr.datasets.asr_dataset import ASRTFRecordDataset +from tensorflow_asr.featurizers.text_featurizers import SubwordFeaturizer modes = ["train", "eval", "test"] @@ -22,12 +25,16 @@ parser.add_argument("--mode", "-m", type=str, default=None, help=f"Mode in {modes}") +parser.add_argument("--config", type=str, default=None, help="The file path of model configuration file") + parser.add_argument("--tfrecords_dir", type=str, default=None, help="Directory to tfrecords") parser.add_argument("--tfrecords_shards", type=int, default=16, help="Number of tfrecords shards") parser.add_argument("--shuffle", default=False, action="store_true", help="Shuffle data or not") +parser.add_argument("--subwords", type=str, default=None, help="Path to file that stores generated subwords") + parser.add_argument("transcripts", nargs="+", type=str, default=None, help="Paths to transcript files") args = parser.parse_args() @@ -37,8 +44,15 @@ transcripts = preprocess_paths(args.transcripts) tfrecords_dir = preprocess_paths(args.tfrecords_dir) +config = Config(args.config) +if args.subwords and os.path.exists(args.subwords): + print("Loading subwords ...") + text_featurizer = SubwordFeaturizer.load_from_file(config.decoder_config, args.subwords) +else: + raise ValueError("subwords must be set") + ASRTFRecordDataset( data_paths=transcripts, tfrecords_dir=tfrecords_dir, - speech_featurizer=None, text_featurizer=None, + speech_featurizer=None, text_featurizer=text_featurizer, stage=args.mode, shuffle=args.shuffle, tfrecords_shards=args.tfrecords_shards ).create_tfrecords() diff --git a/tensorflow_asr/augmentations/augments.py b/tensorflow_asr/augmentations/augments.py index 1cd45473d8..0874eb803a 100755 --- a/tensorflow_asr/augmentations/augments.py +++ b/tensorflow_asr/augmentations/augments.py @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tensorflow as tf import nlpaug.flow as naf from .signal_augment import SignalCropping, SignalLoudness, SignalMask, SignalNoise, \ SignalPitch, SignalShift, SignalSpeed, SignalVtlp -from .spec_augment import FreqMasking, TimeMasking +from .spec_augment import FreqMasking, TimeMasking, TFFreqMasking, TFTimeMasking AUGMENTATIONS = { @@ -32,12 +33,34 @@ "vtlp": SignalVtlp } +TFAUGMENTATIONS = { + "freq_masking": TFFreqMasking, + "time_masking": TFTimeMasking, +} + + +class TFAugmentationExecutor: + def __init__(self, augmentations: list): + self.augmentations = augmentations + + @tf.function + def augment(self, inputs): + outputs = inputs + for au in self.augmentations: + outputs = au.augment(outputs) + return outputs + class Augmentation: def __init__(self, config: dict = None): if not config: config = {} - self.before = self.parse(config.get("before", {})) - self.after = self.parse(config.get("after", {})) + use_tf = config.get("use_tf", False) + if use_tf: + self.before = self.tf_parse(config.get("before", {})) + self.after = self.tf_parse(config.get("after", {})) + else: + self.before = self.parse(config.get("before", {})) + self.after = self.parse(config.get("after", {})) @staticmethod def parse(config: dict) -> list: @@ -50,3 +73,15 @@ def parse(config: dict) -> list: aug = au(**value) if value is not None else au() augmentations.append(aug) return naf.Sometimes(augmentations) + + @staticmethod + def tf_parse(config: dict) -> list: + augmentations = [] + for key, value in config.items(): + au = TFAUGMENTATIONS.get(key, None) + if au is None: + raise KeyError(f"No tf augmentation named: {key}\n" + f"Available tf augmentations: {TFAUGMENTATIONS.keys()}") + aug = au(**value) if value is not None else au() + augmentations.append(aug) + return TFAugmentationExecutor(augmentations) diff --git a/tensorflow_asr/augmentations/spec_augment.py b/tensorflow_asr/augmentations/spec_augment.py index 25d6996b70..9e1f68726d 100755 --- a/tensorflow_asr/augmentations/spec_augment.py +++ b/tensorflow_asr/augmentations/spec_augment.py @@ -11,14 +11,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + """ Augmentation on spectrogram: http://arxiv.org/abs/1904.08779 """ + import numpy as np +import tensorflow as tf from nlpaug.flow import Sequential from nlpaug.util import Action from nlpaug.model.spectrogram import Spectrogram from nlpaug.augmenter.spectrogram import SpectrogramAugmenter +from ..utils.utils import shape_list + # ---------------------------- FREQ MASKING ---------------------------- @@ -75,6 +80,35 @@ def __init__(self, def substitute(self, data): return self.flow.augment(data) + +class TFFreqMasking: + def __init__(self, num_masks: int = 1, mask_factor: float = 27): + self.num_masks = num_masks + self.mask_factor = mask_factor + + @tf.function + def augment(self, spectrogram: tf.Tensor): + """ + Masking the frequency channels (shape[1]) + Args: + spectrogram: shape (T, num_feature_bins, V) + Returns: + frequency masked spectrogram + """ + T, F, V = shape_list(spectrogram, out_type=tf.int32) + for _ in range(self.num_masks): + f = tf.random.uniform([], minval=0, maxval=self.mask_factor, dtype=tf.int32) + f = tf.minimum(f, F) + f0 = tf.random.uniform([], minval=0, maxval=(F - f), dtype=tf.int32) + mask = tf.concat([ + tf.ones([T, f0, V], dtype=spectrogram.dtype), + tf.zeros([T, f, V], dtype=spectrogram.dtype), + tf.ones([T, F - f0 - f, V], dtype=spectrogram.dtype) + ], axis=1) + spectrogram = spectrogram * mask + return spectrogram + + # ---------------------------- TIME MASKING ---------------------------- @@ -101,9 +135,8 @@ def mask(self, data: np.ndarray) -> np.ndarray: """ spectrogram = data.copy() time = np.random.randint(0, self.mask_factor + 1) - time = min(time, spectrogram.shape[0]) - time0 = np.random.randint(0, spectrogram.shape[0] - time + 1) time = min(time, int(self.p_upperbound * spectrogram.shape[0])) + time0 = np.random.randint(0, spectrogram.shape[0] - time + 1) spectrogram[time0:time0 + time, :, :] = 0 return spectrogram @@ -139,3 +172,32 @@ def __init__(self, def substitute(self, data): return self.flow.augment(data) + + +class TFTimeMasking: + def __init__(self, num_masks: int = 1, mask_factor: float = 100, p_upperbound: float = 1.0): + self.num_masks = num_masks + self.mask_factor = mask_factor + self.p_upperbound = p_upperbound + + @tf.function + def augment(self, spectrogram: tf.Tensor): + """ + Masking the time channel (shape[0]) + Args: + spectrogram: shape (T, num_feature_bins, V) + Returns: + frequency masked spectrogram + """ + T, F, V = shape_list(spectrogram, out_type=tf.int32) + for _ in range(self.num_masks): + t = tf.random.uniform([], minval=0, maxval=self.mask_factor, dtype=tf.int32) + t = tf.minimum(t, tf.cast(tf.cast(T, dtype=tf.float32) * self.p_upperbound, dtype=tf.int32)) + t0 = tf.random.uniform([], minval=0, maxval=(T - t), dtype=tf.int32) + mask = tf.concat([ + tf.ones([t0, F, V], dtype=spectrogram.dtype), + tf.zeros([t, F, V], dtype=spectrogram.dtype), + tf.ones([T - t0 - t, F, V], dtype=spectrogram.dtype) + ], axis=0) + spectrogram = spectrogram * mask + return spectrogram diff --git a/tensorflow_asr/datasets/asr_dataset.py b/tensorflow_asr/datasets/asr_dataset.py index e0d9832e74..e42da623ad 100755 --- a/tensorflow_asr/datasets/asr_dataset.py +++ b/tensorflow_asr/datasets/asr_dataset.py @@ -16,12 +16,13 @@ import multiprocessing import os +import librosa import numpy as np import tensorflow as tf from ..augmentations.augments import Augmentation from .base_dataset import BaseDataset, BUFFER_SIZE -from ..featurizers.speech_featurizers import read_raw_audio, SpeechFeaturizer +from ..featurizers.speech_featurizers import read_raw_audio, tf_read_raw_audio, SpeechFeaturizer from ..featurizers.text_featurizers import TextFeaturizer from ..utils.utils import bytestring_feature, print_one_line, get_num_batches @@ -29,22 +30,18 @@ TFRECORD_SHARDS = 16 -def to_tfrecord(path, audio, transcript): - feature = { - "path": bytestring_feature([path]), - "audio": bytestring_feature([audio]), - "transcript": bytestring_feature([transcript]) - } - return tf.train.Example(features=tf.train.Features(feature=feature)) - - def write_tfrecord_file(splitted_entries): shard_path, entries = splitted_entries with tf.io.TFRecordWriter(shard_path, options='ZLIB') as out: - for audio_file, _, transcript in entries: - with open(audio_file, "rb") as f: + for audio_file, _, indices in entries: + with tf.io.gfile.GFile(audio_file, "rb") as f: audio = f.read() - example = to_tfrecord(bytes(audio_file, "utf-8"), audio, bytes(transcript, "utf-8")) + feature = { + "path": bytestring_feature([bytes(audio_file, "utf-8")]), + "audio": bytestring_feature([audio]), + "indices": bytestring_feature([bytes(indices, "utf-8")]) + } + example = tf.train.Example(features=tf.train.Features(feature=feature)) out.write(example.SerializeToString()) print_one_line("Processed:", audio_file) print(f"\nCreated {shard_path}") @@ -59,10 +56,12 @@ def __init__(self, augmentations: Augmentation = Augmentation(None), cache: bool = False, shuffle: bool = False, + use_tf: bool = False, buffer_size: int = BUFFER_SIZE): super(ASRDataset, self).__init__( data_paths=data_paths, augmentations=augmentations, - cache=cache, shuffle=shuffle, stage=stage, buffer_size=buffer_size + cache=cache, shuffle=shuffle, stage=stage, buffer_size=buffer_size, + use_tf=use_tf ) self.speech_featurizer = speech_featurizer self.text_featurizer = text_featurizer @@ -78,22 +77,57 @@ def read_entries(self): # The files is "\t" seperated lines = [line.split("\t", 2) for line in lines] lines = np.array(lines) + for i, line in enumerate(lines): + filepath = f"{os.path.splitext(line[0])[0]}.wav", + librosa.output.write_wav( + filepath, + read_raw_audio(line[0], self.speech_featurizer.sample_rate), + self.speech_featurizer.sample_rate + ) + lines[i][0] = filepath + indices = " ".join([str(x) for x in self.text_featurizer.extract(line[-1]).numpy()]) + lines[i][-1] = indices if self.shuffle: np.random.shuffle(lines) # Mix transcripts.tsv self.total_steps = len(lines) return lines - def preprocess(self, audio, transcript): + def preprocess(self, audio, indices): + def fn(_audio, _indices): + with tf.device("/CPU:0"): + signal = read_raw_audio(_audio, self.speech_featurizer.sample_rate) + + signal = self.augmentations.before.augment(signal) + + features = self.speech_featurizer.extract(signal) + + features = self.augmentations.after.augment(features) + + label = tf.strings.to_number(tf.strings.split(_indices), out_type=tf.int32) + label_length = tf.cast(tf.shape(label)[0], tf.int32) + prediction = self.text_featurizer.prepand_blank(label) + prediction_length = tf.cast(tf.shape(prediction)[0], tf.int32) + features = tf.convert_to_tensor(features, tf.float32) + input_length = tf.cast(tf.shape(features)[0], tf.int32) + + return features, input_length, label, label_length, prediction, prediction_length + + return tf.numpy_function( + fn, inp=[audio, indices], + Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] + ) + + def tf_preprocess(self, audio, indices): with tf.device("/CPU:0"): - signal = read_raw_audio(audio, self.speech_featurizer.sample_rate) + signal = tf_read_raw_audio(audio, self.speech_featurizer.sample_rate) signal = self.augmentations.before.augment(signal) - features = self.speech_featurizer.extract(signal) + features = self.speech_featurizer.tf_extract(signal) features = self.augmentations.after.augment(features) - label = self.text_featurizer.extract(transcript.decode("utf-8")) + label = tf.strings.to_number(tf.strings.split(indices), out_type=tf.int32) label_length = tf.cast(tf.shape(label)[0], tf.int32) prediction = self.text_featurizer.prepand_blank(label) prediction_length = tf.cast(tf.shape(prediction)[0], tf.int32) @@ -157,10 +191,12 @@ def __init__(self, tfrecords_shards: int = TFRECORD_SHARDS, cache: bool = False, shuffle: bool = False, + use_tf: bool = False, buffer_size: int = BUFFER_SIZE): super(ASRTFRecordDataset, self).__init__( stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, - data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, buffer_size=buffer_size + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, buffer_size=buffer_size, + use_tf=use_tf ) self.tfrecords_dir = tfrecords_dir if tfrecords_shards <= 0: raise ValueError("tfrecords_shards must be positive") @@ -198,15 +234,14 @@ def parse(self, record): feature_description = { "path": tf.io.FixedLenFeature([], tf.string), "audio": tf.io.FixedLenFeature([], tf.string), - "transcript": tf.io.FixedLenFeature([], tf.string) + "indices": tf.io.FixedLenFeature([], tf.string) } example = tf.io.parse_single_example(record, feature_description) - return tf.numpy_function( - self.preprocess, - inp=[example["audio"], example["transcript"]], - Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] - ) + if self.use_tf: + return self.tf_preprocess(example["audio"], example["indices"]) + + return self.preprocess(example["audio"], example["indices"]) def create(self, batch_size): # Create TFRecords dataset @@ -226,8 +261,8 @@ def create(self, batch_size): class ASRSliceDataset(ASRDataset): """ Dataset for ASR using Slice """ - def preprocess(self, path, transcript): - return super(ASRSliceDataset, self).preprocess(path.decode("utf-8"), transcript) + def preprocess(self, path, indices): + return super(ASRSliceDataset, self).preprocess(path.decode("utf-8"), indices) @tf.function def parse(self, record): @@ -246,7 +281,7 @@ def create(self, batch_size): class ASRTFRecordTestDataset(ASRTFRecordDataset): - def preprocess(self, path, audio, transcript): + def preprocess(self, path, audio, indices): with tf.device("/CPU:0"): signal = read_raw_audio(audio, self.speech_featurizer.sample_rate) @@ -254,8 +289,7 @@ def preprocess(self, path, audio, transcript): features = tf.convert_to_tensor(features, tf.float32) input_length = tf.cast(tf.shape(features)[0], tf.int32) - label = self.text_featurizer.extract(transcript.decode("utf-8")) - label = tf.convert_to_tensor(label, dtype=tf.int32) + label = tf.strings.to_number(tf.strings.split(indices), out_type=tf.int32) return path, features, input_length, label @@ -264,13 +298,13 @@ def parse(self, record): feature_description = { "path": tf.io.FixedLenFeature([], tf.string), "audio": tf.io.FixedLenFeature([], tf.string), - "transcript": tf.io.FixedLenFeature([], tf.string) + "indices": tf.io.FixedLenFeature([], tf.string) } example = tf.io.parse_single_example(record, feature_description) return tf.numpy_function( self.preprocess, - inp=[example["path"], example["audio"], example["transcript"]], + inp=[example["path"], example["audio"], example["indices"]], Tout=[tf.string, tf.float32, tf.int32, tf.int32] ) diff --git a/tensorflow_asr/datasets/base_dataset.py b/tensorflow_asr/datasets/base_dataset.py index d5e018de1a..765c697df5 100644 --- a/tensorflow_asr/datasets/base_dataset.py +++ b/tensorflow_asr/datasets/base_dataset.py @@ -27,6 +27,7 @@ def __init__(self, cache: bool = False, shuffle: bool = False, buffer_size: int = BUFFER_SIZE, + use_tf: bool = False, stage: str = "train"): self.data_paths = data_paths self.augmentations = augmentations # apply augmentation @@ -36,6 +37,7 @@ def __init__(self, raise ValueError("buffer_size must be positive when shuffle is on") self.buffer_size = buffer_size # shuffle buffer size self.stage = stage # for defining tfrecords files + self.use_tf = use_tf # whether to use only pure tf in the dataset pipeline self.total_steps = None # for better training visualization @abc.abstractmethod diff --git a/tensorflow_asr/datasets/keras/asr_dataset.py b/tensorflow_asr/datasets/keras/asr_dataset.py index a7cb310c08..3d335d3e02 100644 --- a/tensorflow_asr/datasets/keras/asr_dataset.py +++ b/tensorflow_asr/datasets/keras/asr_dataset.py @@ -17,11 +17,11 @@ import tensorflow as tf import numpy as np -from ..asr_dataset import ASRDataset, AUTOTUNE, TFRECORD_SHARDS, write_tfrecord_file +from ..asr_dataset import ASRDataset, AUTOTUNE, TFRECORD_SHARDS from ..base_dataset import BUFFER_SIZE -from ...featurizers.speech_featurizers import SpeechFeaturizer +from ...featurizers.speech_featurizers import SpeechFeaturizer, read_raw_audio, tf_read_raw_audio from ...featurizers.text_featurizers import TextFeaturizer -from ...utils.utils import get_num_batches +from ...utils.utils import get_num_batches, bytestring_feature, print_one_line from ...augmentations.augments import Augmentation @@ -95,6 +95,23 @@ def __init__(self, if not tf.io.gfile.exists(self.tfrecords_dir): tf.io.gfile.makedirs(self.tfrecords_dir) + def write_tfrecord_file(self, splitted_entries): + shard_path, entries = splitted_entries + with tf.io.TFRecordWriter(shard_path, options='ZLIB') as out: + for audio_file, _, transcript in entries: + with tf.io.gfile.GFile(audio_file, mode="rb") as f: + audio = f.read() + indices = " ".join([str(x) for x in self.text_featurizer.extract(transcript).numpy()]) + feature = { + "path": bytestring_feature([bytes(audio_file, "utf-8")]), + "audio": bytestring_feature([audio]), + "indices": bytestring_feature([bytes(indices, "utf-8")]) + } + example = tf.train.Example(features=tf.train.Features(feature=feature)) + out.write(example.SerializeToString()) + print_one_line("Processed:", audio_file) + print(f"\nCreated {shard_path}") + def create_tfrecords(self): if not tf.io.gfile.exists(self.tfrecords_dir): tf.io.gfile.makedirs(self.tfrecords_dir) @@ -116,23 +133,42 @@ def get_shard_path(shard_id): splitted_entries = np.array_split(entries, self.tfrecords_shards) with multiprocessing.Pool(self.tfrecords_shards) as pool: - pool.map(write_tfrecord_file, zip(shards, splitted_entries)) + pool.map(self.write_tfrecord_file, zip(shards, splitted_entries)) return True + def preprocess(self, audio, indices): + with tf.device("/CPU:0"): + signal = read_raw_audio(audio, self.speech_featurizer.sample_rate) + + signal = self.augmentations.before.augment(signal) + + features = self.speech_featurizer.extract(signal) + + features = self.augmentations.after.augment(features) + + label = tf.strings.to_number(tf.strings.split(indices), out_type=tf.int32) + label_length = tf.cast(tf.shape(label)[0], tf.int32) + prediction = self.text_featurizer.prepand_blank(label) + prediction_length = tf.cast(tf.shape(prediction)[0], tf.int32) + features = tf.convert_to_tensor(features, tf.float32) + input_length = tf.cast(tf.shape(features)[0], tf.int32) + + return features, input_length, label, label_length, prediction, prediction_length + @tf.function def parse(self, record): feature_description = { "path": tf.io.FixedLenFeature([], tf.string), "audio": tf.io.FixedLenFeature([], tf.string), - "transcript": tf.io.FixedLenFeature([], tf.string) + "indices": tf.io.FixedLenFeature([], tf.string) } example = tf.io.parse_single_example(record, feature_description) features, input_length, label, label_length, \ prediction, prediction_length = tf.numpy_function( self.preprocess, - inp=[example["audio"], example["transcript"]], + inp=[example["audio"], example["indices"]], Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] ) @@ -164,6 +200,52 @@ def create(self, batch_size): return self.process(dataset, batch_size) +class TFASRTFRecordDatasetKeras(ASRDatasetKeras): + def preprocess(self, audio, indices): + with tf.device("/CPU:0"): + signal = tf_read_raw_audio(audio, self.speech_featurizer.sample_rate) + + signal = self.augmentations.before.augment(signal) + + features = self.speech_featurizer.tf_extract(signal) + + features = self.augmentations.after.augment(features) + + label = tf.strings.to_number(tf.strings.split(indices), out_type=tf.int32) + label_length = tf.cast(tf.shape(label)[0], tf.int32) + prediction = self.text_featurizer.prepand_blank(label) + prediction_length = tf.cast(tf.shape(prediction)[0], tf.int32) + features = tf.convert_to_tensor(features, tf.float32) + input_length = tf.cast(tf.shape(features)[0], tf.int32) + + return features, input_length, label, label_length, prediction, prediction_length + + @tf.function + def parse(self, record): + feature_description = { + "path": tf.io.FixedLenFeature([], tf.string), + "audio": tf.io.FixedLenFeature([], tf.string), + "indices": tf.io.FixedLenFeature([], tf.string) + } + example = tf.io.parse_single_example(record, feature_description) + + features, input_length, label, label_length, \ + prediction, prediction_length = self.preprocess([example["audio"], example["indices"]]) + + return ( + { + "input": features, + "input_length": input_length, + "prediction": prediction, + "prediction_length": prediction_length + }, + { + "label": label, + "label_length": label_length + } + ) + + class ASRSliceDatasetKeras(ASRDatasetKeras): """ Keras Dataset for ASR using Slice """ diff --git a/tensorflow_asr/featurizers/speech_featurizers.py b/tensorflow_asr/featurizers/speech_featurizers.py index f8ffaf83f7..6726cfe798 100755 --- a/tensorflow_asr/featurizers/speech_featurizers.py +++ b/tensorflow_asr/featurizers/speech_featurizers.py @@ -40,6 +40,11 @@ def read_raw_audio(audio, sample_rate=16000): return wave +def tf_read_raw_audio(audio: tf.Tensor, sample_rate=16000): + wave, _ = tf.audio.decode_wav(audio, desired_channels=1, desired_samples=sample_rate) + return tf.squeeze(wave, axis=-1) + + def slice_signal(signal, window_size, stride=0.5) -> np.ndarray: """ Return windows of the given signal by sweeping in stride fractions of window """ assert signal.ndim == 1, signal.ndim diff --git a/tensorflow_asr/losses/rnnt_losses.py b/tensorflow_asr/losses/rnnt_losses.py index bebdc5b536..a598dc6365 100644 --- a/tensorflow_asr/losses/rnnt_losses.py +++ b/tensorflow_asr/losses/rnnt_losses.py @@ -208,7 +208,7 @@ def compute_rnnt_loss_and_grad_helper(logits, labels, label_length, logit_length a = tf.tile(tf.reshape(tf.range(target_max_len - 1, dtype=tf.int64), shape=(1, 1, target_max_len - 1, 1)), multiples=[batch_size, 1, 1, 1]) b = tf.cast(tf.reshape(labels - 1, shape=(batch_size, 1, target_max_len - 1, 1)), dtype=tf.int64) - # b = tf.where(tf.equal(b, -1), tf.zeros_like(b), b) # for cpu testing (index -1 on cpu will raise errors) + b = tf.where(tf.equal(b, -1), tf.zeros_like(b), b) # for cpu testing (index -1 on cpu will raise errors) c = tf.concat([a, b], axis=3) d = tf.tile(c, multiples=(1, input_max_len, 1, 1)) e = tf.tile(tf.reshape(tf.range(input_max_len, dtype=tf.int64), shape=(1, input_max_len, 1, 1)), diff --git a/tensorflow_asr/utils/utils.py b/tensorflow_asr/utils/utils.py index ff27ef7d2d..cbd8b7aa27 100755 --- a/tensorflow_asr/utils/utils.py +++ b/tensorflow_asr/utils/utils.py @@ -103,10 +103,10 @@ def read_bytes(path: str) -> tf.Tensor: return tf.convert_to_tensor(content, dtype=tf.string) -def shape_list(x): +def shape_list(x, out_type=tf.int32): """Deal with dynamic shape in tensorflow cleanly.""" static = x.shape.as_list() - dynamic = tf.shape(x) + dynamic = tf.shape(x, out_type=out_type) return [dynamic[i] if s is None else s for i, s in enumerate(static)] From 087179ac4d0e5b5999ae23d213bb8eace7fe046b Mon Sep 17 00:00:00 2001 From: Huy Le Nguyen Date: Thu, 28 Jan 2021 01:51:38 +0700 Subject: [PATCH 2/4] :writing_hand: initialize data pipeline in pure tf --- tensorflow_asr/datasets/keras/asr_dataset.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tensorflow_asr/datasets/keras/asr_dataset.py b/tensorflow_asr/datasets/keras/asr_dataset.py index 3d335d3e02..efa82aebb3 100644 --- a/tensorflow_asr/datasets/keras/asr_dataset.py +++ b/tensorflow_asr/datasets/keras/asr_dataset.py @@ -200,7 +200,7 @@ def create(self, batch_size): return self.process(dataset, batch_size) -class TFASRTFRecordDatasetKeras(ASRDatasetKeras): +class TFASRTFRecordDatasetKeras(ASRTFRecordDatasetKeras): def preprocess(self, audio, indices): with tf.device("/CPU:0"): signal = tf_read_raw_audio(audio, self.speech_featurizer.sample_rate) @@ -279,3 +279,6 @@ def create(self, batch_size): entries = np.delete(entries, 1, 1) # Remove unused duration dataset = tf.data.Dataset.from_tensor_slices(entries) return self.process(dataset, batch_size) + + +class TFASRSliceDatasetKeras(ASRSliceDatasetKeras): From 9d5f5260d6f6f2eb516569072f2ba2b8847b8d09 Mon Sep 17 00:00:00 2001 From: Huy Le Nguyen Date: Fri, 29 Jan 2021 02:10:56 +0700 Subject: [PATCH 3/4] :writing_hand: update datasets --- .../train_keras_subword_conformer.py | 6 +- setup.py | 1 + tensorflow_asr/datasets/__init__.py | 4 +- tensorflow_asr/datasets/asr_dataset.py | 260 ++++------------- tensorflow_asr/datasets/base_dataset.py | 2 + tensorflow_asr/datasets/keras/__init__.py | 4 +- tensorflow_asr/datasets/keras/asr_dataset.py | 273 +++++------------- .../featurizers/speech_featurizers.py | 6 + tensorflow_asr/losses/rnnt_losses.py | 2 +- tensorflow_asr/models/keras/transducer.py | 14 +- tensorflow_asr/runners/base_runners.py | 2 +- tensorflow_asr/runners/ctc_runners.py | 6 +- tensorflow_asr/runners/transducer_runners.py | 6 +- 13 files changed, 174 insertions(+), 412 deletions(-) diff --git a/examples/conformer/train_keras_subword_conformer.py b/examples/conformer/train_keras_subword_conformer.py index 2c87fa6da7..0ea4e3461f 100644 --- a/examples/conformer/train_keras_subword_conformer.py +++ b/examples/conformer/train_keras_subword_conformer.py @@ -59,7 +59,7 @@ strategy = setup_strategy(args.devices) from tensorflow_asr.configs.config import Config -from tensorflow_asr.datasets.keras import ASRTFRecordDatasetKeras, ASRSliceDatasetKeras +from tensorflow_asr.datasets.keras import ASRTFRecordDatasetKeras, ASRDatasetKeras from tensorflow_asr.featurizers.speech_featurizers import TFSpeechFeaturizer from tensorflow_asr.featurizers.text_featurizers import SubwordFeaturizer, SentencePieceFeaturizer from tensorflow_asr.models.keras.conformer import Conformer @@ -103,7 +103,7 @@ shuffle=True, buffer_size=args.bfs, ) else: - train_dataset = ASRSliceDatasetKeras( + train_dataset = ASRDatasetKeras( data_paths=config.learning_config.dataset_config.train_paths, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, @@ -111,7 +111,7 @@ stage="train", cache=args.cache, shuffle=True, buffer_size=args.bfs, ) - eval_dataset = ASRSliceDatasetKeras( + eval_dataset = ASRDatasetKeras( data_paths=config.learning_config.dataset_config.eval_paths, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, diff --git a/setup.py b/setup.py index f7a9c54213..fa3d2c0ada 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ requirements = [ "tensorflow-datasets>=3.2.1,<4.0.0", "tensorflow-addons>=0.10.0", + "tensorflow-io>=0.17.0", "setuptools>=47.1.1", "librosa>=0.8.0", "soundfile>=0.10.3", diff --git a/tensorflow_asr/datasets/__init__.py b/tensorflow_asr/datasets/__init__.py index 18f3bb5640..f5f8a8a1e8 100644 --- a/tensorflow_asr/datasets/__init__.py +++ b/tensorflow_asr/datasets/__init__.py @@ -13,5 +13,5 @@ # limitations under the License. from .base_dataset import BaseDataset -from .asr_dataset import ASRTFRecordDataset, ASRSliceDataset, ASRTFRecordTestDataset, ASRSliceTestDataset -__all__ = ['BaseDataset', 'ASRTFRecordDataset', 'ASRSliceDataset', 'ASRTFRecordTestDataset', 'ASRSliceTestDataset'] +from .asr_dataset import ASRDataset, ASRTFRecordDataset, ASRSliceDataset +__all__ = ['BaseDataset', 'ASRDataset', 'ASRTFRecordDataset', 'ASRSliceDataset'] diff --git a/tensorflow_asr/datasets/asr_dataset.py b/tensorflow_asr/datasets/asr_dataset.py index e42da623ad..3a47f16368 100755 --- a/tensorflow_asr/datasets/asr_dataset.py +++ b/tensorflow_asr/datasets/asr_dataset.py @@ -12,17 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import abc import multiprocessing import os -import librosa import numpy as np import tensorflow as tf from ..augmentations.augments import Augmentation from .base_dataset import BaseDataset, BUFFER_SIZE -from ..featurizers.speech_featurizers import read_raw_audio, tf_read_raw_audio, SpeechFeaturizer +from ..featurizers.speech_featurizers import load_and_convert_to_wav, read_raw_audio, tf_read_raw_audio, SpeechFeaturizer from ..featurizers.text_featurizers import TextFeaturizer from ..utils.utils import bytestring_feature, print_one_line, get_num_batches @@ -33,21 +31,21 @@ def write_tfrecord_file(splitted_entries): shard_path, entries = splitted_entries with tf.io.TFRecordWriter(shard_path, options='ZLIB') as out: - for audio_file, _, indices in entries: - with tf.io.gfile.GFile(audio_file, "rb") as f: - audio = f.read() + for path, audio, indices in entries: feature = { - "path": bytestring_feature([bytes(audio_file, "utf-8")]), + "path": bytestring_feature([bytes(path, "utf-8")]), "audio": bytestring_feature([audio]), "indices": bytestring_feature([bytes(indices, "utf-8")]) } example = tf.train.Example(features=tf.train.Features(feature=feature)) out.write(example.SerializeToString()) - print_one_line("Processed:", audio_file) + print_one_line("Processed:", path) print(f"\nCreated {shard_path}") class ASRDataset(BaseDataset): + """ Dataset for ASR using Generator """ + def __init__(self, stage: str, speech_featurizer: SpeechFeaturizer, @@ -57,43 +55,39 @@ def __init__(self, cache: bool = False, shuffle: bool = False, use_tf: bool = False, + drop_remainder: bool = True, buffer_size: int = BUFFER_SIZE): super(ASRDataset, self).__init__( data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, stage=stage, buffer_size=buffer_size, - use_tf=use_tf + use_tf=use_tf, drop_remainder=drop_remainder ) self.speech_featurizer = speech_featurizer self.text_featurizer = text_featurizer def read_entries(self): - lines = [] + self.lines = [] for file_path in self.data_paths: print(f"Reading {file_path} ...") with tf.io.gfile.GFile(file_path, "r") as f: temp_lines = f.read().splitlines() # Skip the header of tsv file - lines += temp_lines[1:] + self.lines += temp_lines[1:] # The files is "\t" seperated - lines = [line.split("\t", 2) for line in lines] - lines = np.array(lines) - for i, line in enumerate(lines): - filepath = f"{os.path.splitext(line[0])[0]}.wav", - librosa.output.write_wav( - filepath, - read_raw_audio(line[0], self.speech_featurizer.sample_rate), - self.speech_featurizer.sample_rate - ) - lines[i][0] = filepath - indices = " ".join([str(x) for x in self.text_featurizer.extract(line[-1]).numpy()]) - lines[i][-1] = indices - if self.shuffle: - np.random.shuffle(lines) # Mix transcripts.tsv - self.total_steps = len(lines) - return lines + self.lines = [line.split("\t", 2) for line in self.lines] + self.lines = np.array(self.lines) + for i, line in enumerate(self.lines): + self.lines[i][-1] = " ".join([str(x) for x in self.text_featurizer.extract(line[-1]).numpy()]) + if self.shuffle: np.random.shuffle(self.lines) # Mix transcripts.tsv + self.total_steps = len(self.lines) + + def generator(self): + for path, _, indices in self.lines: + audio = load_and_convert_to_wav(path) + yield path, audio, indices - def preprocess(self, audio, indices): - def fn(_audio, _indices): + def preprocess(self, path, audio, indices): + def fn(_path, _audio, _indices): with tf.device("/CPU:0"): signal = read_raw_audio(_audio, self.speech_featurizer.sample_rate) @@ -110,14 +104,14 @@ def fn(_audio, _indices): features = tf.convert_to_tensor(features, tf.float32) input_length = tf.cast(tf.shape(features)[0], tf.int32) - return features, input_length, label, label_length, prediction, prediction_length + return _path, features, input_length, label, label_length, prediction, prediction_length return tf.numpy_function( - fn, inp=[audio, indices], - Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] + fn, inp=[path, audio, indices], + Tout=[tf.string, tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] ) - def tf_preprocess(self, audio, indices): + def tf_preprocess(self, path, audio, indices): with tf.device("/CPU:0"): signal = tf_read_raw_audio(audio, self.speech_featurizer.sample_rate) @@ -134,7 +128,7 @@ def tf_preprocess(self, audio, indices): features = tf.convert_to_tensor(features, tf.float32) input_length = tf.cast(tf.shape(features)[0], tf.int32) - return features, input_length, label, label_length, prediction, prediction_length + return path, features, input_length, label, label_length, prediction, prediction_length def process(self, dataset, batch_size): dataset = dataset.map(self.parse, num_parallel_calls=AUTOTUNE) @@ -149,6 +143,7 @@ def process(self, dataset, batch_size): dataset = dataset.padded_batch( batch_size=batch_size, padded_shapes=( + tf.TensorShape([]), tf.TensorShape(self.speech_featurizer.shape), tf.TensorShape([]), tf.TensorShape([None]), @@ -156,8 +151,8 @@ def process(self, dataset, batch_size): tf.TensorShape([None]), tf.TensorShape([]), ), - padding_values=(0., 0, self.text_featurizer.blank, 0, self.text_featurizer.blank, 0), - drop_remainder=True + padding_values=("", 0., 0, self.text_featurizer.blank, 0, self.text_featurizer.blank, 0), + drop_remainder=self.drop_remainder ) # PREFETCH to improve speed of input length @@ -165,17 +160,24 @@ def process(self, dataset, batch_size): self.total_steps = get_num_batches(self.total_steps, batch_size) return dataset - @abc.abstractmethod - def parse(self, *args, **kwargs): + @tf.function + def parse(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): """ Returns: path, features, input_lengths, labels, label_lengths, pred_inp """ - raise NotImplementedError() - - @abc.abstractmethod - def create(self, batch_size): - raise NotImplementedError() + if self.use_tf: return self.tf_preprocess(path, audio, indices) + return self.preprocess(path, audio, indices) + + def create(self, batch_size: int): + self.read_entries() + if not self.total_steps or self.total_steps == 0: return None + dataset = tf.data.Dataset.from_generator( + self.generator, + output_types=(tf.string, tf.string, tf.string), + output_shapes=(tf.TensorShape([]), tf.TensorShape([]), tf.TensorShape([])) + ) + return self.process(dataset, batch_size) class ASRTFRecordDataset(ASRDataset): @@ -201,8 +203,7 @@ def __init__(self, self.tfrecords_dir = tfrecords_dir if tfrecords_shards <= 0: raise ValueError("tfrecords_shards must be positive") self.tfrecords_shards = tfrecords_shards - if not tf.io.gfile.exists(self.tfrecords_dir): - tf.io.gfile.makedirs(self.tfrecords_dir) + if not tf.io.gfile.exists(self.tfrecords_dir): tf.io.gfile.makedirs(self.tfrecords_dir) def create_tfrecords(self): if not tf.io.gfile.exists(self.tfrecords_dir): @@ -214,9 +215,9 @@ def create_tfrecords(self): print(f"Creating {self.stage}.tfrecord ...") - entries = self.read_entries() - if len(entries) <= 0: - return False + self.read_entries() + if not self.total_steps or self.total_steps == 0: return False + entries = np.fromiter(self.generator(), dtype=str) def get_shard_path(shard_id): return os.path.join(self.tfrecords_dir, f"{self.stage}_{shard_id}.tfrecord") @@ -230,21 +231,17 @@ def get_shard_path(shard_id): return True @tf.function - def parse(self, record): + def parse(self, record: tf.Tensor): feature_description = { "path": tf.io.FixedLenFeature([], tf.string), "audio": tf.io.FixedLenFeature([], tf.string), "indices": tf.io.FixedLenFeature([], tf.string) } example = tf.io.parse_single_example(record, feature_description) + if self.use_tf: return self.tf_preprocess(**example) + return self.preprocess(**example) - if self.use_tf: - return self.tf_preprocess(example["audio"], example["indices"]) - - return self.preprocess(example["audio"], example["indices"]) - - def create(self, batch_size): - # Create TFRecords dataset + def create(self, batch_size: int): have_data = self.create_tfrecords() if not have_data: return None @@ -261,147 +258,14 @@ def create(self, batch_size): class ASRSliceDataset(ASRDataset): """ Dataset for ASR using Slice """ - def preprocess(self, path, indices): - return super(ASRSliceDataset, self).preprocess(path.decode("utf-8"), indices) - - @tf.function - def parse(self, record): - return tf.numpy_function( - self.preprocess, - inp=[record[0], record[1]], - Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] - ) - - def create(self, batch_size): - entries = self.read_entries() - if len(entries) == 0: return None - entries = np.delete(entries, 1, 1) # Remove unused duration - dataset = tf.data.Dataset.from_tensor_slices(entries) - return self.process(dataset, batch_size) - - -class ASRTFRecordTestDataset(ASRTFRecordDataset): - def preprocess(self, path, audio, indices): - with tf.device("/CPU:0"): - signal = read_raw_audio(audio, self.speech_featurizer.sample_rate) - - features = self.speech_featurizer.extract(signal) - features = tf.convert_to_tensor(features, tf.float32) - input_length = tf.cast(tf.shape(features)[0], tf.int32) - - label = tf.strings.to_number(tf.strings.split(indices), out_type=tf.int32) - - return path, features, input_length, label - - @tf.function - def parse(self, record): - feature_description = { - "path": tf.io.FixedLenFeature([], tf.string), - "audio": tf.io.FixedLenFeature([], tf.string), - "indices": tf.io.FixedLenFeature([], tf.string) - } - example = tf.io.parse_single_example(record, feature_description) - - return tf.numpy_function( - self.preprocess, - inp=[example["path"], example["audio"], example["indices"]], - Tout=[tf.string, tf.float32, tf.int32, tf.int32] - ) - - def process(self, dataset, batch_size): - dataset = dataset.map(self.parse, num_parallel_calls=AUTOTUNE) - - if self.cache: - dataset = dataset.cache() - - if self.shuffle: - dataset = dataset.shuffle(self.buffer_size, reshuffle_each_iteration=True) - - # PADDED BATCH the dataset - dataset = dataset.padded_batch( - batch_size=batch_size, - padded_shapes=( - tf.TensorShape([]), - tf.TensorShape(self.speech_featurizer.shape), - tf.TensorShape([]), - tf.TensorShape([None]), - ), - padding_values=("", 0.0, 0, self.text_featurizer.blank), - drop_remainder=True - ) - - # PREFETCH to improve speed of input length - dataset = dataset.prefetch(AUTOTUNE) - self.total_steps = get_num_batches(self.total_steps, batch_size) - return dataset - - def create(self, batch_size): - # Create TFRecords dataset - have_data = self.create_tfrecords() - if not have_data: return None - - pattern = os.path.join(self.tfrecords_dir, f"{self.stage}*.tfrecord") - files_ds = tf.data.Dataset.list_files(pattern) - ignore_order = tf.data.Options() - ignore_order.experimental_deterministic = False - files_ds = files_ds.with_options(ignore_order) - dataset = tf.data.TFRecordDataset(files_ds, compression_type='ZLIB', num_parallel_reads=AUTOTUNE) - - return self.process(dataset, batch_size) - - -class ASRSliceTestDataset(ASRDataset): - def preprocess(self, path, transcript): - with tf.device("/CPU:0"): - signal = read_raw_audio(path.decode("utf-8"), self.speech_featurizer.sample_rate) - - features = self.speech_featurizer.extract(signal) - features = tf.convert_to_tensor(features, tf.float32) - input_length = tf.cast(tf.shape(features)[0], tf.int32) - - label = self.text_featurizer.extract(transcript.decode("utf-8")) - label = tf.convert_to_tensor(label, dtype=tf.int32) - - return path, features, input_length, label - - @tf.function - def parse(self, record): - return tf.numpy_function( - self.preprocess, - inp=[record[0], record[1]], - Tout=[tf.string, tf.float32, tf.int32, tf.int32] - ) - - def process(self, dataset, batch_size): - dataset = dataset.map(self.parse, num_parallel_calls=AUTOTUNE) - - if self.cache: - dataset = dataset.cache() - - if self.shuffle: - dataset = dataset.shuffle(self.buffer_size, reshuffle_each_iteration=True) - - # PADDED BATCH the dataset - dataset = dataset.padded_batch( - batch_size=batch_size, - padded_shapes=( - tf.TensorShape([]), - tf.TensorShape(self.speech_featurizer.shape), - tf.TensorShape([]), - tf.TensorShape([None]), - ), - padding_values=("", 0.0, 0, self.text_featurizer.blank), - drop_remainder=True - ) - - # PREFETCH to improve speed of input length - dataset = dataset.prefetch(AUTOTUNE) - self.total_steps = get_num_batches(self.total_steps, batch_size) - return dataset + @staticmethod + def load(record: tf.Tensor): + audio = load_and_convert_to_wav(record[0]) + return record[0], audio, record[2] - def create(self, batch_size): - entries = self.read_entries() - if len(entries) == 0: return None - entries = np.delete(entries, 1, 1) # Remove unused duration - dataset = tf.data.Dataset.from_tensor_slices(entries) + def create(self, batch_size: int): + self.read_entries() + if not self.total_steps or self.total_steps == 0: return None + dataset = tf.data.Dataset.from_tensor_slices(self.lines) + dataset = dataset.map(self.load, num_parallel_calls=AUTOTUNE) return self.process(dataset, batch_size) diff --git a/tensorflow_asr/datasets/base_dataset.py b/tensorflow_asr/datasets/base_dataset.py index 765c697df5..da8a0f6872 100644 --- a/tensorflow_asr/datasets/base_dataset.py +++ b/tensorflow_asr/datasets/base_dataset.py @@ -28,6 +28,7 @@ def __init__(self, shuffle: bool = False, buffer_size: int = BUFFER_SIZE, use_tf: bool = False, + drop_remainder: bool = True, stage: str = "train"): self.data_paths = data_paths self.augmentations = augmentations # apply augmentation @@ -38,6 +39,7 @@ def __init__(self, self.buffer_size = buffer_size # shuffle buffer size self.stage = stage # for defining tfrecords files self.use_tf = use_tf # whether to use only pure tf in the dataset pipeline + self.drop_remainder = drop_remainder # whether to drop remainder for multi gpu training self.total_steps = None # for better training visualization @abc.abstractmethod diff --git a/tensorflow_asr/datasets/keras/__init__.py b/tensorflow_asr/datasets/keras/__init__.py index c0a7bc02e1..5aee10fa36 100644 --- a/tensorflow_asr/datasets/keras/__init__.py +++ b/tensorflow_asr/datasets/keras/__init__.py @@ -12,5 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .asr_dataset import ASRTFRecordDatasetKeras, ASRSliceDatasetKeras -__all__ = ['ASRTFRecordDatasetKeras', 'ASRSliceDatasetKeras'] +from .asr_dataset import ASRDatasetKeras, ASRTFRecordDatasetKeras, ASRSliceDatasetKeras +__all__ = ['ASRDatasetKeras', 'ASRTFRecordDatasetKeras', 'ASRSliceDatasetKeras'] diff --git a/tensorflow_asr/datasets/keras/asr_dataset.py b/tensorflow_asr/datasets/keras/asr_dataset.py index efa82aebb3..207119fa98 100644 --- a/tensorflow_asr/datasets/keras/asr_dataset.py +++ b/tensorflow_asr/datasets/keras/asr_dataset.py @@ -12,20 +12,44 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import multiprocessing import tensorflow as tf -import numpy as np -from ..asr_dataset import ASRDataset, AUTOTUNE, TFRECORD_SHARDS +from ..asr_dataset import ASRDataset, ASRTFRecordDataset, ASRSliceDataset, AUTOTUNE from ..base_dataset import BUFFER_SIZE -from ...featurizers.speech_featurizers import SpeechFeaturizer, read_raw_audio, tf_read_raw_audio +from ...featurizers.speech_featurizers import SpeechFeaturizer from ...featurizers.text_featurizers import TextFeaturizer -from ...utils.utils import get_num_batches, bytestring_feature, print_one_line +from ...utils.utils import get_num_batches from ...augmentations.augments import Augmentation class ASRDatasetKeras(ASRDataset): + """ Keras Dataset for ASR using Generator """ + + @tf.function + def parse(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): + """ + Returns: + path, features, input_lengths, labels, label_lengths, pred_inp + """ + if self.use_tf: data = self.tf_preprocess(path, audio, indices) + else: data = self.preprocess(path, audio, indices) + + path, features, input_length, label, label_length, prediction, prediction_length = data + + return ( + { + "path": path, + "input": features, + "input_length": input_length, + "prediction": prediction, + "prediction_length": prediction_length + }, + { + "label": label, + "label_length": label_length + } + ) + def process(self, dataset, batch_size): dataset = dataset.map(self.parse, num_parallel_calls=AUTOTUNE) @@ -40,6 +64,7 @@ def process(self, dataset, batch_size): batch_size=batch_size, padded_shapes=( { + "path": tf.TensorShape([]), "input": tf.TensorShape(self.speech_featurizer.shape), "input_length": tf.TensorShape([]), "prediction": tf.TensorShape([None]), @@ -52,6 +77,7 @@ def process(self, dataset, batch_size): ), padding_values=( { + "path": "", "input": 0., "input_length": 0, "prediction": self.text_featurizer.blank, @@ -62,7 +88,7 @@ def process(self, dataset, batch_size): "label_length": 0 } ), - drop_remainder=True + drop_remainder=self.drop_remainder ) # PREFETCH to improve speed of input length @@ -71,214 +97,67 @@ def process(self, dataset, batch_size): return dataset -class ASRTFRecordDatasetKeras(ASRDatasetKeras): +class ASRTFRecordDatasetKeras(ASRDatasetKeras, ASRTFRecordDataset): """ Keras Dataset for ASR using TFRecords """ def __init__(self, - data_paths: list, - tfrecords_dir: str, + stage: str, speech_featurizer: SpeechFeaturizer, text_featurizer: TextFeaturizer, - stage: str, + data_paths: list, augmentations: Augmentation = Augmentation(None), - tfrecords_shards: int = TFRECORD_SHARDS, cache: bool = False, shuffle: bool = False, + use_tf: bool = False, + drop_remainder: bool = True, buffer_size: int = BUFFER_SIZE): - super(ASRTFRecordDatasetKeras, self).__init__( - stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, - data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, buffer_size=buffer_size + ASRTFRecordDataset.__init__( + self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, use_tf=use_tf, + drop_remainder=drop_remainder, buffer_size=buffer_size ) - self.tfrecords_dir = tfrecords_dir - if tfrecords_shards <= 0: raise ValueError("tfrecords_shards must be positive") - self.tfrecords_shards = tfrecords_shards - if not tf.io.gfile.exists(self.tfrecords_dir): - tf.io.gfile.makedirs(self.tfrecords_dir) - - def write_tfrecord_file(self, splitted_entries): - shard_path, entries = splitted_entries - with tf.io.TFRecordWriter(shard_path, options='ZLIB') as out: - for audio_file, _, transcript in entries: - with tf.io.gfile.GFile(audio_file, mode="rb") as f: - audio = f.read() - indices = " ".join([str(x) for x in self.text_featurizer.extract(transcript).numpy()]) - feature = { - "path": bytestring_feature([bytes(audio_file, "utf-8")]), - "audio": bytestring_feature([audio]), - "indices": bytestring_feature([bytes(indices, "utf-8")]) - } - example = tf.train.Example(features=tf.train.Features(feature=feature)) - out.write(example.SerializeToString()) - print_one_line("Processed:", audio_file) - print(f"\nCreated {shard_path}") - - def create_tfrecords(self): - if not tf.io.gfile.exists(self.tfrecords_dir): - tf.io.gfile.makedirs(self.tfrecords_dir) - - if tf.io.gfile.glob(os.path.join(self.tfrecords_dir, f"{self.stage}*.tfrecord")): - print(f"TFRecords're already existed: {self.stage}") - return True - - print(f"Creating {self.stage}.tfrecord ...") - - entries = self.read_entries() - if len(entries) <= 0: - return False - - def get_shard_path(shard_id): - return os.path.join(self.tfrecords_dir, f"{self.stage}_{shard_id}.tfrecord") - - shards = [get_shard_path(idx) for idx in range(1, self.tfrecords_shards + 1)] - - splitted_entries = np.array_split(entries, self.tfrecords_shards) - with multiprocessing.Pool(self.tfrecords_shards) as pool: - pool.map(self.write_tfrecord_file, zip(shards, splitted_entries)) - - return True - - def preprocess(self, audio, indices): - with tf.device("/CPU:0"): - signal = read_raw_audio(audio, self.speech_featurizer.sample_rate) - - signal = self.augmentations.before.augment(signal) - - features = self.speech_featurizer.extract(signal) - - features = self.augmentations.after.augment(features) - - label = tf.strings.to_number(tf.strings.split(indices), out_type=tf.int32) - label_length = tf.cast(tf.shape(label)[0], tf.int32) - prediction = self.text_featurizer.prepand_blank(label) - prediction_length = tf.cast(tf.shape(prediction)[0], tf.int32) - features = tf.convert_to_tensor(features, tf.float32) - input_length = tf.cast(tf.shape(features)[0], tf.int32) - - return features, input_length, label, label_length, prediction, prediction_length - - @tf.function - def parse(self, record): - feature_description = { - "path": tf.io.FixedLenFeature([], tf.string), - "audio": tf.io.FixedLenFeature([], tf.string), - "indices": tf.io.FixedLenFeature([], tf.string) - } - example = tf.io.parse_single_example(record, feature_description) - - features, input_length, label, label_length, \ - prediction, prediction_length = tf.numpy_function( - self.preprocess, - inp=[example["audio"], example["indices"]], - Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] - ) - - return ( - { - "input": features, - "input_length": input_length, - "prediction": prediction, - "prediction_length": prediction_length - }, - { - "label": label, - "label_length": label_length - } + ASRDatasetKeras.__init__( + self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, use_tf=use_tf, + drop_remainder=drop_remainder, buffer_size=buffer_size ) - def create(self, batch_size): - # Create TFRecords dataset - have_data = self.create_tfrecords() - if not have_data: return None - - pattern = os.path.join(self.tfrecords_dir, f"{self.stage}*.tfrecord") - files_ds = tf.data.Dataset.list_files(pattern) - ignore_order = tf.data.Options() - ignore_order.experimental_deterministic = False - files_ds = files_ds.with_options(ignore_order) - dataset = tf.data.TFRecordDataset(files_ds, compression_type='ZLIB', num_parallel_reads=AUTOTUNE) - - return self.process(dataset, batch_size) - - -class TFASRTFRecordDatasetKeras(ASRTFRecordDatasetKeras): - def preprocess(self, audio, indices): - with tf.device("/CPU:0"): - signal = tf_read_raw_audio(audio, self.speech_featurizer.sample_rate) - - signal = self.augmentations.before.augment(signal) - - features = self.speech_featurizer.tf_extract(signal) - - features = self.augmentations.after.augment(features) - - label = tf.strings.to_number(tf.strings.split(indices), out_type=tf.int32) - label_length = tf.cast(tf.shape(label)[0], tf.int32) - prediction = self.text_featurizer.prepand_blank(label) - prediction_length = tf.cast(tf.shape(prediction)[0], tf.int32) - features = tf.convert_to_tensor(features, tf.float32) - input_length = tf.cast(tf.shape(features)[0], tf.int32) - - return features, input_length, label, label_length, prediction, prediction_length - @tf.function - def parse(self, record): - feature_description = { - "path": tf.io.FixedLenFeature([], tf.string), - "audio": tf.io.FixedLenFeature([], tf.string), - "indices": tf.io.FixedLenFeature([], tf.string) - } - example = tf.io.parse_single_example(record, feature_description) - - features, input_length, label, label_length, \ - prediction, prediction_length = self.preprocess([example["audio"], example["indices"]]) + def parse(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): + return ASRDatasetKeras.parse(self, path, audio, indices) - return ( - { - "input": features, - "input_length": input_length, - "prediction": prediction, - "prediction_length": prediction_length - }, - { - "label": label, - "label_length": label_length - } - ) + def process(self, dataset: tf.data.Dataset, batch_size: int): + return ASRDatasetKeras.process(self, dataset, batch_size) -class ASRSliceDatasetKeras(ASRDatasetKeras): +class ASRSliceDatasetKeras(ASRDatasetKeras, ASRSliceDataset): """ Keras Dataset for ASR using Slice """ - def preprocess(self, path, transcript): - return super(ASRSliceDatasetKeras, self).preprocess(path.decode("utf-8"), transcript) - - @tf.function - def parse(self, record): - features, input_length, label, label_length, \ - prediction, prediction_length = tf.numpy_function( - self.preprocess, - inp=[record[0], record[1]], - Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] - ) - return ( - { - "input": features, - "input_length": input_length, - "prediction": prediction, - "prediction_length": prediction_length - }, - { - "label": label, - "label_length": label_length - } + def __init__(self, + stage: str, + speech_featurizer: SpeechFeaturizer, + text_featurizer: TextFeaturizer, + data_paths: list, + augmentations: Augmentation = Augmentation(None), + cache: bool = False, + shuffle: bool = False, + use_tf: bool = False, + drop_remainder: bool = True, + buffer_size: int = BUFFER_SIZE): + ASRSliceDataset.__init__( + self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, use_tf=use_tf, + drop_remainder=drop_remainder, buffer_size=buffer_size + ) + ASRDatasetKeras.__init__( + self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, use_tf=use_tf, + drop_remainder=drop_remainder, buffer_size=buffer_size ) - def create(self, batch_size): - entries = self.read_entries() - if len(entries) == 0: return None - entries = np.delete(entries, 1, 1) # Remove unused duration - dataset = tf.data.Dataset.from_tensor_slices(entries) - return self.process(dataset, batch_size) - + @tf.function + def parse(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): + return ASRDatasetKeras.parse(self, path, audio, indices) -class TFASRSliceDatasetKeras(ASRSliceDatasetKeras): + def process(self, dataset: tf.data.Dataset, batch_size: int): + return ASRDatasetKeras.process(self, dataset, batch_size) diff --git a/tensorflow_asr/featurizers/speech_featurizers.py b/tensorflow_asr/featurizers/speech_featurizers.py index 6726cfe798..c5a65be77d 100755 --- a/tensorflow_asr/featurizers/speech_featurizers.py +++ b/tensorflow_asr/featurizers/speech_featurizers.py @@ -19,11 +19,17 @@ import librosa import soundfile as sf import tensorflow as tf +import tensorflow_io as tfio from ..utils.utils import log10 from .gammatone import fft_weights +def load_and_convert_to_wav(path: str) -> tf.Tensor: + data = tfio.audio.AudioIOTensor(path, dtype=tf.float32) + return tfio.audio.encode_wav(data.to_tensor(), rate=tf.cast(data.rate, dtype=tf.int64)) + + def read_raw_audio(audio, sample_rate=16000): if isinstance(audio, str): wave, _ = librosa.load(os.path.expanduser(audio), sr=sample_rate, mono=True) diff --git a/tensorflow_asr/losses/rnnt_losses.py b/tensorflow_asr/losses/rnnt_losses.py index a598dc6365..bebdc5b536 100644 --- a/tensorflow_asr/losses/rnnt_losses.py +++ b/tensorflow_asr/losses/rnnt_losses.py @@ -208,7 +208,7 @@ def compute_rnnt_loss_and_grad_helper(logits, labels, label_length, logit_length a = tf.tile(tf.reshape(tf.range(target_max_len - 1, dtype=tf.int64), shape=(1, 1, target_max_len - 1, 1)), multiples=[batch_size, 1, 1, 1]) b = tf.cast(tf.reshape(labels - 1, shape=(batch_size, 1, target_max_len - 1, 1)), dtype=tf.int64) - b = tf.where(tf.equal(b, -1), tf.zeros_like(b), b) # for cpu testing (index -1 on cpu will raise errors) + # b = tf.where(tf.equal(b, -1), tf.zeros_like(b), b) # for cpu testing (index -1 on cpu will raise errors) c = tf.concat([a, b], axis=3) d = tf.tile(c, multiples=(1, input_max_len, 1, 1)) e = tf.tile(tf.reshape(tf.range(input_max_len, dtype=tf.int64), shape=(1, input_max_len, 1, 1)), diff --git a/tensorflow_asr/models/keras/transducer.py b/tensorflow_asr/models/keras/transducer.py index 59e206aaa1..3ce201d189 100644 --- a/tensorflow_asr/models/keras/transducer.py +++ b/tensorflow_asr/models/keras/transducer.py @@ -62,7 +62,12 @@ def compile(self, optimizer, global_batch_size, blank=0, def train_step(self, batch): x, y_true = batch with tf.GradientTape() as tape: - y_pred = self(x, training=True) + y_pred = self({ + "input": x["input"], + "input_length": x["input_length"], + "prediction": x["prediction"], + "prediction_length": x["prediction_length"], + }, training=True) loss = self.loss(y_true, y_pred) scaled_loss = self.optimizer.get_scaled_loss(loss) scaled_gradients = tape.gradient(scaled_loss, self.trainable_weights) @@ -72,6 +77,11 @@ def train_step(self, batch): def test_step(self, batch): x, y_true = batch - y_pred = self(x, training=False) + y_pred = self({ + "input": x["input"], + "input_length": x["input_length"], + "prediction": x["prediction"], + "prediction_length": x["prediction_length"], + }, training=False) loss = self.loss(y_true, y_pred) return {"val_rnnt_loss": loss} diff --git a/tensorflow_asr/runners/base_runners.py b/tensorflow_asr/runners/base_runners.py index 7c7dcee83a..748f33c316 100644 --- a/tensorflow_asr/runners/base_runners.py +++ b/tensorflow_asr/runners/base_runners.py @@ -436,7 +436,7 @@ def _test_step(self, batch): Returns: (file_paths, groundtruth, greedy, beamsearch, beamsearch_lm) each has shape [B] """ - file_paths, features, input_length, labels = batch + file_paths, features, input_length, labels, _, _, _ = batch labels = self.model.text_featurizer.iextract(labels) input_length = get_reduced_length(input_length, self.model.time_reduction_factor) diff --git a/tensorflow_asr/runners/ctc_runners.py b/tensorflow_asr/runners/ctc_runners.py index a892b3ef44..ddc2d01dbd 100644 --- a/tensorflow_asr/runners/ctc_runners.py +++ b/tensorflow_asr/runners/ctc_runners.py @@ -49,7 +49,7 @@ def save_model_weights(self): @tf.function(experimental_relax_shapes=True) def _train_step(self, batch): - features, input_length, labels, label_length, _, _ = batch + _, features, input_length, labels, label_length, _, _ = batch with tf.GradientTape() as tape: y_pred = self.model(features, training=True) @@ -70,7 +70,7 @@ def _train_step(self, batch): @tf.function(experimental_relax_shapes=True) def _eval_step(self, batch): - features, input_length, labels, label_length, _, _ = batch + _, features, input_length, labels, label_length, _, _ = batch logits = self.model(features, training=False) @@ -111,7 +111,7 @@ def _apply_gradients(self): @tf.function(experimental_relax_shapes=True) def _train_step(self, batch): - features, input_length, labels, label_length, _, _ = batch + _, features, input_length, labels, label_length, _, _ = batch with tf.GradientTape() as tape: y_pred = self.model(features, training=True) diff --git a/tensorflow_asr/runners/transducer_runners.py b/tensorflow_asr/runners/transducer_runners.py index b450ad7120..d8e396be56 100644 --- a/tensorflow_asr/runners/transducer_runners.py +++ b/tensorflow_asr/runners/transducer_runners.py @@ -47,7 +47,7 @@ def save_model_weights(self): @tf.function(experimental_relax_shapes=True) def _train_step(self, batch): - features, input_length, labels, label_length, prediction, prediction_length = batch + _, features, input_length, labels, label_length, prediction, prediction_length = batch with tf.GradientTape() as tape: logits = self.model([features, input_length, prediction, prediction_length], training=True) @@ -67,7 +67,7 @@ def _train_step(self, batch): @tf.function(experimental_relax_shapes=True) def _eval_step(self, batch): - features, input_length, labels, label_length, prediction, prediction_length = batch + _, features, input_length, labels, label_length, prediction, prediction_length = batch logits = self.model([features, input_length, prediction, prediction_length], training=False) eval_loss = rnnt_loss( @@ -106,7 +106,7 @@ def _apply_gradients(self): @tf.function(experimental_relax_shapes=True) def _train_step(self, batch): - features, input_length, labels, label_length, prediction, prediction_length = batch + _, features, input_length, labels, label_length, prediction, prediction_length = batch with tf.GradientTape() as tape: logits = self.model([features, input_length, prediction, prediction_length], training=True) From 66a18fd1d9c779feaf4f489ad6e74adee4ce8136 Mon Sep 17 00:00:00 2001 From: Huy Le Nguyen Date: Sat, 30 Jan 2021 19:02:25 +0700 Subject: [PATCH 4/4] :rocket: finish adding option to choose numpy fn or pure tf in dataset --- .../train_keras_subword_conformer.py | 6 +- examples/conformer/train_subword_conformer.py | 6 +- setup.py | 2 +- tensorflow_asr/augmentations/augments.py | 12 +-- tensorflow_asr/configs/config.py | 1 + tensorflow_asr/datasets/asr_dataset.py | 78 +++++++++---------- tensorflow_asr/datasets/base_dataset.py | 3 +- tensorflow_asr/datasets/keras/asr_dataset.py | 30 ++++--- tensorflow_asr/losses/rnnt_losses.py | 8 +- tensorflow_asr/utils/utils.py | 7 ++ 10 files changed, 85 insertions(+), 68 deletions(-) diff --git a/examples/conformer/train_keras_subword_conformer.py b/examples/conformer/train_keras_subword_conformer.py index 0ea4e3461f..2c87fa6da7 100644 --- a/examples/conformer/train_keras_subword_conformer.py +++ b/examples/conformer/train_keras_subword_conformer.py @@ -59,7 +59,7 @@ strategy = setup_strategy(args.devices) from tensorflow_asr.configs.config import Config -from tensorflow_asr.datasets.keras import ASRTFRecordDatasetKeras, ASRDatasetKeras +from tensorflow_asr.datasets.keras import ASRTFRecordDatasetKeras, ASRSliceDatasetKeras from tensorflow_asr.featurizers.speech_featurizers import TFSpeechFeaturizer from tensorflow_asr.featurizers.text_featurizers import SubwordFeaturizer, SentencePieceFeaturizer from tensorflow_asr.models.keras.conformer import Conformer @@ -103,7 +103,7 @@ shuffle=True, buffer_size=args.bfs, ) else: - train_dataset = ASRDatasetKeras( + train_dataset = ASRSliceDatasetKeras( data_paths=config.learning_config.dataset_config.train_paths, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, @@ -111,7 +111,7 @@ stage="train", cache=args.cache, shuffle=True, buffer_size=args.bfs, ) - eval_dataset = ASRDatasetKeras( + eval_dataset = ASRSliceDatasetKeras( data_paths=config.learning_config.dataset_config.eval_paths, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, diff --git a/examples/conformer/train_subword_conformer.py b/examples/conformer/train_subword_conformer.py index 5ff20b070c..c6a54a1339 100644 --- a/examples/conformer/train_subword_conformer.py +++ b/examples/conformer/train_subword_conformer.py @@ -59,7 +59,7 @@ strategy = setup_strategy(args.devices) from tensorflow_asr.configs.config import Config -from tensorflow_asr.datasets.asr_dataset import TFASRTFRecordDataset, ASRSliceDataset +from tensorflow_asr.datasets.asr_dataset import ASRTFRecordDataset, ASRSliceDataset from tensorflow_asr.featurizers.speech_featurizers import TFSpeechFeaturizer from tensorflow_asr.featurizers.text_featurizers import SubwordFeaturizer, SentencePieceFeaturizer from tensorflow_asr.runners.transducer_runners import TransducerTrainer @@ -84,7 +84,7 @@ text_featurizer.save_to_file(args.subwords) if args.tfrecords: - train_dataset = TFASRTFRecordDataset( + train_dataset = ASRTFRecordDataset( data_paths=config.learning_config.dataset_config.train_paths, tfrecords_dir=config.learning_config.dataset_config.tfrecords_dir, speech_featurizer=speech_featurizer, @@ -94,7 +94,7 @@ stage="train", cache=args.cache, shuffle=True, buffer_size=args.bfs, ) - eval_dataset = TFASRTFRecordDataset( + eval_dataset = ASRTFRecordDataset( data_paths=config.learning_config.dataset_config.eval_paths, tfrecords_dir=config.learning_config.dataset_config.tfrecords_dir, tfrecords_shards=args.tfrecords_shards, diff --git a/setup.py b/setup.py index fa3d2c0ada..56c89ad7e0 100644 --- a/setup.py +++ b/setup.py @@ -36,7 +36,7 @@ setuptools.setup( name="TensorFlowASR", - version="0.7.0", + version="0.7.1", author="Huy Le Nguyen", author_email="nlhuy.cs.16@gmail.com", description="Almost State-of-the-art Automatic Speech Recognition using Tensorflow 2", diff --git a/tensorflow_asr/augmentations/augments.py b/tensorflow_asr/augmentations/augments.py index 0874eb803a..ff791c1f5e 100755 --- a/tensorflow_asr/augmentations/augments.py +++ b/tensorflow_asr/augmentations/augments.py @@ -54,13 +54,13 @@ def augment(self, inputs): class Augmentation: def __init__(self, config: dict = None): if not config: config = {} - use_tf = config.get("use_tf", False) - if use_tf: - self.before = self.tf_parse(config.get("before", {})) - self.after = self.tf_parse(config.get("after", {})) + self.use_tf = config.pop("use_tf", False) + if self.use_tf: + self.before = self.tf_parse(config.pop("before", {})) + self.after = self.tf_parse(config.pop("after", {})) else: - self.before = self.parse(config.get("before", {})) - self.after = self.parse(config.get("after", {})) + self.before = self.parse(config.pop("before", {})) + self.after = self.parse(config.pop("after", {})) @staticmethod def parse(config: dict) -> list: diff --git a/tensorflow_asr/configs/config.py b/tensorflow_asr/configs/config.py index 3613a56568..414a515f77 100644 --- a/tensorflow_asr/configs/config.py +++ b/tensorflow_asr/configs/config.py @@ -40,6 +40,7 @@ def __init__(self, config: dict = None): self.eval_paths = preprocess_paths(config.pop("eval_paths", None)) self.test_paths = preprocess_paths(config.pop("test_paths", None)) self.tfrecords_dir = preprocess_paths(config.pop("tfrecords_dir", None)) + self.use_tf = config.pop("use_tf", False) for k, v in config.items(): setattr(self, k, v) diff --git a/tensorflow_asr/datasets/asr_dataset.py b/tensorflow_asr/datasets/asr_dataset.py index 3a47f16368..008103d28d 100755 --- a/tensorflow_asr/datasets/asr_dataset.py +++ b/tensorflow_asr/datasets/asr_dataset.py @@ -28,21 +28,6 @@ TFRECORD_SHARDS = 16 -def write_tfrecord_file(splitted_entries): - shard_path, entries = splitted_entries - with tf.io.TFRecordWriter(shard_path, options='ZLIB') as out: - for path, audio, indices in entries: - feature = { - "path": bytestring_feature([bytes(path, "utf-8")]), - "audio": bytestring_feature([audio]), - "indices": bytestring_feature([bytes(indices, "utf-8")]) - } - example = tf.train.Example(features=tf.train.Features(feature=feature)) - out.write(example.SerializeToString()) - print_one_line("Processed:", path) - print(f"\nCreated {shard_path}") - - class ASRDataset(BaseDataset): """ Dataset for ASR using Generator """ @@ -54,40 +39,39 @@ def __init__(self, augmentations: Augmentation = Augmentation(None), cache: bool = False, shuffle: bool = False, - use_tf: bool = False, drop_remainder: bool = True, buffer_size: int = BUFFER_SIZE): super(ASRDataset, self).__init__( data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, stage=stage, buffer_size=buffer_size, - use_tf=use_tf, drop_remainder=drop_remainder + drop_remainder=drop_remainder ) self.speech_featurizer = speech_featurizer self.text_featurizer = text_featurizer def read_entries(self): - self.lines = [] + self.entries = [] for file_path in self.data_paths: print(f"Reading {file_path} ...") with tf.io.gfile.GFile(file_path, "r") as f: temp_lines = f.read().splitlines() # Skip the header of tsv file - self.lines += temp_lines[1:] + self.entries += temp_lines[1:] # The files is "\t" seperated - self.lines = [line.split("\t", 2) for line in self.lines] - self.lines = np.array(self.lines) - for i, line in enumerate(self.lines): - self.lines[i][-1] = " ".join([str(x) for x in self.text_featurizer.extract(line[-1]).numpy()]) - if self.shuffle: np.random.shuffle(self.lines) # Mix transcripts.tsv - self.total_steps = len(self.lines) + self.entries = [line.split("\t", 2) for line in self.entries] + for i, line in enumerate(self.entries): + self.entries[i][-1] = " ".join([str(x) for x in self.text_featurizer.extract(line[-1]).numpy()]) + self.entries = np.array(self.entries) + if self.shuffle: np.random.shuffle(self.entries) # Mix transcripts.tsv + self.total_steps = len(self.entries) def generator(self): - for path, _, indices in self.lines: - audio = load_and_convert_to_wav(path) - yield path, audio, indices + for path, _, indices in self.entries: + audio = load_and_convert_to_wav(path).numpy() + yield bytes(path, "utf-8"), audio, bytes(indices, "utf-8") - def preprocess(self, path, audio, indices): - def fn(_path, _audio, _indices): + def preprocess(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): + def fn(_path: bytes, _audio: bytes, _indices: bytes): with tf.device("/CPU:0"): signal = read_raw_audio(_audio, self.speech_featurizer.sample_rate) @@ -111,7 +95,7 @@ def fn(_path, _audio, _indices): Tout=[tf.string, tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] ) - def tf_preprocess(self, path, audio, indices): + def tf_preprocess(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): with tf.device("/CPU:0"): signal = tf_read_raw_audio(audio, self.speech_featurizer.sample_rate) @@ -130,7 +114,7 @@ def tf_preprocess(self, path, audio, indices): return path, features, input_length, label, label_length, prediction, prediction_length - def process(self, dataset, batch_size): + def process(self, dataset: tf.data.Dataset, batch_size: int): dataset = dataset.map(self.parse, num_parallel_calls=AUTOTUNE) if self.cache: @@ -193,18 +177,34 @@ def __init__(self, tfrecords_shards: int = TFRECORD_SHARDS, cache: bool = False, shuffle: bool = False, - use_tf: bool = False, + drop_remainder: bool = True, buffer_size: int = BUFFER_SIZE): super(ASRTFRecordDataset, self).__init__( stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, buffer_size=buffer_size, - use_tf=use_tf + drop_remainder=drop_remainder ) self.tfrecords_dir = tfrecords_dir if tfrecords_shards <= 0: raise ValueError("tfrecords_shards must be positive") self.tfrecords_shards = tfrecords_shards if not tf.io.gfile.exists(self.tfrecords_dir): tf.io.gfile.makedirs(self.tfrecords_dir) + @staticmethod + def write_tfrecord_file(splitted_entries): + shard_path, entries = splitted_entries + with tf.io.TFRecordWriter(shard_path, options='ZLIB') as out: + for path, _, indices in entries: + audio = load_and_convert_to_wav(path).numpy() + feature = { + "path": bytestring_feature([bytes(path, "utf-8")]), + "audio": bytestring_feature([audio]), + "indices": bytestring_feature([bytes(indices, "utf-8")]) + } + example = tf.train.Example(features=tf.train.Features(feature=feature)) + out.write(example.SerializeToString()) + print_one_line("Processed:", path) + print(f"\nCreated {shard_path}") + def create_tfrecords(self): if not tf.io.gfile.exists(self.tfrecords_dir): tf.io.gfile.makedirs(self.tfrecords_dir) @@ -217,16 +217,15 @@ def create_tfrecords(self): self.read_entries() if not self.total_steps or self.total_steps == 0: return False - entries = np.fromiter(self.generator(), dtype=str) def get_shard_path(shard_id): return os.path.join(self.tfrecords_dir, f"{self.stage}_{shard_id}.tfrecord") shards = [get_shard_path(idx) for idx in range(1, self.tfrecords_shards + 1)] - splitted_entries = np.array_split(entries, self.tfrecords_shards) + splitted_entries = np.array_split(self.entries, self.tfrecords_shards) with multiprocessing.Pool(self.tfrecords_shards) as pool: - pool.map(write_tfrecord_file, zip(shards, splitted_entries)) + pool.map(self.write_tfrecord_file, zip(shards, splitted_entries)) return True @@ -260,12 +259,13 @@ class ASRSliceDataset(ASRDataset): @staticmethod def load(record: tf.Tensor): - audio = load_and_convert_to_wav(record[0]) + def fn(path: bytes): return load_and_convert_to_wav(path.decode("utf-8")).numpy() + audio = tf.numpy_function(fn, inp=[record[0]], Tout=tf.string) return record[0], audio, record[2] def create(self, batch_size: int): self.read_entries() if not self.total_steps or self.total_steps == 0: return None - dataset = tf.data.Dataset.from_tensor_slices(self.lines) + dataset = tf.data.Dataset.from_tensor_slices(self.entries) dataset = dataset.map(self.load, num_parallel_calls=AUTOTUNE) return self.process(dataset, batch_size) diff --git a/tensorflow_asr/datasets/base_dataset.py b/tensorflow_asr/datasets/base_dataset.py index da8a0f6872..93ea112c95 100644 --- a/tensorflow_asr/datasets/base_dataset.py +++ b/tensorflow_asr/datasets/base_dataset.py @@ -27,7 +27,6 @@ def __init__(self, cache: bool = False, shuffle: bool = False, buffer_size: int = BUFFER_SIZE, - use_tf: bool = False, drop_remainder: bool = True, stage: str = "train"): self.data_paths = data_paths @@ -38,7 +37,7 @@ def __init__(self, raise ValueError("buffer_size must be positive when shuffle is on") self.buffer_size = buffer_size # shuffle buffer size self.stage = stage # for defining tfrecords files - self.use_tf = use_tf # whether to use only pure tf in the dataset pipeline + self.use_tf = self.augmentations.use_tf self.drop_remainder = drop_remainder # whether to drop remainder for multi gpu training self.total_steps = None # for better training visualization diff --git a/tensorflow_asr/datasets/keras/asr_dataset.py b/tensorflow_asr/datasets/keras/asr_dataset.py index 207119fa98..91fd27cf7c 100644 --- a/tensorflow_asr/datasets/keras/asr_dataset.py +++ b/tensorflow_asr/datasets/keras/asr_dataset.py @@ -14,7 +14,7 @@ import tensorflow as tf -from ..asr_dataset import ASRDataset, ASRTFRecordDataset, ASRSliceDataset, AUTOTUNE +from ..asr_dataset import ASRDataset, ASRTFRecordDataset, ASRSliceDataset, AUTOTUNE, TFRECORD_SHARDS from ..base_dataset import BUFFER_SIZE from ...featurizers.speech_featurizers import SpeechFeaturizer from ...featurizers.text_featurizers import TextFeaturizer @@ -101,30 +101,37 @@ class ASRTFRecordDatasetKeras(ASRDatasetKeras, ASRTFRecordDataset): """ Keras Dataset for ASR using TFRecords """ def __init__(self, - stage: str, + data_paths: list, + tfrecords_dir: str, speech_featurizer: SpeechFeaturizer, text_featurizer: TextFeaturizer, - data_paths: list, + stage: str, augmentations: Augmentation = Augmentation(None), + tfrecords_shards: int = TFRECORD_SHARDS, cache: bool = False, shuffle: bool = False, - use_tf: bool = False, drop_remainder: bool = True, buffer_size: int = BUFFER_SIZE): ASRTFRecordDataset.__init__( self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, - data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, use_tf=use_tf, - drop_remainder=drop_remainder, buffer_size=buffer_size + data_paths=data_paths, tfrecords_dir=tfrecords_dir, augmentations=augmentations, cache=cache, shuffle=shuffle, + tfrecords_shards=tfrecords_shards, drop_remainder=drop_remainder, buffer_size=buffer_size ) ASRDatasetKeras.__init__( self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, - data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, use_tf=use_tf, + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, drop_remainder=drop_remainder, buffer_size=buffer_size ) @tf.function - def parse(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): - return ASRDatasetKeras.parse(self, path, audio, indices) + def parse(self, record: tf.Tensor): + feature_description = { + "path": tf.io.FixedLenFeature([], tf.string), + "audio": tf.io.FixedLenFeature([], tf.string), + "indices": tf.io.FixedLenFeature([], tf.string) + } + example = tf.io.parse_single_example(record, feature_description) + return ASRDatasetKeras.parse(self, **example) def process(self, dataset: tf.data.Dataset, batch_size: int): return ASRDatasetKeras.process(self, dataset, batch_size) @@ -141,17 +148,16 @@ def __init__(self, augmentations: Augmentation = Augmentation(None), cache: bool = False, shuffle: bool = False, - use_tf: bool = False, drop_remainder: bool = True, buffer_size: int = BUFFER_SIZE): ASRSliceDataset.__init__( self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, - data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, use_tf=use_tf, + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, drop_remainder=drop_remainder, buffer_size=buffer_size ) ASRDatasetKeras.__init__( self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, - data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, use_tf=use_tf, + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, drop_remainder=drop_remainder, buffer_size=buffer_size ) diff --git a/tensorflow_asr/losses/rnnt_losses.py b/tensorflow_asr/losses/rnnt_losses.py index bebdc5b536..4d72b95083 100644 --- a/tensorflow_asr/losses/rnnt_losses.py +++ b/tensorflow_asr/losses/rnnt_losses.py @@ -14,12 +14,16 @@ # RNNT loss implementation in pure TensorFlow is borrowed from [iamjanvijay's repo](https://github.com/iamjanvijay/rnnt) import tensorflow as tf + +from ..utils.utils import has_gpu_or_tpu + +use_cpu = not has_gpu_or_tpu() + try: from warprnnt_tensorflow import rnnt_loss as warp_rnnt_loss use_warprnnt = True except ImportError: print("Cannot import RNNT loss in warprnnt. Falls back to RNNT in TensorFlow") - print("Note: The RNNT in Tensorflow is not supported for CPU yet") from tensorflow.python.ops.gen_array_ops import matrix_diag_part_v2 use_warprnnt = False @@ -208,7 +212,7 @@ def compute_rnnt_loss_and_grad_helper(logits, labels, label_length, logit_length a = tf.tile(tf.reshape(tf.range(target_max_len - 1, dtype=tf.int64), shape=(1, 1, target_max_len - 1, 1)), multiples=[batch_size, 1, 1, 1]) b = tf.cast(tf.reshape(labels - 1, shape=(batch_size, 1, target_max_len - 1, 1)), dtype=tf.int64) - # b = tf.where(tf.equal(b, -1), tf.zeros_like(b), b) # for cpu testing (index -1 on cpu will raise errors) + if use_cpu: b = tf.where(tf.equal(b, -1), tf.zeros_like(b), b) # for cpu testing (index -1 on cpu will raise errors) c = tf.concat([a, b], axis=3) d = tf.tile(c, multiples=(1, input_max_len, 1, 1)) e = tf.tile(tf.reshape(tf.range(input_max_len, dtype=tf.int64), shape=(1, input_max_len, 1, 1)), diff --git a/tensorflow_asr/utils/utils.py b/tensorflow_asr/utils/utils.py index cbd8b7aa27..1d813261bb 100755 --- a/tensorflow_asr/utils/utils.py +++ b/tensorflow_asr/utils/utils.py @@ -160,3 +160,10 @@ def get_reduced_length(length, reduction_factor): def count_non_blank(tensor: tf.Tensor, blank: int or tf.Tensor = 0, axis=None): return tf.reduce_sum(tf.where(tf.not_equal(tensor, blank), x=tf.ones_like(tensor), y=tf.zeros_like(tensor)), axis=axis) + + +def has_gpu_or_tpu(): + gpus = tf.config.list_logical_devices("GPU") + tpus = tf.config.list_logical_devices("TPU") + if len(gpus) == 0 and len(tpus) == 0: return False + return True