# Train Large Vectorized Model

In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
import shutil
import time
import tarfile
import json
import math
from fastprogress.fastprogress import progress_bar
import pathlib
from pathlib import Path
from tensorflow import keras
from tensorflow.keras.layers import TextVectorization
from sklearn.utils import shuffle
from sklearn.metrics import mean_absolute_error, mean_squared_error

## Connect to Colab GPU

Note: Can not use TF Records with TPU

In [2]:
# Check if Google Colab Instance for Setup
print("Tensorflow version " + tf.__version__)

# Get correct path if on Google Colab
try:
	from google.colab import drive
	drive.mount("/content/drive")

	# Get RAM Info
	from psutil import virtual_memory
	ram_gb = virtual_memory().total / 1e9
	print('Your runtime has {:.1f} gigabytes of available RAM'.format(ram_gb))

	if ram_gb < 20:
		print('Not using a high-RAM runtime')
	else:
		print('You are using a high-RAM runtime!')

except ModuleNotFoundError:
	print("Not connected to Google Colab")

AUTO = tf.data.AUTOTUNE

Tensorflow version 2.7.0
Not connected to Google Colab


## Create TF Records from Review JSON

In [49]:
# https://keras.io/examples/keras_recipes/creating_tfrecords/

def bytes_feature(value):
	"""Returns a bytes_list from a string / byte."""
	return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value.encode()]))

def int64_feature(value):
	"""Returns an int64_list from a bool / enum / int / uint."""
	return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def create_example(text, stars):
	feature = {
		"text": bytes_feature(text),
		"stars": int64_feature(stars)
	}
	return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()

def parse_tfrecord_fn(example):
	feature_description = {
		"text": tf.io.FixedLenFeature([], tf.string),
		"stars": tf.io.FixedLenFeature([], tf.int64),
	}
	example = tf.io.parse_single_example(example, feature_description)
	return example


In [48]:
class ShardedWriter():
	def __init__(self, parent_dir, num_example_per_record, num_classes, class_labels=None):
		self.parent_dir = pathlib.Path(parent_dir)
		self.num_example_per_record = num_example_per_record
		self.num_classes = num_classes
		if class_labels is None:
			self.class_labels = list(range(self.num_classes))
		else:
			assert num_classes == len(class_labels), "num_classes does not match the number of class_labels"
			self.class_labels = class_labels
		self.record_counter = [1 for _ in range(self.num_classes)]
		self.element_counter = [0 for _ in range(self.num_classes)]
		self._init_paths()
		self.writers = self._init_writers()

	def _init_paths(self):
		# Make sure all the directories exist before making writers. 
		for label in self.class_labels:
			path_name = self.parent_dir / str(label)
			path_name.mkdir(parents=True, exist_ok=True)

	def _make_writer(self, path_name):
		# Create a TFRecord writer.
		path_name = str(path_name)
		return tf.io.TFRecordWriter(path_name)

	def _init_writers(self):
		# Initialize a new set of writers. Should only be called during class init. 
		writers = []
		for i in range(self.num_classes):
			record_name = f"01.tfrecord"
			path_name = self.parent_dir / str(self.class_labels[i]) / record_name
			writer = self._make_writer(path_name)
			writers.append(writer)
		return writers
	
	def _get_new_writer(self, index):
		# Close the current writer and create a new one, incrementing the file name
		# Close previous writer
		self.writers[index].close()

		# Increment file name counter
		self.record_counter[index] += 1
		record_name = f"{self.record_counter[index]:02}.tfrecord"
		path_name = self.parent_dir / str(self.class_labels[index]) / record_name

		# Make writer
		self.writers[index] = self._make_writer(path_name)
	
	def write_example(self, example, label):
		# Know which class the example belongs to based on label
		if isinstance(label, int):
			class_index = label
		else:
			class_index = self.class_labels.index(label)

		# Check element_counter
		if self.element_counter[class_index] >= self.num_example_per_record:
			self._get_new_writer(class_index)
			self.element_counter[class_index] = 0
		else:
			self.element_counter[class_index] += 1

		self.writers[class_index].write(example)

	def get_record_paths(self):
		# Get a list of all the record paths for each class. 
		record_paths = []
		for i, label in enumerate(self.class_labels):
			label_paths = []
			for j in range(1, self.record_counter[j] + 1):
				record_name = f"{j:02}.tfrecord"
				path_name = self.parent_dir / str(self.class_labels[i]) / record_name
				label_paths.append(path_name)
			record_paths.append(label_paths)
		return record_paths

	def get_counts(self):
		# Get total counts of each class encountered and saved. 
		counts = [(self.record_counter[i] - 1) * self.num_example_per_record + self.element_counter[i] for i in range(self.num_classes)]
		return counts

	def close_all_writers(self):
		for writer in self.writers:
			writer.close()

In [17]:
read_path = "yelp_dataset/yelp_academic_dataset_review.json"
write_path = "yelp_dataset/all_reviews.zip"
colab_path = "drive/MyDrive/Colab Notebooks/yelp_dataset/all_reviews.zip"

if Path("yelp_dataset/all_reviews").exists():
	# Data should already be in place
	pass
elif Path(colab_path).exists():
	Path("yelp_dataset").mkdir(exist_ok=True)
	shutil.copy(colab_path, "yelp_dataset/")
	Path("yelp_dataset/all_reviews").mkdir(exist_ok=True)
	shutil.unpack_archive(write_path, "yelp_dataset/all_reviews", "zip")
else:
	start_time = time.perf_counter()
	# Unzip yelp reviews and make all reviews
	if Path("yelp_dataset").exists():
		pass
	else:
		# Extract tar file
		Path("yelp_dataset").mkdir(exist_ok=True)
		with tarfile.open("drive/MyDrive/Colab Notebooks/yelp_dataset.tgz") as tar:
			tar.extractall("yelp_dataset/")

	# Number of Reviews is known
	NUM_REVIEWS = 8_635_403

	# Make a writer for each star category
	parent_dir = Path("yelp_dataset/all_reviews")
	parent_dir.mkdir(exist_ok=True)
	# writers = [tf.io.TFRecordWriter(f"yelp_dataset/all_reviews/all_{i}_star_reviews.tfrecord") for i in range(1, 6)]
	class_labels = ["1_star", "2_star", "3_star", "4_star", "5_star"]
	sharded_writer = ShardedWriter(parent_dir, num_example_per_record=2**16, num_classes=5, class_labels=class_labels)

	# Read line by line as json, extract just the "text" and "stars", then write line by line to new json
	with open(read_path, "r") as yelp_review:
		for i in progress_bar(range(NUM_REVIEWS)):
			# Get next line of JSON
			review = next(yelp_review)
			# Parse JSON
			line = json.loads(review)
			# Create tf.train.Example
			example = create_example(line["text"], int(line["stars"]))
			# Write TF Record to correct record
			sharded_writer.write_example(example, label=int(line["stars"])-1)

	# Close all TF Record writers
	sharded_writer.close_all_writers()

	# Get counts
	counts = sharded_writer.get_counts()
	for i, count in enumerate(counts):
		print(f"Saved {count} elements of rating {i + 1}.")

	# Zip all review files
	shutil.make_archive("yelp_dataset/all_reviews", "zip", "yelp_dataset/all_reviews")

	Path("drive/MyDrive/Colab Notebooks/yelp_dataset").mkdir(parents=True, exist_ok=True)
	shutil.copy(write_path, "drive/MyDrive/Colab Notebooks/yelp_dataset")
	print("All reviews saved to disk.")

	print(f"Reviews processed in {round(time.perf_counter() - start_time)} seconds.")

Saved 1262781 elements of rating 1.
Saved 711368 elements of rating 2.
Saved 926642 elements of rating 3.
Saved 1920008 elements of rating 4.
Saved 3814474 elements of rating 5.
All reviews saved to disk.
Reviews processed in 1118 seconds.


## Load All TF Records into balanced dataset

In [24]:
# Get pathnames
record_paths = {}
parent_dir = Path("yelp_dataset/all_reviews")
for class_dir in parent_dir.iterdir():
	if class_dir.is_dir():
		record_paths[class_dir.parts[-1]] = []
		for file_name in class_dir.iterdir():
			if file_name.suffix == ".tfrecord":
				record_paths[file_name.parts[-2]].append(file_name)

In [41]:
# Select Train Val Test
# Test and Val have 20% each
test_paths = {}
val_paths = {}
train_paths = {}
for key, value in record_paths.items():
	value.sort(key=lambda path: int(path.stem))
	test_paths[key] = value[4::5]
	val_paths[key] = value[3::5]
	train_paths[key] = value[2::5] + value[1::5] + value[0::5]

In [57]:
# Methods to construct datasets
def prepare_sample(example):
	# Map to star index
	return example["text"], int(example["stars"] - 1)

def construct_dataset(path_dict):
	# Construct Datasets
	raw_datasets = [tf.data.TFRecordDataset(path_dict[f"{i}_star"], num_parallel_reads=AUTO) for i in range(1, 6)]

	# Create a balanced dataset by evenly selecting from each dataset
	reviews_ds = tf.data.Dataset.sample_from_datasets(
		datasets=raw_datasets, 
		weights=[0.2, 0.2, 0.2, 0.2, 0.2], 
		seed=0, 
		stop_on_empty_dataset=True
	)

	# Parse all records
	reviews_ds = reviews_ds.map(parse_tfrecord_fn, num_parallel_calls=AUTO)
	# Convert ratings to zero index
	reviews_ds = reviews_ds.map(prepare_sample, num_parallel_calls=AUTO)

	return reviews_ds

In [59]:
# Create datasets
train_ds = construct_dataset(train_paths)
val_ds = construct_dataset(val_paths)
test_ds = construct_dataset(test_paths)

In [None]:
num_reviews = [1262801, 711379, 926657, 1920038, 3814533]

## Sequence Embedded Categorical

In [None]:
start_time = time.perf_counter()
# Create TextVectorization
max_tokens = 30000
max_length = 500
batch_size = 2 ** 12
text_vectorization = TextVectorization(max_tokens=max_tokens, output_mode="int", output_sequence_length=max_length)

# Train Vectorizer on train text
text_vectorization.adapt(train_ds.map(lambda x, y: x, num_parallel_calls=AUTO).batch(batch_size))

print(f"Created text vectorization in {round(time.perf_counter() - start_time)} seconds. ")

In [None]:
# Vectorize Datasets
train_dataset_vectorized = train_ds.batch(batch_size).map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=AUTO)
val_dataset_vectorized = val_ds.batch(batch_size).map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=AUTO)
test_dataset_vectorized = test_ds.batch(batch_size).map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=AUTO)

# Repeat train dataset, and prefetch all datasets
train_dataset_vectorized = train_dataset_vectorized.prefetch(AUTO)
val_dataset_vectorized = val_dataset_vectorized.prefetch(AUTO)
test_dataset_vectorized = test_dataset_vectorized.prefetch(AUTO)

## Train Model

In [None]:
# Build Model
def create_embedding_model_categorical(max_tokens, model_name):
	inputs = keras.Input(shape=(None,), dtype="int64")
	embedded = keras.layers.Embedding(input_dim=max_tokens, output_dim=512, mask_zero=True)(inputs)
	x = keras.layers.Bidirectional(keras.layers.LSTM(64, return_sequences=True))(embedded)
	x = keras.layers.Dropout(0.25)(x)
	x = keras.layers.Bidirectional(keras.layers.LSTM(32))(x)
	x = keras.layers.Dropout(0.25)(x)
	outputs = keras.layers.Dense(5, activation="softmax")(x)

	model = keras.Model(inputs, outputs, name=model_name)

	model.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["sparse_categorical_accuracy"])

	return model

In [None]:
model_name = "large_sequence_embedded_model"

model = create_embedding_model_categorical(max_tokens, model_name)

# Create callback to save model with a given name
model_path = f"models/{model_name}.keras"
callbacks = [
	keras.callbacks.ModelCheckpoint(model_path, monitor='val_loss', save_best_only=True),
	keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0.01, patience=5, verbose=1, restore_best_weights=False)
]

start_time = time.perf_counter()

# Train Model
model.fit(train_dataset_vectorized.cache(), 
	validation_data=val_dataset_vectorized.cache(), 
	# steps_per_epoch=num_train_epochs, 
	epochs=1, 
	callbacks=callbacks
)

# Evaluate Model after training
model = keras.models.load_model(model_path)
predictions = model.predict(test_dataset_vectorized)
predictions = np.argmax(predictions, axis = -1)
true_labels = np.concatenate([y for _, y in test_dataset_vectorized], axis=0)
mae = mean_absolute_error(true_labels, predictions)
mse = mean_squared_error(true_labels, predictions)

mins, secs = divmod(time.perf_counter() - start_time, 60)
# Output Model Metrics
print(f"Trained model in {mins} minutes, {round(secs)} seconds")
metrics_text = f"Model {model_name} with MAE {mae:.3f} and MSE {mse:.3f}\n"
print(metrics_text)
with open("model_metrics.txt", "a") as f:
	f.write(metrics_text)

In [None]:
# Export model with Text Vectorization layer
inputs = keras.Inputs(shape=(1,), dtype="string")
vectorized_inputs = text_vectorization(inputs)
outputs = model(vectorized_inputs)

inference_model = keras.Model(inputs, outputs)

keras.models.save_model(inference_model, "models/full_text_model.keras")

In [None]:
# Zip Models
!zip -r "models.zip" "models"

try: 
	from google.colab import files
	files.download("models.zip")
	files.download("model_metrics.txt")
except:
	pass

In [None]:
# Test model
review_model = keras.models.load_model("models/full_text_model.keras")

review_text = [
	["I think my meal was decent, but I have had better. I would recommend other places in the area."],
	["My meal was excellent, and I had a really great time dining at this restaurant tonight. I will be back!"], 
	["Horrible experience. The food was awful and I wish to never return to this restaurant again."]
]

raw_text_data = tf.convert_to_tensor(review_text)

predictions = review_model(raw_text_data)
predictions = np.argmax(predictions, axis = -1) + 1
for text, star in zip(review_text, predictions):
	print(f"{star}: {review_text}")