# **`Setup Cells (REQUIRED)`**

In [None]:
#@title ### **(LOGIN) Set up Hugging Face Token and Connect to Google Drive**

from google.colab import auth
auth.authenticate_user()

from google.colab import drive
drive.mount('/content/drive')

#@markdown Enter your Hugging Face Access Token (Write)
token_name = 'NaN' #@param {type:"string"}
token_value = 'NaN' #@param {type:"string"}

import os
os.environ[token_name] = token_value

from huggingface_hub import HfApi
api = HfApi()

try:
    user_info = api.whoami()
    print(f"Successfully authenticated as: {user_info['name']}")
except Exception as e:
    print(f"Authentication failed: {str(e)}")

In [None]:
#@title ### **Import Libraries and Set Up Environment**

!pip install transformers datasets torch sentencepiece accelerate evaluate
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import json
import os
import torch
from datasets import load_dataset
import numpy as np
import pandas as pd
from accelerate import Accelerator
from IPython.display import clear_output

# Accelerator instance
accelerator = Accelerator(mixed_precision="fp16")  # mixed precision setup

clear_output(wait=True)

print("\nSetup Finished.")

# **`Base Chatbot Training (INITIAL TRAINING)`**
Chatbot Finetuning from scratch and repository creation.

In [None]:
#@title ### Step 1: Load and Preprocess Dataset
#@markdown ## Enter your Dataset Here
HF_dataset = "iZELX1/Comsci-Concepts-25k" #@param {type:"string"}
#@markdown Leave blank to use default.
select_file = "" #@param {type:"string"}
if select_file:
    dataset = load_dataset(HF_dataset, data_files=select_file)
else:
    dataset = load_dataset(HF_dataset)

print(dataset['train'].column_names)
sample_rows = dataset['train'].select(range(3))
print(sample_rows)

#@markdown Input Model to Use:
model_name = "gpt2-medium" #@param ["gpt2-medium", "facebook/opt-350m", "EleutherAI/gpt-neo-350m", "microsoft/DialoGPT-medium", "allenai/led-base-16384", "google/umt5-base", "microsoft/prophetnet-large-uncased"]
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

#@markdown Two Column Structured Dataset Configuration:
Input_Column = 'input' #@param {type:"string"}
Output_Column = 'output' #@param {type:"string"}

# Adjust max_length based on token limit and sample size
def preprocess_function(examples):
    texts = [f"Human: {q1}\nAI: {q2}" for q1, q2 in zip(examples[Input_Column], examples[Output_Column])]
    model_inputs = tokenizer(texts, truncation=True, padding='max_length', max_length=512)
    model_inputs["labels"] = model_inputs["input_ids"].copy()
    return model_inputs

tokenized_dataset = dataset.map(preprocess_function, batched=True, remove_columns=dataset["train"].column_names)

In [None]:
#@title ### Step 2: Prepare Data for Training
#@markdown ### Set the dataset parameters:
#@markdown Enter the number of dataset to use for training (randomized/shuffled)
train_range = 20000 #@param {type:"number"}
#@markdown Enter the index range of dataset to use for evaluation (not shuffled but index-based | Usually 10-20%)
eval_index_start_range = 16000 #@param {type:"number"}
eval_index_end_range = 20000 #@param {type:"number"}

train_dataset = tokenized_dataset["train"].shuffle(seed=42).select(range(train_range))
eval_dataset = tokenized_dataset["train"].shuffle(seed=42).select(range(eval_index_start_range, eval_index_end_range))

train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
eval_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])

In [None]:
#@title ### Step 3: Initialize the Selected Model
model = AutoModelForCausalLM.from_pretrained(model_name)

# Move model to Accelerator setup
device = accelerator.device
model.to(device)


In [None]:
#@title ### Step 4: Fine-tune the Model
from transformers import Trainer, TrainingArguments
#@markdown # Training Arguments
# Set parameters for gradient accumulation
#@markdown - ### Output Directory (Default ./results)
output_directory = './results' #@param {type:"string"}
#@markdown - ### Accumulate Gradients Over N Steps (Default 4)
gradient_accumulation_steps = 4  #@param {type:"number"}
#@markdown - ### Base Batch Size (Default 2)
base_batch_size = 4  #@param {type:"number"}
#@markdown - ### Original Learning Rate (Before Linear Scaling Rule | Default 1e-5)
base_learning_rate = 1e-5  #@param {type:"number"}
#@markdown - ### Number of Train Epochs (Default 5)
train_epochs = 5 #@param {type:"number"}
#@markdown - ### Warmup Steps (Default 500)
warmup_steps = 500 #@param {type:"number"}
#@markdown - ### Weight Decay (Default 0.02)
weight_decay = 0.02 #@param {type:"number"}
#@markdown - ### Logging Directory (Default ./logs)
logging_directory = './logs' #@param {type:"string"}
#@markdown - ### Logging Steps (Default 10)
logging_steps = 10 #@param {type:"number"}
#@markdown - ### Evaluation Strategy (Default epoch)
evaluation_strategy = "epoch" #@param {type:"string"}
#@markdown - ### Saving Strategy (Default epoch)
save_strategy = "epoch" #@param {type:"string"}
#@markdown - ### Load the Best Model at the end (Default True)
load_best_model = True #@param {type:"boolean"}
#@markdown - ### Enable Mixed-Precision Training (FP16 | Default True)
mixed_precision = True #@param {type:"boolean"}
#@markdown - ### Logging First Step (Default True)
lfs = True #@param {type:"boolean"}
#@markdown - ### Keep the last N checkpoints (Default 2)
save_checkpoint = 2 #@param {type:"number"}
#@markdown - ### Warmup Ratio (Default 0.1)
warmup_r = 0.1 #@param {type:"number"}
#@markdown - ### Markdown Epsilon (Default 1e-8)
markdown_e = 1e-8 #@param {type:"number"}
#@markdown - ### Gradient Clipping (Max Grad Norm | Default 1.0)
grad_clip = 1.0 #@param {type:"number"}

# Calculate effective batch size
effective_batch_size = base_batch_size * gradient_accumulation_steps

# Calculate new learning rate based on linear scaling rule
new_learning_rate = base_learning_rate * (effective_batch_size / base_batch_size)

training_args = TrainingArguments(
    output_dir=output_directory,
    num_train_epochs=train_epochs,
    per_device_train_batch_size=base_batch_size,
    per_device_eval_batch_size=base_batch_size,
    warmup_steps=warmup_steps,
    weight_decay=weight_decay,
    logging_dir=logging_directory,
    logging_steps=logging_steps,
    evaluation_strategy=evaluation_strategy,
    save_strategy=save_strategy,
    load_best_model_at_end=load_best_model,
    fp16=mixed_precision,
    learning_rate=new_learning_rate,
    gradient_accumulation_steps=gradient_accumulation_steps,
    logging_first_step=lfs,
    save_total_limit=save_checkpoint,
    warmup_ratio=warmup_r,
    adam_epsilon=markdown_e,
    max_grad_norm=grad_clip,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=tokenizer,
)


#@markdown ## Save the Model and Tokenizer
model_name = 'fine_tuned_drei_model' #@param {type:"string"}

trainer.train()
model.save_pretrained(model_name)
tokenizer.save_pretrained(model_name)

#@markdown ---
#@markdown # Save the Base Model To Hugging Face
save_to_hf = True #@param {type:"boolean"}

import os
from huggingface_hub import HfApi, Repository, login

def save_initial_model(model, tokenizer, repo_name, commit_message="Initial model"):
    # Set up authentication
    hf_token = os.environ.get(token_name)
    if not hf_token:
        raise ValueError("Hugging Face token not found in environment variables.")

    # Login to Hugging Face
    login(token=hf_token)

    # Initialize Hugging Face API
    api = HfApi()

    # Check if the repository exists, if not, create it
    try:
        repo_url = api.create_repo(repo_name, exist_ok=True, token=hf_token)
    except Exception as e:
        print(f"Error creating repository: {e}")
        return

    # Clone the repository
    repo = Repository(local_dir=repo_name, clone_from=repo_url, use_auth_token=hf_token)

    # Save the model and tokenizer in the root directory
    model.save_pretrained(repo_name)
    tokenizer.save_pretrained(repo_name)

    # Commit and push changes
    repo.push_to_hub(commit_message=commit_message)

    print(f"Initial model saved to {repo_url}")

#@markdown ## Enter Hugging Face Repository Details (Write existing repository or create one)
HF_Username = "iZELX1" #@param {type:"string"}
HF_Repository = "CodePath" #@param {type:"string"}

if save_to_hf:
  save_initial_model(model, tokenizer, f"{HF_Username}/{HF_Repository}")

In [None]:
#@title ### Step 5: Create Chatbot Function for Base Model (Beta)
from huggingface_hub import HfApi
from transformers import AutoModelForCausalLM, AutoTokenizer
import logging
import json
import os
import torch
import numpy as np
from typing import List, Dict, Union, Tuple
from datetime import datetime
from collections import Counter
import re
from gensim import corpora
from gensim.models import LdaModel
from sklearn.metrics.pairwise import cosine_similarity
from scipy.spatial.distance import cosine

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class AdvancedChatbotManager:
    def __init__(self, model, tokenizer, history_file: str = "chat_history.json"):
        self.model = model
        self.tokenizer = tokenizer
        self.model.config.pad_token_id = self.model.config.eos_token_id
        self.history_file = history_file
        self.max_history = 15
        self.max_repetition_threshold = 0.7
        self.min_response_length = 15
        self.max_response_length = 150
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)

        self.topic_model = self._initialize_topic_model()
        self.user_feedback = []

    def _initialize_topic_model(self):
        dictionary = corpora.Dictionary([["topic", "model", "initialization"]])
        corpus = [dictionary.doc2bow(["topic", "model", "initialization"])]
        return LdaModel(corpus=corpus, id2word=dictionary, num_topics=5, passes=1)

    def load_chat_history(self) -> List[Dict]:
        try:
            if os.path.exists(self.history_file):
                with open(self.history_file, 'r') as f:
                    history = json.load(f)
                if self._validate_history(history):
                    return history
        except (json.JSONDecodeError, FileNotFoundError) as e:
            logging.warning(f"Error loading chat history: {e}")
        return []

    def _validate_history(self, history: List[Dict]) -> bool:
        if not isinstance(history, list):
            return False
        for entry in history:
            if not isinstance(entry, dict) or 'human' not in entry or 'ai' not in entry or 'timestamp' not in entry:
                return False
            if not isinstance(entry['human'], str) or not isinstance(entry['ai'], str) or not isinstance(entry['timestamp'], str):
                return False
        return True

    def save_chat_history(self, history: List[Dict]) -> None:
        try:
            if os.path.exists(self.history_file):
                backup_file = f"{self.history_file}.{datetime.now().strftime('%Y%m%d%H%M%S')}.backup"
                os.replace(self.history_file, backup_file)

            with open(self.history_file, 'w') as f:
                json.dump(history, f, indent=2)
        except Exception as e:
            logging.error(f"Error saving chat history: {e}")

    def detect_repetition(self, response: str, history: List[Dict]) -> bool:
        if not history:
            return False

        recent_responses = [entry['ai'] for entry in history[-5:]]
        for past_response in recent_responses:
            similarity = self._calculate_similarity(response, past_response)
            if similarity > self.max_repetition_threshold:
                return True
        return False

    def _calculate_similarity(self, text1: str, text2: str) -> float:
        if not text1 or not text2:
            return 0.0

        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())

        intersection = len(words1.intersection(words2))
        union = len(words1.union(words2))

        return intersection / union if union > 0 else 0.0

    def check_response_quality(self, response: str, user_input: str) -> bool:
        if len(response.split()) < self.min_response_length:
            return False
        if len(response.split()) > self.max_response_length:
            return False
        if response.count('.') > 15:
            return False
        if len(set(response.split())) < len(response.split()) * 0.4:
            return False
        if not self._check_relevance(response, user_input):
            return False
        return True

    def _check_relevance(self, response: str, user_input: str) -> bool:
        words1 = set(user_input.lower().split())
        words2 = set(response.lower().split())

        intersection = len(words1.intersection(words2))
        min_overlap = 1 if len(words1) < 3 else 2

        return intersection >= min_overlap

    def generate_response(self, user_input: str, history: List[Dict], max_attempts: int = 5) -> str:
        history_text = self._format_history(history)
        input_text = f"{history_text}\nHuman: {user_input}\nAI:"

        for attempt in range(max_attempts):
            try:
                inputs = self.tokenizer.encode_plus(
                    input_text,
                    return_tensors="pt",
                    padding='max_length',
                    max_length=512,
                    truncation=True
                ).to(self.device)

                with torch.no_grad():
                    outputs = self.model.generate(
                        inputs['input_ids'],
                        attention_mask=inputs['attention_mask'],
                        max_new_tokens=150,
                        num_return_sequences=1,
                        no_repeat_ngram_size=3,
                        top_k=50,
                        top_p=0.92,
                        temperature=self._dynamic_temperature(attempt),
                        do_sample=True
                    )

                response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
                response = response.split("AI:")[-1].strip()

                if (self.check_response_quality(response, user_input) and
                    not self.detect_repetition(response, history)):
                    return response

            except Exception as e:
                logging.error(f"Error generating response (attempt {attempt+1}): {e}")

        return self._generate_fallback_response(user_input, history)

    def _dynamic_temperature(self, attempt: int) -> float:
        return 0.7 + (attempt * 0.05)

    def _generate_fallback_response(self, user_input: str, history: List[Dict]) -> str:
        fallback_responses = [
            "I apologize, but I'm having trouble understanding. Could you please rephrase your question?",
            "I'm not sure I have enough information to answer that. Can you provide more context?",
            "That's an interesting question. To better assist you, could you clarify what specific aspect you're most interested in?",
            "I want to make sure I give you the most accurate information. Could you break down your question into smaller parts?",
            "I'm still learning and evolving. Could you try asking your question in a different way?"
        ]
        return np.random.choice(fallback_responses)

    def _format_history(self, history: List[Dict]) -> str:
        formatted = []
        for entry in history[-self.max_history:]:
            formatted.extend([
                f"Human: {entry['human']}",
                f"AI: {entry['ai']}"
            ])
        return "\n".join(formatted)

    def reset_history(self) -> List[Dict]:
        return []

    def analyze_conversation(self, history: List[Dict]) -> Dict:
        human_words = Counter()
        ai_words = Counter()
        for entry in history:
            human_words.update(entry['human'].lower().split())
            ai_words.update(entry['ai'].lower().split())

        all_text = " ".join([entry['human'] + " " + entry['ai'] for entry in history])
        bow_corpus = self.topic_model.id2word.doc2bow(all_text.lower().split())
        topics = self.topic_model[bow_corpus]

        return {
            'total_exchanges': len(history),
            'top_human_words': human_words.most_common(5),
            'top_ai_words': ai_words.most_common(5),
            'average_human_length': sum(len(entry['human'].split()) for entry in history) / len(history),
            'average_ai_length': sum(len(entry['ai'].split()) for entry in history) / len(history),
            'main_topics': sorted(topics, key=lambda x: x[1], reverse=True)[:3],
        }

    def handle_special_commands(self, user_input: str, history: List[Dict]) -> Tuple[bool, str]:
        if user_input.lower() in ['reset', 'clear']:
            return True, "Conversation history has been reset. How can I help you?"
        elif user_input.lower() in ['analyze', 'stats']:
            analysis = self.analyze_conversation(history)
            return True, f"Conversation Analysis:\n{json.dumps(analysis, indent=2)}"
        elif user_input.lower() in ['help', 'commands']:
            return True, "Available commands: reset/clear, analyze/stats, help/commands, feedback"
        elif user_input.lower() == 'feedback':
            return True, self._get_user_feedback()
        return False, ""

    def _get_user_feedback(self) -> str:
        feedback = input("Please rate the chatbot's performance (1-5, 5 being best): ")
        comment = input("Any additional comments? ")
        self.user_feedback.append({"rating": feedback, "comment": comment, "timestamp": datetime.now().isoformat()})
        return "Thank you for your feedback!"

    def detect_user_intent(self, user_input: str) -> str:
        intents = {
            "greeting": ["hello", "hi", "hey", "greetings"],
            "farewell": ["bye", "goodbye", "see you", "farewell"],
            "question": ["what", "why", "how", "when", "where", "who"],
            "command": ["do", "please", "can you", "could you"],
            "opinion": ["think", "believe", "feel", "opinion"],
        }

        user_input_lower = user_input.lower()
        for intent, keywords in intents.items():
            if any(keyword in user_input_lower for keyword in keywords):
                return intent
        return "general"

def load_base_model():
  #@markdown Enter your Hugging Face Repository with the saved Base Model
    repo_name = "iZELX1/CodePath" #@param {type:"string"}
    api = HfApi()

    try:
        model_files = api.list_repo_files(repo_name)
    except Exception as e:
        print(f"Error accessing repository: {e}")
        return None, None

    try:
        model = AutoModelForCausalLM.from_pretrained(repo_name)
        tokenizer = AutoTokenizer.from_pretrained(repo_name)
        print(f"Base model and tokenizer loaded successfully from {repo_name}")
        return model, tokenizer
    except Exception as e:
        print(f"Error loading base model: {e}")
        return None, None

def main():
    model, tokenizer = load_base_model()
    if model is None or tokenizer is None:
        print("Failed to load the base model. Exiting.")
        return

    chatbot = AdvancedChatbotManager(model=model, tokenizer=tokenizer)
    chat_history = chatbot.load_chat_history()
    print("Chatbot: Hello! How can I assist you today?")

    while True:
        user_input = input("You: ").strip()

        if user_input.lower() in ['quit', 'exit', 'bye']:
            print("Chatbot: Goodbye! It was a pleasure assisting you.")
            chatbot.save_chat_history(chat_history)
            break

        special_command, special_response = chatbot.handle_special_commands(user_input, chat_history)
        if special_command:
            print(f"Chatbot: {special_response}")
            if user_input.lower() in ['reset', 'clear']:
                chat_history = chatbot.reset_history()
            continue

        if len(user_input) < 2:
            print("Chatbot: Could you please provide more details or ask a specific question?")
            continue

        intent = chatbot.detect_user_intent(user_input)
        print(f"Detected user intent: {intent}")

        try:
            response = chatbot.generate_response(user_input, chat_history)
            print("Chatbot:", response)

            chat_history.append({
                "human": user_input,
                "ai": response,
                "timestamp": datetime.now().isoformat(),
                "intent": intent
            })

            if len(chat_history) > chatbot.max_history:
                chat_history = chat_history[-chatbot.max_history:]

            chatbot.save_chat_history(chat_history)

        except Exception as e:
            print(f"Error in main loop: {e}")
            print("Chatbot: I apologize, but I encountered an error. Could you please try again?")

if __name__ == "__main__":
    main()

In [None]:
#@title GPU Cleaning (Optional)
import torch
import gc

class GPUMemoryManager:
    @staticmethod
    def clear_gpu_memory():
        """Clear GPU memory and cache"""
        # Empty CUDA cache
        torch.cuda.empty_cache()

        # Force garbage collection
        gc.collect()

        # Clear any remaining tensors
        for obj in gc.get_objects():
            try:
                if torch.is_tensor(obj):
                    del obj
            except Exception:
                pass

        # Empty cache again after cleanup
        torch.cuda.empty_cache()

        # Print memory stats
        if torch.cuda.is_available():
            print(f"GPU Memory allocated: {torch.cuda.memory_allocated(0) / 1024**2:.2f} MB")
            print(f"GPU Memory cached: {torch.cuda.memory_reserved(0) / 1024**2:.2f} MB")

def main():
    try:
        # Clear GPU memory before starting
        GPUMemoryManager.clear_gpu_memory()

        print("Finished Cleaning.")
    except Exception as e:
        print(f"An error occurred: {str(e)}")

    finally:
        GPUMemoryManager.clear_gpu_memory()

if __name__ == "__main__":
    os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:512'
    main()