In [20]:
import numpy as np
import tensorflow as tf
import tensorflow_recommenders as tfrs
from typing import Dict, Text
import os
import pprint
import tempfile
from tensorflow.keras import Sequential, regularizers
from tensorflow.keras.layers import Embedding, Dense, GlobalAveragePooling1D, Flatten, Reshape, Concatenate,BatchNormalization
from tensorflow.keras.layers.experimental.preprocessing import StringLookup, Normalization, TextVectorization

from tensorflow.keras.constraints import Constraint
import tensorflow_hub as hub

In [21]:
genders=['Non-binary', 'Female', 'Prefer not to say', 'Male']
sexuality=['Lesbian', 'Straight', 'Queer', 'Bisexual', 'Asexual', 'Pansexual','Gay', 'Demisexual', 'Heterosexual', 'Homosexual']

class WeightClip(Constraint):
    def __init__(self, clip_value):
        self.clip_value = clip_value

    def __call__(self, w):
        return tf.clip_by_value(w, -self.clip_value, self.clip_value)

    def get_config(self):
        return {'clip_value': self.clip_value}

# Preprocessing layers
gender_lookup = StringLookup()
gender_lookup.adapt(genders)
sexuality_lookup = StringLookup()
sexuality_lookup.adapt(sexuality)
age_normalizer = Normalization()
score_normalizer = Normalization()
# TextVectorization layer for stim names
max_tokens = 2000  # Maximum number of unique tokens, adjust this value according to your dataset
output_sequence_length = 16 # Adjust this value based on the expected number of tokens per stim_name
embedding_dim = 32  # Dimension of the dense vectors
# Create a Normalization layer for harmfulness_score
harmfulness_score_normalizer = Normalization()

regul=5e-5   #regularization
def create_user_input_layers():
    age_input = tf.keras.Input(shape=(1,), name="age", dtype=tf.int32)
    gender_input = tf.keras.Input(shape=(1,), name="gender", dtype=tf.string)
    sexuality_input = tf.keras.Input(shape=(1,), name="sexuality", dtype=tf.string)
    neurodivergent_conditions_input = tf.keras.Input(shape=(1,512) , name="nd_conditions_embedding", dtype=tf.float32)
    hobbies_embedding_input = tf.keras.Input(shape=(1,512) , name="hobbies_embedding", dtype=tf.float32)
    stimming_essentiality_score_input = tf.keras.Input(shape=(1,), name="stimming_essentiality_score", dtype=tf.int32)
    current_stims_input = tf.keras.Input(shape=(None, 512), name="current_stims", dtype=tf.float32)
    return {
        "age": age_input,
        "gender": gender_input,
        "sexuality": sexuality_input,
        "nd_conditions_embedding": neurodivergent_conditions_input,
        "hobbies_embedding": hobbies_embedding_input,
        "stimming_essentiality_score": stimming_essentiality_score_input,
        "current_stims": current_stims_input,
    }
def create_stim_input_layers():
    name_input = tf.keras.Input(shape=(1,512), name="name_embedding", dtype=tf.float32)
    description_input = tf.keras.Input(shape=(1,512), name="description_embedding", dtype=tf.float32)
    harmfulness_score_input = tf.keras.Input(shape=(1,), name="harmfulness_score", dtype=tf.int32)
    associated_conditions_hobbies_input= tf.keras.Input(shape=(2, 512), dtype=tf.float32, name="associated_conditions_hobbies")
    return {"name_embedding": name_input, "description_embedding": description_input, "harmfulness_score": harmfulness_score_input, 
            'associated_conditions_hobbies':associated_conditions_hobbies_input}

def create_user_model(user_input_layers):
    clip_value = 0.2
    clip_constraint = WeightClip(clip_value)
    # Preprocessing layers
    age_normalized = tf.expand_dims(age_normalizer(user_input_layers["age"]), -1)
    gender_embedded = gender_lookup(user_input_layers["gender"])
    sexuality_embedded = sexuality_lookup(user_input_layers["sexuality"])
    neurodivergent_conditions_embedded = user_input_layers["nd_conditions_embedding"]
    hobbies_embedded = user_input_layers["hobbies_embedding"]
    stimming_essentiality_score_normalized = tf.expand_dims(score_normalizer(user_input_layers["stimming_essentiality_score"]),-1)
    current_stims_embeddings = user_input_layers["current_stims"]

    # Embedding layers
    embedding_dim = 8
    gender_embedding = Embedding(input_dim=len(gender_lookup.get_vocabulary()), output_dim=embedding_dim)(gender_embedded)
    sexuality_embedding = Embedding(input_dim=len(sexuality_lookup.get_vocabulary()), output_dim=embedding_dim)(sexuality_embedded)
    #neurodivergent_conditions_dense = Embedding(input_dim=len(nd_condition_lookup.get_vocabulary()), output_dim=embedding_dim)(neurodivergent_conditions_embedded)
    
    
    # Dense layers
    age_dense = Dense(32, kernel_initializer='random_normal',activation='swish', kernel_constraint=clip_constraint,kernel_regularizer=tf.keras.regularizers.l1(regul))(tf.reshape(age_normalized, (-1, 1)))
    stimming_essentiality_dense = Dense(128, kernel_initializer='random_normal',kernel_regularizer=tf.keras.regularizers.l1(regul),activation='swish', kernel_constraint=clip_constraint)(tf.reshape(stimming_essentiality_score_normalized, (-1, 1)))

    gender_dense = Dense(32, kernel_initializer='random_normal',activation=tf.keras.activations.selu,kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)(Flatten()(gender_embedding))
    gender_dense = tf.debugging.check_numerics(gender_dense, message="NaN or Inf found in gender_dense")
    
    sexuality_dense = Dense(128, kernel_initializer='random_normal',activation='swish',kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)(Flatten()(sexuality_embedded))
    sexuality_dense = tf.debugging.check_numerics(sexuality_dense, message="NaN or Inf found in sexuality_dense")
    
    #neurodivergent_conditions_pooled = GlobalAveragePooling1D()(neurodivergent_conditions_dense)
    nd_conditions_normalizer=BatchNormalization()
    nd_condition_dense = nd_conditions_normalizer(Dense(2048, kernel_initializer='random_normal',kernel_regularizer=tf.keras.regularizers.l1(regul),activation='swish', kernel_constraint=clip_constraint)(Flatten()(neurodivergent_conditions_embedded)))
    nd_condition_dense = tf.debugging.check_numerics(nd_condition_dense, message="NaN or Inf found in nd_condition_dense")
    
    hobbies_normalizer=BatchNormalization()
    hobbies_dense = hobbies_normalizer(Dense(2048, kernel_initializer='random_normal',kernel_regularizer=tf.keras.regularizers.l1(regul),activation='swish', kernel_constraint=clip_constraint)(Flatten()(hobbies_embedded)))
    hobbies_dense = tf.debugging.check_numerics(hobbies_dense, message="NaN or Inf found in hobbies_dense")
    
    avg_pooled_stims = tf.reduce_mean(current_stims_embeddings, axis=1)
    stim_normalizer=BatchNormalization()
    current_stims_embeddings_dense = stim_normalizer(Dense(1024, kernel_initializer='random_normal',kernel_regularizer=tf.keras.regularizers.l1(regul),activation='swish', kernel_constraint=clip_constraint)(avg_pooled_stims))
    current_stims_embeddings_dense = tf.debugging.check_numerics(current_stims_embeddings_dense, message="NaN or Inf found in current_stims_embeddings_dense")

    flattened_stims_dense=Flatten()(current_stims_embeddings_dense)

    # Concatenate dense layers
    concatenated = Concatenate(axis=-1)([
        age_dense,
        gender_dense,
        sexuality_dense,
        nd_condition_dense,
        hobbies_dense,
        stimming_essentiality_dense,
        current_stims_embeddings_dense
    ])

    flat_embeddings = Flatten()(concatenated)
    # User Dense layers
    layer1_norm=BatchNormalization()
    dense_1 = layer1_norm(Dense(8192, kernel_initializer='random_normal',activation='swish', kernel_constraint=clip_constraint,kernel_regularizer=tf.keras.regularizers.l1(regul))(flat_embeddings))
    layer2_norm=BatchNormalization()
    dense_2 = layer2_norm(Dense(4096, kernel_initializer='random_normal',activation='swish', kernel_constraint=clip_constraint,kernel_regularizer=tf.keras.regularizers.l1(regul))(dense_1))
    layer3_norm=BatchNormalization()
    dense_3 = layer3_norm(Dense(2048, kernel_initializer='random_normal',activation='swish',kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)(dense_2))
    layer4_norm=BatchNormalization()
    dense_4 = layer4_norm(Dense(1024, kernel_initializer='random_normal',activation='swish',kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)(dense_3))
    model = tf.keras.Model(inputs=user_input_layers, outputs=dense_4, name="user_model")
    return model

def create_stim_model(stim_input_layers):
    clip_value = 0.2
    clip_constraint = WeightClip(clip_value)
    embedded_name = stim_input_layers["name_embedding"]
    description_embedded = stim_input_layers["description_embedding"]

    harmfulness_score_normalized = tf.expand_dims(harmfulness_score_normalizer(stim_input_layers["harmfulness_score"]), -1)
    associated_conditions_hobbies_embedded= stim_input_layers["associated_conditions_hobbies"]

    name_dense=Dense(1024, kernel_initializer='random_normal',activation='swish', kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)(Flatten()(embedded_name))
    description_dense=Dense(1024, kernel_initializer='random_normal',activation='swish', kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)(Flatten()(description_embedded))
    harmfulness_score_dense=Dense(16, kernel_initializer='random_normal',activation=tf.keras.activations.selu, kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)(tf.reshape(harmfulness_score_normalized,(-1,1)))
    avg_pooled_hobbies = tf.reduce_mean(associated_conditions_hobbies_embedded, axis=1)
    hobby_norm=BatchNormalization()
    associated_conditions_hobbies_embedded_dense = hobby_norm(Dense(2048, kernel_initializer='random_normal', kernel_regularizer=tf.keras.regularizers.l1(regul),activation='swish', kernel_constraint=clip_constraint)(avg_pooled_hobbies))
    flattened_hobbies_dense = Flatten()(associated_conditions_hobbies_embedded_dense)

    concatenated = Concatenate(axis=-1)([
        name_dense,
        description_dense,
        harmfulness_score_dense,
        flattened_hobbies_dense
    ])
    flat_embeddings = Flatten()(concatenated)
    # Flatten and Dense layers
    layer1_norm=BatchNormalization()
    dense_1 = layer1_norm(Dense(8192, kernel_initializer='random_normal',activation='swish',kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)(flat_embeddings))
    layer2_norm=BatchNormalization()
    dense_2 = layer2_norm(Dense(4096, kernel_initializer='random_normal', activation='selu',kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)(dense_1))
    layer3_norm=BatchNormalization()
    dense_3 = layer3_norm(Dense(2048, kernel_initializer='random_normal',activation='swish', kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)(dense_2))
    layer4_norm=BatchNormalization()
    dense_4 = layer4_norm(Dense(1024, kernel_initializer='random_normal', kernel_regularizer=tf.keras.regularizers.l1(regul),activation='selu', kernel_constraint=clip_constraint)(dense_3))
    return tf.keras.Model(inputs=stim_input_layers, outputs=dense_4, name="stim_model")

user_input_layers = create_user_input_layers()
stim_input_layers = create_stim_input_layers()
user_model = create_user_model( user_input_layers)
stim_model = create_stim_model( stim_input_layers)

In [22]:
relug=5e-5
class ScoreEstimator(tf.keras.Model):
    def __init__(self, user_model, stim_model):
        super().__init__()
        self.user_model = user_model
        self.stim_model = stim_model
        self.concat_layer = Concatenate(axis=-1)
        clip_value = 0.5
        clip_constraint = WeightClip(clip_value)

        self.dense_1a = Dense(4096, activation=tf.keras.activations.selu, kernel_initializer='random_normal', kernel_regularizer=regularizers.l1(relug), kernel_constraint=clip_constraint)
        self.dense_1b = Dense(2048, activation=tf.keras.activations.selu, kernel_initializer='random_normal', kernel_regularizer=regularizers.l1(relug), kernel_constraint=clip_constraint)
        self.dense_2a = Dense(512, activation=tf.keras.activations.selu, kernel_initializer='random_normal', kernel_regularizer=regularizers.l1(relug), kernel_constraint=clip_constraint)
        self.dense_2b = Dense(256, activation=tf.keras.activations.selu, kernel_initializer='random_normal', kernel_regularizer=regularizers.l1(relug), kernel_constraint=clip_constraint)
        self.dense_3a = Dense(128, activation=tf.keras.activations.selu, kernel_initializer='random_normal', kernel_regularizer=regularizers.l1(relug), kernel_constraint=clip_constraint)
        self.dense_3b = Dense(64, activation=tf.keras.activations.selu, kernel_initializer='random_normal', kernel_regularizer=regularizers.l1(relug), kernel_constraint=clip_constraint)
        self.dense_4a = Dense(32, activation=tf.keras.activations.selu, kernel_initializer='random_normal', kernel_regularizer=regularizers.l1(relug), kernel_constraint=clip_constraint)
        self.dense_4b = Dense(16, activation=tf.keras.activations.selu, kernel_initializer='random_normal', kernel_regularizer=regularizers.l1(relug), kernel_constraint=clip_constraint)
        self.dense_5a = Dense(8, activation=tf.keras.activations.selu, kernel_initializer='random_normal', kernel_regularizer=regularizers.l1(relug), kernel_constraint=clip_constraint)
        self.dense_5b = Dense(4, activation=tf.keras.activations.selu, kernel_initializer='random_normal', kernel_regularizer=regularizers.l1(relug), kernel_constraint=clip_constraint)
        self.output_layer = Dense(1, activation=tf.keras.activations.selu, kernel_regularizer=tf.keras.regularizers.l1(regul), kernel_constraint=clip_constraint)

        self.batch_norm1 = BatchNormalization()
        self.batch_norm2 = BatchNormalization()
        self.batch_norm3 = BatchNormalization()
        self.batch_norm4 = BatchNormalization()
        self.batch_norm5 = BatchNormalization()
        
        self.loss_fn = tf.keras.losses.MeanSquaredError()

    def call(self, inputs, training=False):
        user_inputs, stim_inputs = inputs
        user_output = self.user_model(user_inputs)
        stim_output = self.stim_model(stim_inputs)
        concatenated = self.concat_layer([user_output, stim_output])
        dense_1a_output = self.batch_norm1(self.dense_1a(concatenated))
        dense_1b_output = self.dense_1b(dense_1a_output)
        dense_2a_output = self.batch_norm2(self.dense_2a(dense_1b_output))
        dense_2b_output = self.dense_2b(dense_2a_output)
        dense_3a_output = self.batch_norm3(self.dense_3a(dense_2b_output))
        dense_3b_output = self.dense_3b(dense_3a_output)
        dense_4a_output = self.batch_norm4(self.dense_4a(dense_3b_output))
        dense_4b_output = self.dense_4b(dense_4a_output)
        dense_5a_output = self.batch_norm5(self.dense_5a(dense_4b_output))
        dense_5b_output = self.dense_5b(dense_5a_output)
        output = self.output_layer(dense_5b_output)
        return output


In [23]:
class RankingModel(tfrs.models.Model):

  def __init__(self):
    super().__init__()

    self.ranking_model: tf.keras.Model = ScoreEstimator(user_model,stim_model)
    self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
      loss = tf.keras.losses.MeanSquaredError(),
      metrics=[tf.keras.metrics.RootMeanSquaredError()]
    )

  def call(self, inputs):
      #tensor_user_inputs, tensor_stim_inputs, scores = inputs[0]
      return self.ranking_model((inputs['user_inputs'], inputs['stim_inputs']))

  def compute_loss(self, inputs, training=False) -> tf.Tensor:
      rating_predictions = self(inputs)
      # The task computes the loss and the metrics.
      return self.task(labels=inputs['labels'], predictions=rating_predictions)

In [24]:
#Restore to use model
def create_recommender_model(user_input_layers, stim_input_layers):
    user_model = create_user_model(user_input_layers)
    stim_model = create_stim_model(stim_input_layers)
    recommender_model = RankingModel()
    recommender_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=3e-4))
    return recommender_model
user_input_layers = create_user_input_layers()
stim_input_layers = create_stim_input_layers()

# Create the model architecture
loaded_model = create_recommender_model(user_input_layers, stim_input_layers)

# Load the saved weights
model_checkpoint = tf.train.Checkpoint(model=loaded_model)
model_checkpoint.restore("stimming_recommender/weights.ckpt-1").expect_partial()

user_checkpoint=tf.train.Checkpoint(model=user_model)
user_checkpoint.restore("user_model_weights/weights.ckpt").expect_partial()

stim_checkpoint=tf.train.Checkpoint(model=stim_model)
stim_checkpoint.restore("stim_model_weights/weights.ckpt").expect_partial()

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x226a5f4f670>

In [25]:
# Load the Universal Sentence Encoder
embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")

def text_embedder(text):
    return embed(text)
def pad_or_truncate(arr, target_shape):
    if arr.shape[0] < target_shape[0]:
        padding = np.zeros((target_shape[0] - arr.shape[0], target_shape[1]))
        arr = np.vstack((arr, padding))
    elif arr.shape[0] > target_shape[0]:
        arr = arr[:target_shape[0], :]
    return arr
def create_prediction_input(user_data, stim_data):
    user_inputs = {
        "age": tf.constant([user_data["age"]], dtype=tf.int32),
        "gender": tf.constant([user_data["gender"]], dtype=tf.string),
        "sexuality": tf.constant([user_data["sexuality"]], dtype=tf.string),
        "nd_conditions_embedding": tf.expand_dims(tf.expand_dims(text_embedder([" and ".join(user_data["neurodivergent_conditions"])])[0], axis=0), axis=1),
        "hobbies_embedding": tf.expand_dims(tf.expand_dims(text_embedder([user_data["hobbies"]])[0], axis=0), axis=1),
        "stimming_essentiality_score": tf.constant([user_data["stimming_essentiality_score"]], dtype=tf.int32),
        "current_stims": tf.expand_dims(tf.expand_dims(text_embedder([user_data["current_stims"]])[0], axis=0), axis=1),
    }

    stim_inputs = {
        "name_embedding": tf.expand_dims(tf.expand_dims(text_embedder([stim_data["name"]])[0], axis=0), axis=1),
        "description_embedding": tf.expand_dims(tf.expand_dims(text_embedder([stim_data["description"]])[0], axis=0), axis=1),
        "harmfulness_score": tf.constant([stim_data["harmfulness_score"]], dtype=tf.int32),
        "associated_conditions_hobbies": tf.expand_dims(pad_or_truncate(text_embedder(stim_data["associated_conditions_hobbies"]), (2, 512)), axis=0),
    }


    return user_inputs, stim_inputs

user_data = {
    "age": 20,
    "gender": "Female",
    "sexuality": "Bisexual",
    "neurodivergent_conditions": ['Social Anxiety Disorder', 'Depression','Epilepsy'],
    "hobbies": "Drawing, Music",
    "stimming_essentiality_score": 5,
    "current_stims": "Tapping Feet",
}

stim_data = {
    "name": "Fidgeting hands",
    "description": "I start tearing random bits of paper whenever I'm mindless",
    "harmfulness_score": 1,
    "associated_conditions_hobbies": ["Dyslexia", "Masturbation"],
}


user_inputs, stim_inputs = create_prediction_input(user_data, stim_data)

# Make a prediction
prediction = loaded_model({ 'user_inputs' : user_inputs, 'stim_inputs': stim_inputs, 'labels': 0})
print(prediction)

tf.Tensor([[84.297874]], shape=(1, 1), dtype=float32)
