In [None]:
# src.wav2vec2.config
import json
import os
from dataclasses import asdict, dataclass, field

@dataclass
class Wav2Vec2Config:
  vocab_size: int = 32
  dropout: int = 0.1
  hidden_size: int = 768
  num_heads: int = 12
  num_layers: int = 12
  intermediate_size: int = 3072
  is_gelu_approx: bool = False
  layer_norm_eps: float = 1e-5
  survival_prob: float = 1.0
  pad_id: int = 0


  # positional embedding
  num_conv_pos_embeddings: int = 128
  num_conv_pos_embedding_groups: int = 16

  # feature extractor
  filter_sizes: list = field(
      default_factory=lambda: [512, 512, 512, 512, 512, 512, 512]
  )
  kernal_sizes: list = field(default_factory=lambda: [10, 3, 3, 3, 3, 2, 2])
  strides: list = field(default_factory=lambda: [5, 2, 2, 2, 2, 2, 2])
  conv_bias: bool = False

  # spec augmentation arguments
  apply_spec_augment: bool = True
  mask_time_prob: float = 0.05
  mask_time_length: int = 10

  attention_norm_type: str = "postnorm"
  feature_extractor_norm_type: bool = "group"
  is_robust: bool = False

  def __post_init__(self):
      if not (len(self.filter_sizes) == len(self.kernal_sizes) == len(self.strides)):
          raise ValueError(
              "Length of filter_sizes, kernal_sizes, strides must match."
          )
      if self.hidden_size % self.num_heads != 0:
          raise ValueError("Hidden size must be perfect multiple of num_heads.")

      assert self.feature_extractor_norm_type in ["group", "layer"], "Only `group` / `layer` are supported"
      assert self.attention_norm_type in ["prenorm", "postnorm"], "Only `prenorm` / `postnorm` are supported"

  def save_pretrained(self, save_dir):
      os.makedirs(save_dir, exist_ok=True)
      with open(os.path.join(save_dir, "config.json"), "w") as f:
          json.dump(asdict(self), f)

  @classmethod
  def from_json(cls, path: str):
      with open(path, "r") as f:
          config_dict = json.load(f)
      return cls(**config_dict)


@dataclass
class RobustWav2Vec2Config(Wav2Vec2Config):
    attention_norm_type: str = "prenorm"
    feature_extractor_norm_type: str = "layer"
    is_robust: bool = True
    conv_bias: bool = True

    hidden_size: int = 1024
    intermediate_size: int = 4096
    num_heads: int = 16
    num_layers: int = 24

In [None]:
#                                          tensorflow_addons.py
import tensorflow as tf
from typeguard import typechecked


class Conv1DWithWeightNorm(tf.keras.layers.Conv1D):
    """
    Adapted from `tensorflow_addons.layers.WeightNormalization`
    torch.nn.WeightNorm works slightly different. So, it's better to implement it
    """

    def __init__(self, *args, **kwargs):
        self._padding = kwargs.pop("padding")
        self.filter_axis = 0
        super().__init__(*args, **kwargs)

    def _compute_kernel(self):
        """Generate weights with normalization."""
        self.kernel = (
            tf.nn.l2_normalize(self.weight_v, axis=self.kernel_norm_axes)
            * self.weight_g
        )

    def build(self, input_shape):
        super().build(input_shape)

        kernel_norm_axes = list(range(self.kernel.shape.rank))
        kernel_norm_axes.pop(self.filter_axis)
        self.kernel_norm_axes = kernel_norm_axes

        # renaming kernal variable for making similar to torch-weight-norm
        self.kernel = tf.Variable(self.kernel, name="weight_v", trainable=True)
        self.weight_v = self.kernel

        self._init_weight_g()

    def _init_weight_g(self):
        """Set the norm of the weight vector."""
        self.weight_g = self.add_weight(
            name="weight_g",
            shape=(int(self.weight_v.shape[self.filter_axis]), 1, 1),
            initializer="ones",
            dtype=self.weight_v.dtype,
            trainable=True,
        )
        kernel_norm = tf.sqrt(
            tf.reduce_sum(tf.square(self.weight_v), axis=self.kernel_norm_axes)
        )
        self.weight_g.assign(kernel_norm[:, tf.newaxis, tf.newaxis])

    def call(self, inputs):
        self._compute_kernel()
        output = tf.pad(inputs, ((0, 0), (self._padding, self._padding), (0, 0)))
        return super().call(output)

    def get_config(self):
        config = super().get_config()
        config.update({"padding": self._padding})
        return config


# FOLLOWING CODE IS DIRECTLY TAKEN FROM OFFICIAL TENSORFLOW_ADDONS
# ITS TAKEN TO MAKE THIS PROJECT INDEPENDENT OF TENSORFLOW VERSION
# CURRENTLY, TENSORFLOW_ADDONS DOESN'T WORK WITH TF>=2.5


class GroupNormalization(tf.keras.layers.Layer):
    """Group normalization layer.
    Source: "Group Normalization" (Yuxin Wu & Kaiming He, 2018)
    https://arxiv.org/abs/1803.08494
    Group Normalization divides the channels into groups and computes
    within each group the mean and variance for normalization.
    Empirically, its accuracy is more stable than batch norm in a wide
    range of small batch sizes, if learning rate is adjusted linearly
    with batch sizes.
    Relation to Layer Normalization:
    If the number of groups is set to 1, then this operation becomes identical
    to Layer Normalization.
    Relation to Instance Normalization:
    If the number of groups is set to the
    input dimension (number of groups is equal
    to number of channels), then this operation becomes
    identical to Instance Normalization.
    Args:
        groups: Integer, the number of groups for Group Normalization.
            Can be in the range [1, N] where N is the input dimension.
            The input dimension must be divisible by the number of groups.
            Defaults to 32.
        axis: Integer, the axis that should be normalized.
        epsilon: Small float added to variance to avoid dividing by zero.
        center: If True, add offset of `beta` to normalized tensor.
            If False, `beta` is ignored.
        scale: If True, multiply by `gamma`.
            If False, `gamma` is not used.
        beta_initializer: Initializer for the beta weight.
        gamma_initializer: Initializer for the gamma weight.
        beta_regularizer: Optional regularizer for the beta weight.
        gamma_regularizer: Optional regularizer for the gamma weight.
        beta_constraint: Optional constraint for the beta weight.
        gamma_constraint: Optional constraint for the gamma weight.
    Input shape:
        Arbitrary. Use the keyword argument `input_shape`
        (tuple of integers, does not include the samples axis)
        when using this layer as the first layer in a model.
    Output shape:
        Same shape as input.
    """

    @typechecked
    def __init__(
        self,
        groups: int = 32,
        axis: int = -1,
        epsilon: float = 1e-3,
        center: bool = True,
        scale: bool = True,
        beta_initializer="zeros",
        gamma_initializer="ones",
        beta_regularizer=None,
        gamma_regularizer=None,
        beta_constraint=None,
        gamma_constraint=None,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.supports_masking = True
        self.groups = groups
        self.axis = axis
        self.epsilon = epsilon
        self.center = center
        self.scale = scale
        self.beta_initializer = tf.keras.initializers.get(beta_initializer)
        self.gamma_initializer = tf.keras.initializers.get(gamma_initializer)
        self.beta_regularizer = tf.keras.regularizers.get(beta_regularizer)
        self.gamma_regularizer = tf.keras.regularizers.get(gamma_regularizer)
        self.beta_constraint = tf.keras.constraints.get(beta_constraint)
        self.gamma_constraint = tf.keras.constraints.get(gamma_constraint)
        self._check_axis()

    def build(self, input_shape):

        self._check_if_input_shape_is_none(input_shape)
        self._set_number_of_groups_for_instance_norm(input_shape)
        self._check_size_of_dimensions(input_shape)
        self._create_input_spec(input_shape)

        self._add_gamma_weight(input_shape)
        self._add_beta_weight(input_shape)
        self.built = True
        super().build(input_shape)

    def call(self, inputs):

        input_shape = tf.keras.backend.int_shape(inputs)
        tensor_input_shape = tf.shape(inputs)

        reshaped_inputs, group_shape = self._reshape_into_groups(
            inputs, input_shape, tensor_input_shape
        )

        normalized_inputs = self._apply_normalization(reshaped_inputs, input_shape)

        is_instance_norm = (input_shape[self.axis] // self.groups) == 1
        if not is_instance_norm:
            outputs = tf.reshape(normalized_inputs, tensor_input_shape)
        else:
            outputs = normalized_inputs

        return outputs

    def get_config(self):
        config = {
            "groups": self.groups,
            "axis": self.axis,
            "epsilon": self.epsilon,
            "center": self.center,
            "scale": self.scale,
            "beta_initializer": tf.keras.initializers.serialize(self.beta_initializer),
            "gamma_initializer": tf.keras.initializers.serialize(
                self.gamma_initializer
            ),
            "beta_regularizer": tf.keras.regularizers.serialize(self.beta_regularizer),
            "gamma_regularizer": tf.keras.regularizers.serialize(
                self.gamma_regularizer
            ),
            "beta_constraint": tf.keras.constraints.serialize(self.beta_constraint),
            "gamma_constraint": tf.keras.constraints.serialize(self.gamma_constraint),
        }
        base_config = super().get_config()
        return {**base_config, **config}

    def compute_output_shape(self, input_shape):
        return input_shape

    def _reshape_into_groups(self, inputs, input_shape, tensor_input_shape):

        group_shape = [tensor_input_shape[i] for i in range(len(input_shape))]
        is_instance_norm = (input_shape[self.axis] // self.groups) == 1
        if not is_instance_norm:
            group_shape[self.axis] = input_shape[self.axis] // self.groups
            group_shape.insert(self.axis, self.groups)
            group_shape = tf.stack(group_shape)
            reshaped_inputs = tf.reshape(inputs, group_shape)
            return reshaped_inputs, group_shape
        else:
            return inputs, group_shape

    def _apply_normalization(self, reshaped_inputs, input_shape):

        group_shape = tf.keras.backend.int_shape(reshaped_inputs)
        group_reduction_axes = list(range(1, len(group_shape)))
        is_instance_norm = (input_shape[self.axis] // self.groups) == 1
        if not is_instance_norm:
            axis = -2 if self.axis == -1 else self.axis - 1
        else:
            axis = -1 if self.axis == -1 else self.axis - 1
        group_reduction_axes.pop(axis)

        mean, variance = tf.nn.moments(
            reshaped_inputs, group_reduction_axes, keepdims=True
        )

        gamma, beta = self._get_reshaped_weights(input_shape)
        normalized_inputs = tf.nn.batch_normalization(
            reshaped_inputs,
            mean=mean,
            variance=variance,
            scale=gamma,
            offset=beta,
            variance_epsilon=self.epsilon,
        )
        return normalized_inputs

    def _get_reshaped_weights(self, input_shape):
        broadcast_shape = self._create_broadcast_shape(input_shape)
        gamma = None
        beta = None
        if self.scale:
            gamma = tf.reshape(self.gamma, broadcast_shape)

        if self.center:
            beta = tf.reshape(self.beta, broadcast_shape)
        return gamma, beta

    def _check_if_input_shape_is_none(self, input_shape):
        dim = input_shape[self.axis]
        if dim is None:
            raise ValueError(
                "Axis " + str(self.axis) + " of "
                "input tensor should have a defined dimension "
                "but the layer received an input with shape " + str(input_shape) + "."
            )

    def _set_number_of_groups_for_instance_norm(self, input_shape):
        dim = input_shape[self.axis]

        if self.groups == -1:
            self.groups = dim

    def _check_size_of_dimensions(self, input_shape):

        dim = input_shape[self.axis]
        if dim < self.groups:
            raise ValueError(
                "Number of groups (" + str(self.groups) + ") cannot be "
                "more than the number of channels (" + str(dim) + ")."
            )

        if dim % self.groups != 0:
            raise ValueError(
                "Number of groups (" + str(self.groups) + ") must be a "
                "multiple of the number of channels (" + str(dim) + ")."
            )

    def _check_axis(self):

        if self.axis == 0:
            raise ValueError(
                "You are trying to normalize your batch axis. Do you want to "
                "use tf.layer.batch_normalization instead"
            )

    def _create_input_spec(self, input_shape):

        dim = input_shape[self.axis]
        self.input_spec = tf.keras.layers.InputSpec(
            ndim=len(input_shape), axes={self.axis: dim}
        )

    def _add_gamma_weight(self, input_shape):

        dim = input_shape[self.axis]
        shape = (dim,)

        if self.scale:
            self.gamma = self.add_weight(
                shape=shape,
                name="gamma",
                initializer=self.gamma_initializer,
                regularizer=self.gamma_regularizer,
                constraint=self.gamma_constraint,
            )
        else:
            self.gamma = None

    def _add_beta_weight(self, input_shape):

        dim = input_shape[self.axis]
        shape = (dim,)

        if self.center:
            self.beta = self.add_weight(
                shape=shape,
                name="beta",
                initializer=self.beta_initializer,
                regularizer=self.beta_regularizer,
                constraint=self.beta_constraint,
            )
        else:
            self.beta = None

    def _create_broadcast_shape(self, input_shape):
        broadcast_shape = [1] * len(input_shape)
        is_instance_norm = (input_shape[self.axis] // self.groups) == 1
        if not is_instance_norm:
            broadcast_shape[self.axis] = input_shape[self.axis] // self.groups
            broadcast_shape.insert(self.axis, self.groups)
        else:
            broadcast_shape[self.axis] = self.groups
        return broadcast_shape


class StochasticDepth(tf.keras.layers.Layer):
    """Stochastic Depth layer.
    Implements Stochastic Depth as described in
    [Deep Networks with Stochastic Depth](https://arxiv.org/abs/1603.09382), to randomly drop residual branches
    in residual architectures.
    Usage:
    Residual architectures with fixed depth, use residual branches that are merged back into the main network
    by adding the residual branch back to the input:
    >>> input = np.ones((1, 3, 3, 1), dtype = np.float32)
    >>> residual = tf.keras.layers.Conv2D(1, 1)(input)
    >>> output = tf.keras.layers.Add()([input, residual])
    >>> output.shape
    TensorShape([1, 3, 3, 1])
    StochasticDepth acts as a drop-in replacement for the addition:
    >>> input = np.ones((1, 3, 3, 1), dtype = np.float32)
    >>> residual = tf.keras.layers.Conv2D(1, 1)(input)
    >>> output = tfa.layers.StochasticDepth()([input, residual])
    >>> output.shape
    TensorShape([1, 3, 3, 1])
    At train time, StochasticDepth returns:
    $$
    x[0] + b_l * x[1],
    $$
    where $b_l$ is a random Bernoulli variable with probability $P(b_l = 1) = p_l$
    At test time, StochasticDepth rescales the activations of the residual branch based on the survival probability ($p_l$):
    $$
    x[0] + p_l * x[1]
    $$
    Args:
        survival_probability: float, the probability of the residual branch being kept.
    Call Args:
        inputs:  List of `[shortcut, residual]` where `shortcut`, and `residual` are tensors of equal shape.
    Output shape:
        Equal to the shape of inputs `shortcut`, and `residual`
    """

    @typechecked
    def __init__(self, survival_probability: float = 0.5, **kwargs):
        super().__init__(**kwargs)

        self.survival_probability = survival_probability

    def call(self, x, training=None):
        if not isinstance(x, list) or len(x) != 2:
            raise ValueError("input must be a list of length 2.")

        shortcut, residual = x

        # Random bernoulli variable indicating whether the branch should be kept or not or not
        b_l = tf.keras.backend.random_bernoulli([], p=self.survival_probability)

        def _call_train():
            return shortcut + b_l * residual

        def _call_test():
            # following line make this implementation differnet from `tensorflow_addons.StochasticDepth`
            # we can't multiply `self.survival_probability` with `residual` during test time
            # as it will disturb the fine-tuned weights if they are directly used with this architecture.
            return shortcut + residual

        return tf.keras.backend.in_train_phase(
            _call_train, _call_test, training=training
        )

    def compute_output_shape(self, input_shape):
        return input_shape[0]

    def get_config(self):
        base_config = super().get_config()

        config = {"survival_probability": self.survival_probability}

        return {**base_config, **config}

In [None]:
#                          encoder.py
import tensorflow as tf

class Conv1DWithWeightNorm(tf.keras.layers.Conv1D):
    """
    Adapted from `tensorflow_addons.layers.WeightNormalization`
    torch.nn.WeightNorm works slightly different. So, it's better to implement it
    """

    def __init__(self, *args, **kwargs):
        self._padding = kwargs.pop("padding")
        self.filter_axis = 0
        super().__init__(*args, **kwargs)

    def _compute_kernel(self):
        """Generate weights with normalization."""
        self.kernel = (
            tf.nn.l2_normalize(self.weight_v, axis=self.kernel_norm_axes)
            * self.weight_g
        )

    def build(self, input_shape):
        super().build(input_shape)

        kernel_norm_axes = list(range(self.kernel.shape.rank))
        kernel_norm_axes.pop(self.filter_axis)
        self.kernel_norm_axes = kernel_norm_axes

        # renaming kernal variable for making similar to torch-weight-norm
        self.kernel = tf.Variable(self.kernel, name="weight_v", trainable=True)
        self.weight_v = self.kernel

        self._init_weight_g()

    def _init_weight_g(self):
        """Set the norm of the weight vector."""
        self.weight_g = self.add_weight(
            name="weight_g",
            shape=(int(self.weight_v.shape[self.filter_axis]), 1, 1),
            initializer="ones",
            dtype=self.weight_v.dtype,
            trainable=True,
        )
        kernel_norm = tf.sqrt(
            tf.reduce_sum(tf.square(self.weight_v), axis=self.kernel_norm_axes)
        )
        self.weight_g.assign(kernel_norm[:, tf.newaxis, tf.newaxis])

    def call(self, inputs):
        self._compute_kernel()
        output = tf.pad(inputs, ((0, 0), (self._padding, self._padding), (0, 0)))
        return super().call(output)

    def get_config(self):
        config = super().get_config()
        config.update({"padding": self._padding})
        return config



class StochasticDepth(tf.keras.layers.Layer):
    """Stochastic Depth layer.
    Implements Stochastic Depth as described in
    [Deep Networks with Stochastic Depth](https://arxiv.org/abs/1603.09382), to randomly drop residual branches
    in residual architectures.
    Usage:
    Residual architectures with fixed depth, use residual branches that are merged back into the main network
    by adding the residual branch back to the input:
    >>> input = np.ones((1, 3, 3, 1), dtype = np.float32)
    >>> residual = tf.keras.layers.Conv2D(1, 1)(input)
    >>> output = tf.keras.layers.Add()([input, residual])
    >>> output.shape
    TensorShape([1, 3, 3, 1])
    StochasticDepth acts as a drop-in replacement for the addition:
    >>> input = np.ones((1, 3, 3, 1), dtype = np.float32)
    >>> residual = tf.keras.layers.Conv2D(1, 1)(input)
    >>> output = tfa.layers.StochasticDepth()([input, residual])
    >>> output.shape
    TensorShape([1, 3, 3, 1])
    At train time, StochasticDepth returns:
    $$
    x[0] + b_l * x[1],
    $$
    where $b_l$ is a random Bernoulli variable with probability $P(b_l = 1) = p_l$
    At test time, StochasticDepth rescales the activations of the residual branch based on the survival probability ($p_l$):
    $$
    x[0] + p_l * x[1]
    $$
    Args:
        survival_probability: float, the probability of the residual branch being kept.
    Call Args:
        inputs:  List of `[shortcut, residual]` where `shortcut`, and `residual` are tensors of equal shape.
    Output shape:
        Equal to the shape of inputs `shortcut`, and `residual`
    """

    @typechecked
    def __init__(self, survival_probability: float = 0.5, **kwargs):
        super().__init__(**kwargs)

        self.survival_probability = survival_probability

    def call(self, x, training=None):
        if not isinstance(x, list) or len(x) != 2:
            raise ValueError("input must be a list of length 2.")

        shortcut, residual = x

        # Random bernoulli variable indicating whether the branch should be kept or not or not
        b_l = tf.keras.backend.random_bernoulli([], p=self.survival_probability)

        def _call_train():
            return shortcut + b_l * residual

        def _call_test():
            # following line make this implementation differnet from `tensorflow_addons.StochasticDepth`
            # we can't multiply `self.survival_probability` with `residual` during test time
            # as it will disturb the fine-tuned weights if they are directly used with this architecture.
            return shortcut + residual

        return tf.keras.backend.in_train_phase(
            _call_train, _call_test, training=training
        )

    def compute_output_shape(self, input_shape):
        return input_shape[0]

    def get_config(self):
        base_config = super().get_config()

        config = {"survival_probability": self.survival_probability}

        return {**base_config, **config}

class TransformerAttention(tf.keras.layers.Layer):
    """Attention layer from `Attention Is All You Need`"""

    def __init__(self, hidden_size, num_heads, dropout=0.1, name="attention"):
        super().__init__(name=name)
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.dropout = dropout

        self.q = tf.keras.layers.Dense(hidden_size, name="q_proj")
        self.k = tf.keras.layers.Dense(hidden_size, name="k_proj")
        self.v = tf.keras.layers.Dense(hidden_size, name="v_proj")
        self.projection = tf.keras.layers.Dense(hidden_size, name="out_proj")

        self.dropout = tf.keras.layers.Dropout(dropout)

    def call(self, batch, attention_mask=None, training=False):
        head_size = batch.shape[2] // self.num_heads
        q_out = self._prepare_either_qkv(self.q(batch), head_size)
        k_out = self._prepare_either_qkv(self.k(batch), head_size)
        v_out = self._prepare_either_qkv(self.v(batch), head_size)

        q_out = q_out * head_size ** (-0.5)

        batch = self.get_context(q_out, k_out, v_out, attention_mask=attention_mask, training=training)
        batch = self.projection(batch)
        return batch

    def get_context(self, q_out, k_out, v_out, attention_mask=None, training=False):

        b, h, l, d = q_out.shape
        attn_scores = tf.matmul(q_out, k_out, transpose_b=True)  # "bhqd,bhkd->bhqk"

        if attention_mask is not None:
            attn_scores = attn_scores + attention_mask

        attn_scores = self.dropout(
            tf.nn.softmax(attn_scores, axis=-1), training=training
        )
        context = tf.matmul(attn_scores, v_out)  # "bhll,bhld->bhld"
        context = tf.transpose(context, perm=(0, 2, 1, 3))
        return tf.reshape(context, (-1, l, h * d))

    def _prepare_either_qkv(self, tensor, head_size):
        bsz, seqlen, _ = tensor.shape
        tensor = tf.reshape(tensor, (-1, seqlen, self.num_heads, head_size))
        return tf.transpose(
            tensor, perm=(0, 2, 1, 3)
        )  # -> bsz, num_heads, seqlen, head_size

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "hidden_size": self.hidden_size,
                "num_heads": self.num_heads,
                "dropout": self.dropout,
            }
        )
        return config


class TransformerLayer(tf.keras.layers.Layer):
    def __init__(
        self,
        hidden_size,
        num_heads,
        intermediate_size,
        survival_prob=0.9,
        layer_norm_eps=1e-5,
        is_gelu_approx=False,
        dropout=0.1,
        attention_norm_type="postnorm",
        name=None,
    ):
        super().__init__(name=name)
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.intermediate_size = intermediate_size
        self.survival_prob = survival_prob
        self.layer_norm_eps = layer_norm_eps
        self.is_gelu_approx = is_gelu_approx
        self.dropout = dropout
        self.attention_norm_type = attention_norm_type

        self.attention = TransformerAttention(
            hidden_size, num_heads, dropout=dropout, name="attention"
        )
        self.dropout = tf.keras.layers.Dropout(dropout)

        self.layer_norm = tf.keras.layers.LayerNormalization(
            epsilon=layer_norm_eps, name="layer_norm"
        )
        self.intermediate = tf.keras.layers.Dense(
            intermediate_size, name="feed_forward/intermediate_dense"
        )
        self.attn_output = tf.keras.layers.Dense(
            hidden_size, name="feed_forward/output_dense"
        )
        self.final_layer_norm = tf.keras.layers.LayerNormalization(
            epsilon=layer_norm_eps,
            name="final_layer_norm",
        )
        self.stochastic_depth = StochasticDepth(survival_prob)

    def call(self, batch, attention_mask=None, training=False):

        # self_attn
        residual = batch
        if self.attention_norm_type == "prenorm":
            batch = self.layer_norm(batch)
        batch = self.attention(batch, attention_mask=attention_mask, training=training)
        batch = self.dropout(batch, training=training)
        batch = batch + residual
        if self.attention_norm_type == "postnorm":
            batch = self.layer_norm(batch)

        # ffn
        residual = batch
        if self.attention_norm_type == "prenorm":
            batch = self.final_layer_norm(batch)
        batch = tf.nn.gelu(self.intermediate(batch), approximate=self.is_gelu_approx)
        batch = self.attn_output(self.dropout(batch, training=training))
        # stochastic depth from `paper <https://arxiv.org/abs/1603.09382> __`
        batch = self.stochastic_depth([residual, batch], training=training)
        if self.attention_norm_type == "postnorm":
            batch = self.final_layer_norm(batch)

        return batch

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "hidden_size": self.hidden_size,
                "num_heads": self.num_heads,
                "intermediate_size": self.intermediate_size,
                "survival_prob": self.survival_prob,
                "layer_norm_eps": self.layer_norm_eps,
                "is_gelu_approx": self.is_gelu_approx,
                "dropout": self.dropout,
                "attention_norm_type": self.attention_norm_type,
            }
        )
        return config


class PositionalConvEmbedding(tf.keras.layers.Layer):
    def __init__(
        self,
        hidden_size,
        num_conv_pos_embeddings,
        num_conv_pos_embedding_groups,
        is_gelu_approx=False,
        name="pos_conv_embed",
    ):
        super().__init__(name=name)
        self.hidden_size = hidden_size
        self.num_conv_pos_embeddings = num_conv_pos_embeddings
        self.num_conv_pos_embedding_groups = num_conv_pos_embedding_groups
        self.is_gelu_approx = is_gelu_approx

        self.conv = Conv1DWithWeightNorm(
            hidden_size,
            num_conv_pos_embeddings,
            padding=num_conv_pos_embeddings // 2,
            groups=num_conv_pos_embedding_groups,
            name="conv",
        )
        self.is_padding_wrong = num_conv_pos_embeddings % 2 == 0

    def call(self, batch):
        batch = self.conv(batch)
        if self.is_padding_wrong:
            batch = batch[:, :-1, :]
        return tf.nn.gelu(batch, approximate=self.is_gelu_approx)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "hidden_size": self.hidden_size,
                "num_conv_pos_embeddings": self.num_conv_pos_embeddings,
                "num_conv_pos_embedding_groups": self.num_conv_pos_embedding_groups,
                "is_gelu_approx": self.is_gelu_approx,
            }
        )
        return config


class Wav2Vec2Encoder(tf.keras.layers.Layer):
    def __init__(
        self,
        hidden_size,
        num_heads,
        num_layers,
        intermediate_size,
        num_conv_pos_embeddings,
        num_conv_pos_embedding_groups,
        survival_prob=0.9,
        dropout=0.1,
        layer_norm_eps=1e-5,
        is_gelu_approx=False,
        attention_norm_type="postnorm",
        name="encoder",
    ):
        super().__init__(name=name)
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.num_layers = num_layers
        self.intermediate_size = intermediate_size
        self.num_conv_pos_embeddings = num_conv_pos_embeddings
        self.num_conv_pos_embedding_groups = num_conv_pos_embedding_groups
        self.survival_prob = survival_prob
        self.dropout = dropout
        self.layer_norm_eps = layer_norm_eps
        self.is_gelu_approx = is_gelu_approx
        self.attention_norm_type = attention_norm_type

        self.pos_conv_embed = PositionalConvEmbedding(
            hidden_size,
            num_conv_pos_embeddings,
            num_conv_pos_embedding_groups,
            is_gelu_approx=is_gelu_approx,
            name="pos_conv_embed",
        )
        self.layer_norm = tf.keras.layers.LayerNormalization(
            epsilon=layer_norm_eps, name="layer_norm"
        )
        self.dropout = tf.keras.layers.Dropout(dropout)
        self.layers = [
            TransformerLayer(
                hidden_size,
                num_heads,
                intermediate_size,
                survival_prob=survival_prob,
                layer_norm_eps=layer_norm_eps,
                is_gelu_approx=is_gelu_approx,
                dropout=dropout,
                attention_norm_type=attention_norm_type,
                name=f"layers/{i}",
            )
            for i in range(num_layers)
        ]

    def call(self, batch, attention_mask=None, training=False):
        if attention_mask is not None:
            batch = tf.where(attention_mask[:, :, tf.newaxis], batch, 0.0)
            seqlen = batch.shape[1]

            attention_mask = tf.cast(attention_mask, dtype=batch.dtype)
            attention_mask = (1.0 - attention_mask) * tf.constant(-10000.0)

            # tf.broadcast_to doesn't work when batch size is unknown (especially with TFSavedModel)
            attention_mask = attention_mask[tf.newaxis, :, tf.newaxis, :]
            attention_mask = tf.repeat(attention_mask, seqlen, axis=0)
            attention_mask = tf.reshape(attention_mask, (seqlen, -1, 1, seqlen))
            attention_mask = tf.transpose(attention_mask, perm=[1, 2, 0, 3])

        batch = batch + self.pos_conv_embed(batch)

        if self.attention_norm_type == "postnorm":
            batch = self.layer_norm(batch)

        batch = self.dropout(batch, training=training)
        for layer in self.layers:
            batch = layer(batch, attention_mask=attention_mask, training=training)

        if self.attention_norm_type == "prenorm":
            batch = self.layer_norm(batch)
        return batch

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "hidden_size": self.hidden_size,
                "num_heads": self.num_heads,
                "num_layers": self.num_layers,
                "intermediate_size": self.intermediate_size,
                "num_conv_pos_embeddings": self.num_conv_pos_embeddings,
                "num_conv_pos_embedding_groups": self.num_conv_pos_embedding_groups,
                "survival_prob": self.survival_prob,
                "dropout": self.dropout,
                "layer_norm_eps": self.layer_norm_eps,
                "is_gelu_approx": self.is_gelu_approx,
                "attention_norm_type": self.attention_norm_type,
            }
        )
        return config

In [None]:
#                             feature_extractor.py
import tensorflow as tf

class GroupNormalization(tf.keras.layers.Layer):
    """Group normalization layer.
    Source: "Group Normalization" (Yuxin Wu & Kaiming He, 2018)
    https://arxiv.org/abs/1803.08494
    Group Normalization divides the channels into groups and computes
    within each group the mean and variance for normalization.
    Empirically, its accuracy is more stable than batch norm in a wide
    range of small batch sizes, if learning rate is adjusted linearly
    with batch sizes.
    Relation to Layer Normalization:
    If the number of groups is set to 1, then this operation becomes identical
    to Layer Normalization.
    Relation to Instance Normalization:
    If the number of groups is set to the
    input dimension (number of groups is equal
    to number of channels), then this operation becomes
    identical to Instance Normalization.
    Args:
        groups: Integer, the number of groups for Group Normalization.
            Can be in the range [1, N] where N is the input dimension.
            The input dimension must be divisible by the number of groups.
            Defaults to 32.
        axis: Integer, the axis that should be normalized.
        epsilon: Small float added to variance to avoid dividing by zero.
        center: If True, add offset of `beta` to normalized tensor.
            If False, `beta` is ignored.
        scale: If True, multiply by `gamma`.
            If False, `gamma` is not used.
        beta_initializer: Initializer for the beta weight.
        gamma_initializer: Initializer for the gamma weight.
        beta_regularizer: Optional regularizer for the beta weight.
        gamma_regularizer: Optional regularizer for the gamma weight.
        beta_constraint: Optional constraint for the beta weight.
        gamma_constraint: Optional constraint for the gamma weight.
    Input shape:
        Arbitrary. Use the keyword argument `input_shape`
        (tuple of integers, does not include the samples axis)
        when using this layer as the first layer in a model.
    Output shape:
        Same shape as input.
    """

    @typechecked
    def __init__(
        self,
        groups: int = 32,
        axis: int = -1,
        epsilon: float = 1e-3,
        center: bool = True,
        scale: bool = True,
        beta_initializer="zeros",
        gamma_initializer="ones",
        beta_regularizer=None,
        gamma_regularizer=None,
        beta_constraint=None,
        gamma_constraint=None,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.supports_masking = True
        self.groups = groups
        self.axis = axis
        self.epsilon = epsilon
        self.center = center
        self.scale = scale
        self.beta_initializer = tf.keras.initializers.get(beta_initializer)
        self.gamma_initializer = tf.keras.initializers.get(gamma_initializer)
        self.beta_regularizer = tf.keras.regularizers.get(beta_regularizer)
        self.gamma_regularizer = tf.keras.regularizers.get(gamma_regularizer)
        self.beta_constraint = tf.keras.constraints.get(beta_constraint)
        self.gamma_constraint = tf.keras.constraints.get(gamma_constraint)
        self._check_axis()

    def build(self, input_shape):

        self._check_if_input_shape_is_none(input_shape)
        self._set_number_of_groups_for_instance_norm(input_shape)
        self._check_size_of_dimensions(input_shape)
        self._create_input_spec(input_shape)

        self._add_gamma_weight(input_shape)
        self._add_beta_weight(input_shape)
        self.built = True
        super().build(input_shape)

    def call(self, inputs):

        input_shape = tf.keras.backend.int_shape(inputs)
        tensor_input_shape = tf.shape(inputs)

        reshaped_inputs, group_shape = self._reshape_into_groups(
            inputs, input_shape, tensor_input_shape
        )

        normalized_inputs = self._apply_normalization(reshaped_inputs, input_shape)

        is_instance_norm = (input_shape[self.axis] // self.groups) == 1
        if not is_instance_norm:
            outputs = tf.reshape(normalized_inputs, tensor_input_shape)
        else:
            outputs = normalized_inputs

        return outputs

    def get_config(self):
        config = {
            "groups": self.groups,
            "axis": self.axis,
            "epsilon": self.epsilon,
            "center": self.center,
            "scale": self.scale,
            "beta_initializer": tf.keras.initializers.serialize(self.beta_initializer),
            "gamma_initializer": tf.keras.initializers.serialize(
                self.gamma_initializer
            ),
            "beta_regularizer": tf.keras.regularizers.serialize(self.beta_regularizer),
            "gamma_regularizer": tf.keras.regularizers.serialize(
                self.gamma_regularizer
            ),
            "beta_constraint": tf.keras.constraints.serialize(self.beta_constraint),
            "gamma_constraint": tf.keras.constraints.serialize(self.gamma_constraint),
        }
        base_config = super().get_config()
        return {**base_config, **config}

    def compute_output_shape(self, input_shape):
        return input_shape

    def _reshape_into_groups(self, inputs, input_shape, tensor_input_shape):

        group_shape = [tensor_input_shape[i] for i in range(len(input_shape))]
        is_instance_norm = (input_shape[self.axis] // self.groups) == 1
        if not is_instance_norm:
            group_shape[self.axis] = input_shape[self.axis] // self.groups
            group_shape.insert(self.axis, self.groups)
            group_shape = tf.stack(group_shape)
            reshaped_inputs = tf.reshape(inputs, group_shape)
            return reshaped_inputs, group_shape
        else:
            return inputs, group_shape

    def _apply_normalization(self, reshaped_inputs, input_shape):

        group_shape = tf.keras.backend.int_shape(reshaped_inputs)
        group_reduction_axes = list(range(1, len(group_shape)))
        is_instance_norm = (input_shape[self.axis] // self.groups) == 1
        if not is_instance_norm:
            axis = -2 if self.axis == -1 else self.axis - 1
        else:
            axis = -1 if self.axis == -1 else self.axis - 1
        group_reduction_axes.pop(axis)

        mean, variance = tf.nn.moments(
            reshaped_inputs, group_reduction_axes, keepdims=True
        )

        gamma, beta = self._get_reshaped_weights(input_shape)
        normalized_inputs = tf.nn.batch_normalization(
            reshaped_inputs,
            mean=mean,
            variance=variance,
            scale=gamma,
            offset=beta,
            variance_epsilon=self.epsilon,
        )
        return normalized_inputs

    def _get_reshaped_weights(self, input_shape):
        broadcast_shape = self._create_broadcast_shape(input_shape)
        gamma = None
        beta = None
        if self.scale:
            gamma = tf.reshape(self.gamma, broadcast_shape)

        if self.center:
            beta = tf.reshape(self.beta, broadcast_shape)
        return gamma, beta

    def _check_if_input_shape_is_none(self, input_shape):
        dim = input_shape[self.axis]
        if dim is None:
            raise ValueError(
                "Axis " + str(self.axis) + " of "
                "input tensor should have a defined dimension "
                "but the layer received an input with shape " + str(input_shape) + "."
            )

    def _set_number_of_groups_for_instance_norm(self, input_shape):
        dim = input_shape[self.axis]

        if self.groups == -1:
            self.groups = dim

    def _check_size_of_dimensions(self, input_shape):

        dim = input_shape[self.axis]
        if dim < self.groups:
            raise ValueError(
                "Number of groups (" + str(self.groups) + ") cannot be "
                "more than the number of channels (" + str(dim) + ")."
            )

        if dim % self.groups != 0:
            raise ValueError(
                "Number of groups (" + str(self.groups) + ") must be a "
                "multiple of the number of channels (" + str(dim) + ")."
            )

    def _check_axis(self):

        if self.axis == 0:
            raise ValueError(
                "You are trying to normalize your batch axis. Do you want to "
                "use tf.layer.batch_normalization instead"
            )

    def _create_input_spec(self, input_shape):

        dim = input_shape[self.axis]
        self.input_spec = tf.keras.layers.InputSpec(
            ndim=len(input_shape), axes={self.axis: dim}
        )

    def _add_gamma_weight(self, input_shape):

        dim = input_shape[self.axis]
        shape = (dim,)

        if self.scale:
            self.gamma = self.add_weight(
                shape=shape,
                name="gamma",
                initializer=self.gamma_initializer,
                regularizer=self.gamma_regularizer,
                constraint=self.gamma_constraint,
            )
        else:
            self.gamma = None

    def _add_beta_weight(self, input_shape):

        dim = input_shape[self.axis]
        shape = (dim,)

        if self.center:
            self.beta = self.add_weight(
                shape=shape,
                name="beta",
                initializer=self.beta_initializer,
                regularizer=self.beta_regularizer,
                constraint=self.beta_constraint,
            )
        else:
            self.beta = None

    def _create_broadcast_shape(self, input_shape):
        broadcast_shape = [1] * len(input_shape)
        is_instance_norm = (input_shape[self.axis] // self.groups) == 1
        if not is_instance_norm:
            broadcast_shape[self.axis] = input_shape[self.axis] // self.groups
            broadcast_shape.insert(self.axis, self.groups)
        else:
            broadcast_shape[self.axis] = self.groups
        return broadcast_shape

class FeatureExtractorLayer(tf.keras.layers.Layer):
    def __init__(
        self,
        filter_sizes,
        kernal_sizes,
        strides,
        conv_bias=False,
        is_gelu_approx=False,
        layer_id=0,
        feature_extractor_norm_type="group",
        name=None,
    ):
        super().__init__(name=name)
        self.filter_sizes = filter_sizes
        self.kernal_sizes = kernal_sizes
        self.strides = strides
        self.conv_bias = conv_bias
        self.is_gelu_approx = is_gelu_approx
        self.layer_id = layer_id
        self.feature_extractor_norm_type = feature_extractor_norm_type

        conv_dim = filter_sizes[layer_id]
        kernal_size = kernal_sizes[layer_id]
        stride = strides[layer_id]

        self.conv_layer = tf.keras.layers.Conv1D(
            conv_dim,
            kernal_size,
            strides=stride,
            use_bias=conv_bias,
            name="conv",
        )

        self.layer_norm = None
        if self.feature_extractor_norm_type == "group":
            if layer_id == 0:
                self.layer_norm = GroupNormalization(
                    conv_dim,
                    axis=-1,
                    name="layer_norm",
                    epsilon=1e-5,
                )
        elif self.feature_extractor_norm_type == "layer":
            # TODO: check value of axis
            self.layer_norm = tf.keras.layers.LayerNormalization(axis=-1, epsilon=1e-5, name="layer_norm")
        else:
            raise NotImplementedError

    def call(self, batch):
        batch = self.conv_layer(batch)
        if self.layer_norm is not None:
            batch = self.layer_norm(batch)
        batch = tf.nn.gelu(batch, approximate=self.is_gelu_approx)
        return batch

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "filter_sizes": self.filter_sizes,
                "kernal_sizes": self.kernal_sizes,
                "strides": self.strides,
                "conv_bias": self.conv_bias,
                "is_gelu_approx": self.is_gelu_approx,
                "layer_id": self.layer_id,
                "feature_extractor_norm_type": self.feature_extractor_norm_type,
            }
        )
        return config


class FeatureProjection(tf.keras.layers.Layer):
    def __init__(
        self, hidden_size, layer_norm_eps=1e-5, dropout=0.1, name="feature_projection"
    ):
        super().__init__(name=name)
        self.hidden_size = hidden_size
        self.layer_norm_eps = layer_norm_eps
        self.dropout = dropout

        self.layer_norm = tf.keras.layers.LayerNormalization(
            epsilon=layer_norm_eps, name="layer_norm"
        )
        self.projection = tf.keras.layers.Dense(hidden_size, name="projection")
        self.dropout = tf.keras.layers.Dropout(dropout)

    def call(self, batch, training=False):
        batch = self.layer_norm(batch)
        batch = self.projection(batch)
        return self.dropout(batch, training=training)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "hidden_size": self.hidden_size,
                "layer_norm_eps": self.layer_norm_eps,
                "dropout": self.dropout,
            }
        )
        return config

In [None]:
#             losses.py
import tensorflow as tf


class CTCLoss(tf.keras.losses.Loss):
    def __init__(self, config, model_input_shape, division_factor=1):
        super().__init__(reduction=tf.keras.losses.Reduction.SUM)
        self.kernal_sizes = config.kernal_sizes
        self.strides = config.strides
        self.pad_id = config.pad_id
        self.division_factor = division_factor

        self.model_input_shape = model_input_shape

    def call(self, labels, hidden_states):
        """
        This methods wraps up `tf.nn.ctc_loss` and returns the ctc-loss for batch.
        Args:
            labels (:obj: `tf.Tensor`):
                This is batch of tokenized text labels.
            hidden_states (:obj: `tf.Tensor`):
                This is the output of LM head of `Wav2Vec2ForCTC.call(...)`.
        Returns:
            loss (:obj: `tf.Tensor`):
                This is the summation/mean of CTC loss of the batch. Mean/Summation will be decided by
                `loss_reduction` parameter in your config.
        """
        input_length = tf.ones(self.model_input_shape[0]) * self.model_input_shape[1]
        logit_length = self._get_logit_length(input_length)

        label_mask = tf.cast(labels != self.pad_id, tf.int32)
        label_length = tf.reduce_sum(label_mask, axis=-1)

        loss = tf.nn.ctc_loss(
            labels=labels,
            logits=hidden_states,
            label_length=label_length,
            logit_length=logit_length,
            logits_time_major=False,
            blank_index=self.pad_id,
            name="ctc-loss",
        )

        return loss / self.division_factor

    def _get_logit_length(self, input_length):
        """
        This will return length of the sequence at the end of convolutional layers
        i.e. seqlen fed to transformer encoder.
        """
        kernal_sizes = self.kernal_sizes
        strides = self.strides
        for kernal_size, stride in zip(kernal_sizes, strides):
            input_length = 1 + (input_length - kernal_size) // stride
        return input_length

In [None]:
!pip3 install huggingface_hub

Collecting huggingface_hub
  Downloading huggingface_hub-0.4.0-py3-none-any.whl (67 kB)
[?25l[K     |█████                           | 10 kB 21.5 MB/s eta 0:00:01[K     |█████████▉                      | 20 kB 25.3 MB/s eta 0:00:01[K     |██████████████▊                 | 30 kB 18.7 MB/s eta 0:00:01[K     |███████████████████▋            | 40 kB 15.5 MB/s eta 0:00:01[K     |████████████████████████▌       | 51 kB 7.3 MB/s eta 0:00:01[K     |█████████████████████████████▍  | 61 kB 8.6 MB/s eta 0:00:01[K     |████████████████████████████████| 67 kB 3.6 MB/s 
Installing collected packages: huggingface-hub
Successfully installed huggingface-hub-0.4.0


In [None]:
#                   modeling.py
import os
import logging
import subprocess
from dataclasses import replace
from typing  import Optional

import tensorflow as tf

from huggingface_hub import ModelHubMixin

# from .config import Wave2Vec2Config

class Wav2Vec2Config:
  vocab_size: int = 32
  dropout: int = 0.1
  hidden_size: int = 768
  num_heads: int = 12
  num_layers: int = 12
  intermediate_size: int = 3072
  is_gelu_approx: bool = False
  layer_norm_eps: float = 1e-5
  survival_prob: float = 1.0
  pad_id: int = 0


  # positional embedding
  num_conv_pos_embeddings: int = 128
  num_conv_pos_embedding_groups: int = 16

  # feature extractor
  filter_sizes: list = field(
      default_factory=lambda: [512, 512, 512, 512, 512, 512, 512]
  )
  kernal_sizes: list = field(default_factory=lambda: [10, 3, 3, 3, 3, 2, 2])
  strides: list = field(default_factory=lambda: [5, 2, 2, 2, 2, 2, 2])
  conv_bias: bool = False

  # spec augmentation arguments
  apply_spec_augment: bool = True
  mask_time_prob: float = 0.05
  mask_time_length: int = 10

  attention_norm_type: str = "postnorm"
  feature_extractor_norm_type: bool = "group"
  is_robust: bool = False

  def __post_init__(self):
      if not (len(self.filter_sizes) == len(self.kernal_sizes) == len(self.strides)):
          raise ValueError(
              "Length of filter_sizes, kernal_sizes, strides must match."
          )
      if self.hidden_size % self.num_heads != 0:
          raise ValueError("Hidden size must be perfect multiple of num_heads.")

      assert self.feature_extractor_norm_type in ["group", "layer"], "Only `group` / `layer` are supported"
      assert self.attention_norm_type in ["prenorm", "postnorm"], "Only `prenorm` / `postnorm` are supported"

  def save_pretrained(self, save_dir):
      os.makedirs(save_dir, exist_ok=True)
      with open(os.path.join(save_dir, "config.json"), "w") as f:
          json.dump(asdict(self), f)

# from .encoder import Wav2Vec2Encoder

class Wav2Vec2Encoder(tf.keras.layers.Layer):
    def __init__(
        self,
        hidden_size,
        num_heads,
        num_layers,
        intermediate_size,
        num_conv_pos_embeddings,
        num_conv_pos_embedding_groups,
        survival_prob=0.9,
        dropout=0.1,
        layer_norm_eps=1e-5,
        is_gelu_approx=False,
        attention_norm_type="postnorm",
        name="encoder",
    ):
        super().__init__(name=name)
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.num_layers = num_layers
        self.intermediate_size = intermediate_size
        self.num_conv_pos_embeddings = num_conv_pos_embeddings
        self.num_conv_pos_embedding_groups = num_conv_pos_embedding_groups
        self.survival_prob = survival_prob
        self.dropout = dropout
        self.layer_norm_eps = layer_norm_eps
        self.is_gelu_approx = is_gelu_approx
        self.attention_norm_type = attention_norm_type

        self.pos_conv_embed = PositionalConvEmbedding(
            hidden_size,
            num_conv_pos_embeddings,
            num_conv_pos_embedding_groups,
            is_gelu_approx=is_gelu_approx,
            name="pos_conv_embed",
        )
        self.layer_norm = tf.keras.layers.LayerNormalization(
            epsilon=layer_norm_eps, name="layer_norm"
        )
        self.dropout = tf.keras.layers.Dropout(dropout)
        self.layers = [
            TransformerLayer(
                hidden_size,
                num_heads,
                intermediate_size,
                survival_prob=survival_prob,
                layer_norm_eps=layer_norm_eps,
                is_gelu_approx=is_gelu_approx,
                dropout=dropout,
                attention_norm_type=attention_norm_type,
                name=f"layers/{i}",
            )
            for i in range(num_layers)
        ]

    def call(self, batch, attention_mask=None, training=False):
        if attention_mask is not None:
            batch = tf.where(attention_mask[:, :, tf.newaxis], batch, 0.0)
            seqlen = batch.shape[1]

            attention_mask = tf.cast(attention_mask, dtype=batch.dtype)
            attention_mask = (1.0 - attention_mask) * tf.constant(-10000.0)

            # tf.broadcast_to doesn't work when batch size is unknown (especially with TFSavedModel)
            attention_mask = attention_mask[tf.newaxis, :, tf.newaxis, :]
            attention_mask = tf.repeat(attention_mask, seqlen, axis=0)
            attention_mask = tf.reshape(attention_mask, (seqlen, -1, 1, seqlen))
            attention_mask = tf.transpose(attention_mask, perm=[1, 2, 0, 3])

        batch = batch + self.pos_conv_embed(batch)

        if self.attention_norm_type == "postnorm":
            batch = self.layer_norm(batch)

        batch = self.dropout(batch, training=training)
        for layer in self.layers:
            batch = layer(batch, attention_mask=attention_mask, training=training)

        if self.attention_norm_type == "prenorm":
            batch = self.layer_norm(batch)
        return batch

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "hidden_size": self.hidden_size,
                "num_heads": self.num_heads,
                "num_layers": self.num_layers,
                "intermediate_size": self.intermediate_size,
                "num_conv_pos_embeddings": self.num_conv_pos_embeddings,
                "num_conv_pos_embedding_groups": self.num_conv_pos_embedding_groups,
                "survival_prob": self.survival_prob,
                "dropout": self.dropout,
                "layer_norm_eps": self.layer_norm_eps,
                "is_gelu_approx": self.is_gelu_approx,
                "attention_norm_type": self.attention_norm_type,
            }
        )
        return config

# from .feature_extractor import FeatureExtractorLayer, FeatureProjection

#     FeatureExtractorLayer
class FeatureExtractorLayer(tf.keras.layers.Layer):
    def __init__(
        self,
        filter_sizes,
        kernal_sizes,
        strides,
        conv_bias=False,
        is_gelu_approx=False,
        layer_id=0,
        feature_extractor_norm_type="group",
        name=None,
    ):
        super().__init__(name=name)
        self.filter_sizes = filter_sizes
        self.kernal_sizes = kernal_sizes
        self.strides = strides
        self.conv_bias = conv_bias
        self.is_gelu_approx = is_gelu_approx
        self.layer_id = layer_id
        self.feature_extractor_norm_type = feature_extractor_norm_type

        conv_dim = filter_sizes[layer_id]
        kernal_size = kernal_sizes[layer_id]
        stride = strides[layer_id]

        self.conv_layer = tf.keras.layers.Conv1D(
            conv_dim,
            kernal_size,
            strides=stride,
            use_bias=conv_bias,
            name="conv",
        )

        self.layer_norm = None
        if self.feature_extractor_norm_type == "group":
            if layer_id == 0:
                self.layer_norm = GroupNormalization(
                    conv_dim,
                    axis=-1,
                    name="layer_norm",
                    epsilon=1e-5,
                )
        elif self.feature_extractor_norm_type == "layer":
            # TODO: check value of axis
            self.layer_norm = tf.keras.layers.LayerNormalization(axis=-1, epsilon=1e-5, name="layer_norm")
        else:
            raise NotImplementedError

    def call(self, batch):
        batch = self.conv_layer(batch)
        if self.layer_norm is not None:
            batch = self.layer_norm(batch)
        batch = tf.nn.gelu(batch, approximate=self.is_gelu_approx)
        return batch

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "filter_sizes": self.filter_sizes,
                "kernal_sizes": self.kernal_sizes,
                "strides": self.strides,
                "conv_bias": self.conv_bias,
                "is_gelu_approx": self.is_gelu_approx,
                "layer_id": self.layer_id,
                "feature_extractor_norm_type": self.feature_extractor_norm_type,
            }
        )
        return config

#   FeatureProjection

class FeatureProjection(tf.keras.layers.Layer):
    def __init__(
        self, hidden_size, layer_norm_eps=1e-5, dropout=0.1, name="feature_projection"
    ):
        super().__init__(name=name)
        self.hidden_size = hidden_size
        self.layer_norm_eps = layer_norm_eps
        self.dropout = dropout

        self.layer_norm = tf.keras.layers.LayerNormalization(
            epsilon=layer_norm_eps, name="layer_norm"
        )
        self.projection = tf.keras.layers.Dense(hidden_size, name="projection")
        self.dropout = tf.keras.layers.Dropout(dropout)

    def call(self, batch, training=False):
        batch = self.layer_norm(batch)
        batch = self.projection(batch)
        return self.dropout(batch, training=training)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "hidden_size": self.hidden_size,
                "layer_norm_eps": self.layer_norm_eps,
                "dropout": self.dropout,
            }
        )
        return config

# from .spec_augment import apply_spec_augmentation

def apply_spec_augmentation(features, masked_spec_augment, mask_prob, mask_length):
    """
    This method apply spec-augmentation to the `hidden_states`
    Args:
        features (:obj: `tf.Tensor`) of shape (batch_size, seqlen, hidden_size):
            hidden states which we want to mask.
        masked_spec_augment (:obj: `tf.Tensor`) of shape (hidden_states,):
            replace indices to be masked with these values.
        mask_prob (:obj: `float`):
            probability if certain token should be masked, this decides number of tokens to be masked.
        mask_length (:obj: `int`):
            span length of the tokens to be masked.
    Return:
        features (:obj: `tf.Tensor`) of shape (batch_size, seqlen, hidden_size):
            hidden states masked at certain positions which are chosen randomly.
    """

    # first find the indices to mask from the sequence
    # choose mask such that we conserve the mask_length
    mask_indices = _compute_mask_indices(
        features.shape[:2], mask_prob, mask_length, min_masks=2
    )

    # since we are going to `tf.where(...)`, we need True at positions where we want to mask
    # while False at indices which we don't want to change
    mask_indices = tf.cast(mask_indices[:, :, None], tf.bool)

    # It's important to keep dtype of masked_spec_augment & features same
    # since we are going to accomodate both in a single tensor
    masked_spec_augment = tf.cast(masked_spec_augment, features.dtype)[None, None, :]

    # simply call `tf.where(...)`, and replace True positions (chosen randomly)
    # with trainable weights (i.e. masked_spec_augment)
    features = tf.where(mask_indices, masked_spec_augment, features)
    return features

logger = logging.getLogger(__name__)

class TFKerasModel(tf.keras.Model):
    def save_pretrained(self, save_dir):
        """
        This method will save model weights and config in `save_directory`.
        """
        self.config.save_pretrained(save_dir)
        self.save_weights(os.path.join(save_dir, "tf_model.h5"))

    def push_to_hub(self, directory: str, model_id: str):
        """
        Use this method to push your model weights to HuggingFace Hub.
        Args:
            directory (:obj: `str`):
                directory where model weights are prensent.
            model_id (:obj: `str`):
                Name of the repositary in HuggingFace Hub you want to push to.
        """
        return ModelHubMixin.push_to_hub(directory, model_id=model_id)

    @classmethod
    def from_pretrained(cls, model_id, **config_kwargs) -> tf.keras.Model:
        """
        This will load model weights from the dictionary specified or download it from HuggingFace Hub
        if weights are not available locally.
        Args:
            model_id (:obj: `str`):
                Directory where weights are present or model_id if needs to be downloaded from HuggingFace Hub.
            config_kwargs (:obj: `dict`)
                Extra arguments will be passed to `Wav2Vec2Config`.
        Returns:
            Instance of `tf.keras.Model` initialized from trained weights.
        """

        save_dir = model_id
        if not os.path.isdir(save_dir):
            os.makedirs(save_dir, exist_ok=True)
            config_url = f"wget https://huggingface.co/{model_id}/resolve/main/config.json -P {save_dir}"
            model_url = f"wget https://huggingface.co/{model_id}/resolve/main/tf_model.h5 -P {save_dir}"

            print(
                f"Downloading model weights from https://huggingface.co/{model_id} ... ",
                end="",
            )
            try:
                for url in [config_url, model_url]:
                    subprocess.run(url.split(), check=True, stderr=subprocess.PIPE)
            except:
                raise ValueError(
                    f"Couldn't download model weights from https://huggingface.co/{model_id}"
                )
            print("Done")
        else:
            print(f"Loading weights locally from `{save_dir}`")

        input_shape = config_kwargs.pop("input_shape", (1, 2048))
        config = Wav2Vec2Config.from_json(os.path.join(save_dir, "config.json"))
        config = replace(config, **config_kwargs)
        model = cls(config, input_shape=input_shape)
        model.load_weights(os.path.join(save_dir, "tf_model.h5"))
        print("Total number of loaded variables:", len(model.variables))
        return model

    def _init(self, input_shape=None, is_robust=False, for_export=False):
        """Build Model weights using dummy inputs"""
        # call this at the end only
        if input_shape is None:
            input_shape = (1, 2048)
        dummy_input = tf.ones(input_shape, dtype=tf.float32)
        attention_mask = tf.ones(input_shape) if is_robust else None

        if for_export:
            self((dummy_input, attention_mask))
        else:
            try:
                # this operation doesn't work on CPU
                self.predict(dummy_input, attention_mask=attention_mask)
            except:
                # this operation will hang the TPU VM, hence prefer `.predict`
                self(dummy_input, attention_mask=attention_mask)


class Wav2Vec2Model(TFKerasModel):
    def __init__(self, config: Wav2Vec2Config, input_shape=(1, 246000), name="wav2vec2"):
        super().__init__(name=name)
        if not isinstance(config, Wav2Vec2Config):
            raise ValueError("`config` must be an instace of `Wave2Vec2Config`")

        self.config = config
        self.hidden_size = config.hidden_size
        self.is_robust = config.is_robust
        self.kernal_sizes = config.kernal_sizes
        self.strides = config.strides

        # spec-augmentation
        self.apply_spec_augment = config.apply_spec_augment
        self.mask_time_prob = config.mask_time_prob
        self.mask_time_length = config.mask_time_length

        num_feature_extractor_layers = len(config.filter_sizes)

        self.feature_extractor = [
            FeatureExtractorLayer(
                config.filter_sizes,
                config.kernal_sizes,
                config.strides,
                conv_bias=config.conv_bias,
                is_gelu_approx=config.is_gelu_approx,
                feature_extractor_norm_type=config.feature_extractor_norm_type,
                layer_id=i,
                name=f"feature_extractor/conv_layers/{i}",
            )
            for i in range(num_feature_extractor_layers)
        ]
        self.feature_projection = FeatureProjection(
            config.hidden_size,
            layer_norm_eps=config.layer_norm_eps,
            dropout=config.dropout,
            name="feature_projection",
        )
        self.encoder = Wav2Vec2Encoder(
            config.hidden_size,
            config.num_heads,
            config.num_layers,
            config.intermediate_size,
            config.num_conv_pos_embeddings,
            config.num_conv_pos_embedding_groups,
            survival_prob=config.survival_prob,
            dropout=config.dropout,
            layer_norm_eps=config.layer_norm_eps,
            is_gelu_approx=config.is_gelu_approx,
            attention_norm_type=config.attention_norm_type,
            name="encoder",
        )

        if input_shape is not None:
            self._init(input_shape=input_shape, is_robust=config.is_robust)

    def build(self, input_shape):
        self.masked_spec_augment = self.add_weight(
            name="masked_spec_embed",
            shape=(self.hidden_size,),
            initializer="uniform",
            trainable=True,
        )

    def call(self, batch, attention_mask: Optional[tf.Tensor] = None, training=False):
        """
        Args:
            batch (:obj: `tf.Tensor`) of shape (batch_size, seqlen):
                Sound tensor obtained from `Wav2Vec2Processor.__call__`.
            attention_mask (:obj: `tf.Tensor`, `optional`) of shape (batch_size, seqlen):
                Don't pass `attention_mask` when working with checkpoints based on `wav2vec2-base`
                otherwise you should pass this argument.
            training (:obj: `bool`, `optional`):
                Whether to use model for training.
        Returns:
            Logits from the model of shape (batch_size, seqlen, hidden_dim).
        """
        if self.is_robust and attention_mask is None:
            logger.warning("You should pass `attention_mask` when working with Wav2Vec2 new checkpoints")
        elif not self.is_robust and attention_mask is not None:
            logger.warning("You should not pass `attention_mask` when working with checkpoints based on `wav2vec2-base`")

        batch = tf.expand_dims(batch, axis=-1)
        for feature_extractor_layer in self.feature_extractor:
            batch = feature_extractor_layer(batch)
        batch = self.feature_projection(batch, training=training)

        if training and self.apply_spec_augment:
            batch = apply_spec_augmentation(
                batch,
                self.masked_spec_augment,
                self.mask_time_prob,
                self.mask_time_length,
            )

        if attention_mask is not None:
            input_length = tf.reduce_sum(attention_mask, axis=-1)
            for kernal_size, stride in zip(self.kernal_sizes, self.strides):
                input_length = 1 + (input_length - kernal_size) // stride

            attention_mask = tf.sequence_mask(input_length, maxlen=batch.shape[1])

        batch = self.encoder(batch, attention_mask=attention_mask, training=training)
        return batch

    def freeze_feature_extractor(self):
        """This will freeze the feature extractor layers (Recommended to use for fine-tuning)."""
        for i in range(len(self.feature_extractor)):
            self.feature_extractor[i].trainable = False


class Wav2Vec2ForCTC(TFKerasModel):
    """Wave2Vec2 model with a CTC head."""

    def __init__(
        self, config: Wav2Vec2Config, input_shape=(1, 246000), name="wav2vec2-ctc"
    ):
        super().__init__(name=name)
        if not isinstance(config, Wav2Vec2Config):
            raise ValueError("`config` must be an instace of `Wave2Vec2Config`.")
        self.config = config
        self.pad_id = config.pad_id

        self.model = Wav2Vec2Model(config, input_shape=None, name="wav2vec2")
        self.dropout = tf.keras.layers.Dropout(config.dropout)
        self.lm_head = tf.keras.layers.Dense(config.vocab_size, name="lm_head")

        self._init(input_shape=input_shape, is_robust=config.is_robust)

    def freeze_feature_extractor(self):
        """This will freeze the feature extractor layers (Recommended to use for fine-tuning)."""
        self.model.freeze_feature_extractor()

    def call(self, batch: tf.Tensor, attention_mask: Optional[tf.Tensor] = None, training=False):
        """
        Args:
            batch (:obj: `tf.Tensor`) of shape (batch_size, seqlen):
                Sound tensor obtained from `Wav2Vec2Processor.__call__`.
            attention_mask (:obj: `tf.Tensor`, `optional`) of shape (batch_size, seqlen):
                Don't pass `attention_mask` when working with checkpoints based on `wav2vec2-base`
                otherwise you should pass this argument.
            training (:obj: `bool`, `optional`):
                Whether to use model for training.
        Returns:
            Logits from the model of shape (batch_size, seqlen, vocab_size).
        """
        batch = self.model(batch, attention_mask=attention_mask, training=training)
        batch = self.dropout(batch, training=training)
        batch = self.lm_head(batch)
        return batch

In [None]:
#                   processor.py

import json
import os
import re
import subprocess
from itertools import groupby

class Wav2Vec2Processor:
    def __init__(
        self, is_tokenizer, do_normalize=True, vocab_path="./vocab.json"
    ):
        # whether to use as `feature_extractor` or `tokenizer`

        self.is_tokenizer = is_tokenizer
        self.do_normalize = do_normalize
        self.vocab_path = vocab_path

        if self.is_tokenizer:
            self._setup_vocab()

            self.token_to_id_mapping = self.get_vocab()
            self.id_to_token_mapping = {
                v: k for k, v in self.token_to_id_mapping.items()
            }
            self.unk_token = "<unk>"
            self.unk_id = self.token_to_id_mapping[self.unk_token]

            self.dimiliter_token = "|"
            self.dimiliter_id = self.token_to_id_mapping[self.dimiliter_token]

            special_tokens = ["<pad>"]
            self.special_ids = [self.token_to_id_mapping[k] for k in special_tokens]

    def _setup_vocab(self):
        """This method will download & setup the vocab file if it's not on the `vocab_path`"""
        if not os.path.isfile(self.vocab_path):
            url = "https://github.com/vasudevgupta7/gsoc-wav2vec2/raw/main/data/vocab.json"

            print(f"Downloading `vocab.json` from {url} ... ", end="")
            try:
                subprocess.run(
                    ["wget", url], stdout=subprocess.PIPE, stderr=subprocess.PIPE
                )
            except:
                raise ValueError(f"Couldn't download `vocab.json` from {url}")
            print("DONE")

            self.vocab_path = "./vocab.json"

    def __call__(self, input_values):
        """
        if is_tokenizer:
            input_values (:obj: `str`):
                Single string you want to encode to ids
        else:
            input_values (:obj: `tf.Tensor`):
                Tensor which needs to be fed into `model.call()`
        """
        if self.is_tokenizer:
            input_values = self._tokenize(input_values)
            input_values = [
                self.token_to_id_mapping.get(k, self.unk_id) for k in input_values
            ]
        else:
            if self.do_normalize:
                input_values = self._normalize(input_values)
        return input_values

    def decode(self, input_ids: list, skip_special_tokens=True, group_tokens=True):
        """
        Use this method to decode your ids back to string.
        Args:
            input_ids (:obj: `list`):
                input_ids you want to decode to string.
            skip_special_tokens (:obj: `bool`, `optional`):
                Whether to remove special tokens (like `<pad>`) from string.
            group_tokens (:obj: `bool`, `optional`):
                Whether to group repeated characters.
        """
        if group_tokens:
            input_ids = [t[0] for t in groupby(input_ids)]
        if skip_special_tokens:
            input_ids = [k for k in input_ids if k not in self.special_ids]
        tokens = [self.id_to_token_mapping.get(k, self.unk_token) for k in input_ids]
        tokens = [k if k != self.dimiliter_token else " " for k in tokens]
        return "".join(tokens).strip()

    def _tokenize(self, string: str):
        string = re.sub("-", " ", string)
        string = re.sub("[^A-Z' ]", "", string.upper())
        return list(string.replace(" ", self.dimiliter_token))

    def get_vocab(self):
        with open(self.vocab_path, "r") as f:
            vocab = json.load(f)
        return vocab

    def _normalize(self, x):
        """You must call this before padding."""
        # -> (1, seqlen)
        mean = tf.reduce_mean(x, axis=-1, keepdims=True)
        var = tf.math.reduce_variance(x, axis=-1, keepdims=True)
        return tf.squeeze((x - mean) / tf.sqrt(var + 1e-5))


if __name__ == "__main__":
    """Testing Area"""

    feature_extractor = Wav2Vec2Processor(is_tokenizer=False)
    batch, _ = tf.audio.decode_wav(tf.io.read_file("../data/sample.wav"))
    batch = tf.transpose(batch, perm=(1, 0))
    batch = tf.concat([batch, batch], axis=0)

    out = feature_extractor(batch)
    print(out)

    print("\n\n")

    tokenizer = Wav2Vec2Processor(is_tokenizer=True)
    ids = tokenizer("vasudev guptaa is a data scientist.")
    print(ids)
    print(tokenizer.decode(ids))
    print(tokenizer.decode(ids, group_tokens=False))

    ids = tokenizer("how is life gooing? what's up.. yayy i got results. it's awe-some")
    print(ids)
    print(tokenizer.decode(ids))
    print(tokenizer.decode(ids, group_tokens=False))

In [None]:
import os
os.getcwd()


'/content'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip3 install -q git+https://github.com/vasudevgupta7/gsoc-wav2vec2@main
!sudo apt-get install -y libsndfile1-dev
!pip3 install -q SoundFile

In [None]:
!pip3 install -q git+https://github.com/vasudevgupta7/gsoc-wav2vec2@main

In [None]:
import json
import os
from dataclasses import asdict, dataclass, field


@dataclass
class Wav2Vec2Config:
    vocab_size: int = 32
    dropout: int = 0.1
    hidden_size: int = 768
    num_heads: int = 12
    num_layers: int = 12
    intermediate_size: int = 3072
    is_gelu_approx: bool = False
    layer_norm_eps: float = 1e-5
    survival_prob: float = 1.0
    pad_id: int = 0

    # positional embedding
    num_conv_pos_embeddings: int = 128
    num_conv_pos_embedding_groups: int = 16

    # feature extractor
    filter_sizes: list = field(
        default_factory=lambda: [512, 512, 512, 512, 512, 512, 512]
    )
    kernal_sizes: list = field(default_factory=lambda: [10, 3, 3, 3, 3, 2, 2])
    strides: list = field(default_factory=lambda: [5, 2, 2, 2, 2, 2, 2])
    conv_bias: bool = False

    # spec augmentation arguments
    apply_spec_augment: bool = True
    mask_time_prob: float = 0.05
    mask_time_length: int = 10

    attention_norm_type: str = "postnorm"
    feature_extractor_norm_type: bool = "group"
    is_robust: bool = False

    def __post_init__(self):
        if not (len(self.filter_sizes) == len(self.kernal_sizes) == len(self.strides)):
            raise ValueError(
                "Length of filter_sizes, kernal_sizes, strides must match."
            )
        if self.hidden_size % self.num_heads != 0:
            raise ValueError("Hidden size must be perfect multiple of num_heads.")

        assert self.feature_extractor_norm_type in ["group", "layer"], "Only `group` / `layer` are supported"
        assert self.attention_norm_type in ["prenorm", "postnorm"], "Only `prenorm` / `postnorm` are supported"

    def save_pretrained(self, save_dir):
        os.makedirs(save_dir, exist_ok=True)
        with open(os.path.join(save_dir, "config.json"), "w") as f:
            json.dump(asdict(self), f)

    @classmethod
    def from_json(cls, path: str):
        with open(path, "r") as f:
            config_dict = json.load(f)
        return cls(**config_dict)


@dataclass
class RobustWav2Vec2Config(Wav2Vec2Config):
    attention_norm_type: str = "prenorm"
    feature_extractor_norm_type: str = "layer"
    is_robust: bool = True
    conv_bias: bool = True

    hidden_size: int = 1024
    intermediate_size: int = 4096
    num_heads: int = 16
    num_layers: int = 24

In [None]:
import os
!pip3 install tensorflow==2.7.0
import tensorflow as tf
import tensorflow_hub as hub
print("TF version", tf.__version__)

TF version 2.7.0


In [None]:
from zipfile import ZipFile

In [None]:
with ZipFile('drive/MyDrive/ezyZip.zip', 'r') as zipObj:
  zipObj.extractall('drive/MyDrive/test')

In [None]:
!pip3 install -q git+https://github.com/vasudevgupta7/gsoc-wav2vec2@main
!sudo apt-get install -y libsndfile1-dev
!pip3 install -q SoundFile

[K     |████████████████████████████████| 1.7 MB 11.1 MB/s 
[K     |████████████████████████████████| 67 kB 4.6 MB/s 
[K     |████████████████████████████████| 50 kB 5.8 MB/s 
[K     |████████████████████████████████| 144 kB 50.3 MB/s 
[K     |████████████████████████████████| 181 kB 53.7 MB/s 
[K     |████████████████████████████████| 63 kB 1.7 MB/s 
[?25h  Building wheel for wav2vec2 (setup.py) ... [?25l[?25hdone
  Building wheel for python-Levenshtein (setup.py) ... [?25l[?25hdone
  Building wheel for pathtools (setup.py) ... [?25l[?25hdone
Reading package lists... Done
Building dependency tree       
Reading state information... Done
libsndfile1-dev is already the newest version (1.0.28-4ubuntu0.18.04.2).
The following package was automatically installed and is no longer required:
  libnvidia-common-470
Use 'sudo apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 39 not upgraded.


In [None]:
import os

import tensorflow as tf
import tensorflow_hub as hub
from wav2vec2 import Wav2Vec2Config

config = Wav2Vec2Config()

print("TF version:", tf.__version__)

Collecting tensorflow==2.7.0
  Downloading tensorflow-2.7.0-cp37-cp37m-manylinux2010_x86_64.whl (489.6 MB)
[K     |████████████████████████████████| 489.6 MB 13 kB/s 
Collecting keras<2.8,>=2.7.0rc0
  Downloading keras-2.7.0-py2.py3-none-any.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 43.3 MB/s 
[?25hCollecting tensorflow-estimator<2.8,~=2.7.0rc0
  Downloading tensorflow_estimator-2.7.0-py2.py3-none-any.whl (463 kB)
[K     |████████████████████████████████| 463 kB 4.6 MB/s 
Collecting gast<0.5.0,>=0.2.1
  Downloading gast-0.4.0-py3-none-any.whl (9.8 kB)
Installing collected packages: tensorflow-estimator, keras, gast, tensorflow
  Attempting uninstall: tensorflow-estimator
    Found existing installation: tensorflow-estimator 2.8.0
    Uninstalling tensorflow-estimator-2.8.0:
      Successfully uninstalled tensorflow-estimator-2.8.0
  Attempting uninstall: keras
    Found existing installation: keras 2.8.0
    Uninstalling keras-2.8.0:
      Successfully uninstall

TF version: 2.8.0


In [None]:
pretrained_layer = hub.KerasLayer("https://tfhub.dev/vasudevgupta7/wav2vec2/1", trainable=True)

In [None]:
AUDIO_MAXLEN = 246000
LABEL_MAXLEN = 256
BATCH_SIZE = 2

In [None]:
from tensorflow_hub import keras_layer

In [None]:
inputs = tf.keras.Input(shape=(AUDIO_MAXLEN,))
hidden_states = pretrained_layer(inputs)
outputs = tf.keras.layers.Dense(Wav2Vec2Config.vocab_size)(hidden_states)

model = tf.keras.Model(inputs=inputs, outputs=outputs)

In [None]:
model(tf.random.uniform(shape=(BATCH_SIZE, AUDIO_MAXLEN)))
model.summary()

In [None]:
#                      Tensorflow.com      it is tikenization on text_to_sequences
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [None]:
sentences = [
             'I love my dog',
             'I love my cat',
             'You love my dog!',
             'Do you think my dog is amazing?',
             'i am not that it is best way to find out it'
]
data_dir = "/content/drive/MyDrive/rus.json/"
all_files = os.listdir(data_dir)

json_files = [f for f in all_files if f.endswith(".json")]

print("Transcription files:", json_files)


NotADirectoryError: ignored

In [None]:
tokenizer = Tokenizer(num_words = 100, oov_token="<OOV>")
tokenizer.fit_on_texts(sentences)
word_index = tokenizer.word_index
word_index = tokenizer.word_index

sequences = tokenizer.texts_to_sequences(sentences)

test_seq = tokenizer.texts_to_sequences(test_data)

padded = pad_sequences(sequences)

padded1 = pad_sequences(sequences, padding = 'post', truncating = 'post', maxlen = 5)

In [None]:
print(word_index)

print(sequences)

print(padded)

print(padded1)


{'<OOV>': 1, 'my': 2, 'i': 3, 'love': 4, 'dog': 5, 'you': 6, 'is': 7, 'it': 8, 'cat': 9, 'do': 10, 'think': 11, 'amazing': 12, 'am': 13, 'not': 14, 'that': 15, 'best': 16, 'way': 17, 'to': 18, 'find': 19, 'out': 20}
[[3, 4, 2, 5], [3, 4, 2, 9], [6, 4, 2, 5], [10, 6, 11, 2, 5, 7, 12], [3, 13, 14, 15, 8, 7, 16, 17, 18, 19, 20, 8]]
[[ 0  0  0  0  0  0  0  0  3  4  2  5]
 [ 0  0  0  0  0  0  0  0  3  4  2  9]
 [ 0  0  0  0  0  0  0  0  6  4  2  5]
 [ 0  0  0  0  0 10  6 11  2  5  7 12]
 [ 3 13 14 15  8  7 16 17 18 19 20  8]]
[[ 3  4  2  5  0]
 [ 3  4  2  9  0]
 [ 6  4  2  5  0]
 [10  6 11  2  5]
 [ 3 13 14 15  8]]


In [None]:
import json

with open("./drive/MyDrive/rus.json", 'r') as f:
  datastore = json.load(f)

sentences =   []
labels = []
urls = []



# New Section

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
tokenizer = Tokenizer(oov_token="<OOV>")
tokenizer.fit_on_texts(sentences)
word_index = tokenizer.word_index

sequences = tokenizer.texts_to_sequences(sentences)
padded = pad_sequences(sequences, padding='post')
print(padded[0])
print(padded.shape)

In [None]:
training_sentences = sentences[0:training_size]
testing