<a href="https://colab.research.google.com/github/MehrdadDastouri/music_recommender_rl/blob/main/music_recommender_rl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple
from sklearn.preprocessing import StandardScaler
from collections import defaultdict
import torch
import torch.nn as nn
import torch.optim as optim

class MusicEnvironment:
    """Simulated environment for music recommendation."""

    def __init__(self, music_data: pd.DataFrame):
        self.music_data = music_data
        self.current_state = None
        self.scaler = StandardScaler()
        self.features = self._prepare_features()

    def _prepare_features(self) -> np.ndarray:
        """Prepare and normalize music features."""
        features = self.music_data[['danceability', 'energy', 'valence', 'tempo']].values
        return self.scaler.fit_transform(features)

    def reset(self) -> np.ndarray:
        """Reset environment to initial state."""
        self.current_state = np.random.randint(len(self.features))
        return self.features[self.current_state]

    def step(self, action: int) -> Tuple[np.ndarray, float, bool]:
        """Take action and return new state, reward and done flag."""
        next_state = action
        reward = self._calculate_reward(self.current_state, next_state)
        self.current_state = next_state
        return self.features[next_state], reward, False

    def _calculate_reward(self, current: int, next: int) -> float:
        """Calculate reward based on music similarity and user preferences."""
        similarity = self._compute_similarity(self.features[current], self.features[next])
        return float(similarity)

    def _compute_similarity(self, state1: np.ndarray, state2: np.ndarray) -> float:
        """Compute similarity between two music states."""
        return 1.0 / (1.0 + np.linalg.norm(state1 - state2))

class DQNNetwork(nn.Module):
    """Deep Q-Network for music recommendation."""

    def __init__(self, state_size: int, action_size: int):
        super(DQNNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, action_size)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

class MusicRecommender:
    """Music recommendation system using reinforcement learning."""

    def __init__(self, music_data: pd.DataFrame):
        self.env = MusicEnvironment(music_data)
        self.state_size = 4  # number of features
        self.action_size = len(music_data)
        self.memory = []
        self.batch_size = 32
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.model = DQNNetwork(self.state_size, self.action_size).to(self.device)
        self.target_model = DQNNetwork(self.state_size, self.action_size).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters())
        self.update_target_model()

    def update_target_model(self):
        """Update target network with weights from main network."""
        self.target_model.load_state_dict(self.model.state_dict())

    def remember(self, state: np.ndarray, action: int, reward: float,
                next_state: np.ndarray, done: bool):
        """Store experience in memory."""
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state: np.ndarray) -> int:
        """Choose action using epsilon-greedy policy."""
        if np.random.rand() <= self.epsilon:
            return np.random.randint(self.action_size)

        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            action_values = self.model(state_tensor)
        return action_values.argmax().item()

    def replay(self, batch_size: int):
        """Train the model using experience replay."""
        if len(self.memory) < batch_size:
            return

        minibatch = np.random.choice(len(self.memory), batch_size, replace=False)
        states, actions, rewards, next_states, dones = [], [], [], [], []

        for idx in minibatch:
            s, a, r, ns, d = self.memory[idx]
            states.append(s)
            actions.append(a)
            rewards.append(r)
            next_states.append(ns)
            dones.append(d)

        states = torch.FloatTensor(states).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)

        current_q_values = self.model(states)
        next_q_values = self.target_model(next_states).detach()

        target_q_values = current_q_values.clone()
        for i in range(batch_size):
            target_q_values[i][actions[i]] = rewards[i] + \
                (1 - dones[i]) * self.gamma * next_q_values[i].max()

        loss = nn.MSELoss()(current_q_values, target_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def train(self, episodes: int = 1000):
        """Train the recommendation system."""
        for episode in range(episodes):
            state = self.env.reset()
            total_reward = 0

            for _ in range(100):  # max steps per episode
                action = self.act(state)
                next_state, reward, done = self.env.step(action)
                self.remember(state, action, reward, next_state, done)
                state = next_state
                total_reward += reward

                self.replay(self.batch_size)
                if done:
                    break

            if episode % 10 == 0:
                self.update_target_model()
                print(f"Episode: {episode}, Total Reward: {total_reward}, Epsilon: {self.epsilon}")

    def recommend(self, current_song_idx: int, n_recommendations: int = 5) -> List[int]:
        """Get music recommendations for a given song."""
        state = self.env.features[current_song_idx]
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)

        with torch.no_grad():
            action_values = self.model(state_tensor)

        return action_values.squeeze().argsort(descending=True)[:n_recommendations].cpu().numpy()

# Example usage
if __name__ == "__main__":
    # Create sample music data
    music_data = pd.DataFrame({
        'danceability': np.random.random(100),
        'energy': np.random.random(100),
        'valence': np.random.random(100),
        'tempo': np.random.random(100),
        'title': [f'Song_{i}' for i in range(100)]
    })

    # Initialize and train recommender
    recommender = MusicRecommender(music_data)
    recommender.train(episodes=100)

    # Get recommendations
    current_song = 0
    recommendations = recommender.recommend(current_song)
    print("\nRecommendations for", music_data.iloc[current_song]['title'])
    for idx in recommendations:
        print(music_data.iloc[idx]['title'])

Episode: 0, Total Reward: 29.1053869128023, Epsilon: 0.7076077347272662
Episode: 10, Total Reward: 97.95737142637364, Epsilon: 0.00998645168764533
Episode: 20, Total Reward: 99.45622802606616, Epsilon: 0.00998645168764533
Episode: 30, Total Reward: 96.23949164185713, Epsilon: 0.00998645168764533
Episode: 40, Total Reward: 99.28784503446326, Epsilon: 0.00998645168764533
Episode: 50, Total Reward: 96.5823917727028, Epsilon: 0.00998645168764533
Episode: 60, Total Reward: 97.09234997302008, Epsilon: 0.00998645168764533
Episode: 70, Total Reward: 95.68002766183787, Epsilon: 0.00998645168764533
Episode: 80, Total Reward: 90.42511847923537, Epsilon: 0.00998645168764533
Episode: 90, Total Reward: 98.05456995741442, Epsilon: 0.00998645168764533

Recommendations for Song_0
Song_0
Song_12
Song_43
Song_61
Song_26
