<a href="https://colab.research.google.com/github/blt-tsp/Fine-tuning-BERT-and-summarization-/blob/main/RoBERTa_finetuning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Finetuning RoBERTa and preprocessing datas


pipeline for bin to tensor 

In [5]:
import os
import numpy as np
import struct
from transformers import RobertaTokenizer

# Define the PCM parameters
bit_depth = 16
sample_rate = 8000
frame_size = 0.01  # 10 milliseconds
frame_step = 0.005  # 5 milliseconds

# Load the RoBERTa tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# Define a function to convert binary data to PCM
def binary_to_pcm(binary_data):
    # Convert binary data to array of 16-bit integers
    int_data = np.frombuffer(binary_data, dtype=np.int16)
    # Normalize the data to the range [-1, 1]
    float_data = int_data / 32768.0
    # Resample the data to the desired sample rate
    resampled_data = librosa.resample(float_data, 44100, sample_rate)
    # Convert the data to PCM format
    pcm_data = (resampled_data * (2 ** (bit_depth - 1) - 1)).astype(np.int16)
    return pcm_data

# Define a function to convert PCM data to text
def pcm_to_text(pcm_data):
    # Convert PCM data to binary string
    binary_data = struct.pack('h' * len(pcm_data), *pcm_data)
    # Encode binary string as ASCII text
    text_data = binary_data.encode('ascii', 'ignore')
    return text_data

# Define a function to tokenize text data
def tokenize_text(text_data):
    # Tokenize the text data
    tokenized_data = tokenizer.encode(text_data, add_special_tokens=True, max_length=512, truncation=True)
    return tokenized_data

# Define a function to prepare the data for a single binary file
def prepare_data_for_file(file_path):
    # Load the binary data from file
    with open(file_path, 'rb') as f:
        binary_data = f.read()

    # Convert binary data to PCM
    pcm_data = binary_to_pcm(binary_data)

    # Convert PCM data to text
    text_data = pcm_to_text(pcm_data)

    # Tokenize text data
    tokenized_data = tokenize_text(text_data)

    # Return the tokenized data
    return tokenized_data

# Define a function to prepare the data for all binary files in a directory
def prepare_data_for_directory(directory_path):
    # Get the list of binary files in the directory
    file_list = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f)) and f.endswith('.bin')]

    # Prepare the data for each file in the directory
    tokenized_data_list = []
    for file_name in file_list:
        file_path = os.path.join(directory_path, file_name)
        tokenized_data = prepare_data_for_file(file_path)
        tokenized_data_list.append(tokenized_data)

    # Convert the list of tokenized data to a tensor
    tensor_data = torch.tensor(tokenized_data_list)

    # Return the tensor data
    return tensor_data


Downloading (…)olve/main/vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

Downloading (…)olve/main/merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

fine tuning on some of our xml files


In [6]:
import torch
from transformers import RobertaForSequenceClassification, RobertaTokenizer
import xml.etree.ElementTree as ET

# Load the labeled dataset
input_tensors = torch.load('input_tensors.pt')
target_outputs = [ET.parse(xml_file).getroot() for xml_file in os.listdir('target_outputs') if xml_file.endswith('.xml')]

# Load the pre-trained RoBERTa model and tokenizer
model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=5)  
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# Define the optimizer and loss function
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
loss_fn = torch.nn.CrossEntropyLoss()

# Define a function to train the model
def train_model(input_tensors, target_outputs, model, tokenizer, optimizer, loss_fn, num_epochs=10):
    # Set the model to training mode
    model.train()

    # Tokenize the target outputs
    tokenized_target_outputs = [torch.tensor(tokenizer.encode(ET.tostring(target_output).decode('utf-8'), add_special_tokens=True)) for target_output in target_outputs]

    # Combine the input tensors and target outputs into a Dataset
    dataset = torch.utils.data.TensorDataset(input_tensors, torch.stack(tokenized_target_outputs))

    # Define the DataLoader
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=4, shuffle=True)

    # Train the model for the specified number of epochs
    for epoch in range(num_epochs):
        # Loop over the batches in the DataLoader
        for batch in dataloader:
            # Extract the input and target tensors from the batch
            input_ids = batch[0]
            target_ids = batch[1]

            # Zero out the gradients
            optimizer.zero_grad()

            # Compute the model output for the input tensor
            outputs = model(input_ids)

            # Compute the loss between the model output and target tensor
            loss = loss_fn(outputs.logits, target_ids)

            # Backpropagate the loss and update the model weights
            loss.backward()
            optimizer.step()

        # Print the loss for the current epoch
        print(f"Epoch {epoch + 1}: Loss = {loss.item()}")

# Train the model on the labeled dataset
train_model(input_tensors, target_outputs, model, tokenizer, optimizer, loss_fn, num_epochs=10)

# Save the fine-tuned model
model.save_pretrained('fine_tuned_model')


FileNotFoundError: ignored

distance between 2 trees

In [2]:
def tree_distance(tree1, tree2):
    """
    Calculates the distance between two telecom trees.

    Args:
        tree1 (xml.etree.ElementTree.Element): The root node of the first tree.
        tree2 (xml.etree.ElementTree.Element): The root node of the second tree.

    Returns:
        The distance between the two trees.
    """
    # If the trees are identical, the distance is 0
    if ET.tostring(tree1) == ET.tostring(tree2):
        return 0

    # If the trees have a different number of children, the distance is the absolute difference in the number of children
    if len(tree1) != len(tree2):
        return abs(len(tree1) - len(tree2))
    # If the trees have the same number of children, calculate the distance between each pair of corresponding children
    child_distances = [tree_distance(tree1[i], tree2[i]) for i in range(len(tree1))]

    # Return the sum of the distances between the corresponding children
    return sum(child_distances)



This script loads the fine-tuned RoBERTa model and tokenizer, loads the reference telecom trees from a directory containing XML files, prepares an input tensor for prediction, tokenizes the input tensor, makes a prediction with the model, converts the output to a softmax probability distribution, gets the predicted class, loads the predicted telecom protocol from an XML file based on the predicted class, loops through the list of reference telecom trees and calculates the distance between the predicted telecom protocol and each reference telecom tree, and chooses the reference telecom tree with the smallest distance. The script then prints the XML representation of the matched telecom tree.




In [3]:
!pip install transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting transformers
  Downloading transformers-4.28.1-py3-none-any.whl (7.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.0/7.0 MB[0m [31m62.5 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.11.0
  Downloading huggingface_hub-0.14.1-py3-none-any.whl (224 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m224.5/224.5 kB[0m [31m28.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting tokenizers!=0.11.3,<0.14,>=0.11.1
  Downloading tokenizers-0.13.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m99.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tokenizers, huggingface-hub, transformers
Successfully installed huggingface-hub-0.14.1 tokenizers-0.13.3 transformers-4.28.1


In [4]:
import os
import torch
from transformers import RobertaForSequenceClassification, RobertaTokenizer
import xml.etree.ElementTree as ET

# Load the fine-tuned model and tokenizer
model = RobertaForSequenceClassification.from_pretrained('fine_tuned_model')
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# Load the reference telecom trees from XML files
ref_trees = []
for file in os.listdir('reference_trees'):
    if file.endswith('.xml'):
        ref_trees.append(ET.parse(os.path.join('reference_trees', file)).getroot())

# Prepare the input tensor for prediction
input_data = b'\x00\x01\x02\x03'
input_tensor = torch.tensor([input_data])

# Tokenize the input tensor
input_ids = tokenizer.encode(input_tensor[0].tolist(), add_special_tokens=True, truncation=True, padding=True, max_length=512, return_tensors='pt')

# Make a prediction with the model
output = model(input_ids)[0]

# Convert the output tensor to a softmax probability distribution
probs = torch.nn.functional.softmax(output, dim=-1)

# Get the predicted class
predicted_class = torch.argmax(probs, dim=-1).item()

# Load the predicted telecom protocol from an XML file
predicted_protocol = ET.parse(os.path.join('predicted_protocols', f'protocol_{predicted_class}.xml')).getroot()

# Calculate the distance between the predicted telecom protocol and each reference telecom tree
min_distance = float('inf')
matched_tree = None
for ref_tree in ref_trees:
    distance = tree_distance(predicted_protocol, ref_tree)
    if distance < min_distance:
        min_distance = distance
        matched_tree = ref_tree

# Print the matched telecom tree
print(ET.tostring(matched_tree))


OSError: ignored

Implementing some classes to manipulate data


In [None]:
class TelecomTreeMatcher:
    def __init__(self, model_dir, tree_dir):
        self.model = RobertaForSequenceClassification.from_pretrained(model_dir)
        self.tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
        self.ref_trees = []
        for file in os.listdir(tree_dir):
            if file.endswith('.xml'):
                self.ref_trees.append(ET.parse(os.path.join(tree_dir, file)).getroot())

    def preprocess(self, data):
        input_ids = []
        for d in data:
            input_tensor = torch.tensor([d])
            input_id = self.tokenizer.encode(input_tensor[0].tolist(), add_special_tokens=True, truncation=True,
                                             padding=True, max_length=512, return_tensors='pt')
            input_ids.append(input_id)
        return input_ids

    def predict(self, input_ids):
        outputs = self.model(input_ids)
        probs = torch.nn.functional.softmax(outputs[0], dim=-1)
        predicted_classes = torch.argmax(probs, dim=-1)
        return predicted_classes

    def match(self, data):
        input_ids = self.preprocess(data)
        predicted_classes = self.predict(input_ids)
        predicted_protocols = [ET.parse(os.path.join('predicted_protocols', f'protocol_{c}.xml')).getroot() for c in
                               predicted_classes]
        distances = [self.tree_distance(p, t) for p, t in zip(predicted_protocols, self.ref_trees)]
        min_distance = min(distances)
        matched_tree = self.ref_trees[distances.index(min_distance)]
        return ET.tostring(matched_tree)

    @staticmethod
    def tree_distance(t1, t2):
        # recursive function to calculate distance between two trees
        pass

    def train(self, train_data, val_data, epochs=10, batch_size=16, lr=2e-5, patience=3):
    """
    Train the model on the given train data.

    Args:
        train_data (list): A list of tuples containing binary data and their corresponding telecom tree.
        val_data (list): A list of tuples containing binary data and their corresponding telecom tree for validation.
        epochs (int): The number of epochs to train for.
        batch_size (int): The batch size to use during training.
        lr (float): The learning rate to use during training.
        patience (int): The number of epochs to wait before stopping training if validation loss doesn't improve.
    """

      train_dataset = TelecomTreeDataset(train_data, self.tokenizer, self.max_len)
      val_dataset = TelecomTreeDataset(val_data, self.tokenizer, self.max_len)

      train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
      val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

      optimizer = AdamW(self.model.parameters(), lr=lr)

      self.model.to(self.device)

      best_val_loss = float('inf')
      epochs_without_improvement = 0

      for epoch in range(epochs):
          self.model.train()
          train_loss = 0

          for batch in train_dataloader:
              batch = tuple(t.to(self.device) for t in batch)
              inputs = {"input_ids": batch[0], "attention_mask": batch[1], "labels": batch[2]}
              outputs = self.model(**inputs)
              loss = outputs.loss
              train_loss += loss.item()
              loss.backward()
              optimizer.step()
              optimizer.zero_grad()

          train_loss /= len(train_dataloader)

          self.model.eval()
          val_loss = 0

          with torch.no_grad():
              for batch in val_dataloader:
                  batch = tuple(t.to(self.device) for t in batch)
                  inputs = {"input_ids": batch[0], "attention_mask": batch[1], "labels": batch[2]}
                  outputs = self.model(**inputs)
                  loss = outputs.loss
                  val_loss += loss.item()

          val_loss /= len(val_dataloader)

          print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

          if val_loss < best_val_loss:
              best_val_loss = val_loss
              epochs_without_improvement = 0
          else:
              epochs_without_improvement += 1
              if epochs_without_improvement == patience:
                  print(f"Validation loss hasn't improved in {patience} epochs. Training stopped early.")
                  break

      self.is_trained = True
      return 
