In [1]:
!pip install -r requirements.txt

[0m





















[0m

In [2]:
from minio import Minio, S3Error
import tarfile
import os
import math
import tensorflow as tf

In [3]:
# Config Paramters
minio_address = "minio.ns-1.svc.cluster.local"
minio_access_key = "kubeflow"
minio_secret_key = "kubeflow123"
datasets_bucket = "datasets"
preprocessed_data_folder = "preprocessed-data"
tf_record_file_size = 500
output_text_path = "preprocess.output"

In [4]:
minioClient = Minio(minio_address,
                    access_key=minio_access_key,
                    secret_key=minio_secret_key,
                    secure=False)

In [5]:
# Download data
try:
    minioClient.fget_object(
        datasets_bucket,
        "aclImdb_v1.tar.gz",
        "/tmp/dataset.tar.gz")
except S3Error as err:
    print(err)


In [6]:
# Extract data
extract_folder = f"/tmp/{datasets_bucket}/"

with tarfile.open("/tmp/dataset.tar.gz", "r:gz") as tar:
    tar.extractall(path=extract_folder)


In [7]:
# Load and structure the data
train = []
test = []

dirs_to_read = [
    "aclImdb/train/pos",
    "aclImdb/train/neg",
    "aclImdb/test/pos",
    "aclImdb/test/neg",
]

for dir_name in dirs_to_read:
    parts = dir_name.split("/")
    dataset = parts[1]
    label = parts[2]
    for filename in os.listdir(os.path.join(extract_folder, dir_name)):
        with open(os.path.join(extract_folder, dir_name, filename), "r") as f:
            content = f.read()
            if dataset == "train":
                train.append({
                    "text": content,
                    "label": label
                })
            elif dataset == "test":
                test.append({
                    "text": content,
                    "label": label
                })


In [8]:
# Since we encode the data using the Universal Sentence Encoder model let's download it
try:
    minioClient.fget_object(
        datasets_bucket,
        "models/universal-sentence-encoder_4.tar.gz",
        "/tmp/universal-sentence-encoder_4.tar.gz")
except S3Error as err:
    print(err)

In [9]:
se_model_prefix = "universal-sentence-encoder/4"
extract_folder = f"/tmp/{se_model_prefix}/"

with tarfile.open("/tmp/universal-sentence-encoder_4.tar.gz", "r:gz") as tar:
    tar.extractall(path=extract_folder)
embed = tf.saved_model.load(extract_folder)

In [10]:


def _embedded_sentence_feature(value):
    # convert tensor to list of float values
    input = value.numpy().ravel().tolist()
    return tf.train.Feature(float_list=tf.train.FloatList(value=input))

def _label_feature(value):
    # convert tensor to list of float values
    input = value.numpy().ravel().tolist()
    return tf.train.Feature(int64_list=tf.train.Int64List(value=input))

def encode_label(label):
    if label == "pos":
        return tf.constant([1, 0])
    elif label == "neg":
        return tf.constant([0, 1])

def serialize_example(label, sentence_tensor):
    feature = {
        "sentence": _embedded_sentence_feature(sentence_tensor[0]),
        "label": _label_feature(label),
    }
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto

def process_examples(records, prefix=""):
    print(f"Process examples for prefix: {prefix}")
    import timeit
    starttime = timeit.default_timer()
    total_training = len(records)
    print(f"Total of {total_training} elements")
    total_batches = math.floor(total_training / tf_record_file_size)
    if total_training % tf_record_file_size != 0:
        total_batches += 1
    print(f"Total of {total_batches} files of {tf_record_file_size} records - {timeit.default_timer() - starttime}")

    counter = 0
    file_counter = 0
    buffer = []
    file_list = []
    for i in range(total_training):
        counter += 1
        sentence_embedding = embed([records[i]["text"]])
        label_encoded = encode_label(records[i]["label"])
        record = serialize_example(label_encoded, sentence_embedding)
        buffer.append(record)

        if len(buffer) >= tf_record_file_size:
            # save this buffer of examples as a file to MinIO
            counter = 0
            file_counter += 1
            file_name = f"{prefix}_file{file_counter}.tfrecord"
            with open(file_name, "w+") as f:
                with tf.io.TFRecordWriter(f.name, options="GZIP") as writer:
                    for example in buffer:
                        writer.write(example.SerializeToString())

            try:
                minioClient.fput_object(datasets_bucket, f"{preprocessed_data_folder}/{file_name}", file_name)
            except S3Error as err:
                print(err)
            file_list.append(file_name)
            os.remove(file_name)
            buffer = []
            print(f"Done with batch {file_counter}/{total_batches} - {timeit.default_timer() - starttime}")
    print("")
    if len(buffer) > 0:
        file_counter += 1
        file_name = f"file{file_counter}.tfrecord"
        with open(file_name, "w+") as f:
            with tf.io.TFRecordWriter(f.name) as writer:
                for example in buffer:
                    writer.write(example.SerializeToString())
        try:
            minioClient.fput_object(datasets_bucket, f"{preprocessed_data_folder}/{file_name}", file_name)
        except S3Error as err:
            print(err)
        file_list.append(file_name)
        os.remove(file_name)
        buffer = []
    print("total time is :", timeit.default_timer() - starttime)
    return file_list



In [11]:
process_examples(train, prefix="train")

Process examples for prefix: train
Total of 25000 elements
Total of 50 files of 500 records - 0.003891207277774811


Done with batch 1/50 - 69.78354430524632


Done with batch 2/50 - 118.11841360805556


Done with batch 3/50 - 162.00976834632456


Done with batch 4/50 - 235.0743087893352


Done with batch 5/50 - 279.11819615308195


Done with batch 6/50 - 330.2023416701704


Done with batch 7/50 - 361.97559532523155


Done with batch 8/50 - 409.2729815039784


Done with batch 9/50 - 456.77326090820134


Done with batch 10/50 - 508.4792569312267


Done with batch 11/50 - 568.6093635512516


Done with batch 12/50 - 600.870526635088


Done with batch 13/50 - 640.274657165166


Done with batch 14/50 - 684.7047643181868


Done with batch 15/50 - 717.8217672011815


Done with batch 16/50 - 773.6030188701116


In [None]:
process_examples(test, prefix="test")

In [None]:
with open(output_text_path, 'w') as writer:
    writer.write("done!")

print("Done!")