In [None]:
!pip install -q openai stable_baselines3 huggingface_hub bitsandbytes datasets peft accelerate safetensors sentencepiece gymnasium shimmy wandb
!pip install -q git+https://github.com/huggingface/diffusers
!pip install -q git+https://github.com/huggingface/transformers

Prompt generation using LLM

In [None]:
import json
import os
from pprint import pprint
import bitsandbytes as bnb
import torch
import torch.nn as nn
import transformers
#from datasets import load_dataset
from huggingface_hub import notebook_login
from peft import (
    LoraConfig,
    PeftConfig,
    PeftModel,
    get_peft_model,
    prepare_model_for_kbit_training
)
from transformers import (
    AutoConfig,
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig
)

os.environ["CUDA_VISIBLE_DEVICES"] = "0"
#notebook_login()

import gymnasium
import numpy as np
import pandas as pd
import torch
from gymnasium import spaces
import time
from PIL import Image
import os
import stable_baselines3
import pandas as pd

import wandb
wandb.login()


In [None]:
MODEL_NAME = "jeong-jasonji/CSE574_prompter"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

model_gen = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    device_map="auto",
    trust_remote_code=True,
    quantization_config=bnb_config
)

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token

generation_config = model_gen.generation_config
generation_config.max_new_tokens = 200
generation_config.temperature = 0.1
generation_config.top_p = 0.7
generation_config.num_return_sequences = 1
generation_config.pad_token_id = tokenizer.eos_token_id
generation_config.eos_token_id = tokenizer.eos_token_id

device = "cuda:0"

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import huggingface_hub


def generate_prompt_transformer(company, strategy):

    action = "make a convincing ad for "+ company + " with themes of " + strategy
    encoding = tokenizer(action, return_tensors="pt").to(device)
    with torch.inference_mode():
      outputs = model_gen.generate(
          input_ids = encoding.input_ids,
          attention_mask = encoding.attention_mask,
          generation_config = generation_config,
          max_new_tokens = 77
      )
      prompt = tokenizer.decode(outputs[0], skip_special_tokens=True)
      return prompt


prompt = generate_prompt_transformer("h&m","elegance")
prompt

In [None]:
df=pd.read_csv("/kaggle/input/cse574/ratings_images_1.csv")
df

In [None]:
import torch.nn as nn
from transformers import BertModel, BertConfig, BertTokenizer,DistilBertConfig
from transformers import DistilBertTokenizer, DistilBertModel

from transformers import AutoTokenizer
import torch
from transformers import BertModel, BertConfig, AutoTokenizer

class TextEncoder(torch.nn.Module):
    def __init__(self, tokenizer, bert_model):
        super(TextEncoder, self).__init__()
        self.tokenizer = tokenizer
        config = BertConfig.from_pretrained("bert-base-uncased", output_hidden_states=True)
        config.num_hidden_layers = 3
        self.bert = bert_model.from_pretrained("bert-base-uncased", config=config).to("cuda")

    def forward(self, inputs):
        inputs = [inputs] if isinstance(inputs, str) else inputs

        max_length_batch = 10

        encoding = self.tokenizer.batch_encode_plus(
            inputs,
            truncation=True,
            return_tensors='pt',
            padding='longest',
            max_length=max_length_batch,
            return_attention_mask=True
        )

        input_ids = encoding["input_ids"].to("cuda")
        attention_mask = encoding["attention_mask"].to("cuda")

        with torch.no_grad():
            outputs = self.bert(input_ids, attention_mask=attention_mask)
            last_hidden_states = outputs.last_hidden_state

        cls_representation = last_hidden_states[:, 0, :]

        reduced_representation = cls_representation[:, :64]

        return reduced_representation

tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel

model = TextEncoder(tokenizer, bert_model)

In [None]:
%load_ext tensorboard
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter('./tensorboard_logs')
wandb.tensorboard.patch(root_logdir="./tensorboard_logs")

In [None]:
wandb.init(project="cse474")

In [None]:
%tensorboard --logdir /content/tensorboard_logs


In [None]:
import gym
from gym import spaces
import numpy as np
import torch
import pandas as pd

class AdsEnv(gym.Env):
    def __init__(self, persuasion_strategies, model, products, df):
        super().__init__()
        self.persuasion_strategies = persuasion_strategies
        self.action_space = spaces.Discrete(len(self.persuasion_strategies))
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(64,), dtype=np.float32)
        self.model = model
        self.products = products
        self.df = df
        self.current_product_index = 0
        self.current_strategy_index=0
        self.current_strategy = None
        self.current_product = None
        self.reset()

    def encode_observation(self, text):
        with torch.no_grad():
            encoded_text = self.model(text).cpu().numpy()
        return encoded_text

    def get_reward(self, product_name, strategy):
        product_name = product_name.lower()
        strategy = strategy.lower()
        mask = self.df['names'].str.contains(product_name) & self.df['names'].str.contains(strategy)
        filtered_df = self.df[mask]

        if not filtered_df.empty:
            average_score = filtered_df['Average'].mean()
            return average_score
        else:
            return 0

    def reset(self):
       
        self.current_product_index = np.random.choice(len(self.products))
        self.current_product = self.products[self.current_product_index]

        
        self.current_strategy_index = np.random.choice(len(self.persuasion_strategies))
        observation = self.encode_observation(self.current_product)
        return observation

      
    def step(self, action):
        action = action % len(self.persuasion_strategies)
        self.current_strategy = self.persuasion_strategies[action]
        reward = self.get_reward(self.current_product, self.current_strategy)
        done = True
        observation = self.encode_observation(self.current_product)
        info = {'product': self.current_product, 'strategy': self.current_strategy}
        return observation, reward, done, info

    def render(self, mode="human"):
        pass


In [None]:
df['names'].apply(lambda x: x.split('_')[1]).unique()

In [None]:
# persuasion_strategies = ["Liking", "SocialProof", "Scarcity", "Authority", "Consistency", "Reciprocity", "Contrast", "Unity", "Consensus", "Emotion", "Scarcity", "Urgency", "Exclusivity", "Curiosity", "Anticipation", "Mystery", "Surprise", "Humor"]
persuasion_strategies=['authority', 'emotion(eager)', 'guarantees', 'socialproof','trustworthiness', 'anchoring&comparison', 'concreteness','fashionable', 'reciprocity', 'socialidentity', 'scarcity']
products=["kfc","h&m","cocacola"]
env = AdsEnv(persuasion_strategies,model,products,df)
agent = stable_baselines3.PPO('MlpPolicy', env,verbose=0,tensorboard_log="./tensorboard_logs/")

In [None]:
agent.learn(total_timesteps=128*1000,progress_bar=True)

In [None]:
agent

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
from tqdm import tqdm
import pandas as pd

action_log_df = pd.DataFrame(columns=['Episode', 'Step', 'Product', 'Strategy', 'Reward', 'Done'])

num_episodes = 200
total_rewards = []
action_counts = {strategy: 0 for strategy in env.persuasion_strategies}

for episode in tqdm(range(num_episodes)):
    obs = env.reset()
    done = False
    episode_reward = 0
    step_count = 0

    while not done:
        step_count += 1
        action,_ = agent.predict(obs)
        if isinstance(action, np.ndarray) and action.size == 1:
            action = action.item()
        action_counts[env.persuasion_strategies[action]] += 1
        obs, reward, done, info = env.step(action)
        episode_reward += reward

        action_log_df.loc[len(action_log_df)] = [episode, step_count, info['product'], env.persuasion_strategies[action], reward, done]

    total_rewards.append(episode_reward)

average_reward = np.mean(total_rewards)

print(f"Average reward over {num_episodes} episodes: {average_reward:.2f}")

action_log_df.to_csv('action_log.csv', index=False)

print("Strategy distribution:")
for strategy, count in action_counts.items():
    print(f"Strategy '{strategy}': chosen {count} times")


In [None]:
positive_reward_rows = action_log_df[action_log_df['Reward'] > 0]
positive_reward_rows.sort_values(by="Reward",ascending=False)

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

grouped_data = action_log_df.groupby(['Product', 'Strategy'])['Reward'].mean().reset_index()

pivot_data = grouped_data.pivot(index='Product', columns='Strategy', values='Reward')

fig, ax = plt.subplots(figsize=(12, 6))
bar_plot = pivot_data.plot(kind='bar', ax=ax)

ax.set_title('Average Reward for Product-Strategy Combinations')
ax.set_xlabel('Product')
ax.set_ylabel('Average Reward')
ax.legend(title='Strategy')

for i, strategy in enumerate(pivot_data.columns):
    bars = [bar for bar in bar_plot.containers[i].patches]
    for bar in bars:
        height = bar.get_height()
        ax.annotate(f'{strategy}',
                    xy=(bar.get_x() + bar.get_width() / 2, height),
                    xytext=(0, 3),  # 3 points vertical offset
                    textcoords="offset points",
                    ha='center', va='bottom', rotation=90)

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
unique_combinations = action_log_df[action_log_df['Reward'] > 0].drop_duplicates(subset=['Product', 'Strategy']).sort_values(by="Reward",ascending=False)
unique_combinations