In [1]:
import sys
import os

# Add the /app/src directory to the Python path
sys.path.append(os.path.abspath('../app/src'))

import gym
from gym import spaces
from gym import Space
import numpy as np
from typing import List
import random
from datetime import datetime
from sklearn.preprocessing import MinMaxScaler
import numpy as np

# Now try importing again
from spark.data import loader
from spark.data.models import Customer, Product, Category, Interaction, InteractionType
from spark import utils


In [2]:
from spark.agent.environment import RecommendationEnv

In [3]:

"""
Custom gym environment for product recommendations where states represent customer interactions
actions are products;

Each customer interaction is a state. In the step function, transition of states will be the same 
customer with the new interaction for a product. The transition ends when the customer made a purchase or session ends.

The aim is to maximise rewards that can lead to a purchase.

It is better to have a customer interaction to represent a state rather than a time series of steps leading to a purchase. 
The latter method may have an incomplete where customer exits the application. Also, even if the customer did not purchase,
this information is still valuable for recommendations. Hence every state is a customer interaction.

An addition meta data for each customer will be stored to understand the context of the user. For example, if a product is 
purchase many times, this may factor into preference of the states.
---
Personalization: By incorporating the user ID, the model can tailor recommendations specifically to individual users, allowing 
it to learn unique user preferences and behaviors over time.

VS

Overfitting: If the model learns too much from the user ID directly, it might overfit to individual user patterns, potentially 
missing out on broader trends that could be useful for all users.

SOLUTION
Use Embeddings: Instead of directly using user IDs, consider using an embedding layer that transforms the user ID into a dense vector representation. This approach reduces dimensionality while capturing user-specific features.

Combine Features: Use user ID embeddings in conjunction with other features like user demographics, interaction history, and product attributes. This can create a more holistic view of user preferences.

Regularization: Implement techniques like dropout or weight regularization to mitigate overfitting when using user IDs or their embeddings.

Batch Normalization: Use batch normalization to stabilize learning, especially if the user ID leads to a wide range of outputs.
----
"""
class RecommendationEnv(gym.Env):
    def __init__(self, users:List[Customer], products:List[Product], top_k:int):
        super().__init__()
        
        self.users = users                  # list of users as states
        self.products = products            # products as actions, potential recommendations
        self.top_k = top_k                  # number of recommendations
        self.user_idx = 0                   # index of users list, not user_id
        self.current_step = 0               # step is also the interactions list index
        self.categories = loader.load_categories()
        
        self.action_space = spaces.MultiDiscrete([len(products)] * 10) 
        
        # number of customers as states
        # states are derived from customer profiles and interactioms
        # Users list will keep track of unique users
        # States include subset of features including product, interaction, ratings, and time in one-hot-encoding format
        # States exclude user_ids for policy network generalisation. But internal users list will be used as reference        
        self.observation_space = spaces.Dict({
            'pref_prod': spaces.Box(low=0, high=1, shape=(len(self.products),), dtype=np.float32),
            'pref_cat': spaces.Box(low=0, high=1, shape=(len(self.categories),), dtype=np.float32),
            'buys': spaces.Box(low=0, high=1, shape=(len(self.products),), dtype=np.uint8),
            'views': spaces.Box(low=0, high=1, shape=(len(self.products),), dtype=np.uint8),
            'likes': spaces.Box(low=0, high=1, shape=(len(self.products),), dtype=np.uint8),
            'ratings': spaces.Box(low=0, high=1, shape=(len(self.products),), dtype=np.uint8),
            'product': spaces.Box(low=0, high=1, shape=(len(self.products),), dtype=np.uint8),
            'interaction': spaces.Box(low=0, high=1, shape=(len(list(InteractionType)),), dtype=np.uint8),
            'rating': spaces.Discrete(6)
            }) 
            ## add more features like time, ignored recommendtions, engagement etc
        
    def reset(self):
        self.user_idx = np.random.randint(len(self.users)) # may run throught users one by one
        user = self.users[self.user_idx]
        self.current_step = 0
        return self._get_observation(user) # get current user features as states

    def step(self, rec_products):
        """ randomly interacting with product to mimick real user unpredictable behavious """
        self.current_step += 1
        user = self.users[self.user_idx]
        
        reward = 0
        done = False
        
        # simulate selected recommended product and interaction
        seleted_pid, interaction_type = self._simulate_interaction(rec_products) # generate random interaction
        # seleted_pid = random.choice(rec_products)
        # interaction_type = random.choice(list(InteractionType))
        
        random_rating = 0
        
        if interaction_type == InteractionType.NONE:
            reward = -1 # no interaction, customers not interested in recommendations
        elif interaction_type ==  InteractionType.VIEW:
            reward = 3
        elif interaction_type ==  InteractionType.LIKE:
            reward = 10
        elif interaction_type ==  InteractionType.BUY:
            reward = 50
        elif interaction_type ==  InteractionType.RATE:
            # generate rating, reward 1-2 is negative 3 neutral and 5 positive
            random_rating = random.randint(0, 5)
            reward = random_rating -1        
        elif interaction_type ==  InteractionType.SESSION_START:
            reward = 0
        elif interaction_type ==  InteractionType.SESSION_CLOSE:
            done = True
            reward = 0 # TODO: check if engament is too short
        else:
            reward = 0
        
        # generate random interaction
        new_interaction = Interaction(self.current_step, datetime.now(), user.idx, seleted_pid, interaction_type, random_rating)
        # reward = self._calculate_reward(user, product)
        
        return self._update_observation(new_interaction), reward, done, {}

    def _update_observation(self, interaction:Interaction):   
        # update user data     
        user = self.users[self.user_idx]   
        pid = interaction.product_idx    
         
        if interaction == InteractionType.VIEW:
            user.views[pid] += 1
        elif interaction == InteractionType.LIKE:
            user.likes[pid] += 1
        elif interaction == InteractionType.BUY:
            user.buys[pid] += 1
        elif interaction == InteractionType.RATE:
            user.rates[pid] = interaction.value
          
        # update observation based on new data  
        obs = {
                'pref_prod': self._get_product_preferences(user),
                'pref_cat': self._get_category_preferences(user), 
                'buys': utils.normalise(user.buys),
                'views': utils.normalise(user.views),
                'likes': utils.normalise(user.likes),
                'ratings': user.ratings,
                'product': utils.one_hot_encode(pid, len(self.products)),
                'interaction': self._get_interaction_observation(interaction),
                'rating': interaction.value if interaction.type == InteractionType.RATE else 0 
            }
        
        return obs

    def _get_observation(self, user:Customer): 
        
        obs = {
                'pref_prod': self._get_product_preferences(user),
                'pref_cat': self._get_category_preferences(user), 
                'buys': utils.normalise(user.buys),
                'views': utils.normalise(user.views),
                'likes': utils.normalise(user.likes),
                'ratings': user.ratings,
                'product': np.zeros(len(self.products)),
                'interaction': np.zeros(len(list(InteractionType))),
                'rating': 0 
            }
        
        return obs
        
    def _get_interaction_observation(self, interaction:Interaction):
        idx = list(InteractionType).index(interaction.type)
        size = len(InteractionType)
        
        return utils.one_hot_encode(idx, size)
    
    # calculate preferences based on past interactions
    def _get_product_preferences(self, user:Customer):
        view_prefs = user.views / 20
        purchase_prefs = user.buys
        like_prefs = user.likes / 15

        rating_prefs = user.ratings.copy()
        rating_prefs[rating_prefs > 0] -= 2
        
        product_prefs = view_prefs + purchase_prefs + like_prefs+ rating_prefs
        
        return product_prefs    # calculate preferences based on past interactions
    
    def _get_category_preferences(self, user:Customer):
        prod_prefs = self._get_product_preferences(user)
        cat_prefs = np.zeros(len(self.categories), np.float32)
        
        for idx, prod_pref in enumerate(prod_prefs):
            if prod_pref > 0:
                product = self.products[idx]
                cat_idx = product.category.idx
                cat_prefs[cat_idx] += prod_pref # accumulation of fav products for this cat
                # print(f"added pf {prod_pref} to cat {cat_idx}")
                
        cat_prefs = cat_prefs / 5 # reduce space     
           
        return cat_prefs

    def _simulate_interaction(self, product_ids):        
        user = self.users[self.user_idx]
        product_list = []
        
        # simulate selection
        num_products = len(product_ids)
        prod_scores = np.zeros(num_products, np.uint8)
        product_prefs = self._get_product_preferences(user)
        category_prefs = self._get_category_preferences(user)
        product_probs = np.full((num_products,), 1.0 / num_products) # equal probs by default
        product_probs[-1] = 0.1 # lower ending epsidoe flag to encourage longer training
        
        for idx, pid in enumerate(product_ids):
            product_list.append(self.products[pid]) # get the product objects
            prod_scores[idx] = product_prefs[pid]
            
        # combining category prefs to calculate probabilities
        for idx, product in enumerate(product_list):
            cid = product.category.idx
            prod_scores[idx] = category_prefs[cid] 
    
        # Ensure the probabilities sum to 1 for a valid probability distribution
        if np.argmax(prod_scores) > 0: # the product is in the preferences
            product_probs = np.array(prod_scores) / sum(prod_scores)

        # Randomly select a product based on the defined probabilities
        selected_product_id = np.random.choice(product_ids, p=product_probs)
        
        # simulate interaction for the selected product
        inter_types = list(InteractionType)
        inter_scores = np.zeros(len(inter_types), np.uint8)
        inter_probs = np.full((len(inter_types),), 1.0 / len(inter_types)) # equal probs by default
        
        
        for idx, inter_type in enumerate(inter_types):
            if inter_type == InteractionType.VIEW:
                inter_scores[idx] = user.views[selected_product_id]
            if inter_type == InteractionType.LIKE:
                inter_scores[idx] = user.likes[selected_product_id]
            if inter_type == InteractionType.BUY:
                inter_scores[idx] = user.buys[selected_product_id]
            if inter_type == InteractionType.RATE:
                inter_scores[idx] = user.ratings[selected_product_id]
        
        if np.argmax(inter_scores) > 0:
            inter_scores[inter_scores == 0] = 1 # default score for interaction that are 0
            inter_probs = np.array(inter_scores) / sum(inter_scores)

        # Randomly select a product based on the defined probabilities
        selected_interaction_type = np.random.choice(inter_types, p=inter_probs)
        
        return selected_product_id, selected_interaction_type
        

    def render(self, mode='human'):
        if hasattr(self, 'last_action'):
            print(f"Recommended Product ID (Last Action): {self.last_action}")
        else:
            print("No product recommended yet.")

        # Optionally, print the reward received for the last action
        if hasattr(self, 'last_reward'):
            print(f"Reward for Last Action: {self.last_reward}")

        print("-----")

    def close(self):
        pass

In [4]:
list(InteractionType)

[<InteractionType.NONE: 'none'>,
 <InteractionType.VIEW: 'view'>,
 <InteractionType.LIKE: 'like'>,
 <InteractionType.BUY: 'buy'>,
 <InteractionType.RATE: 'rate'>,
 <InteractionType.EXIT: 'exit'>,
 <InteractionType.SESSION_START: 'session_start'>,
 <InteractionType.SESSION_CLOSE: 'session_close'>]

In [5]:
products = loader.load_products()
customers = loader.load_customers(include_interactions=True)

In [6]:
for customer in customers:
    customer.views = np.zeros(len(products), dtype=np.int8)
    customer.likes = np.zeros(len(products), dtype=np.int8)
    customer.buys = np.zeros(len(products), dtype=np.int8)
    customer.ratings = np.zeros(len(products), dtype=np.int8)
    for interaction in customer.interactions:
        i_type = interaction.type.value    
        product_idx = interaction.product_idx  
        # print(f"customer {customer.idx} interaction {type} product {product_idx}")
        if i_type == InteractionType.VIEW.value:
            customer.views[product_idx] += 1
            # print(f"customer {customer.idx} view", customer.views)
        elif i_type == InteractionType.LIKE.value:
            customer.likes[product_idx] += 1
            # print(f"customer {customer.idx} like", customer.likes)
        elif i_type == InteractionType.BUY.value:
            customer.buys[product_idx] += 1
            # print(f"customer {customer.idx} buy", customer.buys)
        elif i_type == InteractionType.RATE.value:
            customer.ratings[product_idx] = interaction.value
            # print(f"customer {customer.idx} rate", customer.rates)

In [7]:
# len(InteractionType)
cats = loader.load_categories()
len(cats)

38

In [8]:
random.choice(list(InteractionType))

<InteractionType.VIEW: 'view'>

In [20]:
# test the env
from gym.wrappers import FlattenObservation
env = RecommendationEnv(customers, products, top_k=10)
env.seed(100)

  deprecation(


[1000]

In [10]:
# env.reset()
# env.step()

In [11]:

# env.test_simiulate_interaction(products[:10])
# env.observation_space.n
pids = [product.idx for product in products]
# env.test_simulate_interaction(pids[30:40])

In [12]:

# env = FlattenObservation(env)
# env.observation_space
random.choice(list(InteractionType))

<InteractionType.NONE: 'none'>

In [13]:

# print(type(env.observation_space))  # Output: <class 'int'>

# def get_interaction_observation(interaction:Interaction):
#     idx = list(InteractionType).index(interaction.type)
#     size = len(InteractionType)
#     # print(idx)
    
def get_interaction_observation(interaction:Interaction):
        idx = list(InteractionType).index(interaction.type)
        size = len(InteractionType)
        return utils.one_hot_encode(idx, size)
    
get_interaction_observation(customers[0].interactions[0])

array([0, 0, 0, 1, 0, 0, 0, 0], dtype=uint8)

In [22]:
from stable_baselines3 import PPO
from torch import nn

ppo_log_dir = './logs'

policy_kwargs = dict(
    net_arch=[dict(pi=[256, 256], vf=[256, 256])],
    activation_fn=nn.ReLU,
)

model = PPO(
    policy="MultiInputPolicy",
    env=env,
    verbose=0,
    learning_rate=3e-4,
    batch_size=64,
    n_steps=10,
    gamma=0.99,
    gae_lambda=0.95,
    ent_coef=0.0,
    tensorboard_log=ppo_log_dir,
    policy_kwargs=policy_kwargs,
)

model.learn(total_timesteps=30000)  # Adjust based on your needs

We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=10 and n_envs=1)


<stable_baselines3.ppo.ppo.PPO at 0x7feaef3fde80>

In [18]:
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [19]:
%tensorboard --logdir={ppo_log_dir}

Reusing TensorBoard on port 6011 (pid 17230), started 0:03:21 ago. (Use '!kill 17230' to kill it.)

In [24]:
obs = env.reset()
for _ in range(20):  # Make 10 recommendations
    action, _states = model.predict(obs)
    obs, rewards, done, _ = env.step(action)
    print("Recommended products:", action)

Recommended products: [262 224  48 279 209 279  99 113 241 106]
Recommended products: [ 53 274  47  52 211 163  99 113 289 191]
Recommended products: [162 213 272 263 286 154  99 255 114 191]
Recommended products: [262  44  18 109 209 122  99 113 222 191]
Recommended products: [262  48  33 279 232 253  99 113 188 131]
Recommended products: [262 224 290 279  11 186  99  91 241 191]
Recommended products: [262  22 151  52 291  17  99  91 290 191]
Recommended products: [262 274 172 279  37 279  99 113 241 191]
Recommended products: [262 277 272 284 286 110  99 113 263  69]
Recommended products: [235 274  47 238 163  17  99 113 263 191]
Recommended products: [262 135 272  59 264 279  99  91  22 191]
Recommended products: [144 213 219 279 286  35  96  91 241 191]
Recommended products: [236 274 293 279 286 202  99  91 111 191]
Recommended products: [262  71  47 135 232 141  99 113  51 148]
Recommended products: [262 277 140 115 286 279 167  35 171 191]
Recommended products: [183  44 198 279 1