In [84]:
import pprint
import tempfile
from absl import logging

import numpy as np

import tensorflow as tf
import tensorflow_transform as tft

import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils
import apache_beam as beam

tf.compat.v1.enable_eager_execution()
from tensorflow.keras.preprocessing.sequence import skipgrams

import warnings
warnings.filterwarnings("ignore")

In [67]:
features = tf.constant([[1,2, 3], [4, 5, 6], [1, 2, 3], [6, 7, 8], [2, 3, 5], [3, 5, 7]], dtype="int64")
dataset = tf.data.Dataset.from_tensor_slices({f"s{i}": features[:, i] for i in range(features.shape[1])})
dataset_schema = dataset_metadata.DatasetMetadata(
    schema_utils.schema_from_feature_spec({
        f's{i}': tf.io.FixedLenFeature([], tf.int64)\
        for i in range(features.shape[1])
    }))
dataset_schema

{'_schema': feature {
  name: "s0"
  type: INT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "s1"
  type: INT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "s2"
  type: INT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
}

In [74]:
def make_preproc_func(vocabulary_size, window_size, negative_samples):
    """Returns a preprocessing_fn to make skipgrams given the parameters."""
    def _make_skipgrams(s):
        """Numpy function to make skipgrams."""
        pairs, labels = skipgrams(
                s, vocabulary_size=vocabulary_size, window_size=window_size, negative_samples=0,
            )
        samples = np.concatenate([np.asarray(pairs), np.asarray(labels)[:, None]], axis=1)
        return samples
    
    @tf.function
    def tf_make_skipgrams(s):
        """tf nump / function wrapper."""
        y = tf.numpy_function(_make_skipgrams, [s], tf.int64)
        return y
    
    def _fn(inputs):
        """Preprocess input columns into transformed columns."""
        S = tf.stack(list(inputs.values()), axis=1) # tf tensor
        
        out = tf.map_fn(tf_make_skipgrams, S)
        out = tf.reshape(out, (-1, 3))
        
        output = {}
        output["target"] = out[:, 0]
        output["context"] = out[:, 1]
        output["label"] = out[:, 2]

        return output
    
    return _fn

preprocessing_fn = make_preproc_func(12, 2, 0.2)
output = preprocessing_fn(list(dataset.batch(5).as_numpy_iterator())[0])
output



{'target': <tf.Tensor: shape=(30,), dtype=int64, numpy=
 array([3, 2, 2, 1, 1, 3, 4, 5, 6, 4, 6, 5, 2, 1, 1, 2, 3, 3, 8, 8, 7, 6,
        7, 6, 2, 3, 5, 5, 2, 3])>,
 'context': <tf.Tensor: shape=(30,), dtype=int64, numpy=
 array([1, 1, 3, 2, 3, 2, 6, 6, 5, 5, 4, 4, 1, 2, 3, 3, 1, 2, 7, 6, 6, 8,
        8, 7, 5, 5, 2, 3, 3, 2])>,
 'label': <tf.Tensor: shape=(30,), dtype=int64, numpy=
 array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1])>}

In [None]:
def make_preproc_func(vocabulary_size, window_size, negative_samples):
    def _make_skipgrams(s):
        pairs, labels = skipgrams(
                s, vocabulary_size=vocabulary_size, window_size=window_size, negative_samples=0,
            )
        samples = np.concatenate([np.asarray(pairs), np.asarray(labels)[:, None]], axis=1)
        
        return samples
    
    def _fn(inputs):
        """Preprocess input columns into transformed columns."""
        S = np.stack(list(inputs.values()), axis=1) # tf tensor
        print(inputs)
        
        out = np.apply_along_axis(_make_skipgrams, axis=1, arr=S).reshape((-1, 3))
        
        output = {}
        output["target"] = out[:, 0]
        output["context"] = out[:, 1]
        output["label"] = out[:, 2]

        return output
    
    return _fn

preprocessing_fn = make_preproc_func(12, 2, 0.2)
output = preprocessing_fn(list(dataset.batch(5).as_numpy_iterator())[0])
output


In [75]:
# Run the beam pipeline
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transformed_dataset, transform_fn = (  # pylint: disable=unused-variable
        (dataset.as_numpy_iterator(), dataset_schema) | tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn))

transformed_data, transformed_metadata = transformed_dataset  # pylint: disable=unused-variable

print('\nRaw data:\n{}\n'.format(pprint.pformat(dataset)))
print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))













INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: /tmp/tmpzlg47iob/tftransform_tmp/4664b7dad2bf4bfd85e19aaf27b3e058/saved_model.pb


INFO:tensorflow:SavedModel written to: /tmp/tmpzlg47iob/tftransform_tmp/4664b7dad2bf4bfd85e19aaf27b3e058/saved_model.pb










INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore



Raw data:
<TensorSliceDataset shapes: {s0: (), s1: (), s2: ()}, types: {s0: tf.int64, s1: tf.int64, s2: tf.int64}>

Transformed data:
[{'context': 1, 'label': 1, 'target': 3},
 {'context': 3, 'label': 1, 'target': 2},
 {'context': 2, 'label': 1, 'target': 3},
 {'context': 1, 'label': 1, 'target': 2},
 {'context': 2, 'label': 1, 'target': 1},
 {'context': 3, 'label': 1, 'target': 1},
 {'context': 5, 'label': 1, 'target': 6},
 {'context': 4, 'label': 1, 'target': 5},
 {'context': 5, 'label': 1, 'target': 4},
 {'context': 6, 'label': 1, 'target': 5},
 {'context': 4, 'label': 1, 'target': 6},
 {'context': 6, 'label': 1, 'target': 4},
 {'context': 2, 'label': 1, 'target': 1},
 {'context': 3, 'label': 1, 'target': 2},
 {'context': 1, 'label': 1, 'target': 2},
 {'context': 1, 'label': 1, 'target': 3},
 {'context': 3, 'label': 1, 'target': 1},
 {'context': 2, 'label': 1, 'target': 3},
 {'context': 6, 'label': 1, 'target': 8},
 {'context': 8, 'label': 1, 'target': 7},
 {'context': 8, 'label': 

In [198]:
# Putting everything together
def make_preproc_func(vocabulary_size, window_size, negative_samples):
    """Returns a preprocessing_fn to make skipgrams given the parameters."""
    def _make_skipgrams(s):
        """Numpy function to make skipgrams."""
        pairs, labels = skipgrams(
                s, vocabulary_size=100, window_size=window_size, 
                negative_samples=negative_samples, seed=42,
            )
        samples = np.concatenate([np.asarray(pairs), np.asarray(labels)[:, None]], axis=1)
        return samples
    
    @tf.function
    def _tf_make_skipgrams(s):
        """tf nump / function wrapper."""
        y = tf.numpy_function(_make_skipgrams, [s], tf.int64)
        y.set_shape([None, 3])
        return y
    
    def _fn(inputs):
        """Preprocess input columns into transformed columns."""
        S = tf.stack(list(inputs.values()), axis=1) # tf tensor
        
        if False: # taking care of variable size tensors
            out = tf.map_fn(_tf_make_skipgrams, S, 
                            fn_output_signature=tf.RaggedTensorSpec(shape=[None, 3], ragged_rank=0, 
                                                                    dtype=tf.int64))
            
            out = out.to_tensor(default_value=-1)
            out = tf.reshape(out, (-1, 3))
            index = tf.reduce_all(tf.greater(out, -1), axis=1)
            out = tf.boolean_mask(out, index, axis=0)
        else:
            out = tf.map_fn(_tf_make_skipgrams, S)
            out = tf.reshape(out, (-1, 3))
        
        output = {}
        output["target"] = out[:, 0]
        output["context"] = out[:, 1]
        output["label"] = out[:, 2]

        return output
    
    return _fn


def generate_skipgrams(features, vocabulary_size=10, window_size=2, negative_samples=0., feature_names=None, save_path="temp"):
    if feature_names is None:
        feature_names = [f"f{i}" for i in range(features.shape[1])]
    assert len(feature_names) == features.shape[1]
    
    # Convert to list of dict dataset
    dataset = tf.data.Dataset.from_tensor_slices({f"s{i}": features[:, i] for i in range(features.shape[1])})
    dataset_schema = dataset_metadata.DatasetMetadata(
        schema_utils.schema_from_feature_spec({
            f's{i}': tf.io.FixedLenFeature([], tf.int64)\
            for i in range(features.shape[1])
        }))
    
    # Make the preprocessing_fn
    preprocessing_fn = make_preproc_func(vocabulary_size, window_size, negative_samples)
    
    # Run the beam pipeline
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
        transformed_dataset, transform_fn = (  # pylint: disable=unused-variable
            (dataset.as_numpy_iterator(), dataset_schema) 
            | "Make Skipgrams " >> tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)
        )

    # pylint: disable=unused-variable
    transformed_data, transformed_metadata = transformed_dataset  
    saved_results = (transformed_data
        | "Write to TFRecord" >> beam.io.tfrecordio.WriteToTFRecord(
            file_path_prefix=save_path, file_name_suffix=".tfrecords",
            coder=tft.coders.example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
        )
    print('\nRaw data:\n{}\n'.format(pprint.pformat(dataset)))
    print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))
    # Return the list of paths of tfrecords
    num_rows_saved = len(transformed_data)
    
    return saved_results, num_rows_saved
    

features = tf.constant([[1,2, 3], [4, 5, 6], [1, 2, 3], [6, 7, 8], [2, 3, 5], [3, 5, 7], [1, 1, 1]], dtype="int64")

saved_results, n = generate_skipgrams(features)
    














(None, 3)
INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: /tmp/tmp3noze5na/tftransform_tmp/3765f6302a544f5289b5554e75fb8cf5/saved_model.pb


INFO:tensorflow:SavedModel written to: /tmp/tmp3noze5na/tftransform_tmp/3765f6302a544f5289b5554e75fb8cf5/saved_model.pb










INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore



Raw data:
<TensorSliceDataset shapes: {s0: (), s1: (), s2: ()}, types: {s0: tf.int64, s1: tf.int64, s2: tf.int64}>

Transformed data:
[{'context': 3, 'label': 1, 'target': 2},
 {'context': 3, 'label': 1, 'target': 1},
 {'context': 1, 'label': 1, 'target': 2},
 {'context': 1, 'label': 1, 'target': 3},
 {'context': 2, 'label': 1, 'target': 1},
 {'context': 2, 'label': 1, 'target': 3},
 {'context': 6, 'label': 1, 'target': 5},
 {'context': 6, 'label': 1, 'target': 4},
 {'context': 4, 'label': 1, 'target': 5},
 {'context': 4, 'label': 1, 'target': 6},
 {'context': 5, 'label': 1, 'target': 4},
 {'context': 5, 'label': 1, 'target': 6},
 {'context': 3, 'label': 1, 'target': 2},
 {'context': 3, 'label': 1, 'target': 1},
 {'context': 1, 'label': 1, 'target': 2},
 {'context': 1, 'label': 1, 'target': 3},
 {'context': 2, 'label': 1, 'target': 1},
 {'context': 2, 'label': 1, 'target': 3},
 {'context': 8, 'label': 1, 'target': 7},
 {'context': 8, 'label': 1, 'target': 6},
 {'context': 6, 'label': 

In [199]:
n

42

36

In [120]:
# Read the skipgrams
# Read

def tfrecord2dataset(file_pattern, feature_spec, label_key, batch_size=5, 
                       num_epochs=2):
    """Returns:
        A dataset that contains (features, indices) tuple where features is a
        dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      num_epochs=num_epochs,
      features=feature_spec,
      label_key=label_key)
    #dataset = tf.data.TFRecord()
    return dataset


def skipgram_loader_map_fn(x, y):
    return (x["target"], x["context"]), y


feature_spec = {
    "target":    tf.io.FixedLenFeature([], dtype=tf.int64),
    "context":    tf.io.FixedLenFeature([], dtype=tf.int64),
    "label":    tf.io.FixedLenFeature([], dtype=tf.int64),
}    
loaded_dataset = tfrecord2dataset(saved_results, 
                                    feature_spec, 
                                    label_key="label",
                                    batch_size=7
                                    ).map(skipgram_loader_map_fn)
for i in loaded_dataset:
    print(i)

((<tf.Tensor: shape=(7,), dtype=int64, numpy=array([2, 2, 6, 4, 2, 3, 8])>, <tf.Tensor: shape=(7,), dtype=int64, numpy=array([5, 3, 8, 5, 3, 1, 7])>), <tf.Tensor: shape=(7,), dtype=int64, numpy=array([1, 1, 1, 1, 1, 1, 1])>)
((<tf.Tensor: shape=(7,), dtype=int64, numpy=array([5, 7, 5, 5, 1, 1, 6])>, <tf.Tensor: shape=(7,), dtype=int64, numpy=array([6, 8, 3, 3, 3, 3, 4])>), <tf.Tensor: shape=(7,), dtype=int64, numpy=array([1, 1, 1, 1, 1, 1, 1])>)
((<tf.Tensor: shape=(7,), dtype=int64, numpy=array([3, 2, 3, 7, 2, 6, 5])>, <tf.Tensor: shape=(7,), dtype=int64, numpy=array([5, 3, 5, 6, 1, 5, 2])>), <tf.Tensor: shape=(7,), dtype=int64, numpy=array([1, 1, 1, 1, 1, 1, 1])>)
((<tf.Tensor: shape=(7,), dtype=int64, numpy=array([5, 3, 1, 3, 4, 3, 5])>, <tf.Tensor: shape=(7,), dtype=int64, numpy=array([4, 2, 2, 7, 6, 1, 7])>), <tf.Tensor: shape=(7,), dtype=int64, numpy=array([1, 1, 1, 1, 1, 1, 1])>)
((<tf.Tensor: shape=(7,), dtype=int64, numpy=array([3, 3, 7, 2, 8, 6, 1])>, <tf.Tensor: shape=(7,), 