# Python Practice 481-490

## Here are Python Codes

### 481. Develop a Reinforcement Learning Agent using Truncated Importance Sampling with Custom Importance Sampling Ratio
 Importance sampling is a method in reinforcement learning to correct the bias when learning from a behavior policy that's different from the target policy. Truncated importance sampling limits the weight of the importance sampling ratio to avoid high variance.

Below is a simple Python program to demonstrate the concept with a toy problem. The RL agent will try to maximize its reward in a random walk environment:

Environment: The agent starts in the middle of a 5-state line (states: A, B, C, D, E). The agent can move left or right. Moving out of state A or state E ends the episode.
Reward: The agent receives a reward of +1 if it moves out of state E and a reward of -1 if it moves out of state A.
We'll use Q-learning with truncated importance sampling. The agent will have a behavior policy (epsilon-greedy) different from the target policy (greedy).

Expected Output:
You should see a 2D array (Q-values) which should have higher values on the right-side actions (moving towards state E) as it leads to a reward of +1.

Note:

This is a toy example to demonstrate the truncated importance sampling concept. In more complex environments, additional considerations (like neural networks for approximation, replay buffers, etc.) would be necessary.
EPSILON is set to 0.5 here for demonstration. In practice, it might be useful to decay the EPSILON over time to reduce exploration and exploit more as the agent learns.

In [None]:
import numpy as np

# Parameters
ALPHA = 0.1  # Learning rate
EPSILON = 0.5  # Exploration rate for behavior policy
GAMMA = 1.0  # Discount factor
EPISODES = 5000
TRUNCATION_LIMIT = 2.0

# States A, B, C, D, E
state_space = ['A', 'B', 'C', 'D', 'E']
n_states = len(state_space)

# Actions: 0 (left), 1 (right)
n_actions = 2

# Q-values
Q = np.zeros((n_states, n_actions))

# Transitions and rewards
transitions = {'A': {'0': (-1, -1), '1': (1, 0)},
               'B': {'0': (0, 0), '1': (2, 0)},
               'C': {'0': (1, 0), '1': (3, 0)},
               'D': {'0': (2, 0), '1': (4, 0)},
               'E': {'0': (3, 0), '1': (5, 1)}}

def behavior_policy(state):
    if np.random.rand() < EPSILON:
        return np.random.choice([0, 1])
    return np.argmax(Q[state, :])

def target_policy(state):
    return np.argmax(Q[state, :])

for episode in range(EPISODES):
    state = 2  # Start at C
    done = False
    
    while not done:
        action = behavior_policy(state)
        next_state, reward = transitions[state_space[state]][str(action)]
        
        # Truncated Importance Sampling
        if action == target_policy(state):
            rho = 1.0 / (1.0 - EPSILON + EPSILON/n_actions)
        else:
            rho = 1.0 / (EPSILON/n_actions)
        
        # Truncate the importance sampling ratio
        rho = min(TRUNCATION_LIMIT, rho)

        # Q-learning update
        best_next_action = np.argmax(Q[next_state, :])
        td_target = reward + GAMMA * Q[next_state, best_next_action]
        td_delta = td_target - Q[state, action]
        Q[state, action] += ALPHA * rho * td_delta

        state = next_state
        if state == -1 or state == 5:
            done = True

print(Q)


### 482. Build a Recommender System with Cross-Domain Recommendation and Custom Domain Alignment
Cross-domain recommendation is the task of transferring the knowledge learned in a source domain to improve the recommendation performance in a target domain. One way to perform cross-domain recommendation is to utilize shared latent factors or embeddings that are common across the domains. These shared embeddings can align the two domains, which can help in making better recommendations.

For simplicity, let's build a recommender system using matrix factorization and align the embeddings for two domains (books and movies). We'll simulate the data for the two domains:

1. Utilize matrix factorization to get user and item embeddings for both domains.
2. Align the embeddings by minimizing the distance between source and target domain embeddings.
3. Use the aligned embeddings for recommendation in the target domain.


Expected Output:
The output will show embeddings for a new user in the book and movie domain. They should be somewhat similar because of the alignment layer, but won't be identical due to the randomness in our synthetic data and embeddings.

Note:

This is a simplistic demonstration, and in practice, more sophisticated methods and a lot more data are used.
The embeddings are aligned by forcing them through the same transformation layer. This simplistic approach ensures that the embeddings for users are somewhat aligned across domains.
Cross-domain recommendation is a complex topic, and this example serves just as an introduction.

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

# Simulated data
users = 100
books = 50
movies = 60
latent_factors = 10

# Random user-book interactions (source domain)
book_ratings = np.random.randint(0, 6, (users, books))

# Random user-movie interactions (target domain)
movie_ratings = np.random.randint(0, 6, (users, movies))

# Matrix Factorization for both domains
book_input = keras.layers.Input(shape=(books,))
book_emb = keras.layers.Dense(latent_factors, activation='relu')(book_input)

movie_input = keras.layers.Input(shape=(movies,))
movie_emb = keras.layers.Dense(latent_factors, activation='relu')(movie_input)

# Domain alignment layer - here, we'll just use a dense layer
alignment_layer = keras.layers.Dense(latent_factors)

aligned_book_emb = alignment_layer(book_emb)
aligned_movie_emb = alignment_layer(movie_emb)

# Model
book_model = keras.models.Model(inputs=book_input, outputs=aligned_book_emb)
movie_model = keras.models.Model(inputs=movie_input, outputs=aligned_movie_emb)

book_model.compile(optimizer='adam', loss='mse')
movie_model.compile(optimizer='adam', loss='mse')

# Simulated training (just to create embeddings)
book_model.fit(book_ratings, np.random.random((users, latent_factors)), epochs=5, verbose=0)
movie_model.fit(movie_ratings, np.random.random((users, latent_factors)), epochs=5, verbose=0)

# After training, you can utilize these models to make recommendations in each domain.
# With the embeddings aligned, knowledge from the book domain can assist in making better recommendations in the movie domain.

# Example: Get embeddings for a new user
new_user_books = np.random.randint(0, 6, (1, books))
new_user_movies = np.random.randint(0, 6, (1, movies))

book_embedding = book_model.predict(new_user_books)
movie_embedding = movie_model.predict(new_user_movies)

print("Book Embedding for New User:", book_embedding)
print("Movie Embedding for New User:", movie_embedding)



### 483. Implement a Transfer Learning Model with Zero-Shot Learning and Custom Semantic Embeddings
Zero-Shot Learning (ZSL) is the task of making predictions for classes that have not been observed during training. One popular method for ZSL is to utilize semantic embeddings (like word vectors) to bridge the source and target classes.

Below is an illustrative approach on how one might utilize Zero-Shot Learning with word embeddings using the Word2Vec model provided by gensim. We'll create a simple synthetic dataset for demonstration:

1. Train a model on a source task.
2. Create semantic embeddings for source and target classes.
3. During inference, use the semantic similarity between source and target classes to make predictions.

For the sake of simplicity, we'll assume a binary classification problem where the source task distinguishes between cat and dog, and the target task is to predict wolf.

Expected Output:
You'll get an array of class predictions for the "wolf" test samples based on the semantic similarity between "cat", "dog", and "wolf". Depending on the semantic similarities and the model's predictions, it will either lean towards the cat class or the dog class for the predictions.

Note:

1. Ensure you have TensorFlow, Gensim installed.
2. You'll need to download the GoogleNews-vectors-negative300.bin Word2Vec model, which is a sizeable download (~3.5 GB). You can adjust the code to use other embeddings if desired.
3. This is a simple demonstration. In practice, more sophisticated methods and more data are used.

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from gensim.models import KeyedVectors

# Load word vectors (ensure you have downloaded GoogleNews-vectors-negative300.bin)
word_vectors = KeyedVectors.load_word2vec_format('GoogleNews-vectors-negative300.bin', binary=True)

# Synthetic dataset
# 1: cat, 2: dog
X_train = np.array([[1, 0], [0, 1], [1.1, 0.2], [0.2, 1.1]])
y_train = np.array([0, 1, 0, 1])

# Simple Feedforward Neural Network
model = keras.models.Sequential([
    keras.layers.Dense(5, activation='relu', input_shape=(2,)),
    keras.layers.Dense(2, activation='softmax')
])

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=10)

# Assume we've not seen "wolf" during training
def zero_shot_predict(model, word_vectors, input_data, target_word):
    predictions = model.predict(input_data)
    # Compute semantic similarities
    cat_similarity = word_vectors.similarity('cat', target_word)
    dog_similarity = word_vectors.similarity('dog', target_word)
    similarities = np.array([cat_similarity, dog_similarity])
    # Scale predictions by similarities
    adjusted_predictions = predictions * similarities
    # Normalize to turn back into probability distribution
    adjusted_predictions /= adjusted_predictions.sum(axis=1, keepdims=True)
    return np.argmax(adjusted_predictions, axis=1)

# Synthetic test data for "wolf"
X_test = np.array([[0.5, 0.5], [0.6, 0.4]])
predictions = zero_shot_predict(model, word_vectors, X_test, 'wolf')
print(predictions) # Expected to print either class (cat or dog) based on semantic similarity



### 484. Create a Reinforcement Learning Agent using Distributional Reinforcement Learning with Custom Distributional Q-Values
Distributional Reinforcement Learning involves representing the Q-values as distributions rather than scalars. A popular algorithm that employs this technique is the C51 algorithm. In this example, I'll show a basic cartpole agent using a custom Distributional Q-value setup with TensorFlow.

Expected Output:
You should see the cumulative reward of the agent for each episode. With enough episodes and proper hyperparameter settings, the agent should achieve a near-optimal policy for the CartPole environment.

Notes:

1. Ensure TensorFlow and Gym are installed.
2. The code employs a basic version of distributional RL, and there are more advanced techniques and algorithms in the literature.
3. This example uses the CartPole environment for simplicity. Adjustments might be necessary for other environments.
4. The model might need more epochs and tweaks for stable and high performance.

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import gym

# Define environment
env = gym.make('CartPole-v1')
num_actions = env.action_space.n
state_dim = env.observation_space.shape[0]

# Define hyperparameters
gamma = 0.99
learning_rate = 0.001
num_atoms = 51
v_min = -10.0
v_max = 10.0
delta_z = (v_max - v_min) / (num_atoms - 1)
z = np.linspace(v_min, v_max, num_atoms)

# Distributional Q-network
def create_model():
    inputs = layers.Input(shape=(state_dim,))
    x = layers.Dense(24, activation='relu')(inputs)
    x = layers.Dense(24, activation='relu')(x)
    x = layers.Dense(num_actions * num_atoms, activation='softmax')(x)
    return models.Model(inputs=inputs, outputs=x)

model = create_model()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

# Custom Q-value function using distributional values
def get_distributional_q_values(state):
    logits = model(state)
    probs = tf.keras.activations.softmax(logits)
    q_values = tf.reduce_sum(probs * z, axis=2)
    return q_values

# Update model
def update(state, action, reward, next_state, done):
    with tf.GradientTape() as tape:
        logits = model(state)
        probs = tf.keras.activations.softmax(logits)
        
        next_logits = model(next_state)
        next_probs = tf.keras.activations.softmax(next_logits)
        next_q_values = tf.reduce_sum(next_probs * z, axis=2)
        next_actions = tf.argmax(next_q_values, axis=1)
        
        target_probs = np.zeros_like(probs.numpy())
        for i in range(probs.shape[0]):
            if done[i]:
                tz = min(max(reward[i], v_min), v_max)
                bj = (tz - v_min) / delta_z
                l, u = np.floor(bj), np.ceil(bj)
                target_probs[i][action[i]][int(l)] += (u - bj)
                target_probs[i][action[i]][int(u)] += (bj - l)
            else:
                for j in range(num_atoms):
                    tz = min(max(reward[i] + gamma * z[j], v_min), v_max)
                    bj = (tz - v_min) / delta_z
                    l, u = np.floor(bj), np.ceil(bj)
                    target_probs[i][action[i]][int(l)] += next_probs[i][next_actions[i]][j] * (u - bj)
                    target_probs[i][action[i]][int(u)] += next_probs[i][next_actions[i]][j] * (bj - l)
        
        loss = -tf.reduce_sum(target_probs * tf.math.log(probs + 1e-10))
        
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

# Train
num_episodes = 200
for episode in range(num_episodes):
    state = env.reset()
    episode_reward = 0
    while True:
        action = np.argmax(get_distributional_q_values(state[None, :]))
        next_state, reward, done, _ = env.step(action)
        episode_reward += reward
        update(state[None, :], [action], [reward], next_state[None, :], [done])
        state = next_state
        if done:
            break
    print(f"Episode {episode + 1}: {episode_reward}")

env.close()


### 485. Develop a Generative Adversarial Network (GAN) with Spectral Normalization for Image Generation
Spectral normalization is a technique to stabilize the training of the discriminator in a GAN by constraining its Lipschitz constant. It's particularly useful to prevent the discriminator from becoming too powerful.


Expected Output: 
The training will print the completed epochs.
At the end of the training, 9 generated images from the generator will be displayed. These images should resemble handwritten digits if the training was successful.

Notes:
1. Ensure TensorFlow is installed.
2. This is a basic example and may need further tuning and improvements to generate high-quality images.
3. Spectral normalization is applied only to the Dense layer of the discriminator in this example. In a more comprehensive model, it would typically be applied to other layers as well.

Here's a simple implementation of a GAN with spectral normalization using TensorFlow and Keras:

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Sample data: MNIST
(x_train, _), (_, _) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1).astype('float32')
x_train = (x_train - 127.5) / 127.5  # Normalize the images to [-1, 1]

BUFFER_SIZE = 60000
BATCH_SIZE = 256

# Batch and shuffle the data
train_dataset = tf.data.Dataset.from_tensor_slices(x_train).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

# Create the Generator
def make_generator_model():
    model = tf.keras.Sequential()
    model.add(layers.Dense(7*7*256, use_bias=False, input_shape=(100,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Reshape((7, 7, 256)))
    model.add(layers.Conv2DTranspose(128, (5, 5), strides=(1, 1), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))
    return model

# Spectral Normalization
def spectral_norm(w, iteration=1):
    w_shape = w.shape.as_list()
    w = tf.reshape(w, [-1, w_shape[-1]])

    u = tf.random.normal([1, w_shape[-1]])
    for _ in range(iteration):
        v = tf.linalg.matvec(tf.transpose(w), u)
        v = tf.nn.l2_normalize(v)
        u = tf.linalg.matvec(w, v)
        u = tf.nn.l2_normalize(u)

    sigma = tf.tensordot(u, tf.linalg.matvec(tf.transpose(w), v), axes=1)
    w_bar = w / sigma

    return tf.reshape(w_bar, w_shape)

# Create the Discriminator with Spectral Normalization
def make_discriminator_model():
    model = keras.Sequential()
    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=[28, 28, 1]))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Flatten())
    model.add(layers.Dense(1))

    # Apply spectral normalization to the Dense layer
    w = model.layers[-1].kernel
    model.layers[-1].kernel = spectral_norm(w)
    return model

# Define loss and optimizers
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

generator = make_generator_model()
discriminator = make_discriminator_model()

# Define the training loop
@tf.function
def train_step(images):
    noise = tf.random.normal([BATCH_SIZE, 100])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_images = generator(noise, training=True)
        real_output = discriminator(images, training=True)
        fake_output = discriminator(generated_images, training=True)
        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

def train(dataset, epochs):
    for epoch in range(epochs):
        for image_batch in dataset:
            train_step(image_batch)
        print(f"Epoch {epoch + 1} completed")

# Train
train(train_dataset, 10)

# Display generated images
import matplotlib.pyplot as plt

noise = tf.random.normal([9, 100])
generated_images = generator(noise, training=False)

fig, axs = plt.subplots(3, 3, figsize=(6,6))
for i, img in enumerate(generated_images):
    ax = axs[i // 3, i % 3]
    ax.imshow(img.numpy().reshape(28,28) * 127.5 + 127.5, cmap='gray')
    ax.axis('off')
plt.show()


### 486. Build a Multi-Objective Optimization Algorithm with Evolutionary Constrained Optimization and Custom Constraint Handling

Multi-objective optimization is about finding solutions that balance multiple objectives. Evolutionary algorithms can be adapted to these scenarios. In this code, we will employ an evolutionary algorithm to handle multi-objective optimization with constraints.

Problem Definition:

1. Objectives: We'll consider a two-objective optimization problem for demonstration.
- Objective 1: Minimize f1(x) = x2
- Objective 2: Minimize f2(x) = (x-2)2
2. Constraint: The solution must satisfy x>1.

The ideal Pareto front for this problem lies between 1 and 2 on the x-axis.

Expected Output:
The output will be a list of x values which represent solutions that satisfy the given objectives and constraints. These solutions should ideally lie between 1 and 2 (inclusive of 1 but not of 2) since that's the Pareto front for this problem. The exact numbers will vary between runs because of the stochastic nature of the algorithm.

Notes:

This is a very basic version of an MOEA. Advanced techniques like NSGA-II, SPEA2, etc., are used in practice.
The constraint handling done here is a basic penalty method, where infeasible solutions are penalized by increasing their objective values. In practice, sophisticated constraint-handling techniques might be applied.

Code:

In [None]:
import numpy as np

# Define objectives
def objective1(x):
    return x**2

def objective2(x):
    return (x - 2)**2

# Define constraints
def constraint(x):
    if x > 1:
        return 0
    else:
        return 1

# Multi-objective Evolutionary Algorithm
def moea(num_generations=1000, pop_size=100, mutation_rate=0.02):
    # Initial population
    population = np.random.rand(pop_size) * 10 - 5  # Random initialization between -5 and 5

    for generation in range(num_generations):
        # Evaluate objectives and constraints
        obj1_values = np.array([objective1(x) for x in population])
        obj2_values = np.array([objective2(x) for x in population])
        constraint_values = np.array([constraint(x) for x in population])

        # Select parents - preference given to feasible solutions (those satisfying constraints)
        combined_scores = obj1_values + obj2_values - 5 * constraint_values  # Bias towards feasible solutions
        selected_parents = population[np.argsort(combined_scores)[:pop_size // 2]]

        # Crossover
        children = []
        while len(children) < pop_size // 2:
            parent_choices = np.random.choice(selected_parents, 2, replace=False)
            crossover_point = np.random.rand()
            child = crossover_point * parent_choices[0] + (1 - crossover_point) * parent_choices[1]
            children.append(child)
        children = np.array(children)

        # Mutation
        mutation_mask = np.random.rand(pop_size // 2) < mutation_rate
        children[mutation_mask] += np.random.randn(np.sum(mutation_mask)) * mutation_rate

        # Combine parents and children and move to next generation
        population = np.concatenate([selected_parents, children])

    return population

# Run MOEA
population = moea()

# Display final solutions
print("Final solutions (x-values):", population)

# Note: You should find solutions close to the interval [1, 2] satisfying the constraints and objectives.


### 487. Implement an Autoencoder with Denoising Autoencoder for Anomaly Detection and Custom Noise Level
A denoising autoencoder is designed to learn how to encode the primary features of data in such a way that it can ignore noise. Here's an example using TensorFlow and Keras on the Digits dataset from sklearn. We'll consider the "9" digit as an anomaly and all others as normal data points.

The example will:

Load the dataset.
Introduce noise to the data.
Train a denoising autoencoder.
Test for anomaly detection, with the assumption that anomalies will have higher reconstruction errors.

pip install tensorflow scikit-learn matplotlib

Expected Output:
You will see two rows of images: the first row contains noisy versions of test images, and the second row contains the denoised versions produced by the autoencoder. The numbers are reconstructed to an extent where they are recognizable, which means the autoencoder is working effectively.

The Number of '9' digits detected as anomalies output should tell you how many of the digit "9" were detected as anomalies.

Notes:

1. Here we used the mean squared error as a simple metric for anomaly detection. In practice, more sophisticated methods might be applied.
2. Adjusting the noise level, adding more layers to the autoencoder, or changing the encoding dimensions can influence the model's ability to denoise the input and detect anomalies.

In [None]:
import numpy as np
import tensorflow as tf
from sklearn import datasets
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

# Load the dataset
digits = datasets.load_digits()
X = digits.data / 16.0
y = digits.target

# Introduce noise
def add_noise(data, noise_factor=0.5):
    noisy_data = data + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=data.shape) 
    return np.clip(noisy_data, 0., 1.)

X_noisy = add_noise(X, noise_factor=0.5)

# Split the dataset
X_train, X_test, _, y_test = train_test_split(X_noisy, y, test_size=0.2, random_state=42)

# Define the autoencoder
input_layer = tf.keras.layers.Input(shape=(X.shape[1],))
encoded = tf.keras.layers.Dense(32, activation='relu')(input_layer)
decoded = tf.keras.layers.Dense(X.shape[1], activation='sigmoid')(encoded)

autoencoder = tf.keras.models.Model(input_layer, decoded)
autoencoder.compile(optimizer='adam', loss='mean_squared_error')

# Train the autoencoder
autoencoder.fit(X_train, X_train, epochs=100, batch_size=256, validation_data=(X_test, X_test), verbose=1)

# Use autoencoder for anomaly detection
X_test_denoised = autoencoder.predict(X_test)
mse = np.mean(np.power(X_test - X_test_denoised, 2), axis=1)
mse_threshold = np.quantile(mse, 0.999) # Get the 99.9% quantile as threshold

# Check if number 9s have high reconstruction errors
anomalies = y_test[mse > mse_threshold]
anomaly_count = np.sum(anomalies == 9)

print(f"Number of '9' digits detected as anomalies: {anomaly_count} out of {np.sum(y_test == 9)}")

# Visualize
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
    # Original noisy images
    ax = plt.subplot(2, n, i + 1)
    plt.imshow(X_test[i].reshape(8, 8))
    plt.gray()
    ax.set_title("Original Noisy")
    ax.axis('off')

    # Reconstruction
    ax = plt.subplot(2, n, i + 1 + n)
    plt.imshow(X_test_denoised[i].reshape(8, 8))
    plt.gray()
    ax.set_title("Denoised")
    ax.axis('off')
plt.show()


### 488. Create a Reinforcement Learning Agent using Conservative Policy Optimization with Custom Trust Region Size
Conservative Policy Optimization (CPO) is an algorithm in the family of Trust Region Policy Optimization (TRPO) and Proximal Policy Optimization (PPO), which is designed to handle constraints more robustly. For our example, we'll create a simple implementation of CPO on the CartPole environment from OpenAI's Gym.
Setup:

Firstly, ensure you have the necessary libraries:
pip install gym tensorflow


Expected Output:
You should see an episode count with the corresponding reward. The reward should generally increase as episodes progress, indicating the agent is learning. For example:

Episode: 1, Reward: 21.0
Episode: 2, Reward: 19.0
...
Episode: 199, Reward: 200.0
Episode: 200, Reward: 200.0
This output suggests that the agent's performance has been improving over episodes, and by the end, it consistently achieves the maximum reward for the CartPole environment.

Note:

1. The trust_region_size hyperparameter controls the size of the region within which the updated policy is considered trustworthy. Adjusting this value can lead to more stable or faster learning.
2. This is a simple implementation for illustrative purposes. Enhancements such as adding an actor-critic structure, adjusting learning rate, and using more advanced methods to handle the trust region can further improve performance.

In [None]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.models import Model

# Environment Setup
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

# Neural Network Policy
inputs = Input(shape=(state_dim,))
fc1 = Dense(24, activation='relu')(inputs)
fc2 = Dense(24, activation='relu')(fc1)
probs = Dense(action_dim, activation='softmax')(fc2)

model = Model(inputs=inputs, outputs=probs)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

def get_action(state):
    state = state.reshape([1, state_dim])
    probs = model.predict(state)[0]
    return np.random.choice(action_dim, p=probs)

# Conservative Policy Optimization (CPO) Training
def train(states, actions, rewards, trust_region_size=0.01):
    discounted_rewards = []
    cumulative_reward = 0
    for reward in rewards[::-1]:
        cumulative_reward = reward + cumulative_reward * 0.99
        discounted_rewards.insert(0, cumulative_reward)
    
    with tf.GradientTape() as tape:
        for state, action, reward in zip(states, actions, discounted_rewards):
            state = state.reshape([1, state_dim])
            probs = model(state)
            loss = -tf.math.log(probs[0, action]) * reward  # Negative log likelihood
        gradients = tape.gradient(loss, model.trainable_variables)
        
        # Trust Region Modification
        for i, gradient in enumerate(gradients):
            gradients[i] = tf.clip_by_norm(gradient, trust_region_size)

        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

# Main training loop
for episode in range(200):
    state = env.reset()
    states, actions, rewards = [], [], []
    episode_reward = 0
    
    while True:
        action = get_action(state)
        next_state, reward, done, _ = env.step(action)
        states.append(state)
        actions.append(action)
        rewards.append(reward)
        state = next_state
        episode_reward += reward
        if done:
            train(states, actions, rewards)
            print(f"Episode: {episode + 1}, Reward: {episode_reward}")
            break


### 489. Develop a Hybrid Recommender System with Collaborative Filtering and Sequential Recommendation
The idea behind a hybrid recommender system that combines collaborative filtering and sequential recommendation is to leverage both general user-item interactions and specific sequential patterns to improve recommendation accuracy.

In this example:

We'll use collaborative filtering via matrix factorization (using a simple neural network) to learn latent user and item embeddings.
We'll use a recurrent neural network (RNN) to capture sequential patterns of item interactions.
We'll merge the two models to produce a final recommendation score.
Let's use a simple dataset for this demonstration.

Expected Output:
Epoch 1/3
...
Epoch 2/3
...
Epoch 3/3
...
Note:
1. This code sets up the hybrid model structure. The dataset is randomly generated for the sake of demonstration. In a real-world scenario, you would replace it with a genuine dataset.
2. The collaborative filtering component is a basic matrix factorization technique implemented using embeddings.
3. The sequential component uses an LSTM to capture the sequence of items. It's a very simple version, and there's room for enhancing it, such as considering timestamp, adding more LSTM layers, or using more sophisticated models like Transformers.
4. The models are not fine-tuned. Hyperparameters like the number of embeddings, dense layer sizes, batch sizes, and learning rates should be adjusted based on the dataset and problem specifics.

Code:

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model

# Generate random data: 1000 users, 500 items, and 10000 interactions
n_users = 1000
n_items = 500
n_interactions = 10000

user_ids = np.random.randint(0, n_users, n_interactions)
item_ids = np.random.randint(0, n_items, n_interactions)
ratings = np.random.randint(1, 6, n_interactions)  # ratings between 1 to 5
sequences = [np.random.choice(n_items, 5, replace=False) for _ in range(n_interactions)]  # sequences of 5 items

# Collaborative Filtering Model
user_input = layers.Input(shape=(1,))
item_input = layers.Input(shape=(1,))

user_embedding = layers.Embedding(n_users, 50)(user_input)
item_embedding = layers.Embedding(n_items, 50)(item_input)

user_flatten = layers.Flatten()(user_embedding)
item_flatten = layers.Flatten()(item_embedding)

merged_layer = layers.Concatenate()([user_flatten, item_flatten])
dense_layer = layers.Dense(10, activation='relu')(merged_layer)
output_cf = layers.Dense(1)(dense_layer)

CF_Model = Model(inputs=[user_input, item_input], outputs=output_cf)

# Sequential Model
sequence_input = layers.Input(shape=(5,))
sequence_embedding = layers.Embedding(n_items, 50)(sequence_input)
sequence_lstm = layers.LSTM(50)(sequence_embedding)
output_seq = layers.Dense(n_items, activation='softmax')(sequence_lstm)

Seq_Model = Model(inputs=sequence_input, outputs=output_seq)

# Hybrid Model
combined_item_embedding = layers.Concatenate()([item_flatten, sequence_lstm])
dense_combined = layers.Dense(10, activation='relu')(combined_item_embedding)
output_hybrid = layers.Dense(1)(dense_combined)

Hybrid_Model = Model(inputs=[user_input, item_input, sequence_input], outputs=output_hybrid)

# Compile and train
Hybrid_Model.compile(optimizer='adam', loss='mean_squared_error')
Hybrid_Model.fit([user_ids, item_ids, np.array(sequences)], ratings, epochs=3, batch_size=32)



### 490. Build a Transfer Learning Model with Unsupervised Domain Adaptation and Custom Adversarial Loss
Unsupervised Domain Adaptation (UDA) aims to leverage labeled data from a source domain to learn a model that performs well on an unlabeled target domain. A popular approach to UDA is to use adversarial training, where a domain discriminator tries to distinguish between source and target samples while the feature extractor tries to fool the discriminator.

Here, we will build a Transfer Learning model with UDA using a custom adversarial loss.

Outline:
1. Setup: Import necessary libraries.
2. Dataset: For simplicity, we'll use MNIST as the source domain and a modified version (e.g., color-inverted) as the target domain.
3. Model Architecture: Create a feature extractor, a classifier, and a domain discriminator.
4. Adversarial Loss: Custom loss to train the model adversarially.
5. Training: Train using labeled source data and unlabeled target data.
6. Evaluation: Test the domain-adapted model on target data.

Expected Output:
...
Epoch 5/5
...
Target domain accuracy: ...
Note:

The above example uses a basic UDA setup with a simple adversarial loss for illustration purposes. In practice, more complex models and sophisticated losses can be employed.
Using the full MNIST dataset for the source domain and inverted MNIST as the target domain is for demonstration purposes. In practice, you'd likely use two different datasets or more significant modifications to simulate domain shifts.
This example assumes the source and target domains have the same number of classes and uses source labels as dummy labels for the target domain during adversarial training. In practice, careful handling might be required if the domains have different class distributions.


Code: 

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.datasets import mnist

# Load datasets
(source_train_images, source_train_labels), (source_test_images, source_test_labels) = mnist.load_data()
target_train_images = 255 - source_train_images  # Inverted MNIST as target
target_test_images = 255 - source_test_images

# Preprocessing
source_train_images = source_train_images.astype('float32') / 255
source_test_images = source_test_images.astype('float32') / 255
target_train_images = target_train_images.astype('float32') / 255
target_test_images = target_test_images.astype('float32') / 255

source_train_images = np.expand_dims(source_train_images, axis=-1)
source_test_images = np.expand_dims(source_test_images, axis=-1)
target_train_images = np.expand_dims(target_train_images, axis=-1)
target_test_images = np.expand_dims(target_test_images, axis=-1)

# Define the feature extractor
input_img = layers.Input(shape=(28, 28, 1))
x = layers.Conv2D(32, (3, 3), activation='relu')(input_img)
x = layers.MaxPooling2D((2, 2))(x)
x = layers.Conv2D(64, (3, 3), activation='relu')(x)
features = layers.Flatten()(x)

feature_extractor = Model(inputs=input_img, outputs=features)

# Define the classifier
classifier_input = layers.Input(shape=(features.shape[1],))
x = layers.Dense(64, activation='relu')(classifier_input)
classifier_output = layers.Dense(10, activation='softmax')(x)
classifier = Model(inputs=classifier_input, outputs=classifier_output)

# Define the domain discriminator
discriminator_input = layers.Input(shape=(features.shape[1],))
x = layers.Dense(64, activation='relu')(discriminator_input)
discriminator_output = layers.Dense(1, activation='sigmoid')(x)  # Binary classification: source vs. target
domain_discriminator = Model(inputs=discriminator_input, outputs=discriminator_output)

# Combined model
combined_features = feature_extractor(input_img)
combined_class_predictions = classifier(combined_features)
combined_domain_predictions = domain_discriminator(combined_features)
combined_model = Model(inputs=input_img, outputs=[combined_class_predictions, combined_domain_predictions])

# Custom adversarial loss
def adversarial_loss(y_true, y_pred):
    return tf.keras.losses.binary_crossentropy(y_true, y_pred)

# Compile models
classifier.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
domain_discriminator.compile(optimizer='adam', loss='binary_crossentropy')
combined_model.compile(optimizer='adam', loss=['sparse_categorical_crossentropy', adversarial_loss], loss_weights=[1, 0.5])

# Train classifier on source data
classifier.fit(feature_extractor.predict(source_train_images), source_train_labels, epochs=5)

# Adversarial training
for epoch in range(5):
    # Domain labels: 0 for source, 1 for target
    source_domain_labels = np.zeros(source_train_images.shape[0])
    target_domain_labels = np.ones(target_train_images.shape[0])
    
    # Train on source
    combined_model.train_on_batch(source_train_images, [source_train_labels, source_domain_labels])
    
    # Train on target
    combined_model.train_on_batch(target_train_images, [source_train_labels, target_domain_labels])  # Using source labels as dummy labels for classification loss

# Evaluate on target domain
target_features = feature_extractor.predict(target_test_images)
accuracy = classifier.evaluate(target_features, source_test_labels)[1]
print(f"Target domain accuracy: {accuracy:.4f}")

