diff --git a/examples/conformer/config.yml b/examples/conformer/config.yml index e538ab6383..e9bab34c5d 100755 --- a/examples/conformer/config.yml +++ b/examples/conformer/config.yml @@ -60,6 +60,7 @@ model_config: learning_config: augmentations: + use_tf: True after: time_masking: num_masks: 10 @@ -77,7 +78,7 @@ learning_config: - /mnt/Miscellanea/Datasets/Speech/LibriSpeech/dev-other/transcripts.tsv test_paths: - /mnt/Miscellanea/Datasets/Speech/LibriSpeech/test-clean/transcripts.tsv - tfrecords_dir: /mnt/Miscellanea/Datasets/Speech/LibriSpeech/tfrecords + tfrecords_dir: /mnt/Miscellanea/Datasets/Speech/LibriSpeech/tfrecords-test optimizer_config: warmup_steps: 40000 diff --git a/scripts/create_tfrecords.py b/scripts/create_tfrecords.py index 509d6b891b..ab34ee9ba4 100644 --- a/scripts/create_tfrecords.py +++ b/scripts/create_tfrecords.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import argparse +from tensorflow_asr.configs.config import Config from tensorflow_asr.utils.utils import preprocess_paths from tensorflow_asr.datasets.asr_dataset import ASRTFRecordDataset +from tensorflow_asr.featurizers.text_featurizers import SubwordFeaturizer modes = ["train", "eval", "test"] @@ -22,12 +25,16 @@ parser.add_argument("--mode", "-m", type=str, default=None, help=f"Mode in {modes}") +parser.add_argument("--config", type=str, default=None, help="The file path of model configuration file") + parser.add_argument("--tfrecords_dir", type=str, default=None, help="Directory to tfrecords") parser.add_argument("--tfrecords_shards", type=int, default=16, help="Number of tfrecords shards") parser.add_argument("--shuffle", default=False, action="store_true", help="Shuffle data or not") +parser.add_argument("--subwords", type=str, default=None, help="Path to file that stores generated subwords") + parser.add_argument("transcripts", nargs="+", type=str, default=None, help="Paths to transcript files") args = parser.parse_args() @@ -37,8 +44,15 @@ transcripts = preprocess_paths(args.transcripts) tfrecords_dir = preprocess_paths(args.tfrecords_dir) +config = Config(args.config) +if args.subwords and os.path.exists(args.subwords): + print("Loading subwords ...") + text_featurizer = SubwordFeaturizer.load_from_file(config.decoder_config, args.subwords) +else: + raise ValueError("subwords must be set") + ASRTFRecordDataset( data_paths=transcripts, tfrecords_dir=tfrecords_dir, - speech_featurizer=None, text_featurizer=None, + speech_featurizer=None, text_featurizer=text_featurizer, stage=args.mode, shuffle=args.shuffle, tfrecords_shards=args.tfrecords_shards ).create_tfrecords() diff --git a/setup.py b/setup.py index f7a9c54213..56c89ad7e0 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ requirements = [ "tensorflow-datasets>=3.2.1,<4.0.0", "tensorflow-addons>=0.10.0", + "tensorflow-io>=0.17.0", "setuptools>=47.1.1", "librosa>=0.8.0", "soundfile>=0.10.3", @@ -35,7 +36,7 @@ setuptools.setup( name="TensorFlowASR", - version="0.7.0", + version="0.7.1", author="Huy Le Nguyen", author_email="nlhuy.cs.16@gmail.com", description="Almost State-of-the-art Automatic Speech Recognition using Tensorflow 2", diff --git a/tensorflow_asr/augmentations/augments.py b/tensorflow_asr/augmentations/augments.py index 1cd45473d8..ff791c1f5e 100755 --- a/tensorflow_asr/augmentations/augments.py +++ b/tensorflow_asr/augmentations/augments.py @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tensorflow as tf import nlpaug.flow as naf from .signal_augment import SignalCropping, SignalLoudness, SignalMask, SignalNoise, \ SignalPitch, SignalShift, SignalSpeed, SignalVtlp -from .spec_augment import FreqMasking, TimeMasking +from .spec_augment import FreqMasking, TimeMasking, TFFreqMasking, TFTimeMasking AUGMENTATIONS = { @@ -32,12 +33,34 @@ "vtlp": SignalVtlp } +TFAUGMENTATIONS = { + "freq_masking": TFFreqMasking, + "time_masking": TFTimeMasking, +} + + +class TFAugmentationExecutor: + def __init__(self, augmentations: list): + self.augmentations = augmentations + + @tf.function + def augment(self, inputs): + outputs = inputs + for au in self.augmentations: + outputs = au.augment(outputs) + return outputs + class Augmentation: def __init__(self, config: dict = None): if not config: config = {} - self.before = self.parse(config.get("before", {})) - self.after = self.parse(config.get("after", {})) + self.use_tf = config.pop("use_tf", False) + if self.use_tf: + self.before = self.tf_parse(config.pop("before", {})) + self.after = self.tf_parse(config.pop("after", {})) + else: + self.before = self.parse(config.pop("before", {})) + self.after = self.parse(config.pop("after", {})) @staticmethod def parse(config: dict) -> list: @@ -50,3 +73,15 @@ def parse(config: dict) -> list: aug = au(**value) if value is not None else au() augmentations.append(aug) return naf.Sometimes(augmentations) + + @staticmethod + def tf_parse(config: dict) -> list: + augmentations = [] + for key, value in config.items(): + au = TFAUGMENTATIONS.get(key, None) + if au is None: + raise KeyError(f"No tf augmentation named: {key}\n" + f"Available tf augmentations: {TFAUGMENTATIONS.keys()}") + aug = au(**value) if value is not None else au() + augmentations.append(aug) + return TFAugmentationExecutor(augmentations) diff --git a/tensorflow_asr/augmentations/spec_augment.py b/tensorflow_asr/augmentations/spec_augment.py index 25d6996b70..9e1f68726d 100755 --- a/tensorflow_asr/augmentations/spec_augment.py +++ b/tensorflow_asr/augmentations/spec_augment.py @@ -11,14 +11,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + """ Augmentation on spectrogram: http://arxiv.org/abs/1904.08779 """ + import numpy as np +import tensorflow as tf from nlpaug.flow import Sequential from nlpaug.util import Action from nlpaug.model.spectrogram import Spectrogram from nlpaug.augmenter.spectrogram import SpectrogramAugmenter +from ..utils.utils import shape_list + # ---------------------------- FREQ MASKING ---------------------------- @@ -75,6 +80,35 @@ def __init__(self, def substitute(self, data): return self.flow.augment(data) + +class TFFreqMasking: + def __init__(self, num_masks: int = 1, mask_factor: float = 27): + self.num_masks = num_masks + self.mask_factor = mask_factor + + @tf.function + def augment(self, spectrogram: tf.Tensor): + """ + Masking the frequency channels (shape[1]) + Args: + spectrogram: shape (T, num_feature_bins, V) + Returns: + frequency masked spectrogram + """ + T, F, V = shape_list(spectrogram, out_type=tf.int32) + for _ in range(self.num_masks): + f = tf.random.uniform([], minval=0, maxval=self.mask_factor, dtype=tf.int32) + f = tf.minimum(f, F) + f0 = tf.random.uniform([], minval=0, maxval=(F - f), dtype=tf.int32) + mask = tf.concat([ + tf.ones([T, f0, V], dtype=spectrogram.dtype), + tf.zeros([T, f, V], dtype=spectrogram.dtype), + tf.ones([T, F - f0 - f, V], dtype=spectrogram.dtype) + ], axis=1) + spectrogram = spectrogram * mask + return spectrogram + + # ---------------------------- TIME MASKING ---------------------------- @@ -101,9 +135,8 @@ def mask(self, data: np.ndarray) -> np.ndarray: """ spectrogram = data.copy() time = np.random.randint(0, self.mask_factor + 1) - time = min(time, spectrogram.shape[0]) - time0 = np.random.randint(0, spectrogram.shape[0] - time + 1) time = min(time, int(self.p_upperbound * spectrogram.shape[0])) + time0 = np.random.randint(0, spectrogram.shape[0] - time + 1) spectrogram[time0:time0 + time, :, :] = 0 return spectrogram @@ -139,3 +172,32 @@ def __init__(self, def substitute(self, data): return self.flow.augment(data) + + +class TFTimeMasking: + def __init__(self, num_masks: int = 1, mask_factor: float = 100, p_upperbound: float = 1.0): + self.num_masks = num_masks + self.mask_factor = mask_factor + self.p_upperbound = p_upperbound + + @tf.function + def augment(self, spectrogram: tf.Tensor): + """ + Masking the time channel (shape[0]) + Args: + spectrogram: shape (T, num_feature_bins, V) + Returns: + frequency masked spectrogram + """ + T, F, V = shape_list(spectrogram, out_type=tf.int32) + for _ in range(self.num_masks): + t = tf.random.uniform([], minval=0, maxval=self.mask_factor, dtype=tf.int32) + t = tf.minimum(t, tf.cast(tf.cast(T, dtype=tf.float32) * self.p_upperbound, dtype=tf.int32)) + t0 = tf.random.uniform([], minval=0, maxval=(T - t), dtype=tf.int32) + mask = tf.concat([ + tf.ones([t0, F, V], dtype=spectrogram.dtype), + tf.zeros([t, F, V], dtype=spectrogram.dtype), + tf.ones([T - t0 - t, F, V], dtype=spectrogram.dtype) + ], axis=0) + spectrogram = spectrogram * mask + return spectrogram diff --git a/tensorflow_asr/configs/config.py b/tensorflow_asr/configs/config.py index 3613a56568..414a515f77 100644 --- a/tensorflow_asr/configs/config.py +++ b/tensorflow_asr/configs/config.py @@ -40,6 +40,7 @@ def __init__(self, config: dict = None): self.eval_paths = preprocess_paths(config.pop("eval_paths", None)) self.test_paths = preprocess_paths(config.pop("test_paths", None)) self.tfrecords_dir = preprocess_paths(config.pop("tfrecords_dir", None)) + self.use_tf = config.pop("use_tf", False) for k, v in config.items(): setattr(self, k, v) diff --git a/tensorflow_asr/datasets/__init__.py b/tensorflow_asr/datasets/__init__.py index 18f3bb5640..f5f8a8a1e8 100644 --- a/tensorflow_asr/datasets/__init__.py +++ b/tensorflow_asr/datasets/__init__.py @@ -13,5 +13,5 @@ # limitations under the License. from .base_dataset import BaseDataset -from .asr_dataset import ASRTFRecordDataset, ASRSliceDataset, ASRTFRecordTestDataset, ASRSliceTestDataset -__all__ = ['BaseDataset', 'ASRTFRecordDataset', 'ASRSliceDataset', 'ASRTFRecordTestDataset', 'ASRSliceTestDataset'] +from .asr_dataset import ASRDataset, ASRTFRecordDataset, ASRSliceDataset +__all__ = ['BaseDataset', 'ASRDataset', 'ASRTFRecordDataset', 'ASRSliceDataset'] diff --git a/tensorflow_asr/datasets/asr_dataset.py b/tensorflow_asr/datasets/asr_dataset.py index e0d9832e74..008103d28d 100755 --- a/tensorflow_asr/datasets/asr_dataset.py +++ b/tensorflow_asr/datasets/asr_dataset.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import abc import multiprocessing import os @@ -21,7 +20,7 @@ from ..augmentations.augments import Augmentation from .base_dataset import BaseDataset, BUFFER_SIZE -from ..featurizers.speech_featurizers import read_raw_audio, SpeechFeaturizer +from ..featurizers.speech_featurizers import load_and_convert_to_wav, read_raw_audio, tf_read_raw_audio, SpeechFeaturizer from ..featurizers.text_featurizers import TextFeaturizer from ..utils.utils import bytestring_feature, print_one_line, get_num_batches @@ -29,28 +28,9 @@ TFRECORD_SHARDS = 16 -def to_tfrecord(path, audio, transcript): - feature = { - "path": bytestring_feature([path]), - "audio": bytestring_feature([audio]), - "transcript": bytestring_feature([transcript]) - } - return tf.train.Example(features=tf.train.Features(feature=feature)) - - -def write_tfrecord_file(splitted_entries): - shard_path, entries = splitted_entries - with tf.io.TFRecordWriter(shard_path, options='ZLIB') as out: - for audio_file, _, transcript in entries: - with open(audio_file, "rb") as f: - audio = f.read() - example = to_tfrecord(bytes(audio_file, "utf-8"), audio, bytes(transcript, "utf-8")) - out.write(example.SerializeToString()) - print_one_line("Processed:", audio_file) - print(f"\nCreated {shard_path}") - - class ASRDataset(BaseDataset): + """ Dataset for ASR using Generator """ + def __init__(self, stage: str, speech_featurizer: SpeechFeaturizer, @@ -59,50 +39,82 @@ def __init__(self, augmentations: Augmentation = Augmentation(None), cache: bool = False, shuffle: bool = False, + drop_remainder: bool = True, buffer_size: int = BUFFER_SIZE): super(ASRDataset, self).__init__( data_paths=data_paths, augmentations=augmentations, - cache=cache, shuffle=shuffle, stage=stage, buffer_size=buffer_size + cache=cache, shuffle=shuffle, stage=stage, buffer_size=buffer_size, + drop_remainder=drop_remainder ) self.speech_featurizer = speech_featurizer self.text_featurizer = text_featurizer def read_entries(self): - lines = [] + self.entries = [] for file_path in self.data_paths: print(f"Reading {file_path} ...") with tf.io.gfile.GFile(file_path, "r") as f: temp_lines = f.read().splitlines() # Skip the header of tsv file - lines += temp_lines[1:] + self.entries += temp_lines[1:] # The files is "\t" seperated - lines = [line.split("\t", 2) for line in lines] - lines = np.array(lines) - if self.shuffle: - np.random.shuffle(lines) # Mix transcripts.tsv - self.total_steps = len(lines) - return lines + self.entries = [line.split("\t", 2) for line in self.entries] + for i, line in enumerate(self.entries): + self.entries[i][-1] = " ".join([str(x) for x in self.text_featurizer.extract(line[-1]).numpy()]) + self.entries = np.array(self.entries) + if self.shuffle: np.random.shuffle(self.entries) # Mix transcripts.tsv + self.total_steps = len(self.entries) + + def generator(self): + for path, _, indices in self.entries: + audio = load_and_convert_to_wav(path).numpy() + yield bytes(path, "utf-8"), audio, bytes(indices, "utf-8") + + def preprocess(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): + def fn(_path: bytes, _audio: bytes, _indices: bytes): + with tf.device("/CPU:0"): + signal = read_raw_audio(_audio, self.speech_featurizer.sample_rate) + + signal = self.augmentations.before.augment(signal) + + features = self.speech_featurizer.extract(signal) + + features = self.augmentations.after.augment(features) + + label = tf.strings.to_number(tf.strings.split(_indices), out_type=tf.int32) + label_length = tf.cast(tf.shape(label)[0], tf.int32) + prediction = self.text_featurizer.prepand_blank(label) + prediction_length = tf.cast(tf.shape(prediction)[0], tf.int32) + features = tf.convert_to_tensor(features, tf.float32) + input_length = tf.cast(tf.shape(features)[0], tf.int32) + + return _path, features, input_length, label, label_length, prediction, prediction_length + + return tf.numpy_function( + fn, inp=[path, audio, indices], + Tout=[tf.string, tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] + ) - def preprocess(self, audio, transcript): + def tf_preprocess(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): with tf.device("/CPU:0"): - signal = read_raw_audio(audio, self.speech_featurizer.sample_rate) + signal = tf_read_raw_audio(audio, self.speech_featurizer.sample_rate) signal = self.augmentations.before.augment(signal) - features = self.speech_featurizer.extract(signal) + features = self.speech_featurizer.tf_extract(signal) features = self.augmentations.after.augment(features) - label = self.text_featurizer.extract(transcript.decode("utf-8")) + label = tf.strings.to_number(tf.strings.split(indices), out_type=tf.int32) label_length = tf.cast(tf.shape(label)[0], tf.int32) prediction = self.text_featurizer.prepand_blank(label) prediction_length = tf.cast(tf.shape(prediction)[0], tf.int32) features = tf.convert_to_tensor(features, tf.float32) input_length = tf.cast(tf.shape(features)[0], tf.int32) - return features, input_length, label, label_length, prediction, prediction_length + return path, features, input_length, label, label_length, prediction, prediction_length - def process(self, dataset, batch_size): + def process(self, dataset: tf.data.Dataset, batch_size: int): dataset = dataset.map(self.parse, num_parallel_calls=AUTOTUNE) if self.cache: @@ -115,6 +127,7 @@ def process(self, dataset, batch_size): dataset = dataset.padded_batch( batch_size=batch_size, padded_shapes=( + tf.TensorShape([]), tf.TensorShape(self.speech_featurizer.shape), tf.TensorShape([]), tf.TensorShape([None]), @@ -122,8 +135,8 @@ def process(self, dataset, batch_size): tf.TensorShape([None]), tf.TensorShape([]), ), - padding_values=(0., 0, self.text_featurizer.blank, 0, self.text_featurizer.blank, 0), - drop_remainder=True + padding_values=("", 0., 0, self.text_featurizer.blank, 0, self.text_featurizer.blank, 0), + drop_remainder=self.drop_remainder ) # PREFETCH to improve speed of input length @@ -131,17 +144,24 @@ def process(self, dataset, batch_size): self.total_steps = get_num_batches(self.total_steps, batch_size) return dataset - @abc.abstractmethod - def parse(self, *args, **kwargs): + @tf.function + def parse(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): """ Returns: path, features, input_lengths, labels, label_lengths, pred_inp """ - raise NotImplementedError() - - @abc.abstractmethod - def create(self, batch_size): - raise NotImplementedError() + if self.use_tf: return self.tf_preprocess(path, audio, indices) + return self.preprocess(path, audio, indices) + + def create(self, batch_size: int): + self.read_entries() + if not self.total_steps or self.total_steps == 0: return None + dataset = tf.data.Dataset.from_generator( + self.generator, + output_types=(tf.string, tf.string, tf.string), + output_shapes=(tf.TensorShape([]), tf.TensorShape([]), tf.TensorShape([])) + ) + return self.process(dataset, batch_size) class ASRTFRecordDataset(ASRDataset): @@ -157,16 +177,33 @@ def __init__(self, tfrecords_shards: int = TFRECORD_SHARDS, cache: bool = False, shuffle: bool = False, + drop_remainder: bool = True, buffer_size: int = BUFFER_SIZE): super(ASRTFRecordDataset, self).__init__( stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, - data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, buffer_size=buffer_size + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, buffer_size=buffer_size, + drop_remainder=drop_remainder ) self.tfrecords_dir = tfrecords_dir if tfrecords_shards <= 0: raise ValueError("tfrecords_shards must be positive") self.tfrecords_shards = tfrecords_shards - if not tf.io.gfile.exists(self.tfrecords_dir): - tf.io.gfile.makedirs(self.tfrecords_dir) + if not tf.io.gfile.exists(self.tfrecords_dir): tf.io.gfile.makedirs(self.tfrecords_dir) + + @staticmethod + def write_tfrecord_file(splitted_entries): + shard_path, entries = splitted_entries + with tf.io.TFRecordWriter(shard_path, options='ZLIB') as out: + for path, _, indices in entries: + audio = load_and_convert_to_wav(path).numpy() + feature = { + "path": bytestring_feature([bytes(path, "utf-8")]), + "audio": bytestring_feature([audio]), + "indices": bytestring_feature([bytes(indices, "utf-8")]) + } + example = tf.train.Example(features=tf.train.Features(feature=feature)) + out.write(example.SerializeToString()) + print_one_line("Processed:", path) + print(f"\nCreated {shard_path}") def create_tfrecords(self): if not tf.io.gfile.exists(self.tfrecords_dir): @@ -178,38 +215,32 @@ def create_tfrecords(self): print(f"Creating {self.stage}.tfrecord ...") - entries = self.read_entries() - if len(entries) <= 0: - return False + self.read_entries() + if not self.total_steps or self.total_steps == 0: return False def get_shard_path(shard_id): return os.path.join(self.tfrecords_dir, f"{self.stage}_{shard_id}.tfrecord") shards = [get_shard_path(idx) for idx in range(1, self.tfrecords_shards + 1)] - splitted_entries = np.array_split(entries, self.tfrecords_shards) + splitted_entries = np.array_split(self.entries, self.tfrecords_shards) with multiprocessing.Pool(self.tfrecords_shards) as pool: - pool.map(write_tfrecord_file, zip(shards, splitted_entries)) + pool.map(self.write_tfrecord_file, zip(shards, splitted_entries)) return True @tf.function - def parse(self, record): + def parse(self, record: tf.Tensor): feature_description = { "path": tf.io.FixedLenFeature([], tf.string), "audio": tf.io.FixedLenFeature([], tf.string), - "transcript": tf.io.FixedLenFeature([], tf.string) + "indices": tf.io.FixedLenFeature([], tf.string) } example = tf.io.parse_single_example(record, feature_description) + if self.use_tf: return self.tf_preprocess(**example) + return self.preprocess(**example) - return tf.numpy_function( - self.preprocess, - inp=[example["audio"], example["transcript"]], - Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] - ) - - def create(self, batch_size): - # Create TFRecords dataset + def create(self, batch_size: int): have_data = self.create_tfrecords() if not have_data: return None @@ -226,148 +257,15 @@ def create(self, batch_size): class ASRSliceDataset(ASRDataset): """ Dataset for ASR using Slice """ - def preprocess(self, path, transcript): - return super(ASRSliceDataset, self).preprocess(path.decode("utf-8"), transcript) - - @tf.function - def parse(self, record): - return tf.numpy_function( - self.preprocess, - inp=[record[0], record[1]], - Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] - ) - - def create(self, batch_size): - entries = self.read_entries() - if len(entries) == 0: return None - entries = np.delete(entries, 1, 1) # Remove unused duration - dataset = tf.data.Dataset.from_tensor_slices(entries) - return self.process(dataset, batch_size) - - -class ASRTFRecordTestDataset(ASRTFRecordDataset): - def preprocess(self, path, audio, transcript): - with tf.device("/CPU:0"): - signal = read_raw_audio(audio, self.speech_featurizer.sample_rate) - - features = self.speech_featurizer.extract(signal) - features = tf.convert_to_tensor(features, tf.float32) - input_length = tf.cast(tf.shape(features)[0], tf.int32) - - label = self.text_featurizer.extract(transcript.decode("utf-8")) - label = tf.convert_to_tensor(label, dtype=tf.int32) - - return path, features, input_length, label - - @tf.function - def parse(self, record): - feature_description = { - "path": tf.io.FixedLenFeature([], tf.string), - "audio": tf.io.FixedLenFeature([], tf.string), - "transcript": tf.io.FixedLenFeature([], tf.string) - } - example = tf.io.parse_single_example(record, feature_description) - - return tf.numpy_function( - self.preprocess, - inp=[example["path"], example["audio"], example["transcript"]], - Tout=[tf.string, tf.float32, tf.int32, tf.int32] - ) - - def process(self, dataset, batch_size): - dataset = dataset.map(self.parse, num_parallel_calls=AUTOTUNE) - - if self.cache: - dataset = dataset.cache() - - if self.shuffle: - dataset = dataset.shuffle(self.buffer_size, reshuffle_each_iteration=True) - - # PADDED BATCH the dataset - dataset = dataset.padded_batch( - batch_size=batch_size, - padded_shapes=( - tf.TensorShape([]), - tf.TensorShape(self.speech_featurizer.shape), - tf.TensorShape([]), - tf.TensorShape([None]), - ), - padding_values=("", 0.0, 0, self.text_featurizer.blank), - drop_remainder=True - ) - - # PREFETCH to improve speed of input length - dataset = dataset.prefetch(AUTOTUNE) - self.total_steps = get_num_batches(self.total_steps, batch_size) - return dataset - - def create(self, batch_size): - # Create TFRecords dataset - have_data = self.create_tfrecords() - if not have_data: return None - - pattern = os.path.join(self.tfrecords_dir, f"{self.stage}*.tfrecord") - files_ds = tf.data.Dataset.list_files(pattern) - ignore_order = tf.data.Options() - ignore_order.experimental_deterministic = False - files_ds = files_ds.with_options(ignore_order) - dataset = tf.data.TFRecordDataset(files_ds, compression_type='ZLIB', num_parallel_reads=AUTOTUNE) - - return self.process(dataset, batch_size) - - -class ASRSliceTestDataset(ASRDataset): - def preprocess(self, path, transcript): - with tf.device("/CPU:0"): - signal = read_raw_audio(path.decode("utf-8"), self.speech_featurizer.sample_rate) - - features = self.speech_featurizer.extract(signal) - features = tf.convert_to_tensor(features, tf.float32) - input_length = tf.cast(tf.shape(features)[0], tf.int32) - - label = self.text_featurizer.extract(transcript.decode("utf-8")) - label = tf.convert_to_tensor(label, dtype=tf.int32) - - return path, features, input_length, label - - @tf.function - def parse(self, record): - return tf.numpy_function( - self.preprocess, - inp=[record[0], record[1]], - Tout=[tf.string, tf.float32, tf.int32, tf.int32] - ) - - def process(self, dataset, batch_size): - dataset = dataset.map(self.parse, num_parallel_calls=AUTOTUNE) - - if self.cache: - dataset = dataset.cache() - - if self.shuffle: - dataset = dataset.shuffle(self.buffer_size, reshuffle_each_iteration=True) - - # PADDED BATCH the dataset - dataset = dataset.padded_batch( - batch_size=batch_size, - padded_shapes=( - tf.TensorShape([]), - tf.TensorShape(self.speech_featurizer.shape), - tf.TensorShape([]), - tf.TensorShape([None]), - ), - padding_values=("", 0.0, 0, self.text_featurizer.blank), - drop_remainder=True - ) - - # PREFETCH to improve speed of input length - dataset = dataset.prefetch(AUTOTUNE) - self.total_steps = get_num_batches(self.total_steps, batch_size) - return dataset - - def create(self, batch_size): - entries = self.read_entries() - if len(entries) == 0: return None - entries = np.delete(entries, 1, 1) # Remove unused duration - dataset = tf.data.Dataset.from_tensor_slices(entries) + @staticmethod + def load(record: tf.Tensor): + def fn(path: bytes): return load_and_convert_to_wav(path.decode("utf-8")).numpy() + audio = tf.numpy_function(fn, inp=[record[0]], Tout=tf.string) + return record[0], audio, record[2] + + def create(self, batch_size: int): + self.read_entries() + if not self.total_steps or self.total_steps == 0: return None + dataset = tf.data.Dataset.from_tensor_slices(self.entries) + dataset = dataset.map(self.load, num_parallel_calls=AUTOTUNE) return self.process(dataset, batch_size) diff --git a/tensorflow_asr/datasets/base_dataset.py b/tensorflow_asr/datasets/base_dataset.py index d5e018de1a..93ea112c95 100644 --- a/tensorflow_asr/datasets/base_dataset.py +++ b/tensorflow_asr/datasets/base_dataset.py @@ -27,6 +27,7 @@ def __init__(self, cache: bool = False, shuffle: bool = False, buffer_size: int = BUFFER_SIZE, + drop_remainder: bool = True, stage: str = "train"): self.data_paths = data_paths self.augmentations = augmentations # apply augmentation @@ -36,6 +37,8 @@ def __init__(self, raise ValueError("buffer_size must be positive when shuffle is on") self.buffer_size = buffer_size # shuffle buffer size self.stage = stage # for defining tfrecords files + self.use_tf = self.augmentations.use_tf + self.drop_remainder = drop_remainder # whether to drop remainder for multi gpu training self.total_steps = None # for better training visualization @abc.abstractmethod diff --git a/tensorflow_asr/datasets/keras/__init__.py b/tensorflow_asr/datasets/keras/__init__.py index c0a7bc02e1..5aee10fa36 100644 --- a/tensorflow_asr/datasets/keras/__init__.py +++ b/tensorflow_asr/datasets/keras/__init__.py @@ -12,5 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .asr_dataset import ASRTFRecordDatasetKeras, ASRSliceDatasetKeras -__all__ = ['ASRTFRecordDatasetKeras', 'ASRSliceDatasetKeras'] +from .asr_dataset import ASRDatasetKeras, ASRTFRecordDatasetKeras, ASRSliceDatasetKeras +__all__ = ['ASRDatasetKeras', 'ASRTFRecordDatasetKeras', 'ASRSliceDatasetKeras'] diff --git a/tensorflow_asr/datasets/keras/asr_dataset.py b/tensorflow_asr/datasets/keras/asr_dataset.py index a7cb310c08..91fd27cf7c 100644 --- a/tensorflow_asr/datasets/keras/asr_dataset.py +++ b/tensorflow_asr/datasets/keras/asr_dataset.py @@ -12,12 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import multiprocessing import tensorflow as tf -import numpy as np -from ..asr_dataset import ASRDataset, AUTOTUNE, TFRECORD_SHARDS, write_tfrecord_file +from ..asr_dataset import ASRDataset, ASRTFRecordDataset, ASRSliceDataset, AUTOTUNE, TFRECORD_SHARDS from ..base_dataset import BUFFER_SIZE from ...featurizers.speech_featurizers import SpeechFeaturizer from ...featurizers.text_featurizers import TextFeaturizer @@ -26,6 +23,33 @@ class ASRDatasetKeras(ASRDataset): + """ Keras Dataset for ASR using Generator """ + + @tf.function + def parse(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): + """ + Returns: + path, features, input_lengths, labels, label_lengths, pred_inp + """ + if self.use_tf: data = self.tf_preprocess(path, audio, indices) + else: data = self.preprocess(path, audio, indices) + + path, features, input_length, label, label_length, prediction, prediction_length = data + + return ( + { + "path": path, + "input": features, + "input_length": input_length, + "prediction": prediction, + "prediction_length": prediction_length + }, + { + "label": label, + "label_length": label_length + } + ) + def process(self, dataset, batch_size): dataset = dataset.map(self.parse, num_parallel_calls=AUTOTUNE) @@ -40,6 +64,7 @@ def process(self, dataset, batch_size): batch_size=batch_size, padded_shapes=( { + "path": tf.TensorShape([]), "input": tf.TensorShape(self.speech_featurizer.shape), "input_length": tf.TensorShape([]), "prediction": tf.TensorShape([None]), @@ -52,6 +77,7 @@ def process(self, dataset, batch_size): ), padding_values=( { + "path": "", "input": 0., "input_length": 0, "prediction": self.text_featurizer.blank, @@ -62,7 +88,7 @@ def process(self, dataset, batch_size): "label_length": 0 } ), - drop_remainder=True + drop_remainder=self.drop_remainder ) # PREFETCH to improve speed of input length @@ -71,7 +97,7 @@ def process(self, dataset, batch_size): return dataset -class ASRTFRecordDatasetKeras(ASRDatasetKeras): +class ASRTFRecordDatasetKeras(ASRDatasetKeras, ASRTFRecordDataset): """ Keras Dataset for ASR using TFRecords """ def __init__(self, @@ -84,116 +110,60 @@ def __init__(self, tfrecords_shards: int = TFRECORD_SHARDS, cache: bool = False, shuffle: bool = False, + drop_remainder: bool = True, buffer_size: int = BUFFER_SIZE): - super(ASRTFRecordDatasetKeras, self).__init__( - stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, - data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, buffer_size=buffer_size + ASRTFRecordDataset.__init__( + self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, + data_paths=data_paths, tfrecords_dir=tfrecords_dir, augmentations=augmentations, cache=cache, shuffle=shuffle, + tfrecords_shards=tfrecords_shards, drop_remainder=drop_remainder, buffer_size=buffer_size + ) + ASRDatasetKeras.__init__( + self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, + drop_remainder=drop_remainder, buffer_size=buffer_size ) - self.tfrecords_dir = tfrecords_dir - if tfrecords_shards <= 0: raise ValueError("tfrecords_shards must be positive") - self.tfrecords_shards = tfrecords_shards - if not tf.io.gfile.exists(self.tfrecords_dir): - tf.io.gfile.makedirs(self.tfrecords_dir) - - def create_tfrecords(self): - if not tf.io.gfile.exists(self.tfrecords_dir): - tf.io.gfile.makedirs(self.tfrecords_dir) - - if tf.io.gfile.glob(os.path.join(self.tfrecords_dir, f"{self.stage}*.tfrecord")): - print(f"TFRecords're already existed: {self.stage}") - return True - - print(f"Creating {self.stage}.tfrecord ...") - - entries = self.read_entries() - if len(entries) <= 0: - return False - - def get_shard_path(shard_id): - return os.path.join(self.tfrecords_dir, f"{self.stage}_{shard_id}.tfrecord") - - shards = [get_shard_path(idx) for idx in range(1, self.tfrecords_shards + 1)] - - splitted_entries = np.array_split(entries, self.tfrecords_shards) - with multiprocessing.Pool(self.tfrecords_shards) as pool: - pool.map(write_tfrecord_file, zip(shards, splitted_entries)) - - return True @tf.function - def parse(self, record): + def parse(self, record: tf.Tensor): feature_description = { "path": tf.io.FixedLenFeature([], tf.string), "audio": tf.io.FixedLenFeature([], tf.string), - "transcript": tf.io.FixedLenFeature([], tf.string) + "indices": tf.io.FixedLenFeature([], tf.string) } example = tf.io.parse_single_example(record, feature_description) + return ASRDatasetKeras.parse(self, **example) - features, input_length, label, label_length, \ - prediction, prediction_length = tf.numpy_function( - self.preprocess, - inp=[example["audio"], example["transcript"]], - Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] - ) + def process(self, dataset: tf.data.Dataset, batch_size: int): + return ASRDatasetKeras.process(self, dataset, batch_size) - return ( - { - "input": features, - "input_length": input_length, - "prediction": prediction, - "prediction_length": prediction_length - }, - { - "label": label, - "label_length": label_length - } - ) - - def create(self, batch_size): - # Create TFRecords dataset - have_data = self.create_tfrecords() - if not have_data: return None - - pattern = os.path.join(self.tfrecords_dir, f"{self.stage}*.tfrecord") - files_ds = tf.data.Dataset.list_files(pattern) - ignore_order = tf.data.Options() - ignore_order.experimental_deterministic = False - files_ds = files_ds.with_options(ignore_order) - dataset = tf.data.TFRecordDataset(files_ds, compression_type='ZLIB', num_parallel_reads=AUTOTUNE) - - return self.process(dataset, batch_size) - -class ASRSliceDatasetKeras(ASRDatasetKeras): +class ASRSliceDatasetKeras(ASRDatasetKeras, ASRSliceDataset): """ Keras Dataset for ASR using Slice """ - def preprocess(self, path, transcript): - return super(ASRSliceDatasetKeras, self).preprocess(path.decode("utf-8"), transcript) + def __init__(self, + stage: str, + speech_featurizer: SpeechFeaturizer, + text_featurizer: TextFeaturizer, + data_paths: list, + augmentations: Augmentation = Augmentation(None), + cache: bool = False, + shuffle: bool = False, + drop_remainder: bool = True, + buffer_size: int = BUFFER_SIZE): + ASRSliceDataset.__init__( + self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, + drop_remainder=drop_remainder, buffer_size=buffer_size + ) + ASRDatasetKeras.__init__( + self, stage=stage, speech_featurizer=speech_featurizer, text_featurizer=text_featurizer, + data_paths=data_paths, augmentations=augmentations, cache=cache, shuffle=shuffle, + drop_remainder=drop_remainder, buffer_size=buffer_size + ) @tf.function - def parse(self, record): - features, input_length, label, label_length, \ - prediction, prediction_length = tf.numpy_function( - self.preprocess, - inp=[record[0], record[1]], - Tout=[tf.float32, tf.int32, tf.int32, tf.int32, tf.int32, tf.int32] - ) - return ( - { - "input": features, - "input_length": input_length, - "prediction": prediction, - "prediction_length": prediction_length - }, - { - "label": label, - "label_length": label_length - } - ) + def parse(self, path: tf.Tensor, audio: tf.Tensor, indices: tf.Tensor): + return ASRDatasetKeras.parse(self, path, audio, indices) - def create(self, batch_size): - entries = self.read_entries() - if len(entries) == 0: return None - entries = np.delete(entries, 1, 1) # Remove unused duration - dataset = tf.data.Dataset.from_tensor_slices(entries) - return self.process(dataset, batch_size) + def process(self, dataset: tf.data.Dataset, batch_size: int): + return ASRDatasetKeras.process(self, dataset, batch_size) diff --git a/tensorflow_asr/featurizers/speech_featurizers.py b/tensorflow_asr/featurizers/speech_featurizers.py index f8ffaf83f7..c5a65be77d 100755 --- a/tensorflow_asr/featurizers/speech_featurizers.py +++ b/tensorflow_asr/featurizers/speech_featurizers.py @@ -19,11 +19,17 @@ import librosa import soundfile as sf import tensorflow as tf +import tensorflow_io as tfio from ..utils.utils import log10 from .gammatone import fft_weights +def load_and_convert_to_wav(path: str) -> tf.Tensor: + data = tfio.audio.AudioIOTensor(path, dtype=tf.float32) + return tfio.audio.encode_wav(data.to_tensor(), rate=tf.cast(data.rate, dtype=tf.int64)) + + def read_raw_audio(audio, sample_rate=16000): if isinstance(audio, str): wave, _ = librosa.load(os.path.expanduser(audio), sr=sample_rate, mono=True) @@ -40,6 +46,11 @@ def read_raw_audio(audio, sample_rate=16000): return wave +def tf_read_raw_audio(audio: tf.Tensor, sample_rate=16000): + wave, _ = tf.audio.decode_wav(audio, desired_channels=1, desired_samples=sample_rate) + return tf.squeeze(wave, axis=-1) + + def slice_signal(signal, window_size, stride=0.5) -> np.ndarray: """ Return windows of the given signal by sweeping in stride fractions of window """ assert signal.ndim == 1, signal.ndim diff --git a/tensorflow_asr/losses/rnnt_losses.py b/tensorflow_asr/losses/rnnt_losses.py index bebdc5b536..4d72b95083 100644 --- a/tensorflow_asr/losses/rnnt_losses.py +++ b/tensorflow_asr/losses/rnnt_losses.py @@ -14,12 +14,16 @@ # RNNT loss implementation in pure TensorFlow is borrowed from [iamjanvijay's repo](https://github.com/iamjanvijay/rnnt) import tensorflow as tf + +from ..utils.utils import has_gpu_or_tpu + +use_cpu = not has_gpu_or_tpu() + try: from warprnnt_tensorflow import rnnt_loss as warp_rnnt_loss use_warprnnt = True except ImportError: print("Cannot import RNNT loss in warprnnt. Falls back to RNNT in TensorFlow") - print("Note: The RNNT in Tensorflow is not supported for CPU yet") from tensorflow.python.ops.gen_array_ops import matrix_diag_part_v2 use_warprnnt = False @@ -208,7 +212,7 @@ def compute_rnnt_loss_and_grad_helper(logits, labels, label_length, logit_length a = tf.tile(tf.reshape(tf.range(target_max_len - 1, dtype=tf.int64), shape=(1, 1, target_max_len - 1, 1)), multiples=[batch_size, 1, 1, 1]) b = tf.cast(tf.reshape(labels - 1, shape=(batch_size, 1, target_max_len - 1, 1)), dtype=tf.int64) - # b = tf.where(tf.equal(b, -1), tf.zeros_like(b), b) # for cpu testing (index -1 on cpu will raise errors) + if use_cpu: b = tf.where(tf.equal(b, -1), tf.zeros_like(b), b) # for cpu testing (index -1 on cpu will raise errors) c = tf.concat([a, b], axis=3) d = tf.tile(c, multiples=(1, input_max_len, 1, 1)) e = tf.tile(tf.reshape(tf.range(input_max_len, dtype=tf.int64), shape=(1, input_max_len, 1, 1)), diff --git a/tensorflow_asr/models/keras/transducer.py b/tensorflow_asr/models/keras/transducer.py index 59e206aaa1..3ce201d189 100644 --- a/tensorflow_asr/models/keras/transducer.py +++ b/tensorflow_asr/models/keras/transducer.py @@ -62,7 +62,12 @@ def compile(self, optimizer, global_batch_size, blank=0, def train_step(self, batch): x, y_true = batch with tf.GradientTape() as tape: - y_pred = self(x, training=True) + y_pred = self({ + "input": x["input"], + "input_length": x["input_length"], + "prediction": x["prediction"], + "prediction_length": x["prediction_length"], + }, training=True) loss = self.loss(y_true, y_pred) scaled_loss = self.optimizer.get_scaled_loss(loss) scaled_gradients = tape.gradient(scaled_loss, self.trainable_weights) @@ -72,6 +77,11 @@ def train_step(self, batch): def test_step(self, batch): x, y_true = batch - y_pred = self(x, training=False) + y_pred = self({ + "input": x["input"], + "input_length": x["input_length"], + "prediction": x["prediction"], + "prediction_length": x["prediction_length"], + }, training=False) loss = self.loss(y_true, y_pred) return {"val_rnnt_loss": loss} diff --git a/tensorflow_asr/runners/base_runners.py b/tensorflow_asr/runners/base_runners.py index 7c7dcee83a..748f33c316 100644 --- a/tensorflow_asr/runners/base_runners.py +++ b/tensorflow_asr/runners/base_runners.py @@ -436,7 +436,7 @@ def _test_step(self, batch): Returns: (file_paths, groundtruth, greedy, beamsearch, beamsearch_lm) each has shape [B] """ - file_paths, features, input_length, labels = batch + file_paths, features, input_length, labels, _, _, _ = batch labels = self.model.text_featurizer.iextract(labels) input_length = get_reduced_length(input_length, self.model.time_reduction_factor) diff --git a/tensorflow_asr/runners/ctc_runners.py b/tensorflow_asr/runners/ctc_runners.py index a892b3ef44..ddc2d01dbd 100644 --- a/tensorflow_asr/runners/ctc_runners.py +++ b/tensorflow_asr/runners/ctc_runners.py @@ -49,7 +49,7 @@ def save_model_weights(self): @tf.function(experimental_relax_shapes=True) def _train_step(self, batch): - features, input_length, labels, label_length, _, _ = batch + _, features, input_length, labels, label_length, _, _ = batch with tf.GradientTape() as tape: y_pred = self.model(features, training=True) @@ -70,7 +70,7 @@ def _train_step(self, batch): @tf.function(experimental_relax_shapes=True) def _eval_step(self, batch): - features, input_length, labels, label_length, _, _ = batch + _, features, input_length, labels, label_length, _, _ = batch logits = self.model(features, training=False) @@ -111,7 +111,7 @@ def _apply_gradients(self): @tf.function(experimental_relax_shapes=True) def _train_step(self, batch): - features, input_length, labels, label_length, _, _ = batch + _, features, input_length, labels, label_length, _, _ = batch with tf.GradientTape() as tape: y_pred = self.model(features, training=True) diff --git a/tensorflow_asr/runners/transducer_runners.py b/tensorflow_asr/runners/transducer_runners.py index b450ad7120..d8e396be56 100644 --- a/tensorflow_asr/runners/transducer_runners.py +++ b/tensorflow_asr/runners/transducer_runners.py @@ -47,7 +47,7 @@ def save_model_weights(self): @tf.function(experimental_relax_shapes=True) def _train_step(self, batch): - features, input_length, labels, label_length, prediction, prediction_length = batch + _, features, input_length, labels, label_length, prediction, prediction_length = batch with tf.GradientTape() as tape: logits = self.model([features, input_length, prediction, prediction_length], training=True) @@ -67,7 +67,7 @@ def _train_step(self, batch): @tf.function(experimental_relax_shapes=True) def _eval_step(self, batch): - features, input_length, labels, label_length, prediction, prediction_length = batch + _, features, input_length, labels, label_length, prediction, prediction_length = batch logits = self.model([features, input_length, prediction, prediction_length], training=False) eval_loss = rnnt_loss( @@ -106,7 +106,7 @@ def _apply_gradients(self): @tf.function(experimental_relax_shapes=True) def _train_step(self, batch): - features, input_length, labels, label_length, prediction, prediction_length = batch + _, features, input_length, labels, label_length, prediction, prediction_length = batch with tf.GradientTape() as tape: logits = self.model([features, input_length, prediction, prediction_length], training=True) diff --git a/tensorflow_asr/utils/utils.py b/tensorflow_asr/utils/utils.py index ff27ef7d2d..1d813261bb 100755 --- a/tensorflow_asr/utils/utils.py +++ b/tensorflow_asr/utils/utils.py @@ -103,10 +103,10 @@ def read_bytes(path: str) -> tf.Tensor: return tf.convert_to_tensor(content, dtype=tf.string) -def shape_list(x): +def shape_list(x, out_type=tf.int32): """Deal with dynamic shape in tensorflow cleanly.""" static = x.shape.as_list() - dynamic = tf.shape(x) + dynamic = tf.shape(x, out_type=out_type) return [dynamic[i] if s is None else s for i, s in enumerate(static)] @@ -160,3 +160,10 @@ def get_reduced_length(length, reduction_factor): def count_non_blank(tensor: tf.Tensor, blank: int or tf.Tensor = 0, axis=None): return tf.reduce_sum(tf.where(tf.not_equal(tensor, blank), x=tf.ones_like(tensor), y=tf.zeros_like(tensor)), axis=axis) + + +def has_gpu_or_tpu(): + gpus = tf.config.list_logical_devices("GPU") + tpus = tf.config.list_logical_devices("TPU") + if len(gpus) == 0 and len(tpus) == 0: return False + return True