In [None]:
import json

with open("config.json", "r") as f:
    config_argument = json.load(f)

access_token = config_argument["huggingface_access_token"]

task1_train_data_path_txt = config_argument["model_train_task1_data_path_txt"]
task2_train_data_path_txt = config_argument["model_train_task2_data_path_txt"]

task1_val_data_path_txt = config_argument["model_val_task1_data_path_txt"]
task2_val_data_path_txt = config_argument["model_val_task2_data_path_txt"]

answer_val_data_path_txt = config_argument["answer_val_data_path_txt"]

model_save_path = config_argument["model_save_path"]
model_logging_dir = config_argument["model_logging_dir"]


print( "access_token: ", access_token )
print( "task1_train_data_path: ", task1_train_data_path_txt )
print( "task2_train_data_path: ", task2_train_data_path_txt )
print( "task1_val_data_path: ", task1_val_data_path_txt )
print( "task2_val_data_path: ", task2_val_data_path_txt )
print( "answer_val_data_path: ", answer_val_data_path_txt )
print( "model_save_path: ", model_save_path )
print( "model_logging_dir: ", model_logging_dir )


In [None]:
from transformers import AutoTokenizer, AutoModelForTokenClassification, AutoConfig
from transformers import pipeline

access_token = access_token

model_id = "xlm-roberta-large-finetuned-conll03-english"  # xlm-roberta-large-finetuned-conll03-english

from transformers import pipeline
tokenizer = AutoTokenizer.from_pretrained(model_id, token=access_token ) #padding=True,truncation=True,max_length=128
model = AutoModelForTokenClassification.from_pretrained(model_id, token=access_token)
classifier = pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple")
classifier("Alya told Jasmine that Andrew could pay with cash..")

In [None]:
model

In [None]:
label_map = {
    0: 'O',
    1: 'B-PATIENT', 2: 'I-PATIENT',
    3: 'B-DOCTOR', 4: 'I-DOCTOR',
    5: 'B-USERNAME', 6: 'I-USERNAME',
    7: 'B-FAMILYNAME', 8: 'I-FAMILYNAME',
    9: 'B-PERSONALNAME', 10: 'I-PERSONALNAME',
    11: 'B-PROFESSION', 12: 'I-PROFESSION',
    13: 'B-ROOM', 14: 'I-ROOM',
    15: 'B-DEPARTMENT', 16: 'I-DEPARTMENT',
    17: 'B-HOSPITAL', 18: 'I-HOSPITAL',
    19: 'B-ORGANIZATION', 20: 'I-ORGANIZATION',
    21: 'B-STREET', 22: 'I-STREET',
    23: 'B-CITY', 24: 'I-CITY',
    25: 'B-DISTRICT', 26: 'I-DISTRICT',
    27: 'B-COUNTY', 28: 'I-COUNTY',
    29: 'B-STATE', 30: 'I-STATE',
    31: 'B-COUNTRY', 32: 'I-COUNTRY',
    33: 'B-ZIP', 34: 'I-ZIP',
    35: 'B-LOCATION-OTHER', 36: 'I-LOCATION-OTHER',
    37: 'B-AGE', 38: 'I-AGE',
    39: 'B-DATE', 40: 'I-DATE',
    41: 'B-TIME', 42: 'I-TIME',
    43: 'B-DURATION', 44: 'I-DURATION',
    45: 'B-SET', 46: 'I-SET',
    47: 'B-PHONE', 48: 'I-PHONE',
    49: 'B-FAX', 50: 'I-FAX',
    51: 'B-EMAIL', 52: 'I-EMAIL',
    53: 'B-URL', 54: 'I-URL',
    55: 'B-IPADDRESS', 56: 'I-IPADDRESS',
    57: 'B-SOCIAL_SECURITY_NUMBER', 58: 'I-SOCIAL_SECURITY_NUMBER',
    59: 'B-MEDICAL_RECORD_NUMBER', 60: 'I-MEDICAL_RECORD_NUMBER',
    61: 'B-HEALTH_PLAN_NUMBER', 62: 'I-HEALTH_PLAN_NUMBER',
    63: 'B-ACCOUNT_NUMBER', 64: 'I-ACCOUNT_NUMBER',
    65: 'B-LICENSE_NUMBER', 66: 'I-LICENSE_NUMBER',
    67: 'B-VEHICLE_ID', 68: 'I-VEHICLE_ID',
    69: 'B-DEVICE_ID', 70: 'I-DEVICE_ID',
    71: 'B-BIOMETRIC_ID', 72: 'I-BIOMETRIC_ID',
    73: 'B-ID_NUMBER', 74: 'I-ID_NUMBER',
    75: 'B-OTHER', 76: 'I-OTHER'
}

In [None]:
from transformers import XLMRobertaForTokenClassification
import torch.nn as nn
from transformers.modeling_outputs import TokenClassifierOutput
import torch.nn.functional as F # Import F



from transformers import XLMRobertaForTokenClassification, AutoConfig
import torch.nn as nn
from transformers.modeling_outputs import TokenClassifierOutput
import torch


class XLMRobertaForTokenClassificationWithClassWeight(XLMRobertaForTokenClassification):
    def __init__(self, config, class_weights=None):
        super().__init__(config)
        self.num_labels = config.num_labels

        if class_weights is not None:
            self.loss_fct = nn.CrossEntropyLoss(weight=class_weights, ignore_index=-100)
        else:
            self.loss_fct = nn.CrossEntropyLoss(ignore_index=-100)

    def forward(self, input_ids=None, attention_mask=None, labels=None):

        outputs = super().forward(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels,  # 必須保留
        )

        logits = outputs.logits
        loss = None
        if labels is not None:
            loss = self.loss_fct(logits.contiguous().view(-1, self.num_labels), labels.contiguous().view(-1))

        return TokenClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

# 你改 num_labels
num_labels = len(label_map)  # 你想要的類別數，舉例

model_name = "FacebookAI/xlm-roberta-large-finetuned-conll03-english"
config = AutoConfig.from_pretrained(model_name, num_labels=num_labels, token=access_token)

config.id2label = label_map
config.label2id = {v:k for k,v in label_map.items()}



In [None]:


print("新的 id2label:", model.config.id2label)
print("新的 label2id:", model.config.label2id)


# **資料準備**

---



In [None]:
def Caculate_Wav_File_Times( inputs ) :

        read = inputs

        dict_times = {}
        for line in read:
            line = line.strip()
            line_split = line.split('\t')

            if line_split[0] not in dict_times :
                dict_times[line_split[0]] = 1
            else:
                dict_times[line_split[0]] = dict_times[line_split[0]]  + 1

        return dict_times

In [None]:
# with open( "/content/task2_answer.txt", "r", encoding="utf-8" ) as f :
#   data = f.readlines()

def Prepare_Task2_NER(data, is_big=False) :
  data_times_dict = Caculate_Wav_File_Times( data )


  data_list = {}
  temp_dict = {}
  temp_list = []

  while data :

    times = data_times_dict[data[0].split('\t')[0]]

    for i in range( times  ) :

      line = data[i]


      line = line.strip()
      line_split = line.split("\t")

      if is_big :
        big_label = Change_small_label_to_big_label( line_split[1] )
        temp_dict[ line_split[4] ] = big_label
      else :
        temp_dict[ line_split[4] ] = line_split[1]

      temp_list.append( temp_dict )
      temp_dict = {}

    data_list[ data[0].split('\t')[0] ] = temp_list
    temp_list = []


    data = data[times:]


  print(data_list)

  return data_list



In [None]:
#en
with open( task2_train_data_path_txt,  "r", encoding="utf-8" ) as f :
  data = f.readlines()

print(len(data))

# with open( "/content/drive/MyDrive/AICUP_DATA/en-dataset/3_fold/hold_2/task2_answer_change.txt", "r", encoding="utf-8" ) as f :
#   data = data + f.readlines()


# with open( "/content/drive/MyDrive/AICUP_DATA/en-dataset/3_fold/hold_3/task2_answer_change.txt", "r", encoding="utf-8" ) as f :
#   data = data + f.readlines()


data_list = Prepare_Task2_NER( data )

In [None]:
#en
with open( task2_val_data_path_txt, "r", encoding="utf-8" ) as f :
  val_data = f.readlines()

print(len(val_data))


val_data_list = Prepare_Task2_NER( val_data )


In [None]:
def Prepare_Task1_NER( data, data_list):
  train_data = []

  for i in data:
      # print( i )
      line = i.strip()
      line_split = line.split("\t")

      name = line_split[0]
      text = line_split[1]

      tokens = tokenizer(text.strip(), return_offsets_mapping=True, return_tensors="pt", truncation=True, add_special_tokens=True)
      offsets = tokens["offset_mapping"][0].tolist()
      input_ids = tokens["input_ids"][0].tolist()
      token_texts = tokenizer.convert_ids_to_tokens(input_ids)

      # 初始化 label
      label = [config.label2id["O"]] * len(input_ids)
      label[0] = -100
      label[-1] = -100

      input_ids = tokens["input_ids"]
      attention_mask = tokens["attention_mask"]


      # print(f"name: {name}")
      # print(f"text: {text}")
      # print(f"offsets: {offsets}")
      # print(tokens.tokens())


      # 檢查是否有標註資料
      if name not in data_list:
          train_data.append({
              "input_ids": input_ids[0].tolist(),
              "labels": label,
              "attention_mask": attention_mask[0].tolist()
          })
          continue

      # 將標註合併為 (start, end, tag) 的格式
      entities = []
      used_indices = set()  # 防止重複使用相同文字
      for ent in data_list[name]:
          for word, tag in ent.items():
              # 用 sliding window 尋找沒使用過的 word 位置
              start = -1
              for idx in range(len(text)):
                  if idx in used_indices:
                      continue
                  if text[idx:idx+len(word)] == word:
                      start = idx
                      # 標記這些字元位置已經用過
                      used_indices.update(range(start, start+len(word)))
                      break
              if start != -1:
                  end = start + len(word)
                  entities.append((start, end, tag))
              else:
                  print(f"[未找到實體] name={name}, word='{word}', tag='{tag}'")
                  print(f"→ 原始句子：{text}")
                  print(text[idx:idx+len(word)])

      # print(f"name: {name}")
      # print(f"text: {text}")
      # print(f"entities: {entities}")

      # 比對 offset 和 entity span，標註 label
      for idx, (start, end) in enumerate(offsets):
          if start == end:
              continue
          for ent_start, ent_end, tag in entities:
              if start == ent_start:
                  label[idx] = config.label2id[f"B-{tag}"]
                  break
              elif ent_start < start < ent_end:
                  label[idx] = config.label2id[f"I-{tag}"]
                  break

      train_data.append({
          "input_ids": input_ids[0].tolist(),
          "labels": label,
          "attention_mask": attention_mask[0].tolist()
      })

  return train_data


In [None]:
#en
with open( task1_train_data_path_txt, "r", encoding="utf-8" ) as f :
  data = f.readlines()

print( len(data) )

# with open( "/content/drive/MyDrive/AICUP_DATA/en-dataset/3_fold/hold_2/task1_answer_change.txt", "r", encoding="utf-8" ) as f :
#   data = data + f.readlines()

# with open( "/content/drive/MyDrive/AICUP_DATA/en-dataset/3_fold/hold_3/task1_answer_change.txt", "r", encoding="utf-8" ) as f :
#   data = data + f.readlines()


# print( len(data) )

train_data = Prepare_Task1_NER( data, data_list )


In [None]:
#en
with open( task1_val_data_path_txt, "r", encoding="utf-8" ) as f :
  val_data = f.readlines()

print( len(val_data) )


test_data = Prepare_Task1_NER( val_data, val_data_list )

In [None]:
!pip install datasets

In [None]:
from datasets import Dataset
data_train = Dataset.from_list(train_data)
data_test = Dataset.from_list(test_data)

In [None]:
data_train, data_test

In [None]:
print( data_train[100])

# **WEIGHT SETTING**

In [None]:
from collections import Counter
import numpy as np

# 假設你的訓練集是 train_dataset（即 DatasetDict()["train"]）
train_dataset = data_train  # 或者你手上變數名稱

# 收集所有標籤（排除掉 -100）
all_labels = []
for example in train_dataset:
    all_labels += [label for label in example["labels"] if label != -100]

y_all = np.array(all_labels)


In [None]:
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
import torch

# y_all: 所有出現的標籤（不含 -100）
unique_labels = np.unique(y_all)

# 計算已出現類別的權重
weights = compute_class_weight(class_weight='balanced', classes=unique_labels, y=y_all)

# 創建完整長度的權重向量，預設為 1（或 0）
class_weights = np.ones(num_labels, dtype=np.float32)

# 把已出現類別的權重放回對應的位置
for idx, label in enumerate(unique_labels):
    class_weights[label] = weights[idx]

class_weights = torch.tensor(class_weights)


In [None]:

model = XLMRobertaForTokenClassificationWithClassWeight.from_pretrained(model_name, config=config, class_weights=class_weights, token=access_token, ignore_mismatched_sizes=True)

# **TRAIN**

In [None]:
import torch.nn as nn
from transformers import DataCollatorForTokenClassification
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)


from transformers import AutoModelForTokenClassification

In [None]:
def Process_Predict_Ner(pre):
    answer_list = []
    current_entity = None
    current_word = ""
    start_pos = None
    end_pos = None

    for dic in pre:
        entity_type = dic['entity']
        raw_word = dic['word']
        word = raw_word.replace("▁", "")
        token_start = dic.get('start')
        token_end = dic.get('end')
        has_space = raw_word.startswith("▁")

        if entity_type.startswith("B-"):
            if current_entity and current_word:
                answer_list.append({
                    "entity": current_entity,
                    "word": current_word,
                    "start": start_pos,
                    "end": end_pos
                })
            current_entity = entity_type.replace("B-", "")
            current_word = word
            start_pos = token_start
            end_pos = token_end

        elif entity_type.startswith("I-"):
            ent = entity_type.replace("I-", "")
            if current_entity == ent:
                if has_space:
                    current_word += " " + word
                else:
                    current_word += word
                end_pos = token_end
            else:
                if current_entity and current_word:
                    answer_list.append({
                        "entity": current_entity,
                        "word": current_word,
                        "start": start_pos,
                        "end": end_pos
                    })
                current_entity = ent
                current_word = word
                start_pos = token_start
                end_pos = token_end

        else:  # O
            if current_entity and current_word:
                answer_list.append({
                    "entity": current_entity,
                    "word": current_word,
                    "start": start_pos,
                    "end": end_pos
                })
            current_entity = None
            current_word = ""
            start_pos = None
            end_pos = None

    # 收尾
    if current_entity and current_word:
        answer_list.append({
            "entity": current_entity,
            "word": current_word,
            "start": start_pos,
            "end": end_pos
        })

    return answer_list

In [None]:
def get_level2_entities_normal(model, tokenizer, sentence, label_map):
    device = next(model.parameters()).device  # 取得 model 裝置

    # 1. Tokenize with offsets
    encoding = tokenizer(sentence, return_tensors="pt", return_offsets_mapping=True, truncation=True)
    input_ids = encoding["input_ids"].to(device)          # 放到 GPU
    attention_mask = encoding["attention_mask"].to(device)  # 放到 GPU
    offsets = encoding["offset_mapping"][0].tolist()
    tokens = tokenizer.convert_ids_to_tokens(input_ids[0].cpu())  # token ids 放 CPU 才能用 tokenizer

    # 2. Model forward
    model.eval()
    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits  # [batch_size, seq_len, num_labels]

    preds = torch.argmax(logits, dim=2)[0].cpu().numpy()  # 預測結果放回 CPU

    results = []
    for idx, (pred_id, offset) in enumerate(zip(preds, offsets)):
        token_id = input_ids[0, idx].item()

        # 跳過特殊 token 或無效 offset
        if token_id in [tokenizer.pad_token_id, tokenizer.cls_token_id, tokenizer.sep_token_id]:
            continue

        start, end = offset
        entity = label_map.get(pred_id, "O")

        if entity != "O":
            probs = torch.softmax(logits[0, idx], dim=0)
            score = probs[pred_id].item()

            results.append({
                "entity": entity,
                "score": np.float32(score),
                "index": idx,
                "word": tokens[idx],  # 更準確
                "start": start,
                "end": end
            })

    return results


In [None]:
import numpy as np
import pandas as pd

def calculate_overlap(pred_start, pred_end, gt_start, gt_end):
    """計算兩個時間區間的重疊長度"""
    overlap_start = max(pred_start, gt_start)
    overlap_end = min(pred_end, gt_end)
    overlap = max(0, overlap_end - overlap_start)
    return overlap

def evaluate_task2( ground_truth_file, model, tokenizer ) :



    answer = ""

    for text in val_data :

      answer_list = []

      text_split = text.strip().split("\t")
      name = text_split[0]
      text = text_split[1]

      pre = get_level2_entities_normal(model, tokenizer, text, label_map)
      if len(pre) != 0:
        answer_list = Process_Predict_Ner(pre)

      for i in answer_list:
        answer += f"{name}\t{i['entity']}\t{i['start']}\t{i['end']}\t{i['word']}\n"

    prediction_file = "model_eval.txt"
    with open( prediction_file, "w", encoding="utf-8") as f:
      f.write(answer)


    # 讀取預測和真實標籤數據
    import csv
    pred_df = pd.read_csv(
          prediction_file,
          sep='\t',
          header=None,
          names=['id', 'type', 'start', 'end', 'content'],
          quoting=csv.QUOTE_NONE,        # 不解析引號
          encoding='utf-8',              # 或試 utf-8-sig
          on_bad_lines='skip',           # 跳過爛行
          engine='python'                # 更寬容的 parser
      )
    gt_df = pd.read_csv(ground_truth_file, sep='\t', header=None,
                       names=['id', 'type', 'start', 'end', 'content'])

    # 獲取所有獨特的SHI類型
    all_types = sorted(set(gt_df['type'].unique()) | set(pred_df['type'].unique()))

    # 初始化每種類型的指標
    metrics = {shi_type: {'tp': 0, 'fp': 0, 'fn': 0} for shi_type in all_types}

    # 按音頻ID分組處理
    unique_ids = sorted(set(gt_df['id'].unique()) | set(pred_df['id'].unique()))

    for audio_id in unique_ids:
        gt_records = gt_df[gt_df['id'] == audio_id].copy()
        pred_records = pred_df[pred_df['id'] == audio_id].copy()

        # 初始化匹配矩陣來追蹤已處理的預測和真實標籤
        gt_matched = [False] * len(gt_records)
        pred_matched = [False] * len(pred_records)

        # 計算True Positives和部分False Positives/False Negatives
        for i, pred_row in enumerate(pred_records.itertuples()):
            pred_type = pred_row.type
            pred_start = pred_row.start
            pred_end = pred_row.end
            pred_duration = pred_end - pred_start

            best_overlap = 0
            best_gt_idx = -1

            # 找到與當前預測重疊最大的真實標籤
            for j, gt_row in enumerate(gt_records.itertuples()):
                if gt_row.type != pred_type:
                    continue

                overlap = calculate_overlap(pred_start, pred_end, gt_row.start, gt_row.end)
                if overlap > best_overlap:
                    best_overlap = overlap
                    best_gt_idx = j

            if best_gt_idx >= 0:  # 找到部分匹配
                gt_row = gt_records.iloc[best_gt_idx]
                gt_duration = gt_row.end - gt_row.start

                # 計算 True Positive
                metrics[pred_type]['tp'] += best_overlap

                # 計算 False Positive (對於部分匹配，類型相同)
                metrics[pred_type]['fp'] += pred_duration - best_overlap

                # 計算 False Negative (對於部分匹配，類型相同)
                metrics[pred_type]['fn'] += gt_duration - best_overlap

                # 標記已處理
                gt_matched[best_gt_idx] = True
                pred_matched[i] = True
            else:
                # 完全不匹配或者類型不同：整個預測為False Positive
                metrics[pred_type]['fp'] += pred_duration

        # 處理未匹配的真實標籤 (False Negatives)
        for j, matched in enumerate(gt_matched):
            if not matched:
                gt_row = gt_records.iloc[j]
                gt_type = gt_row.type
                gt_duration = gt_row.end - gt_row.start
                metrics[gt_type]['fn'] += gt_duration

        # 處理與類型不同的預測 (False Positives)
        for i, (matched, pred_row) in enumerate(zip(pred_matched, pred_records.itertuples())):
            if matched:
                continue

            # 檢查是否有與其他類型匹配
            pred_type = pred_row.type
            pred_start = pred_row.start
            pred_end = pred_row.end
            pred_duration = pred_end - pred_start

            for gt_row in gt_records.itertuples():
                if gt_row.type == pred_type:
                    continue  # 已在之前的步驟中處理過

                overlap = calculate_overlap(pred_start, pred_end, gt_row.start, gt_row.end)
                if overlap > 0:
                    # 類型不匹配但時間重疊：整個預測為False Positive
                    metrics[pred_type]['fp'] += pred_duration
                    break

    # 計算每種類型的Precision, Recall和F1
    f1_scores = []
    for shi_type in all_types:
        m = metrics[shi_type]
        precision = m['tp'] / (m['tp'] + m['fp']) if (m['tp'] + m['fp']) > 0 else 0
        recall = m['tp'] / (m['tp'] + m['fn']) if (m['tp'] + m['fn']) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        f1_scores.append(f1)

        # print(f"類型 {shi_type}:")
        # print(f"  Precision: {precision:.4f}")
        # print(f"  Recall: {recall:.4f}")
        # print(f"  F1: {f1:.4f}")
        # print(f"  TP: {m['tp']:.2f}, FP: {m['fp']:.2f}, FN: {m['fn']:.2f}")
        # print()

    # 計算宏平均F1
    macro_f1 = np.mean(f1_scores)
    # print(f"Macro-Average F1: {macro_f1:.4f}")

    return macro_f1



In [None]:
from transformers import TrainerCallback

class CharBasedEvaluationCallback(TrainerCallback):
    def __init__(self, task2_path, tokenizer):
        self.task2_path = task2_path
        self.tokenizer = tokenizer

    def on_evaluate(self, args, state, control, **kwargs):
        model = kwargs["model"]

        macro_f1 = evaluate_task2(self.task2_path, model, self.tokenizer)

        print(f"[Char-based Evaluation after epoch {state.epoch}]")
        print("Macro-F1:", macro_f1)

In [None]:
# ✅ 建立 Trainer
from transformers import TrainingArguments, Trainer



training_args = TrainingArguments(
    output_dir=model_save_path,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="epoch",
    logging_first_step=True,
    logging_dir=model_logging_dir,
    learning_rate=3e-5,                     # 微幅調高（視情況）3e-5
    num_train_epochs=30,                   # 避免一次就設 50
    weight_decay=0.03,                     # 適當正則化 0.03
    per_device_eval_batch_size=64,

)

task2_path = answer_val_data_path_txt

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=data_train,
    eval_dataset=data_test,
    data_collator=data_collator,
    tokenizer=tokenizer,
    callbacks=[CharBasedEvaluationCallback(task2_path, tokenizer)]
)


trainer.train()