In [None]:
import os
import io
import json
os.environ["CUDA_VISIBLE_DEVICES"]="0"
import torch
import torch.nn as nn
from torch.utils.data import Dataset


from peft import get_peft_model, AdaLoraConfig, TaskType
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments, DataCollatorForLanguageModeling

In [None]:
max_input_length = 512

In [None]:
def _make_r_io_base(f, mode: str):
    if not isinstance(f, io.IOBase):
        f = open(f, mode=mode)
    return f


def jload(f, mode="r"):
    """Load a .json file into a dictionary."""
    f = _make_r_io_base(f, mode)
    jdict = json.load(f)
    f.close()
    return jdict


PROMPT_DICT = {
    "prompt_input":
        ("Below is an instruction that describes a task, paired with an input that provides further context. "
         "Write a response that appropriately completes the request.\n\n"
         "### Instruction:\n{instruction}\n\n### Input:\n{input}\n\n### Response:"),
    "prompt_no_input": ("Below is an instruction that describes a task. "
                        "Write a response that appropriately completes the request.\n\n"
                        "### Instruction:\n{instruction}\n\n### Response:"),
}


def get_dataset_from_jsonl(jsonl_file, tokenizer=None):
    list_data_dict = jload(jsonl_file)

    prompt_input, prompt_no_input = PROMPT_DICT["prompt_input"], PROMPT_DICT["prompt_no_input"]
    sources = [
        prompt_input.format_map(example) if example.get("input", "") != "" else prompt_no_input.format_map(example)
        for example in list_data_dict
    ]
    targets = [f"{example['output']}{tokenizer.eos_token}" for example in list_data_dict]

    return zip(sources, targets)


class SFTDataset(Dataset):
    def __init__(self, train_path, tokenizer, split, max_length=2048, max_data_size=0, valid_ratio=0.01):
        self.post_list = []
        dataset = get_dataset_from_jsonl(train_path, tokenizer=tokenizer)
        self.post_list = [s + t for s, t in dataset]
        
        if max_data_size != 0:
            self.post_list = self.post_list[:max_data_size]

        if "valid" in split:
            self.post_list = self.post_list[0:10]
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.input_ids = []
        self.attn_masks = []

    def __len__(self):
        return len(self.post_list)

    def __getitem__(self, idx):
        txt = self.post_list[idx]
        encodings_dict = self.tokenizer(txt, truncation=True, max_length=self.max_length, padding="max_length")
        input_ids = torch.tensor(encodings_dict["input_ids"])
        attn_masks = torch.tensor(encodings_dict["attention_mask"])

        return {
            "input_ids": input_ids,
            "attention_mask": attn_masks,
            "labels": input_ids,
        }

In [None]:
data_path = "./lora_test/niuyeye.json"
tokenizer = AutoTokenizer.from_pretrained("facebook/opt-125m")
model = AutoModelForCausalLM.from_pretrained("facebook/opt-125m")

In [None]:
lora_config = AdaLoraConfig(
    task_type=TaskType.CAUSAL_LM,
    init_r=6,
    target_r=4,
    tinit=50,
    tfinal=100,
    deltaT=5,
    beta1=0.3,
    beta2=0.3,
    
    orth_reg_weight=0.2,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
)

In [None]:
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

In [None]:
train_dataset = SFTDataset(
        data_path,
        tokenizer,
        "train",
        max_length=max_input_length,
)

In [None]:
trainer = Trainer(
    model=model, 
    train_dataset=train_dataset,
    args=TrainingArguments(
        per_device_train_batch_size=1, 
        gradient_accumulation_steps=4,
        warmup_steps=50, 
        max_steps=200, 
        learning_rate=2e-4, 
        fp16=True,
        logging_steps=10, 
        output_dir='outputs'
    ),
    data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False)
)
model.config.use_cache = False  # silence the warnings. Please re-enable for inference!
trainer.train()

In [None]:
model.save_pretrained("outputs/niuyeye/")