<a href="https://colab.research.google.com/github/Aditi-Yadav-19/ColabTest/blob/main/ady_lora.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import os
import logging
import re
import numpy as np
import torch
import argparse
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    DataCollatorForLanguageModeling,
    Trainer,
    Pipeline,
)
from datasets import load_dataset, Dataset, DatasetDict
from peft import prepare_model_for_int8_training, LoraConfig, get_peft_model

# Set up logging
logger = logging.getLogger(__name__)

# Constants for keys and format
INSTRUCTION_KEY = "### Instruction:"
RESPONSE_KEY = "### Response:"
END_KEY = "### End"
INTRO_BLURB = "Below is an instruction that describes a task. Write a response that appropriately completes the request."
PROMPT_FOR_GENERATION_FORMAT = """{intro}
{instruction_key}
{instruction}
{response_key}
""".format(
    intro=INTRO_BLURB,
    instruction_key=INSTRUCTION_KEY,
    instruction="{instruction}",
    response_key=RESPONSE_KEY,
)


class InstructionTextGenerationPipeline(Pipeline):
    def __init__(
        self,
        *args,
        do_sample: bool = True,
        max_new_tokens: int = 256,
        top_p: float = 0.92,
        top_k: int = 0,
        **kwargs,
    ):
        super().__init__(
            *args,
            do_sample=do_sample,
            max_new_tokens=max_new_tokens,
            top_p=top_p,
            top_k=top_k,
            **kwargs,
        )

    def _sanitize_parameters(self, return_instruction_text=False, **generate_kwargs):
        preprocess_params = {}

        # newer versions of the tokenizer configure the response key as a special token.  newer versions still may
        # append a newline to yield a single token.  find whatever token is configured for the response key.
        tokenizer_response_key = next(
            (
                token
                for token in self.tokenizer.additional_special_tokens
                if token.startswith(RESPONSE_KEY)
            ),
            None,
        )

        response_key_token_id = None
        end_key_token_id = None
        if tokenizer_response_key:
            try:
                response_key_token_id = get_special_token_id(
                    self.tokenizer, tokenizer_response_key
                )
                end_key_token_id = get_special_token_id(self.tokenizer, END_KEY)

                # Ensure generation stops once it generates "### End"
                generate_kwargs["eos_token_id"] = end_key_token_id
            except ValueError:
                pass

        forward_params = generate_kwargs
        postprocess_params = {
            "response_key_token_id": response_key_token_id,
            "end_key_token_id": end_key_token_id,
            "return_instruction_text": return_instruction_text,
        }

        return preprocess_params, forward_params, postprocess_params

    def preprocess(self, instruction_text, **generate_kwargs):
        prompt_text = PROMPT_FOR_GENERATION_FORMAT.format(instruction=instruction_text)
        inputs = self.tokenizer(
            prompt_text,
            return_tensors="pt",
        )
        inputs["prompt_text"] = prompt_text
        inputs["instruction_text"] = instruction_text
        return inputs

    def _forward(self, model_inputs, **generate_kwargs):
        input_ids = model_inputs["input_ids"]
        attention_mask = model_inputs.get("attention_mask", None)
        generated_sequence = self.model.generate(
            input_ids=input_ids.to(self.model.device),
            attention_mask=attention_mask,
            pad_token_id=self.tokenizer.pad_token_id,
            **generate_kwargs,
        )[0].cpu()
        instruction_text = model_inputs.pop("instruction_text")
        return {
            "generated_sequence": generated_sequence,
            "input_ids": input_ids,
            "instruction_text": instruction_text,
        }

    def postprocess(
        self,
        model_outputs,
        response_key_token_id,
        end_key_token_id,
        return_instruction_text,
    ):
        sequence = model_outputs["generated_sequence"]
        instruction_text = model_outputs["instruction_text"]

        # The response will be set to this variable if we can identify it.
        decoded = None

        # If we have token IDs for the response and end, then we can find the tokens and only decode between them.
        if response_key_token_id and end_key_token_id:
            # Find where "### Response:" is first found in the generated tokens.  Considering this is part of the
            # prompt, we should definitely find it.  We will return the tokens found after this token.
            response_pos = None
            response_positions = np.where(sequence == response_key_token_id)[0]
            if len(response_positions) == 0:
                logger.warn(
                    f"Could not find response key {response_key_token_id} in: {sequence}"
                )
            else:
                response_pos = response_positions[0]

            if response_pos:
                # Next find where "### End" is located.  The model has been trained to end its responses with this
                # sequence (or actually, the token ID it maps to, since it is a special token).  We may not find
                # this token, as the response could be truncated.  If we don't find it then just return everything
                # to the end.  Note that even though we set eos_token_id, we still see the this token at the end.
                end_pos = None
                end_positions = np.where(sequence == end_key_token_id)[0]
                if len(end_positions) > 0:
                    end_pos = end_positions[0]

                decoded = self.tokenizer.decode(
                    sequence[response_pos + 1 : end_pos]
                ).strip()
        else:
            # Otherwise we'll decode everything and use a regex to find the response and end.

            fully_decoded = self.tokenizer.decode(sequence)

            # The response appears after "### Response:".  The model has been trained to append "### End" at the
            # end.
            m = re.search(
                r"#+\s*Response:\s*(.+?)#+\s*End", fully_decoded, flags=re.DOTALL
            )

            if m:
                decoded = m.group(1).strip()
            else:
                # The model might not generate the "### End" sequence before reaching the max tokens.  In this case,
                # return everything after "### Response:".
                m = re.search(r"#+\s*Response:\s*(.+)", fully_decoded, flags=re.DOTALL)
                if m:
                    decoded = m.group(1).strip()
                else:
                    logger.warn(f"Failed to find response in:\n{fully_decoded}")

        if return_instruction_text:
            return {"instruction_text": instruction_text, "generated_text": decoded}

        return decoded


def main():
    """Main function to run ALPaCA LORA training script."""
    parser = argparse.ArgumentParser(description="Run ALPaCA LORA training script")
    parser.add_argument(
        "--sample_size", type=int, default=100, help="Number of samples"
    )
    parser.add_argument(
        "--model_name",
        type=str,
        default="Maykeye/TinyLLama-v0",
        help="Pretrained model name",
    )
    parser.add_argument("--batch_size", type=int, default=128, help="Batch size")
    parser.add_argument(
        "--output_dir", type=str, default="AlpacaWeights", help="Output Directory"
    )

    args = parser.parse_args()

    # Load tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(args.model_name, padding_side="left")
    model = AutoModelForCausalLM.from_pretrained(args.model_name, device_map="auto")

    # Load dataset
    data = load_dataset("tatsu-lab/alpaca")

    # Create a sample of 1000 items from the 'train' split
    sampled_data = data["train"].shuffle(seed=42).select(range(args.sample_size))

    # Create a new DatasetDict with the updated 'train' split
    sampled_dataset_dict = DatasetDict({"train": sampled_data})

    data = sampled_dataset_dict

    # Generate prompt for each data point
    def generate_prompt(data_point):
        # taken from https://github.com/tloen/alpaca-lora
        if data_point["instruction"]:
            return f"""Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

    ### Instruction:
    {data_point["instruction"]}

    ### Input:
    {data_point["input"]}

    ### Response:
    {data_point["output"]}"""
        else:
            return f"""Below is an instruction that describes a task. Write a response that appropriately completes the request.

    ### Instruction:
    {data_point["instruction"]}

    ### Response:
    {data_point["output"]}"""

    data = data.map(
        lambda data_point: {"prompt": tokenizer(generate_prompt(data_point))}
    )

    # Settings for A100 - For 3090
    MICRO_BATCH_SIZE = 4  # change to 4 for 3090
    BATCH_SIZE = args.batch_size
    GRADIENT_ACCUMULATION_STEPS = BATCH_SIZE // MICRO_BATCH_SIZE
    EPOCHS = 2  # paper uses 3
    LEARNING_RATE = 2e-5
    CUTOFF_LEN = 256
    LORA_R = 4
    LORA_ALPHA = 16
    LORA_DROPOUT = 0.05

    model = prepare_model_for_int8_training(model, use_gradient_checkpointing=True)

    config = LoraConfig(
        r=LORA_R,
        lora_alpha=LORA_ALPHA,
        lora_dropout=LORA_DROPOUT,
        bias="none",
        task_type="CAUSAL_LM",
    )
    model = get_peft_model(model, config)
    tokenizer.pad_token_id = 0  # unk. we want this to be different from the eos token

    data = data.shuffle().map(
        lambda data_point: tokenizer(
            generate_prompt(data_point),
            truncation=True,
            max_length=CUTOFF_LEN,
            padding="max_length",
        )
    )

    trainer = Trainer(
        model=model,
        train_dataset=data["train"],
        args=TrainingArguments(
            per_device_train_batch_size=MICRO_BATCH_SIZE,
            gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
            warmup_steps=100,
            num_train_epochs=EPOCHS,
            learning_rate=LEARNING_RATE,
            logging_steps=100,
            output_dir="lora-weights",
            save_total_limit=3,
        ),
        data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
    )
    model.config.use_cache = False
    trainer.train(resume_from_checkpoint=False)

    model.save_pretrained("lora-weights")


if __name__ == "__main__":
    main()

usage: ipykernel_launcher.py [-h] [--sample_size SAMPLE_SIZE]
                             [--model_name MODEL_NAME]
                             [--batch_size BATCH_SIZE]
                             [--output_dir OUTPUT_DIR]
ipykernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-f7660b2d-fde1-4980-b1f0-ce96661fe528.json


SystemExit: ignored

In [1]:


!pip install accelerate
!pip install appdirs
!pip install loralib
!pip install bitsandbytes
!pip install black
!pip install black[jupyter]
!pip install datasets
!pip install fire
!pip install git+https://github.com/huggingface/peft.git
!pip install git+https://github.com/huggingface/transformers.git
!pip install sentencepiece
!pip install gradio

Collecting git+https://github.com/huggingface/peft.git
  Cloning https://github.com/huggingface/peft.git to /tmp/pip-req-build-nq3p438k
  Running command git clone --filter=blob:none --quiet https://github.com/huggingface/peft.git /tmp/pip-req-build-nq3p438k
  Resolved https://github.com/huggingface/peft.git to commit 369a0fba85a94e99738f832877c665a25793da1f
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting git+https://github.com/huggingface/transformers.git
  Cloning https://github.com/huggingface/transformers.git to /tmp/pip-req-build-nlbmrxex
  Running command git clone --filter=blob:none --quiet https://github.com/huggingface/transformers.git /tmp/pip-req-build-nlbmrxex
  Resolved https://github.com/huggingface/transformers.git to commit 3f9cb335047315edfd4b6ad666ef148e98cc4850
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements

In [6]:
import os
import logging
import re
import numpy as np
import torch
import argparse
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    DataCollatorForLanguageModeling,
    Trainer,
    Pipeline,
)
from datasets import load_dataset, Dataset, DatasetDict
from peft import prepare_model_for_int8_training, LoraConfig, get_peft_model

# Rest of the code remains the same until the main() function

# Define the main() function as a cell in Jupyter Notebook
def main():
    sample_size = 100  # Set your desired sample size here
    model_name = "Maykeye/TinyLLama-v0"
    batch_size = 128
    output_dir = "AlpacaWeights"

    # Load tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side="left")
    model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto")

    # Load dataset
    data = load_dataset("tatsu-lab/alpaca")

    # Create a sample of 1000 items from the 'train' split
    sampled_data = data["train"].shuffle(seed=42).select(range(sample_size))
    sampled_dataset_dict = DatasetDict({"train": sampled_data})
    data = sampled_dataset_dict

    # Generate prompt for each data point
    def generate_prompt(data_point):
        # taken from https://github.com/tloen/alpaca-lora
        if data_point["instruction"]:
            return f"""Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

    ### Instruction:
    {data_point["instruction"]}

    ### Input:
    {data_point["input"]}

    ### Response:
    {data_point["output"]}"""
        else:
            return f"""Below is an instruction that describes a task. Write a response that appropriately completes the request.

    ### Instruction:
    {data_point["instruction"]}

    ### Response:
    {data_point["output"]}"""

    data = data.map(
        lambda data_point: {"prompt": tokenizer(generate_prompt(data_point))}
    )

    # Settings for A100 - For 3090
    MICRO_BATCH_SIZE = 4  # change to 4 for 3090
    BATCH_SIZE = 16
    GRADIENT_ACCUMULATION_STEPS = BATCH_SIZE // MICRO_BATCH_SIZE
    EPOCHS = 2  # paper uses 3
    LEARNING_RATE = 2e-5
    CUTOFF_LEN = 256
    LORA_R = 4
    LORA_ALPHA = 16
    LORA_DROPOUT = 0.05

    model = prepare_model_for_int8_training(model, use_gradient_checkpointing=True)

    config = LoraConfig(
        r=LORA_R,
        lora_alpha=LORA_ALPHA,
        lora_dropout=LORA_DROPOUT,
        bias="none",
        task_type="CAUSAL_LM",
    )
    model = get_peft_model(model, config)
    tokenizer.pad_token_id = 0  # unk. we want this to be different from the eos token

    data = data.shuffle().map(
        lambda data_point: tokenizer(
            generate_prompt(data_point),
            truncation=True,
            max_length=CUTOFF_LEN,
            padding="max_length",
        )
    )

    # Trainer initialization
    trainer = Trainer(
        model=model,
        train_dataset=data["train"],
        args=TrainingArguments(
            per_device_train_batch_size=MICRO_BATCH_SIZE,
            gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
            warmup_steps=100,
            num_train_epochs=EPOCHS,
            learning_rate=LEARNING_RATE,
            logging_steps=100,
            output_dir="lora-weights",
            save_total_limit=3,
        ),
        data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
    )
    model.config.use_cache = False
    trainer.train(resume_from_checkpoint=False)

    model.save_pretrained("lora-weights")

if __name__ == "__main__":
    main()


Map:   0%|          | 0/100 [00:00<?, ? examples/s]

You're using a LlamaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


Step,Training Loss


In [7]:
import argparse
import transformers
import torch
from peft import PeftModel





In [13]:


# def model_fn(model_dir):
#     model = transformers.AutoModelForCausalLM.from_pretrained(model_dir)
#     adapters_name = "lora-weights"
#     model = PeftModel.from_pretrained(model, adapters_name)
#     model.eval()

#     tokenizer = transformers.AutoTokenizer.from_pretrained(model_dir)

#     return model, tokenizer

# def predict_fn(data, model_and_tokenizer):
#     model, tokenizer = model_and_tokenizer

#     batch = tokenizer(data, return_tensors="pt", add_special_tokens=False)

#     input_ids = batch["input_ids"].to(model.device)

#     with torch.no_grad():
#         generated = model.generate(
#             input_ids=input_ids,
#             min_length=10,
#             max_length=60,
#             do_sample=True,
#             top_k=50,
#             temperature=1.0,
#             top_p=1.0,
#             repetition_penalty=1.1,
#         )
#         response = tokenizer.decode(generated[0], skip_special_tokens=True)

#     return response

# # Set your model directory and input sentence here
# model_dir = "Maykeye/TinyLLama-v0"
# input_sentence = "Joe Biden is"

# # Load model and tokenizer
# model_and_tokenizer = model_fn(model_dir)

# # Generate text
# prediction = predict_fn(input_sentence, model_and_tokenizer)

# # Print the generated text
# print("Generated text:", prediction)


import transformers
import torch
from peft import PeftModel

def model_fn(model_identifier):
    model = transformers.AutoModelForCausalLM.from_pretrained(model_identifier)
    adapters_name = "lora-weights"
    model = PeftModel.from_pretrained(model, adapters_name)
    model.eval()

    tokenizer = transformers.AutoTokenizer.from_pretrained(model_identifier)

    return model, tokenizer

def predict_fn(question, model_and_tokenizer):
    model, tokenizer = model_and_tokenizer

    prompt = f"Question: {question}\nAnswer:"

    input_ids = tokenizer.encode(prompt, return_tensors="pt")

    with torch.no_grad():
        generated = model.generate(
            input_ids=input_ids,
            max_length=100,  # Adjust the max length as needed
            do_sample=True,
            top_k=50,
            temperature=1.0,
            top_p=1.0,
            repetition_penalty=1.1,
        )
        response = tokenizer.decode(generated[0], skip_special_tokens=True)

    return response

# Set your model identifier and input question here
model_identifier = "Maykeye/TinyLLama-v0"
input_question = "What is the capital of France?"

# Load model and tokenizer
model_and_tokenizer = model_fn(model_identifier)

# Generate answer to the question
answer = predict_fn(input_question, model_and_tokenizer)

# Print the generated answer
print("Generated answer:", answer)


Generated answer: Question: What is the capital of France?
Answer: "Now you can fly!"
He walks through the park and then suddenly was all over with a big tree when he heard a bit of other animals. It was so bright!
The lions were walking closer to them and laughed. "Let's go up at the park.” 
They started sitting in the tree, just like a few times he noticed it looked very soft.
The farmer said, "What


In [15]:
import transformers
import torch
from peft import PeftModel
from datasets import load_dataset

# Define your fine-tuned finance model directory
finance_model_identifier = "Maykeye/TinyLLama-v0"

def model_fn(model_identifier):
    model = transformers.AutoModelForCausalLM.from_pretrained(model_identifier)
    adapters_name = "lora-weights"
    model = PeftModel.from_pretrained(model, adapters_name)
    model.eval()

    tokenizer = transformers.AutoTokenizer.from_pretrained(model_identifier)

    return model, tokenizer

def predict_fn(question, model_and_tokenizer):
    model, tokenizer = model_and_tokenizer

    prompt = f"Finance Question: {question}\nAnswer:"

    input_ids = tokenizer.encode(prompt, return_tensors="pt")

    with torch.no_grad():
        generated = model.generate(
            input_ids=input_ids,
            max_length=100,  # Adjust the max length as needed
            do_sample=True,
            top_k=50,
            temperature=1.0,
            top_p=1.0,
            repetition_penalty=1.1,
        )
        response = tokenizer.decode(generated[0], skip_special_tokens=True)

    return response

# Set your finance model identifier and input finance question here
finance_model_and_tokenizer = model_fn(finance_model_identifier)
input_finance_question = "What is a stock market?"

# Generate answer to the finance question
finance_answer = predict_fn(input_finance_question, finance_model_and_tokenizer)

# Print the generated finance answer
print("Generated finance answer:", finance_answer)


Generated finance answer: Finance Question: What is a stock market?
Answer: "This is this place to be! If you put on your own." 
Mummy said yes. She decided to go outside and play. As it went away, she was glad he had always made. She had such something special for her that was not a better thing to do.
Soon, the most exciting job of her friends who came by they stopped her. 
Maya would have a great
