# Use for E5 training for all Classification scenario

# Classification

In [14]:
%%writefile train_script_optimized_Qwen_1.py
# %%writefile train_script_optimized_e5_2.py
# %%writefile train_script_optimized_e5_3.py

import os
import argparse
import json
from datetime import timedelta
from tqdm import tqdm
import multiprocessing
import shutil
import torch
import torch.distributed as dist
import torch.optim as optim

from torch.utils.data import DistributedSampler,Dataset, DataLoader
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.fully_sharded_data_parallel import MixedPrecision
from transformers.models.mixtral.modeling_mixtral import MixtralDecoderLayer

from transformers import AutoTokenizer, AutoModel, AutoModelForSequenceClassification, AutoModelForCausalLM, default_data_collator, get_cosine_schedule_with_warmup, AutoConfig, BitsAndBytesConfig
import gc
from transformers import MistralForCausalLM
from torch.utils.tensorboard import SummaryWriter

import subprocess
import utils
import copy
import pandas as pd

from torch.nn import functional as F

class TrainingDataset(Dataset):
    def __init__(self, df, tokenizer, max_seq_length):
        self.df = df
        self.tokenizer = tokenizer
        self.max_seq_length = max_seq_length

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        sample = self.df.iloc[idx]
        full_body = sample['FullBody']
   
        text = ' '.join([str(sample['Url']), str(sample['UrlTitle']), str(sample['UrlSnippet']), str(sample['FullBody'])])
        label = int(sample['Label'])
        
        # For E5: padding=False
        res = self.tokenizer(text, max_length=self.max_seq_length-1, return_attention_mask=False, padding=False, truncation=True)
        
        res['input_ids'] = res['input_ids'] + [self.tokenizer.eos_token_id]
        res = self.tokenizer.pad(res, max_length=self.max_seq_length, padding='max_length', return_attention_mask=True, return_tensors='pt')
        
        return {
            'input_ids': res['input_ids'],  # shape: torch.Size([bs, 1024])
            'attention_mask': res['attention_mask'],    # shape: torch.Size([bs, 1024])
            'labels': torch.tensor(label)   # shape: torch.Size([bs])
        }

def setup(args):
    # setup distributed environment
    world_size = int(os.environ["WORLD_SIZE"])  # 参与训练的总进程数，即GPU数量。在分布式训练中，每个GPU对应一个进程。因此，world_size是你在分布式训练中使用的GPU数量
    rank = int(os.environ["RANK"])      # rank是每个进程的唯一标识符，用于区分不同的进程。在分布式训练中，每个进程都有一个独特的rank，从0到world_size-1
    local_rank = int(os.environ["LOCAL_RANK"])  # local_rank:是在单个节点内部的进程的本地标识符。如果你在多个节点上运行分布式训练，每个节点都有自己的local_rank。
    # 比如，你在2台服务器上运行分布式训练，每台服务器有4个GPU。那么总的world_size是8，进程的rank分别是[0,1,2,3,4,5,6,7]。在每台服务器上local_rank分别是[0,1,2,3]
    print(f"World size: {world_size}, rank: {rank}, local rank: {local_rank}")

    timeout = timedelta(hours=5) # 时间间隔对象，表示在初始化分布式进程组时等待的最大时间。这里表示如果在5小时内无法成功初始化分布式进程组，将引发超时错误
    dist.init_process_group("nccl", timeout=timeout, rank=rank, world_size=world_size) # 初始化分布式进程组，nccl表示用用NVIDIA Collective Communications Library作为后端，rank表示当前进程的标识符，world_size表示总进程数
    assert torch.distributed.is_initialized()

    torch.cuda.set_device(local_rank)  # 设置当前进程使用的GPU设备，local_rank表示当前节点内部的进程的本地标识符
    torch.cuda.empty_cache()    # 用于清空GPU缓存，以释放内存

    if is_master(rank): # 只有当前是主进程时才会执行打印参数的操作
        print(args)
    
    # setup tokenizer and dataloader
    
    # if args.model == 'llama2':
    #     tokenizer = LlamaTokenizer.from_pretrained(args.load_from)
    # elif args.model == 'mistral':
    #     tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1")
    # elif args.model == 'turing':
    #     # tokenizer = TNLGv4Tokenizer()
    #     tokenizer = GPT2TokenizerFast.from_pretrained('Xenova/gpt-4')
    # elif args.model == 'mixtral':
    #     tokenizer = AutoTokenizer.from_pretrained("mistralai/Mixtral-8x7B-v0.1")
    # else:
    #     print(f"model {args.model} not supported")
    #     raise NotImplementedError
    tokenizer = AutoTokenizer.from_pretrained(args.load_from)
    tokenizer.pad_token = tokenizer.eos_token
    
    # setup model and fsdp
    if args.model == 'llama2':
        model = LlamaForCausalLM.from_pretrained(args.load_from, load_in_8bit=False, device_map=None, torch_dtype=torch.bfloat16, use_cache=True)
        if args.neftune_alpha is not None:
            print('NEFTUNE enabled')
            # Save the old forward function as a class attribute
            torch.nn.Embedding.old_forward = model.model.embed_tokens.forward

            # Define the new forward function
            def new_forward(self, x):
                # Call the old forward function and get its output
                #print('neftune forward')
                out = self.old_forward(x)
                dims = torch.tensor(out.size(1) * out.size(2))
                mag_norm = args.neftune_alpha / torch.sqrt(dims)
                return out + torch.zeros_like(out).uniform_(-mag_norm, mag_norm)

            # Replace the forward function of the embedding object with the new one
            model.model.embed_tokens.forward = new_forward.__get__(model.model.embed_tokens, torch.nn.Embedding)
    elif args.model == "mistral":
        # e5
        # model = MistralForCausalLM.from_pretrained(args.load_from, load_in_8bit=False, device_map=None, torch_dtype=torch.float16, use_cache=True)
        # model = AutoModel.from_pretrained(args.load_from, load_in_8bit=False, device_map=None, torch_dtype=torch.float16)  # device_map=f'cuda:{local_rank}'
        # model = AutoModel.from_pretrained(args.load_from, load_in_8bit=False, device_map=f'cuda:{local_rank}', torch_dtype=torch.float16) 
        model = AutoModelForSequenceClassification.from_pretrained(args.load_from, num_labels=2, load_in_8bit=False, device_map=f'cuda:{local_rank}', torch_dtype=torch.float16) # zhym
        # model_state_dict = torch.load(args.model_path)
        # model.load_state_dict(model_state_dict)
    elif args.model == "mixtral":
        model = AutoModelForCausalLM.from_pretrained(args.load_from,
                                                    load_in_8bit=False, # 让模型不要以8位精度加载，使用默认精度加载权重
                                                    device_map=None,    # 没有自定义的设备映射，此时如果你在多个GPU上运行，模型的参数将均匀分配到所有可用的GPU上，适用于参数量大的模型
                                                    torch_dtype=torch.bfloat16,  # 用16位浮点数作为权重数据类型
                                                    use_cache=True  # 模型将使用缓存来加速计算
                                                    )
    elif args.model == "qwen":
        model = AutoModelForSequenceClassification.from_pretrained(args.load_from, num_labels=2, load_in_8bit=False, device_map=f'cuda:{local_rank}', torch_dtype=torch.float16) # zhym
    else:
        print(f"model {args.model} not supported")
        raise NotImplementedError
    
    if is_master(rank):
        print(f"model config: {model.config}")
        print(model)
    
    model.config.pad_token_id = model.config.eos_token_id  # new        
    model.to(dtype=torch.bfloat16)

    #utils.apply_fsdp_checkpointing(model) if args.model == 'llama2' else model.gradient_checkpointing_enable() # for turing
    # model.gradient_checkpointing_enable()
    #print(f"Gradient Checkpointing: {model.is_gradient_checkpointing}")

    model = FSDP(
        model,
        auto_wrap_policy=utils.get_mistral_wrapper(),
        #mixed_precision=mixed_precision_policy,
        mixed_precision=None,
        sharding_strategy=utils.fsdp_config.sharding_strategy,
        device_id=torch.cuda.current_device(),
        limit_all_gathers=True,
        sync_module_states=False,
        param_init_fn=None
    )

    # Mistral-E5
    utils.apply_fsdp_checkpointing(model) 
    
    # setup optimizer
    optimizer = optim.AdamW(
        model.parameters(),
        lr=args.lr,
        weight_decay=0.0,
    )
    
    # df = pd.read_csv("/cosmos/local/IndexQuality/FinetuneLLM/TrainingData/O1_A3_crowd_training_data_1.tsv",sep="\t")
    df = pd.read_csv(args.training_data_from, sep='\t', usecols=args.columns)
    df = df[df['Label'].notna()]
    print(f'training dataset length: {len(df)}')
    print(f'training data columns: {df.columns}')
    print(f'training data pos/neg counts: {df.Label.value_counts()}')
    
    train_ds =  TrainingDataset(df, tokenizer, args.max_seq_length)
    
    train_sampler = DistributedSampler(
        train_ds,
        rank=rank,
        num_replicas=world_size,
        shuffle=True,
    )
    
    train_dataloader = torch.utils.data.DataLoader(
        train_ds,
        batch_size=args.batch_size,
        num_workers=args.num_workers,  # 4
        pin_memory=True,
        sampler=train_sampler,
        drop_last=True,
        collate_fn=default_data_collator,
    )

    total_iteration = args.num_epochs * (len(train_dataloader)/(args.batch_size*args.gpu_counts))
    scheduler = get_cosine_schedule_with_warmup(
        optimizer, 
        num_warmup_steps=len(train_dataloader) * args.warmup,
        num_training_steps=len(train_dataloader) * args.num_epochs
        # num_warmup_steps=total_iteration*0.1,
        # num_training_steps=total_iteration
    )

    return model, train_dataloader, optimizer, scheduler, local_rank, rank, world_size, tokenizer


def is_master(rank):
    # In a multi-node setup, the master process is rank 0
    return rank == 0
        
class RunningMean(object):
    def __init__(self, local_rank, N=100):
        self.N = N
        self.local_rank = local_rank
        self.data = []

    def add(self, x):
        self.data.append(x)

    def mean_all_rank(self):
        d = self.data[-self.N:]
        if len(d) > 0:
            m = sum(d) / len(d)
        else:
            m = 0
        t = torch.tensor([m]).to(self.local_rank) # 将m转换为PyTorch张量，并将其移到self.local_rank对应的设备上
        all_t = torch.zeros(dist.get_world_size(), dtype=t.dtype).to(self.local_rank)   # 创建一个全零张量all_t，大小为分布式环境中的进程数
        dist.all_gather_into_tensor(all_t, t)   # 将所有进程的m收集到all_t中，并计算所有进程的平均值，返回一个CPU上的标量值
        return all_t.mean().cpu().item()
            
def train(args, model, train_dataloader, optimizer, scheduler, local_rank, rank, world_size, tokenizer):
    rm = RunningMean(local_rank)
    global_step = 0

    if global_step == 0 and is_master(rank):
        # Initialize TensorBoard writer only on the master process
        print('init tensorboard,', args.experiment_name)
        writer = SummaryWriter(args.log_dir)
        # with open(os.path.join(args.output_dir, "log.csv"), 'w') as f_log:
        #     f_log.write('global_step, total_loss\n')

    gradient_accumulation_steps = args.gradient_accumulation_steps  # bs = 10
    for epoch in range(args.num_epochs):
        for step, data in tqdm(enumerate(train_dataloader), total=len(train_dataloader), disable=not is_master(rank), desc=f'Epoch {epoch}/{args.num_epochs}'):
            model.train()
                    
            loss = model(**data).loss
            # accululating gradients over steps
            if gradient_accumulation_steps > 1:
                loss = loss / gradient_accumulation_steps
            loss.backward()
            if step % gradient_accumulation_steps == 0:
                torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
                optimizer.step()
                optimizer.zero_grad()
                scheduler.step()
            
            rm.add(loss.item())
            mean_loss = rm.mean_all_rank()
            if is_master(rank):
                print(f'loss: {mean_loss}')
                writer.add_scalar('Loss/train', mean_loss, global_step)      
                # with open(os.path.join(args.output_dir, "log.csv"), "a") as f_log:
                #     f_log.write(str(global_step)+","+str(mean_loss)+"\n")
            
            if(global_step%(args.save_checkpoint_steps*gradient_accumulation_steps)==0 and global_step>0):
                dir_name = 'model_'+str(global_step)
                checkpoint_dir = os.path.join(args.output_dir, dir_name)
                if not os.path.exists(checkpoint_dir):
                    os.makedirs(checkpoint_dir, exist_ok=True)
                print("saving checkpoint...")
                utils.save_model_checkpoint(model, checkpoint_dir, rank)
                
                
            dist.barrier()
            global_step += 1
        
    if is_master(rank):
        writer.flush()
        writer.close()

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    # parser.add_argument('--load_from', type=str, default='/data/local/IndexQuality/FinetuneLLM/Phi-3-medium')
    parser.add_argument('--load_from', type=str, default='intfloat/e5-mistral-7b-instruct')
    parser.add_argument('--model_path', type=str, default='/cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/Mixtral_New_ym_e5/current_best_1200/pytorch_model.bin')
    parser.add_argument('--model', type=str, default='mixtral')
    parser.add_argument('--num_workers', type=int, default=4)
    parser.add_argument('--warmup', type=float, default=0.1)
    parser.add_argument('--training_data_from', type=str, default="/cosmos/local/IndexQuality/FinetuneLLM/TrainingData/O1_A3_crowd_llm_training_data_no_eval_dataset_overlap.tsv")
    parser.add_argument('--columns', '-c', nargs='+', required=True, help="Names of columns to read")
    parser.add_argument('--batch_size', type=int, default=10) # 8->10 
    parser.add_argument('--gradient_accumulation_steps', type=int, default=2)
    parser.add_argument('--num_epochs', type=int, default=2)
    parser.add_argument('--max_seq_length', type=int, default=1024)
    # parser.add_argument('--eval_steps', type=int, default=200)  # 平常隔200评估一次，如果效果比之前好，直接保存；否则按save_checkpoint_steps保存
    parser.add_argument('--save_checkpoint_steps', type=int, default=100)
    parser.add_argument('--lr', type=float, default=1e-5)
    parser.add_argument('--gpu_counts', type=float, default=16) # warmup=0.1
    parser.add_argument('--output_dir', type=str, required=True)
    parser.add_argument('--experiment_name', type=str, required=True)
    parser.add_argument('--log_dir', type=str, required=True)
    args = parser.parse_args()
    train(args, *setup(args))
    
# singularity Command
# pip install transformers[torch]==4.38.1 datasets scikit-learn dataclasses lightgbm matplotlib mlflow tensorboard && cd /cosmos/local/IndexQuality/ContentModels/DataAugmentation/data/CBSpam_v3/Code/FinetuneLLM-US/new/ && CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15 torchrun --nnodes 1 --nproc_per_node 16 train_script_optimized_test.py --load_from /cosmos/local/IndexQuality/FinetuneLLM/Mixtral-8x7B-Instruct-v0.1/ --model mixtral --num_workers 4 --training_data_from /cosmos/local/IndexQuality/FinetuneLLM/TrainingData/O1_A3_crowd_llm_training_data_no_eval_dataset_overlap.tsv --batch_size 10 --gradient_accumulation_steps 2 --num_epochs 2 --save_checkpoint_steps 200 --lr 1e-5 --output_dir /cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/Mixtral_New_ym_bs_10_test --experiment_name MixtralTrain_ym --disable_tensorboard False --log_dir /cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/Mixtral_New_ym_bs_10_test/log
# current running: Mixtral_again: https://ml.azure.com/runs/Mixtral_again?wsid=/subscriptions/7972af26-e54d-410e-a755-20e582a46de0/resourceGroups/singularity-webdata/providers/Microsoft.MachineLearningServices/workspaces/singularity-webdata-ws01-eastus2&flight=1ptraining&tid=72f988bf-86f1-41af-91ab-2d7cd011db47

Overwriting train_script_optimized_Qwen_1.py


In [5]:
!pip install transformers[torch]==4.38.1 datasets scikit-learn dataclasses lightgbm matplotlib mlflow tensorboard

Collecting transformers==4.38.1 (from transformers[torch]==4.38.1)
  Downloading transformers-4.38.1-py3-none-any.whl.metadata (131 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m131.1/131.1 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting datasets
  Downloading datasets-3.0.1-py3-none-any.whl.metadata (20 kB)
Collecting scikit-learn
  Downloading scikit_learn-1.5.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Collecting dataclasses
  Downloading dataclasses-0.6-py3-none-any.whl.metadata (3.0 kB)
Collecting lightgbm
  Downloading lightgbm-4.5.0-py3-none-manylinux_2_28_x86_64.whl.metadata (17 kB)
Collecting matplotlib
  Downloading matplotlib-3.9.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting mlflow
  Downloading mlflow-2.16.2-py3-none-any.whl.metadata (29 kB)
Collecting tensorboard
  Downloading tensorboard-2.18.0-py3-none-any.whl.metadata (1.6 kB)
Collecting huggingface-hub<1.0,>

In [15]:
!export CUDA_VISIBLE_DEVICES="1,2,3"
!NCCL_DEBUG=WARN python -m torch.distributed.run  \
--master_port 29501 --nnodes 1 --nproc_per_node 3 train_script_optimized_Qwen_1.py \
--load_from Qwen/Qwen2.5-0.5B-Instruct \
--model qwen --num_workers 4 \
--training_data_from /cosmos/local/IndexQuality/FinetuneLLM/TrainingData/No_overlap_complete_O1_A3_crowd_training.tsv \
--columns Url UrlTitle UrlSnippet FullBody Label \
--batch_size 20 \
--gradient_accumulation_steps 1 \
--num_epochs 6 \
--max_seq_length 1024 \
--save_checkpoint_steps 200 \
--lr 1e-5 \
--gpu_counts 3 \
--output_dir /cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/qwen_no_overlap_o1_a3_v1 \
--experiment_name qwen_no_overlap_v1 \
--log_dir /cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/qwen_no_overlap_o1_a3_v1/logs

*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
[2024-10-10 04:16:57,275] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2024-10-10 04:16:57,327] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2024-10-10 04:16:57,338] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect)
World size: 3, rank: 0, local rank: 0
World size: 3, rank: 1, local rank: 1
World size: 3, rank: 2, local rank: 2
Namespace(load_from='Qwen/Qwen2.5-0.5B-Instruct', model_path='/cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/Mixtral_New_ym_e5/current_best_1200/pytorch_model.bin', model='qwen', num_workers=4, warmup=0.1, training_data_from='

# 模型预测Embedding和Score

In [None]:
# for example
# 'intfloat/e5-mistral-7b-instruct'

from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModelForSequenceClassification
model_name = "Qwen/Qwen2.5-0.5B-Instruct"

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="auto"
)

e5_model = AutoModelForCausalLM.from_pretrained(
    'intfloat/e5-mistral-7b-instruct',
    torch_dtype="auto",
    device_map="auto"
)

e5_model_classification = AutoModelForSequenceClassification.from_pretrained(
    'intfloat/e5-mistral-7b-instruct', num_labels=2,
    torch_dtype="auto",
    device_map="auto"
)

outputs = e5_model_classification(input_ids=model_input["input_ids"], attention_mask=model_input["attention_mask"])

param_generator = e5_model_classification.named_parameters()
all_params = list(param_generator)

e5_embedding_name = all_params[-2][0]
e5_embedding = all_params[-2][1]
e5_embedding_result = '|'.join(map(str, e5_embedding.tolist())) # should pass throught mlp, or the numbers are 5.1 5, too big

output_logits = outputs.logits
output_prob = F.softmax(output_logits, dim=1)
output_prob = output_prob[:, 1]




# 把Embedding输出的模型改为Prompt预测分类

In [1]:
%%writefile train_script_prompt_e5_0.py
# %%writefile train_script_optimized_e5_2.py
# %%writefile train_script_optimized_e5_3.py

import os
import argparse
import json
from datetime import timedelta
from tqdm import tqdm
import multiprocessing
import shutil
import torch
import torch.distributed as dist
import torch.optim as optim

from torch.utils.data import DistributedSampler,Dataset, DataLoader
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.fully_sharded_data_parallel import MixedPrecision
from transformers.models.mixtral.modeling_mixtral import MixtralDecoderLayer

from transformers import AutoTokenizer, AutoModel, AutoModelForSequenceClassification, AutoModelForCausalLM, default_data_collator, get_cosine_schedule_with_warmup, AutoConfig, BitsAndBytesConfig
import gc
from transformers import MistralForCausalLM
from torch.utils.tensorboard import SummaryWriter

import subprocess
import utils
import copy
import pandas as pd

from torch.nn import functional as F

prompt_part1 = \
f'''You are a website spam expert. You are given information about a webpage to judge whether or not it is spam. 0 means nonspam and 1 means spam. Give your prediction after the <ANS>: tag.
    Url: {{Url}}
    UrlTitle: {{UrlTitle}}
    UrlSnippet: {{UrlSnippet}} 
    Site Content: {{FullBody}}
'''

prompt_part2 = \
f'''
What is your prediction <ANS>:{{Label}}'''

IGNORE_INDEX = -100  # The default setting in CrossEntropyLoss
# MAX_LENGTH_EVAL = 1024

class TrainingDataset(Dataset):
    def __init__(self, df, tokenizer, max_seq_length):
        self.df = df
        self.tokenizer = tokenizer
        self.max_seq_length = max_seq_length

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        def apply_prompt1_template(sample):
            return prompt_part1.format(Url=sample['Url'],
                                        UrlTitle=sample['UrlTitle'],
                                        UrlSnippet=sample['UrlSnippet'],
                                        FullBody=sample['FullBody'])
        
        def apply_prompt2_template(sample):
            return prompt_part2.format(Label=sample['Label'])
        
        
        row = self.df.iloc[idx]
        text_part_1 = apply_prompt1_template(row[["Url","UrlTitle","UrlSnippet","FullBody"]])
        text_part_2 = apply_prompt2_template(row[["Label"]])

        res = self.tokenizer(f"{self.tokenizer.bos_token} {text_part_1}", text_part_2, add_special_tokens=False, max_length=self.max_seq_length, padding='max_length', truncation='only_first')
        
        labels = torch.tensor(copy.deepcopy(res['input_ids']), dtype=torch.int64)
        actual_token_len = sum(res['attention_mask'])

        labels[:actual_token_len-1] = IGNORE_INDEX  
        labels[actual_token_len:] = IGNORE_INDEX
        
        return {
            'input_ids': torch.tensor(res['input_ids']),
            'attention_mask': torch.tensor(res['attention_mask']),
            'labels':labels
        }



def setup(args):
    # setup distributed environment
    world_size = int(os.environ["WORLD_SIZE"])  # 参与训练的总进程数，即GPU数量。在分布式训练中，每个GPU对应一个进程。因此，world_size是你在分布式训练中使用的GPU数量
    rank = int(os.environ["RANK"])      # rank是每个进程的唯一标识符，用于区分不同的进程。在分布式训练中，每个进程都有一个独特的rank，从0到world_size-1
    local_rank = int(os.environ["LOCAL_RANK"])  # local_rank:是在单个节点内部的进程的本地标识符。如果你在多个节点上运行分布式训练，每个节点都有自己的local_rank。
    # 比如，你在2台服务器上运行分布式训练，每台服务器有4个GPU。那么总的world_size是8，进程的rank分别是[0,1,2,3,4,5,6,7]。在每台服务器上local_rank分别是[0,1,2,3]
    print(f"World size: {world_size}, rank: {rank}, local rank: {local_rank}")

    timeout = timedelta(hours=5) # 时间间隔对象，表示在初始化分布式进程组时等待的最大时间。这里表示如果在5小时内无法成功初始化分布式进程组，将引发超时错误
    dist.init_process_group("nccl", timeout=timeout, rank=rank, world_size=world_size) # 初始化分布式进程组，nccl表示用用NVIDIA Collective Communications Library作为后端，rank表示当前进程的标识符，world_size表示总进程数
    assert torch.distributed.is_initialized()

    torch.cuda.set_device(local_rank)  # 设置当前进程使用的GPU设备，local_rank表示当前节点内部的进程的本地标识符
    torch.cuda.empty_cache()    # 用于清空GPU缓存，以释放内存

    if is_master(rank): # 只有当前是主进程时才会执行打印参数的操作
        print(args)
    
    # setup tokenizer and dataloader
    
    # if args.model == 'llama2':
    #     tokenizer = LlamaTokenizer.from_pretrained(args.load_from)
    # elif args.model == 'mistral':
    #     tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1")
    # elif args.model == 'turing':
    #     # tokenizer = TNLGv4Tokenizer()
    #     tokenizer = GPT2TokenizerFast.from_pretrained('Xenova/gpt-4')
    # elif args.model == 'mixtral':
    #     tokenizer = AutoTokenizer.from_pretrained("mistralai/Mixtral-8x7B-v0.1")
    # else:
    #     print(f"model {args.model} not supported")
    #     raise NotImplementedError
    tokenizer = AutoTokenizer.from_pretrained(args.load_from,add_bos_token=True)
    tokenizer.padding_side = 'left'
    tokenizer.pad_token = tokenizer.eos_token
    
    # setup model and fsdp
    if args.model == 'llama2':
        model = LlamaForCausalLM.from_pretrained(args.load_from, load_in_8bit=False, device_map=None, torch_dtype=torch.bfloat16, use_cache=True)
        if args.neftune_alpha is not None:
            print('NEFTUNE enabled')
            # Save the old forward function as a class attribute
            torch.nn.Embedding.old_forward = model.model.embed_tokens.forward

            # Define the new forward function
            def new_forward(self, x):
                # Call the old forward function and get its output
                #print('neftune forward')
                out = self.old_forward(x)
                dims = torch.tensor(out.size(1) * out.size(2))
                mag_norm = args.neftune_alpha / torch.sqrt(dims)
                return out + torch.zeros_like(out).uniform_(-mag_norm, mag_norm)

            # Replace the forward function of the embedding object with the new one
            model.model.embed_tokens.forward = new_forward.__get__(model.model.embed_tokens, torch.nn.Embedding)
    elif args.model == "mistral":
        # e5
        # model = MistralForCausalLM.from_pretrained(args.load_from, load_in_8bit=False, device_map=None, torch_dtype=torch.float16, use_cache=True)
        # model = AutoModel.from_pretrained(args.load_from, load_in_8bit=False, device_map=None, torch_dtype=torch.float16)  # device_map=f'cuda:{local_rank}'
        model = AutoModel.from_pretrained(args.load_from, load_in_8bit=False, device_map=f'cuda:{local_rank}', torch_dtype=torch.float16) 
        # model = AutoModelForSequenceClassification.from_pretrained(args.load_from, num_labels=2, load_in_8bit=False, device_map=f'cuda:{local_rank}', torch_dtype=torch.float16) # zhym
        # model_state_dict = torch.load(args.model_path)
        # model.load_state_dict(model_state_dict)
    elif args.model == "mixtral":
        model = AutoModelForCausalLM.from_pretrained(args.load_from,
                                                    load_in_8bit=False, # 让模型不要以8位精度加载，使用默认精度加载权重
                                                    device_map=None,    # 没有自定义的设备映射，此时如果你在多个GPU上运行，模型的参数将均匀分配到所有可用的GPU上，适用于参数量大的模型
                                                    torch_dtype=torch.bfloat16,  # 用16位浮点数作为权重数据类型
                                                    use_cache=True  # 模型将使用缓存来加速计算
                                                    )
    else:
        print(f"model {args.model} not supported")
        raise NotImplementedError
    
    if is_master(rank):
        print(f"model config: {model.config}")
        print(model)
        
    # model.config.pad_token_id = model.config.bos_token_id  # 先注释掉看看
    model.to(dtype=torch.bfloat16)

    #utils.apply_fsdp_checkpointing(model) if args.model == 'llama2' else model.gradient_checkpointing_enable() # for turing
    # model.gradient_checkpointing_enable()
    #print(f"Gradient Checkpointing: {model.is_gradient_checkpointing}")

    model = FSDP(
        model,
        auto_wrap_policy=utils.get_mistral_wrapper(),
        #mixed_precision=mixed_precision_policy,
        mixed_precision=None,
        sharding_strategy=utils.fsdp_config.sharding_strategy,
        device_id=torch.cuda.current_device(),
        limit_all_gathers=True,
        sync_module_states=False,
        param_init_fn=None
    )

    # Mistral-E5
    utils.apply_fsdp_checkpointing(model) 
    
    # setup optimizer
    optimizer = optim.AdamW(
        model.parameters(),
        lr=args.lr,
        weight_decay=0.0,
    )
    
    # df = pd.read_csv("/cosmos/local/IndexQuality/FinetuneLLM/TrainingData/O1_A3_crowd_training_data_1.tsv",sep="\t")
    df = pd.read_csv(args.training_data_from, sep='\t', usecols=args.columns)
    df = df[df['Label'].notna()]
    print(f'training dataset length: {len(df)}')
    print(f'training data columns: {df.columns}')
    print(f'training data pos/neg counts: {df.Label.value_counts()}')
    
    train_ds =  TrainingDataset(df, tokenizer, args.max_seq_length)
    
    train_sampler = DistributedSampler(
        train_ds,
        rank=rank,
        num_replicas=world_size,
        shuffle=True,
    )
    
    train_dataloader = torch.utils.data.DataLoader(
        train_ds,
        batch_size=args.batch_size,
        num_workers=args.num_workers,  # 4
        pin_memory=True,
        sampler=train_sampler,
        drop_last=True,
        collate_fn=default_data_collator,
    )

    total_iteration = args.num_epochs * (len(train_dataloader)/(args.batch_size*args.gpu_counts))
    scheduler = get_cosine_schedule_with_warmup(
        optimizer, 
        num_warmup_steps=len(train_dataloader) * args.warmup,
        num_training_steps=len(train_dataloader) * args.num_epochs
        # num_warmup_steps=total_iteration*0.1,
        # num_training_steps=total_iteration
    )

    return model, train_dataloader, optimizer, scheduler, local_rank, rank, world_size, tokenizer


def is_master(rank):
    # In a multi-node setup, the master process is rank 0
    return rank == 0
        
class RunningMean(object):
    def __init__(self, local_rank, N=100):
        self.N = N
        self.local_rank = local_rank
        self.data = []

    def add(self, x):
        self.data.append(x)

    def mean_all_rank(self):
        d = self.data[-self.N:]
        if len(d) > 0:
            m = sum(d) / len(d)
        else:
            m = 0
        t = torch.tensor([m]).to(self.local_rank) # 将m转换为PyTorch张量，并将其移到self.local_rank对应的设备上
        all_t = torch.zeros(dist.get_world_size(), dtype=t.dtype).to(self.local_rank)   # 创建一个全零张量all_t，大小为分布式环境中的进程数
        dist.all_gather_into_tensor(all_t, t)   # 将所有进程的m收集到all_t中，并计算所有进程的平均值，返回一个CPU上的标量值
        return all_t.mean().cpu().item()
            
def train(args, model, train_dataloader, optimizer, scheduler, local_rank, rank, world_size, tokenizer):
    rm = RunningMean(local_rank)
    global_step = 0

    if global_step == 0 and is_master(rank):
        # Initialize TensorBoard writer only on the master process
        print('init tensorboard,', args.experiment_name)
        writer = SummaryWriter(args.log_dir)
        # with open(os.path.join(args.output_dir, "log.csv"), 'w') as f_log:
        #     f_log.write('global_step, total_loss\n')

    gradient_accumulation_steps = args.gradient_accumulation_steps  # bs = 10
    for epoch in range(args.num_epochs):
        for step, data in tqdm(enumerate(train_dataloader), total=len(train_dataloader), disable=not is_master(rank), desc=f'Epoch {epoch}/{args.num_epochs}'):
            model.train()
                    
            loss = model(**data).loss
            # accululating gradients over steps
            if gradient_accumulation_steps > 1:
                loss = loss / gradient_accumulation_steps
            loss.backward()
            if step % gradient_accumulation_steps == 0:
                torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
                optimizer.step()
                optimizer.zero_grad()
                scheduler.step()
            
            rm.add(loss.item())
            mean_loss = rm.mean_all_rank()
            if is_master(rank):
                print(f'loss: {mean_loss}')
                writer.add_scalar('Loss/train', mean_loss, global_step)      
                # with open(os.path.join(args.output_dir, "log.csv"), "a") as f_log:
                #     f_log.write(str(global_step)+","+str(mean_loss)+"\n")
            
            if(global_step%(args.save_checkpoint_steps*gradient_accumulation_steps)==0 and global_step>0):
                dir_name = 'model_'+str(global_step)
                checkpoint_dir = os.path.join(args.output_dir, dir_name)
                if not os.path.exists(checkpoint_dir):
                    os.makedirs(checkpoint_dir, exist_ok=True)
                print("saving checkpoint...")
                utils.save_model_checkpoint(model, checkpoint_dir, rank)
                
                
            dist.barrier()
            global_step += 1
        
    if is_master(rank):
        writer.flush()
        writer.close()

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    # parser.add_argument('--load_from', type=str, default='/data/local/IndexQuality/FinetuneLLM/Phi-3-medium')
    parser.add_argument('--load_from', type=str, default='intfloat/e5-mistral-7b-instruct')
    parser.add_argument('--model_path', type=str, default='/cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/Mixtral_New_ym_e5/current_best_1200/pytorch_model.bin')
    parser.add_argument('--model', type=str, default='mixtral')
    parser.add_argument('--num_workers', type=int, default=4)
    parser.add_argument('--warmup', type=float, default=0.1)
    parser.add_argument('--training_data_from', type=str, default="/cosmos/local/IndexQuality/FinetuneLLM/TrainingData/O1_A3_crowd_llm_training_data_no_eval_dataset_overlap.tsv")
    parser.add_argument('--columns', '-c', nargs='+', required=True, help="Names of columns to read")
    parser.add_argument('--batch_size', type=int, default=10) # 8->10 
    parser.add_argument('--gradient_accumulation_steps', type=int, default=2)
    parser.add_argument('--num_epochs', type=int, default=2)
    parser.add_argument('--max_seq_length', type=int, default=1024)
    # parser.add_argument('--eval_steps', type=int, default=200)  # 平常隔200评估一次，如果效果比之前好，直接保存；否则按save_checkpoint_steps保存
    parser.add_argument('--save_checkpoint_steps', type=int, default=100)
    parser.add_argument('--lr', type=float, default=1e-5)
    parser.add_argument('--gpu_counts', type=float, default=16) # warmup=0.1
    parser.add_argument('--output_dir', type=str, required=True)
    parser.add_argument('--experiment_name', type=str, required=True)
    parser.add_argument('--log_dir', type=str, required=True)
    args = parser.parse_args()
    train(args, *setup(args))
    
# singularity Command
# pip install transformers[torch]==4.38.1 datasets scikit-learn dataclasses lightgbm matplotlib mlflow tensorboard && cd /cosmos/local/IndexQuality/ContentModels/DataAugmentation/data/CBSpam_v3/Code/FinetuneLLM-US/new/ && CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15 torchrun --nnodes 1 --nproc_per_node 16 train_script_optimized_test.py --load_from /cosmos/local/IndexQuality/FinetuneLLM/Mixtral-8x7B-Instruct-v0.1/ --model mixtral --num_workers 4 --training_data_from /cosmos/local/IndexQuality/FinetuneLLM/TrainingData/O1_A3_crowd_llm_training_data_no_eval_dataset_overlap.tsv --batch_size 10 --gradient_accumulation_steps 2 --num_epochs 2 --save_checkpoint_steps 200 --lr 1e-5 --output_dir /cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/Mixtral_New_ym_bs_10_test --experiment_name MixtralTrain_ym --disable_tensorboard False --log_dir /cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/Mixtral_New_ym_bs_10_test/log
# current running: Mixtral_again: https://ml.azure.com/runs/Mixtral_again?wsid=/subscriptions/7972af26-e54d-410e-a755-20e582a46de0/resourceGroups/singularity-webdata/providers/Microsoft.MachineLearningServices/workspaces/singularity-webdata-ws01-eastus2&flight=1ptraining&tid=72f988bf-86f1-41af-91ab-2d7cd011db47

Writing train_script_prompt_e5_0.py


In [9]:
!export CUDA_VISIBLE_DEVICES="0,1,2,3"
!NCCL_DEBUG=WARN python -m torch.distributed.run  \
--master_port 29501 --nnodes 1 --nproc_per_node 4 train_script_optimized_Qwen_1.py \
--load_from Qwen/Qwen2.5-0.5B-Instruct \
--model qwen --num_workers 4 \
--training_data_from /cosmos/local/IndexQuality/FinetuneLLM/TrainingData/No_overlap_complete_O1_A3_crowd_training.tsv \
--columns Url UrlTitle UrlSnippet FullBody Label \
--batch_size 20 \
--gradient_accumulation_steps 1 \
--num_epochs 5 \
--max_seq_length 1024 \
--save_checkpoint_steps 200 \
--lr 1e-5 \
--gpu_counts 3 \
--output_dir /cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/E5_prompt_v0 \
--experiment_name mistral_e5_v7 \
--log_dir /cosmos/local/IndexQuality/FinetuneLLM/FullTrainTest/E5_prompt_v0/logs



*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
[2024-10-10 04:08:02,231] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2024-10-10 04:08:02,276] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2024-10-10 04:08:02,300] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2024-10-10 04:08:02,310] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect)
World size: 4, rank: 3, local rank: 3
World size: 4, rank: 2, local rank: 2
World size: 4, rank: 0, local rank: 0
World size: 4, rank: 1, local rank: 1
Namespace(load_from='Qwen/Qwen2.5-0.5B-Instruct', model_path='/cosmos/loca