#### Import Packages

In [4]:
import json
import numpy as np
import pandas as pd
from transformers import BertTokenizer, BertForTokenClassification, AdamW
from torch.utils.data import DataLoader, Dataset
import torch
from typing import List, Dict, Any, Tuple
import numpy as np
import torch.nn.functional as F
import torch.nn as nn

#### Read Data

In [5]:
# 載入訓練資料
with open(r'E:\NYCU-Project\Class\NLP\Dimensional ASTE\NYCU_NLP_113A_Dataset\NYCU_NLP_113A_TrainingSet.json', 'r', encoding='utf-8') as f:
    training_data = json.load(f)

# 載入測試資料
test_data = pd.read_csv(r'E:\NYCU-Project\Class\NLP\Dimensional ASTE\NYCU_NLP_113A_Dataset\NYCU_NLP_113A_Validation.txt')
test_df = pd.DataFrame(test_data)
testing_data = [f"{row[0]}{row[1]}" for index, row in test_df.iterrows()]
print(testing_data)

['R3530:S002 肉片有厚實的口感。', 'R3453:S019 而且蝦子肉質好又鮮甜。', 'R2774:S014 起司牛肉炊飯個人覺得普普同時也認為這一道放這裡真的會太飽。', 'R3634:S011 喝羹湯也會有飽足感。', 'R3674:S021 用餐體驗良好。', 'R3601:S021 蝦子外皮經高溫油炸香脆連殼都可以食用唷，鹹蛋黃完全沒有腥味，金沙口感細緻滑順。', 'R0920:S016 最喜歡這個小魚炒得酥酥又甜甜的，非常下飯。', 'R0713:S001 平價好吃的雞肉飯，嫩雞肉搭配半熟蛋太銷魂。', 'R0982:S001 可頌烤得酥脆，我覺得很加分如果他有單賣可頌我會買單。', 'R3985:S028 北海道昆布柴魚湯一碗沒有料的清湯，喝起來十分清淡也蠻空虛。', 'R3593:S011 蠔油芥蘭菜很脆。', 'R3645:S016 酥炸臭豆腐就夠水準可推。', 'R3941:S048 用料也真的超級海派除了五隻蝦還有魚肉小海鮮丸等，蛤蠣也放得不少。', 'R3554:S006 叉燒及牛肉肉片約三片，肉片很薄吃起了來有點失望。', 'R3955:S030 拿鐵口感溫順，咖啡香濃和綿密奶泡完美結合是好喝的。', 'R3337:S004 還有火腿也很乾沒什麼好吃。', 'R3681:S012 至於配菜也就是一般。', 'R3996:S045 第一次喝到這樣的湯十分特別。', 'R3962:S011 戰斧豬排的肉咬起來意外的軟嫩搭上附的黑蒜醬超好吃。', 'R1036:S006 蛋塔則中規中矩，不錯吃。', 'R3334:S049 麵包區的歐式麵包還蠻好吃的但單獨吃太乾了......', 'R3993:S021 三塊牛肉肉質完全軟爛好吃湯頭清甜。', 'R4025:S005 麻醬麵濃厚的麻醬香我很愛。', 'R3731:S006 主菜椒麻雞看起來就很好吃。', 'R3543:S013 抹茶拿鐵的味道有一點像粉泡的，不是抹茶加上牛奶。', 'R3603:S001 因為上次跟朋友在這兒聚餐吃到了超級無敵好吃的臘味煲仔飯!!', 'R3981:S040 沒想到蛋黃的部分超黏稠味道濃郁很好吃。', 'R3579:S009 馬斯卡邦乳酪餅乾太濕，信玄餅吃起來是很奇怪，可麗餅還拿到沒熟的。', 'R3970:S015 蔥爆牛肉偏甜我們覺得不是很對味。'

  testing_data = [f"{row[0]}{row[1]}" for index, row in test_df.iterrows()]


#### Code

In [6]:
def convert_to_position_aware_bio(data):
    sentences, bio_tags, position_info, intensity = [], [], [], []
    
    for entry in data:
        sentence = entry['Sentence']
        aspect_positions = entry['AspectFromTo']
        opinion_positions = entry['OpinionFromTo']
        aspect_intensity = entry['Intensity']

        # Initialize tags with 'O'
        tags = ['O'] * len(sentence)
        position_data = []
        intensity_data = []

        def apply_bio_tags(positions, tags, tag_prefix=''):
            for pos in positions:
                start, end = map(int, pos.split('#'))
                if start == end:
                    tags[start - 1] = f'{tag_prefix}S'  # Single token
                else:
                    tags[start - 1] = f'{tag_prefix}B'  # Beginning
                    for i in range(start, end - 1):
                        tags[i] = f'{tag_prefix}I'  # Inside
                    tags[end - 1] = f'{tag_prefix}E'  # End
            return tags

        # Apply BIO tags for aspects and opinions
        tags = apply_bio_tags(aspect_positions, tags, 'A-')  # Aspect tags
        tags = apply_bio_tags(opinion_positions, tags, 'O-')  # Opinion tags

        # Calculate position information
        for idx, opinion_pos in enumerate(opinion_positions):
            opinion_start, opinion_end = map(int, opinion_pos.split('#'))
            aspect_start = int(aspect_positions[idx].split('#')[0])
            
            # Calculate relative position and span length
            relative_position = opinion_start - aspect_start
            span_length = opinion_end - opinion_start
            position_data.append((relative_position, span_length))

        # Process intensity values
        for intensity_pair in aspect_intensity:
            value1, value2 = map(float, intensity_pair.split('#'))
            intensity_data.append((value1, value2))

        sentences.append(sentence)
        bio_tags.append(tags)
        position_info.append(position_data)
        intensity.append(intensity_data)

    return sentences, bio_tags, position_info, intensity


In [7]:
# Test the function
sentences, tags, positions, intensity = convert_to_position_aware_bio(training_data)

# Pretty print results
def print_results(sentences, tags, positions, intensity):
    for s, t, p, i in zip(sentences, tags, positions, intensity):
        print("Sentence:", s)
        print("BIO Tags:", ' '.join(t))
        print("Position Info:", p)
        print("Intensity:", i)
        print("-" * 50)

print_results(sentences, tags, positions, intensity)

Sentence: 肉粿沒有很焦脆。
BIO Tags: A-B A-E O-B O-I O-I O-I O-E O
Position Info: [(2, 4)]
Intensity: [(4.0, 5.0)]
--------------------------------------------------
Sentence: 肉粿每一塊都好脆好恰好喜歡。
BIO Tags: A-B A-E O O O O O-B O-E O-B O-E O-B O-I O-E O
Position Info: [(6, 1), (8, 1), (10, 2)]
Intensity: [(6.25, 6.0), (6.12, 6.0), (6.62, 6.62)]
--------------------------------------------------
Sentence: 口感有點微妙。
BIO Tags: A-B A-E O-B O-I O-I O-E O
Position Info: [(2, 3)]
Intensity: [(4.75, 4.75)]
--------------------------------------------------
Sentence: 很夠味起司也很香。
BIO Tags: O-B O-I O-E A-B A-E O O-B O-E O
Position Info: [(-3, 2), (3, 1)]
Intensity: [(6.67, 6.5), (6.5, 6.33)]
--------------------------------------------------
Sentence: 這款沙拉真是我的愛。
BIO Tags: O O A-B A-E O O O-B O-I O-E O
Position Info: [(4, 2)]
Intensity: [(7.5, 7.25)]
--------------------------------------------------
Sentence: 味道是真的很好。
BIO Tags: A-B A-E O O-B O-I O-I O-E O
Position Info: [(3, 3)]
Intensity: [(6.17, 6.17)]
----------

In [8]:
class AspectOpinionDataset(Dataset):
    def __init__(
        self, 
        sentences: List[str], 
        tags: List[List[str]], 
        position_info: List[List[tuple]], 
        intensity: List[List[tuple]], 
        tokenizer: Any, 
        max_length: int = 50
    ):
        self.sentences = sentences
        self.tags = tags
        self.position_info = position_info
        self.intensity = intensity
        self.tokenizer = tokenizer
        self.max_length = max_length
        
        # Updated tag2id mapping to include aspect and opinion prefixes
        self.tag2id = {
            "O": 0,  # Outside
            "A-B": 1,  # Aspect-Beginning
            "A-I": 2,  # Aspect-Inside
            "A-E": 3,  # Aspect-End
            "A-S": 4,  # Aspect-Single
            "O-B": 5,  # Opinion-Beginning
            "O-I": 6,  # Opinion-Inside
            "O-E": 7,  # Opinion-End
            "O-S": 8,  # Opinion-Single
        }
        
    def __len__(self) -> int:
        return len(self.sentences)

    def _pad_or_truncate_sequence(self, sequence: List[Any], pad_value: Any) -> List[Any]:
        if len(sequence) > self.max_length:
            return sequence[:self.max_length]  # pad
        return sequence + [pad_value] * (self.max_length - len(sequence)) # truncate

    def _process_position_info(self, positions: List[tuple]) -> torch.Tensor:
        # position --> tensor
        padded_positions = np.zeros((self.max_length, 2))  # 2 for relative_position and span_length
        
        for i, (rel_pos, span_len) in enumerate(positions[:self.max_length]):
            padded_positions[i] = [rel_pos, span_len]
            
        return torch.tensor(padded_positions, dtype=torch.float32)

    def _process_intensity(self, intensity_values: List[tuple]) -> torch.Tensor:
        # intensity --> tensor
        padded_intensity = np.zeros((self.max_length, 2))  # 2 for the two intensity values
    
        for i, (val1, val2) in enumerate(intensity_values[:self.max_length]):
            padded_intensity[i] = [val1, val2]
            
        return torch.tensor(padded_intensity, dtype=torch.float32)

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        sentence = self.sentences[idx]
        tags = self.tags[idx]
        positions = self.position_info[idx]
        intensity_values = self.intensity[idx]

        # Tokenize input sentence 
        encoding = self.tokenizer(
            sentence,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors="pt"
        )

        # Convert tags to ids and handle padding 
        label_ids = [self.tag2id.get(tag, 0) for tag in tags]
        label_ids = self._pad_or_truncate_sequence(label_ids, 0)

        # Process position and intensity information 
        position_tensor = self._process_position_info(positions)
        intensity_tensor = self._process_intensity(intensity_values)

        return {
            "input_ids": encoding['input_ids'].squeeze(0),
            "attention_mask": encoding['attention_mask'].squeeze(0),
            "labels": torch.tensor(label_ids, dtype=torch.long),
            "position_info": position_tensor,
            "intensity": intensity_tensor
        }

class PositionAwareBertForTokenClassification(BertForTokenClassification):
    def __init__(self, config):
        super().__init__(config)
        # Linear --> position & intensity
        self.position_intensity_fusion = nn.Linear(config.hidden_size + 4, config.hidden_size)
        self.intensity_predictor = nn.Linear(config.hidden_size, 2)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
    
    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None,
        position_info=None,
        intensity=None,
        **kwargs
    ):
        # BERT output
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, **kwargs)
        sequence_output = outputs.last_hidden_state
        
        # average position & average intensity
        avg_position = position_info.mean(dim=1).unsqueeze(1).expand(-1, sequence_output.size(1), -1)
        avg_intensity = intensity.mean(dim=1).unsqueeze(1).expand(-1, sequence_output.size(1), -1)
        
        # mix position & intensity
        enhanced_features = torch.cat([sequence_output, avg_position, avg_intensity], dim=-1)
        fused_output = F.relu(self.position_intensity_fusion(enhanced_features))
        
        # classifier
        logits = self.classifier(self.dropout(fused_output))

        # intensity
        intensity_logits = self.intensity_predictor(fused_output)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            active_loss = attention_mask.view(-1) == 1
            active_logits = logits.view(-1, self.num_labels)
            active_labels = labels.view(-1)
            classification_loss = loss_fct(active_logits[active_loss], active_labels[active_loss])

            intensity_loss_fct = nn.MSELoss()
            intensity_loss = intensity_loss_fct(intensity_logits, intensity)

            loss = classification_loss + 0.5 * intensity_loss 

        return {"loss": loss, "logits": logits, "intensity_logits": intensity_logits}

def train_model(
    model: PositionAwareBertForTokenClassification,
    train_loader: DataLoader,
    optimizer: AdamW,
    device: torch.device,
    num_epochs: int = 30
) -> List[float]:
   
    model.train()
    epoch_losses = []
    
    for epoch in range(num_epochs):
        total_loss = 0
        for batch in train_loader:
            # Move all batch tensors to device
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            position_info = batch['position_info'].to(device)
            intensity = batch['intensity'].to(device)

            # Forward pass
            optimizer.zero_grad()
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels,
                position_info=position_info,
                intensity=intensity
            )
            
            loss = outputs['loss']
            total_loss += loss.item()

            # Backward pass
            loss.backward()
            optimizer.step()

        avg_loss = total_loss / len(train_loader)
        epoch_losses.append(avg_loss)
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {avg_loss:.4f}")
    
    return epoch_losses

In [None]:
class AspectOpinionExtractor:
    def __init__(self, model, tokenizer, device, max_length=50):
        self.model = model
        self.tokenizer = tokenizer
        self.device = device
        self.max_length = max_length
        self.id2tag = {
            0: "O",
            1: "A-B",  # Aspect-Beginning
            2: "A-I",  # Aspect-Inside
            3: "A-E",  # Aspect-End
            4: "A-S",  # Aspect-Single
            5: "O-B",  # Opinion-Beginning
            6: "O-I",  # Opinion-Inside
            7: "O-E",  # Opinion-End
            8: "O-S"   # Opinion-Single
        }

    def _calculate_position_info(self, text: str, aspects: List[str], opinions: List[str]) -> Tuple[List[tuple], List[tuple]]:
        # Calculate position and span information for aspects and opinions.
    
        position_info = []
        intensity_info = []
        text_length = len(text)
        
        # Process aspects and opinions together
        for span in aspects + opinions:
            start_idx = text.find(span)
            if start_idx != -1:
                span_length = len(span)
                # Calculate relative position (normalized to 0-1)
                rel_pos = start_idx / text_length
                # Calculate span length (normalized to 0-1)
                norm_span_length = span_length / text_length
                position_info.append((rel_pos, norm_span_length))
                
                # Initial intensity values (will be updated by model)
                intensity_info.append((0.5, 0.5))
        
        # Pad if necessary
        while len(position_info) < self.max_length:
            position_info.append((0.0, 0.0))
            intensity_info.append((0.0, 0.0))
            
        return position_info[:self.max_length], intensity_info[:self.max_length]

    def extract_spans(self, text: str, tags: List[str]) -> Tuple[List[str], List[str]]:
        # Extract aspect and opinion spans from text based on BIO tags.
    
        aspects = []
        opinions = []
        current_span = []
        current_type = None

        for char, tag in zip(text, tags):
            if tag.startswith('A-'):
                if tag in ['A-B', 'A-S']:
                    if current_span:
                        if current_type == 'aspect':
                            aspects.append(''.join(current_span))
                        else:
                            opinions.append(''.join(current_span))
                        current_span = []
                    current_type = 'aspect'
                current_span.append(char)
            elif tag.startswith('O-'):
                if tag in ['O-B', 'O-S']:
                    if current_span:
                        if current_type == 'aspect':
                            aspects.append(''.join(current_span))
                        else:
                            opinions.append(''.join(current_span))
                        current_span = []
                    current_type = 'opinion'
                current_span.append(char)
            elif tag == 'O' and current_span:
                if current_type == 'aspect':
                    aspects.append(''.join(current_span))
                else:
                    opinions.append(''.join(current_span))
                current_span = []
                current_type = None

        if current_span:
            if current_type == 'aspect':
                aspects.append(''.join(current_span))
            else:
                opinions.append(''.join(current_span))

        return aspects, opinions

    def predict_tags_and_intensity(self, text: str) -> Tuple[List[str], torch.Tensor]:
        # Predict BIO tags and intensity scores for a given text
    
        # Get initial aspects and opinions for position calculation
        initial_tags = self.predict_initial_tags(text)
        initial_aspects, initial_opinions = self.extract_spans(text, initial_tags)
        
        # Calculate position and intensity info
        position_info, initial_intensity = self._calculate_position_info(
            text, initial_aspects, initial_opinions)
        
        # Prepare model inputs
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors="pt"
        ).to(self.device)
        
        # Convert position and intensity info to tensors
        position_tensor = torch.tensor([position_info], dtype=torch.float32).to(self.device)
        intensity_tensor = torch.tensor([initial_intensity], dtype=torch.float32).to(self.device)
        
        # Get model predictions
        with torch.no_grad():
            outputs = self.model(
                input_ids=encoding['input_ids'],
                attention_mask=encoding['attention_mask'],
                position_info=position_tensor,
                intensity=intensity_tensor
            )
            tag_predictions = outputs['logits'].argmax(dim=-1)
            intensity_predictions = outputs['intensity_logits']

        # Convert predictions to tags
        predicted_tags = [self.id2tag[pred.item()] 
                        for pred in tag_predictions[0][:len(text)]]
        
        return predicted_tags, intensity_predictions[0]

    def predict_initial_tags(self, text: str) -> List[str]:
        # Make initial tag predictions without position info
        
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding=True,
            return_tensors="pt"
        ).to(self.device)

        with torch.no_grad():
            outputs = self.model(
                **encoding,
                position_info=torch.zeros((1, self.max_length, 2)).to(self.device),
                intensity=torch.zeros((1, self.max_length, 2)).to(self.device)
            )
            predictions = outputs['logits'].argmax(dim=-1)

        return [self.id2tag[pred.item()] 
                for pred in predictions[0][:len(text)]]

    def process_text(self, id_text_pairs: List[Tuple[str, str]]) -> List[str]:
        # Process a list of (ID, text) pairs and return formatted output strings

        results = []
        
        for text_id, text in id_text_pairs:
            # Predict tags and intensity
            predicted_tags, intensity_predictions = self.predict_tags_and_intensity(text)
            
            # Extract spans
            aspects, opinions = self.extract_spans(text, predicted_tags)
            
            # Format triplets with predicted intensity scores
            triplets = []
            for i, (aspect, opinion) in enumerate(zip(aspects, opinions)):
                if i < len(intensity_predictions):
                    intensity_scores = f"{intensity_predictions[i][0]:.2f}#{intensity_predictions[i][1]:.2f}"
                else:
                    intensity_scores = "5.00#5.00"  # default fallback
                triplet = f"({aspect},{opinion},{intensity_scores})"
                triplets.append(triplet)
            
            # Format output line
            output_line = f"{text_id} {''.join(triplets)}"
            results.append(output_line)
            
        return results

def process_test_data(test_data: List[str], model, tokenizer, device) -> List[str]:
    # Process test data using model and return formatted results

    # Parse input data
    id_text_pairs = []
    for line in test_data:
        text_id, text = line.split(' ', 1)
        id_text_pairs.append((text_id, text))
    
    # Create extractor
    extractor = AspectOpinionExtractor(model, tokenizer, device)
    
    # Process and format results
    results = extractor.process_text(id_text_pairs)
    
    return results

def main():

    # Initialize model and move to device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Initialize tokenizer
    tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
    
    # Initialize and train model
    model = PositionAwareBertForTokenClassification.from_pretrained(
        'bert-base-chinese',
        num_labels=9
    )
    model.to(device)
    
    # Training data
    sentences, tags, positions, intensity = convert_to_position_aware_bio(training_data)
    dataset = AspectOpinionDataset(sentences, tags, positions, intensity, tokenizer)
    train_loader = DataLoader(dataset, batch_size=16, shuffle=True)
    
    # Training model
    optimizer = AdamW(model.parameters(), lr=5e-5, weight_decay=1e-2)
    losses = train_model(model, train_loader, optimizer, device)
    print(f"Final training loss: {losses[-1]:.4f}")
    
    # Evaluation
    model.eval()
    
    # Process test data
    results = process_test_data(testing_data, model, tokenizer, device)

    # Print
    for result in results:
        print(result)

    # Write
    with open('submission.txt', 'w', encoding='utf-8') as f:
        f.write('ID Triplets\n')
        for line in results:
            f.write(line + '\n')

if __name__ == "__main__":
    main()


Some weights of PositionAwareBertForTokenClassification were not initialized from the model checkpoint at bert-base-chinese and are newly initialized: ['classifier.bias', 'classifier.weight', 'intensity_predictor.bias', 'intensity_predictor.weight', 'position_intensity_fusion.bias', 'position_intensity_fusion.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  attn_output = torch.nn.functional.scaled_dot_product_attention(


Epoch 1/30, Loss: 0.5585
Epoch 2/30, Loss: 0.2848
Epoch 3/30, Loss: 0.1975
Epoch 4/30, Loss: 0.1474
Epoch 5/30, Loss: 0.1078
Epoch 6/30, Loss: 0.0843
Epoch 7/30, Loss: 0.0792
Epoch 8/30, Loss: 0.0683
Epoch 9/30, Loss: 0.0560
Epoch 10/30, Loss: 0.0480
Epoch 11/30, Loss: 0.0381
Epoch 12/30, Loss: 0.0469
Epoch 13/30, Loss: 0.0378
Epoch 14/30, Loss: 0.0370
Epoch 15/30, Loss: 0.0260
Epoch 16/30, Loss: 0.0297
Epoch 17/30, Loss: 0.0285
Epoch 18/30, Loss: 0.0299
Epoch 19/30, Loss: 0.0261
Epoch 20/30, Loss: 0.0295
Epoch 21/30, Loss: 0.0270
Epoch 22/30, Loss: 0.0219
Epoch 23/30, Loss: 0.0209
Epoch 24/30, Loss: 0.0240
Epoch 25/30, Loss: 0.0241
Epoch 26/30, Loss: 0.0214
