<a href="https://colab.research.google.com/github/Jeremy-su1/ai-algorithm/blob/main/ebinna_llama3_2_1b_instrunct_tag.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%capture
!pip install -U bitsandbytes
!pip install -U transformers
!pip install -U accelerate
!pip install -U peft
!pip install -U trl

In [None]:
!pip install kaggle --upgrade
!pip install huggingface_hub
!pip install wandb



In [None]:
!pip install evaluate



In [None]:
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
import bitsandbytes as bnb
import torch
import torch.nn as nn
import transformers
from datasets import Dataset, load_dataset, DatasetDict
from peft import LoraConfig, PeftConfig
from trl import SFTTrainer
from trl import setup_chat_format
from transformers import (AutoModelForCausalLM,
                          AutoTokenizer,
                          BitsAndBytesConfig,
                          TrainingArguments,
                          pipeline,
                          logging)
from sklearn.metrics import (accuracy_score,
                             classification_report,
                             confusion_matrix)
from sklearn.model_selection import train_test_split
import ast
import evaluate
from sklearn.metrics import accuracy_score, f1_score, precision_recall_fscore_support

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from huggingface_hub import login

# 로그인 함수 호출
login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
dataset_train = load_dataset(
    'csv',
    data_files='/content/drive/My Drive/AiExpertCource/project/dataset/rev_tag_training_samples.csv',
    split='train'
)
dataset_valid = load_dataset(
    'csv',
    data_files='/content/drive/My Drive/AiExpertCource/project/dataset/rev_tag_validation_samples.csv',
    split='train'
)

In [None]:
dataset = DatasetDict({
    'train': Dataset.from_dict({'Title': dataset_train['Title'], 'Body': dataset_train['Body'], 'Tags_new': dataset_train['Tags_new']}),
    'val': Dataset.from_dict({'Title': dataset_valid['Title'], 'Body': dataset_valid['Body'], 'Tags_new': dataset_valid['Tags_new']}),
})

In [None]:
classes = ['Algorithms', 'Backend', 'Data Science', 'Databases', 'Dev Tools', 'Frontend', 'Mobile', 'Systems', 'iOS/macOS']
class2id = {'Algorithms' :0, 'Backend' : 1, 'Data Science' : 2, 'Databases' : 3, 'Dev Tools' : 4, 'Frontend' : 5, 'Mobile' :6, 'Systems' : 7, 'iOS/macOS' : 8}
id2class = {0 : 'Algorithms', 1: 'Backend', 2 : 'Data Science', 3 : 'Databases', 4 : 'Dev Tools', 5 : 'Frontend', 6 : 'Mobile', 7 : 'Systems', 8 :'iOS/macOS'}

In [None]:
# Define the prompt generation functions
def generate_prompt(data_point):
    all_labels =  ast.literal_eval(data_point['Tags_new'])
    label =  ','.join(map(str, all_labels))
    data_point["text"] =  f"""
            Classify the text into Algorithms, Backend, Data Science, Databases, Dev Tools, Frontend, Mobile, Systems, iOS/macOS, and return the answer as the corresponding Software Development and Engineering label.
text: {data_point["Title"]} {data_point["Body"]}
label: {label}""".strip()
    return data_point

def generate_test_prompt(data_point):
    data_point["text"] =  f"""
            Classify the text into Algorithms, Backend, Data Science, Databases, Dev Tools, Frontend, Mobile, Systems, iOS/macOS, and return the answer as the corresponding Software Development and Engineering label.
text: {data_point["Title"]} {data_point["Body"]}
label: """.strip()
    return data_point

In [None]:
dataset['train'] = dataset['train'].map(generate_prompt)

Map:   0%|          | 0/20000 [00:00<?, ? examples/s]

In [None]:
dataset['val'] = dataset['val'].map(generate_test_prompt)

Map:   0%|          | 0/6500 [00:00<?, ? examples/s]

In [None]:
def generate_label(data_point):
  all_labels =  ast.literal_eval(data_point['Tags_new'])
  labels = [0. for i in range(len(classes))]
  for label in all_labels:
      label_id = class2id[label]
      labels[label_id] = 1.
  data_point['labels'] = labels
  return data_point

In [None]:
dataset['val'] = dataset['val'].map(generate_label)

Map:   0%|          | 0/6500 [00:00<?, ? examples/s]

In [None]:
dataset['val'][0]

{'Title': 'how to add target blank to an href.location hyperlink',
 'Body': '<p>I\'m trying to get a link in javascript to open a url in a new tab. I\'ve found a number of posts for target="blank" using attribute and a couple other ways but can\'t seem to get it to work. Basically, if v_virt = "invoices" I just need the url to open in a new tab. Does anyone know the proper syntax? </p>\n\n<p><div class="snippet" data-lang="js" data-hide="false" data-console="true" data-babel="false">\r\n<div class="snippet-code">\r\n<pre class="snippet-code-js lang-js prettyprint-override"><code>if(v_virt=="invoices"){\r\nlocation.href=(\'https://www.example.com/invoices/invoice?ProjectID=[@field:ProjectID]&amp;InvoiceID=[@field:InvoiceID]\', \'_blank\');\r\n}</code></pre>\r\n</div>\r\n</div>\r\n</p>\n',
 'Tags_new': "['Frontend']",
 'text': 'Classify the text into Algorithms, Backend, Data Science, Databases, Dev Tools, Frontend, Mobile, Systems, iOS/macOS, and return the answer as the corresponding S

In [None]:
base_model_name = "meta-llama/Llama-3.2-1B-Instruct"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=False,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype="float16",
)

model = AutoModelForCausalLM.from_pretrained(
    base_model_name,
    device_map="auto",
    torch_dtype="float16",
    quantization_config=bnb_config,
)

model.config.use_cache = False
model.config.pretraining_tp = 1

In [None]:
tokenizer = AutoTokenizer.from_pretrained(base_model_name)

tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:
def predict(test, model, tokenizer):

  prompt = test["text"]
  pipe = pipeline(task="text-generation",
                  model=model,
                  tokenizer=tokenizer,
                  max_new_tokens=2,
                  temperature=0.1)

  result = pipe(prompt)
  answer = result[0]['generated_text'].split("label:")[-1].strip()

  all_labels = [item.strip() for item in answer.split(",")]

  labels = [0. for i in range(len(classes))]
  for label in all_labels:
      # Check if the label is not empty before accessing class2id
      if label:
          try:
              label_id = class2id[label]
              labels[label_id] = 1.0
          except KeyError:
              print(f"Warning: Label '{label}' not found in class2id")

  test['predict'] = labels

  return test

In [None]:
dataset['val'] = dataset['val'].map(predict, fn_kwargs={'model': model, 'tokenizer': tokenizer})

Map:   0%|          | 0/6500 [00:00<?, ? examples/s]



In [None]:
print(dataset['val'][0]['predict'])
dataset['val'][0]['labels']

[1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]


[0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0]

In [None]:
# define which metrics to compute for evaluation
import evaluate
import numpy as np

clf_metrics = evaluate.combine(["accuracy", "f1", "precision", "recall"])

def sigmoid(x):
   return 1/(1 + np.exp(-x))

def compute_metrics(predictions, labels):

  predictions = sigmoid(predictions)
  predictions = (predictions > 0.5).astype(int)
  accuracy = accuracy_score(labels, predictions)
  precision, recall, f1_score_result, _ = precision_recall_fscore_support(labels, predictions, average='micro')

  flat_predictions = predictions.reshape(-1)
  flat_labels = labels.reshape(-1)
  flat_accuracy = accuracy_score(flat_labels, flat_predictions)

  return {
        'flat_accuracy' : flat_accuracy,
        'accuracy' : accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score_result
        }

In [None]:
y_pred = np.array([item['predict'] for item in dataset['val']])
y_true = np.array([item['labels'] for item in dataset['val']])

In [None]:
print(compute_metrics(y_pred, y_true))

{'flat_accuracy': 0.8174358974358974, 'accuracy': 0.12584615384615386, 'precision': 0.24487179487179486, 'recall': 0.16936914111983786, 'f1_score': 0.20023962857570765}


In [None]:
import bitsandbytes as bnb

def find_all_linear_names(model):
    cls = bnb.nn.Linear4bit
    lora_module_names = set()
    for name, module in model.named_modules():
        if isinstance(module, cls):
            names = name.split('.')
            lora_module_names.add(names[0] if len(names) == 1 else names[-1])
    if 'lm_head' in lora_module_names:  # needed for 16 bit
        lora_module_names.remove('lm_head')
    return list(lora_module_names)

In [None]:
modules = find_all_linear_names(model)
modules

['gate_proj', 'down_proj', 'q_proj', 'up_proj', 'k_proj', 'o_proj', 'v_proj']

In [None]:
output_dir="/content/drive/My Drive/AiExpertCource/project/llama-3.2-1b-instrunct-tag-lora"

peft_config = LoraConfig(
    lora_alpha=16,
    lora_dropout=0,
    r=64,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=modules,
)

training_arguments = TrainingArguments(
    output_dir="llama-3.2-1b-instrunct-tag-lora",                    # directory to save and repository id
    num_train_epochs=5,                       # number of training epochs
    per_device_train_batch_size=1,            # batch size per device during training
    gradient_accumulation_steps=8,            # number of steps before performing a backward/update pass
    gradient_checkpointing=True,              # use gradient checkpointing to save memory
    optim="paged_adamw_32bit",
    logging_steps=1,
    learning_rate=2e-4,                       # learning rate, based on QLoRA paper
    weight_decay=0.001,
    fp16=True,
    bf16=False,
    max_grad_norm=0.3,                        # max gradient norm based on QLoRA paper
    max_steps=-1,
    warmup_ratio=0.03,                        # warmup ratio based on QLoRA paper
    group_by_length=False,
    lr_scheduler_type="cosine",               # use cosine learning rate scheduler
    report_to="wandb",                  # report metrics to w&b
    eval_strategy="steps",              # save checkpoint every epoch
    eval_steps = 0.2
)

trainer = SFTTrainer(
    model=model,
    args=training_arguments,
    train_dataset=dataset['train'],
    eval_dataset=dataset['val'],
    peft_config=peft_config,
    dataset_text_field="text",
    tokenizer=tokenizer,
    max_seq_length=512,
    packing=False,
    dataset_kwargs={
    "add_special_tokens": False,
    "append_concat_token": False,
    }
)


Deprecated positional argument(s) used in SFTTrainer, please use the SFTConfig to set these arguments instead.


Map:   0%|          | 0/20000 [00:00<?, ? examples/s]

Map:   0%|          | 0/6500 [00:00<?, ? examples/s]

  super().__init__(


In [None]:
print(dataset['train'][0]["text"])

Classify the text into Algorithms, Backend, Data Science, Databases, Dev Tools, Frontend, Mobile, Systems, iOS/macOS, and return the answer as the corresponding Software Development and Engineering label.
text: Why threads are needed in my given assignment in java? <p><strong>I'm not asking to do my assignment. Read carefully</strong></p>

<blockquote>
  <p>Write a program to simulate a bus traveling between 5 different stations and
  repeats the cycle, the bus can take up to a maximum of 50 persons, at each
  station random number of persons get off the bus and random number of
  persons get on the bus, consider these cases.</p>
  
  <ul>
  <li>If bus does not have enough space for all persons, persons will have to
  stay in station for next cycle</li>
  <li>Persons cannot mount on bus until persons on bus dismount first.</li>
  <li>You can simulate bus trip with a fixed delay between each stop to
  simulate travel time.</li>
  <li>Persons can not mount/dismount the bus until bus arri

In [None]:
# Train model
trainer.train()

  return fn(*args, **kwargs)


Step,Training Loss,Validation Loss
2500,1.4815,1.600047
5000,1.4337,1.612491


  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)


In [None]:
dataset['val'] = dataset['val'].map(predict, fn_kwargs={'model': model, 'tokenizer': tokenizer})
y_pred = np.array([item['predict'] for item in dataset['val']])
y_true = np.array([item['labels'] for item in dataset['val']])
print(compute_metrics(y_pred, y_true))

In [None]:
# Save trained model and tokenizer
trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from peft import PeftModel
import torch


# Reload tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
tokenizer.pad_token_id = tokenizer.eos_token_id

base_model_reload = AutoModelForCausalLM.from_pretrained(
        base_model_name,
        return_dict=True,
        low_cpu_mem_usage=True,
        torch_dtype=torch.float16,
        device_map="auto",
        trust_remote_code=True,
)

In [None]:
new_model = PeftModel.from_pretrained(base_model_reload, output_dir)
new_model = new_model.merge_and_unload()

In [None]:
dataset['val'] = dataset['val'].map(predict, fn_kwargs={'model': new_model, 'tokenizer': tokenizer})
y_pred = np.array([item['predict'] for item in dataset['val']])
y_true = np.array([item['labels'] for item in dataset['val']])
print(compute_metrics(y_pred, y_true))

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from peft import PeftModel
import torch


# Reload tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
tokenizer.pad_token_id = tokenizer.eos_token_id

basic_model = AutoModelForCausalLM.from_pretrained(
        base_model_name,
        return_dict=True,
        low_cpu_mem_usage=True,
        torch_dtype=torch.float16,
        device_map="auto",
        trust_remote_code=True,
        quantization_config=bnb_config,

)

In [None]:
dataset['val'] = dataset['val'].map(predict, fn_kwargs={'model': basic_model, 'tokenizer': tokenizer})
y_pred = np.array([item['predict'] for item in dataset['val']])
y_true = np.array([item['labels'] for item in dataset['val']])
print(compute_metrics(y_pred, y_true))