In [1]:
# !pip install datasets
# !pip install evaluate
# !pip install sentence-transformers==3.1.1
# !pip install transformers==4.45.2

In [2]:
import time
import pandas as pd
import numpy as np
from torch.nn import functional as F

import torch
from sklearn.model_selection import train_test_split
from transformers import AutoModelForSequenceClassification, AutoTokenizer

from sentence_transformers import SentenceTransformer

In [3]:
step1_labels = ['loss', 'hazard', 'constraint']
step2_labels = ['valid', 'invalid']
step3_labels = ['rewrite', 'not', 'condition', 'accident', 'valid']

In [4]:
def label_to_int(df, name, labels):
  for i in range(len(df[name])):
    df.loc[i,name] = labels.index(df.loc[i,name])

In [5]:
# Define input file name here:
file_name = 'input_example labeled.csv'

# Uncomment 4 lines below if using Google Drive
# from google.colab import drive
# drive.mount('/content/drive')
# drive_path = '/content/drive/MyDrive/Colab Notebooks/'
# file_name = drive_path+file_name


# @title Select Input type and import data (with or without label)

# Unlabeled input allows the execution of the first step of the BEDS Pipeline, for a labeling error detection and a rough check of sentence ambiguity;
# Expected input: .csv dataset with a single column: ['sentence']
# Labeled input skips the first step, and assumes all sentences are correctly labeled.
# Expected input: .csv dataset with two columns: ['sentence', 'label'] #the label should be exactly 'loss', 'hazard', or 'constraint'
input_type = 'labeled' #@param ['labeled', 'unlabeled']
verbose = 'False' #@param ['True', 'False']


col_names = []
if input_type == 'unlabeled':
  col_names = ['sentence']
  df = pd.read_csv(file_name, names=col_names, index_col=False, delimiter=',', skiprows=1)#,header=None)
elif input_type == 'labeled':
  col_names = ['sentence', 'label']
  df = pd.read_csv(file_name, names=col_names, index_col=False, delimiter=',', skiprows=1)
  label_to_int(df, 'label', step1_labels)

df.head()

Unnamed: 0,sentence,label
0,A collision between the ACROBOTER robotic plat...,2
1,A non‐patient is injured or killed by radiation.,0
2,A nonpatient is injured or killed in the proce...,0
3,A pair of controlled aircraft violates minimum...,1
4,A person or worker is standing/working under o...,1


In [6]:
def step1_print_label_diff(results,df):
  predictions = np.argmax(results.logits.cpu(), axis=-1)
  probabilities = F.softmax(results.logits.cpu(), dim=-1)

  for sentence, label, prediction, probability in zip(df['sentence'], df['label'], predictions, probabilities):

    if label != prediction.item():
      print('Sentence:        ', sentence)
      print('True label:      ', step1_labels[label])
      print('Predicted label: ', step1_labels[prediction.item()])
      print('Probabilities    ')
      print('Loss:            ', probability.tolist()[0])
      print('Hazard:          ', probability.tolist()[1])
      print('Constraint:      ', probability.tolist()[2])
      print('\n')

In [7]:
model_name = "google-bert/bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained('google-bert/bert-base-uncased')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [8]:
# Step 1

# Load models
path = 'andreyunic23/'
path_step1 = path+'beds_step1'

model_classif = AutoModelForSequenceClassification.from_pretrained(path_step1, num_labels=3)

x_test = df['sentence'].to_list()

start = time.perf_counter()

with torch.no_grad():
  encodings = tokenizer(x_test, truncation=True, padding='max_length', max_length=512,return_tensors="pt")
  results = model_classif(encodings['input_ids'], encodings['attention_mask'])
  predictions = np.argmax(results.logits.cpu(), axis=-1)

end = time.perf_counter()
print('Step 1 execution time:', end-start)

if input_type == 'unlabeled':
  # add labels to the unlabeled dataframe
  df['label'] = predictions

if input_type == 'labeled':
  # show the difference in prediction labels from the original labels
  step1_print_label_diff(results,df)


Step 1 execution time: 214.985378095
Sentence:         Collision between the ACROBOTER robotic platform and an unknown object (both human or material).
True label:       hazard
Predicted label:  loss
Probabilities    
Loss:             0.8982072472572327
Hazard:           0.10129029303789139
Constraint:       0.0005025389837101102


Sentence:         Doors close on a person in the doorway.
True label:       hazard
Predicted label:  loss
Probabilities    
Loss:             0.9973586201667786
Hazard:           0.0022139123175293207
Constraint:       0.00042743596713989973


Sentence:         During a rejected takeoff the aircraft departs runway.
True label:       loss
Predicted label:  hazard
Probabilities    
Loss:             0.00046184760867618024
Hazard:           0.999242901802063
Constraint:       0.00029530166648328304


Sentence:         Patient does not receive treatment.
True label:       hazard
Predicted label:  loss
Probabilities    
Loss:             0.9790673851966858
Hazar

In [9]:
def organize_step2_predictions(predictions, list_sentences):
  list_valid = []
  list_invalid = []

  for prediction, sentence in zip(predictions, list_sentences):
    if prediction == 0:
      list_valid.append(sentence)
    else:
      list_invalid.append(sentence)

  return list_valid, list_invalid

In [10]:
# Step 2: Sentence validity detection

# Separate dataframe sentences into lists of Losses, Hazards, and Constraints
list_loss = df[df['label'] == 0]['sentence'].to_list()
list_hazard = df[df['label'] == 1]['sentence'].to_list()
list_constraint = df[df['label'] == 2]['sentence'].to_list()

# Load models

list_valid_loss = []
list_invalid_loss = []
list_valid_hazard = []
list_invalid_hazard = []
list_valid_constraint = []
list_invalid_constraint = []

# Checks if list is not empty
if list_loss:
  path_step2_loss = path+'beds_step2_loss'
  model_step2_loss = AutoModelForSequenceClassification.from_pretrained(path_step2_loss, num_labels=2)

  start = time.perf_counter()
  with torch.no_grad():
    encodings = tokenizer(list_loss, truncation=True, padding='max_length', max_length=512,return_tensors="pt")
    results = model_step2_loss(encodings['input_ids'], encodings['attention_mask'])
    predictions = np.argmax(results.logits.cpu(), axis=-1)
  end = time.perf_counter()
  print('Step 2 (loss) execution time:', end-start)
  list_valid_loss, list_invalid_loss = organize_step2_predictions(predictions, list_loss) # Returns only the sentences, used in step 3

if list_hazard:
  path_step2_hazard = path+'beds_step2_hazard'
  model_step2_hazard = AutoModelForSequenceClassification.from_pretrained(path_step2_hazard, num_labels=2)

  start = time.perf_counter()
  with torch.no_grad():
    encodings = tokenizer(list_hazard, truncation=True, padding='max_length', max_length=512,return_tensors="pt")
    results = model_step2_hazard(encodings['input_ids'], encodings['attention_mask'])
    predictions = np.argmax(results.logits.cpu(), axis=-1)
  end = time.perf_counter()
  print('Step 2 (hazard) execution time:', end-start)

  list_valid_hazard, list_invalid_hazard = organize_step2_predictions(predictions, list_hazard) # Returns only the sentences, used in step 3

if list_constraint:
  path_step2_constraint = path+'beds_step2_constraint'
  model_step2_constraint = AutoModelForSequenceClassification.from_pretrained(path_step2_constraint, num_labels=2)

  start = time.perf_counter()
  with torch.no_grad():
    encodings = tokenizer(list_constraint, truncation=True, padding='max_length', max_length=512,return_tensors="pt")
    results = model_step2_constraint(encodings['input_ids'], encodings['attention_mask'])
    predictions = np.argmax(results.logits.cpu(), axis=-1)
  end = time.perf_counter()
  print('Step 2 (constraint) execution time:', end-start)

  list_valid_constraint, list_invalid_constraint = organize_step2_predictions(predictions, list_constraint) # Returns only the sentences, used in step 3

Step 2 (loss) execution time: 60.74411584900008
Step 2 (hazard) execution time: 85.68522143100017
Step 2 (constraint) execution time: 75.13913612499982


In [11]:
def organize_step3_predictions(results, list_sentences):
  predictions = np.argmax(results.logits.cpu(), axis=-1)
  probabilities = F.softmax(results.logits.cpu(), dim=-1)

  list_dict = []
  dict_faults = {}
  for prediction, sentence, probability in zip(predictions, list_sentences, probabilities):
    dict_faults = {'sentence': sentence, 'faults': step3_labels[prediction], 'probabilities': probability}
    list_dict.append(dict_faults)

  return list_dict

In [12]:
# Step 3: Fault type detection

dict_faults_loss = {}
dict_faults_hazard = {}
dict_faults_constraint = {}

# the inputs of this step are the lists generated in the step 2 ('list_invalid_loss', 'list_invalid_hazard', and 'list_invalid_constraint')

if list_invalid_loss:
  path_step3_loss = path+'beds_step3_loss'
  model_step3_loss = AutoModelForSequenceClassification.from_pretrained(path_step3_loss, num_labels=4)

  start = time.perf_counter()
  with torch.no_grad():
    encodings = tokenizer(list_invalid_loss, truncation=True, padding='max_length', max_length=512,return_tensors="pt")
    results = model_step3_loss(encodings['input_ids'], encodings['attention_mask'])
  end = time.perf_counter()
  print('Step 3 (loss) execution time:', end-start)

  list_dict_faults_loss = organize_step3_predictions(results, list_invalid_loss)


if list_invalid_hazard:
  path_step3_hazard = path+'beds_step3_hazard'
  model_step3_hazard = AutoModelForSequenceClassification.from_pretrained(path_step3_hazard, num_labels=4)

  start = time.perf_counter()
  with torch.no_grad():
    encodings = tokenizer(list_invalid_hazard, truncation=True, padding='max_length', max_length=512,return_tensors="pt")
    results = model_step3_hazard(encodings['input_ids'], encodings['attention_mask'])
  end = time.perf_counter()
  print('Step 3 (hazard) execution time:', end-start)

  list_dict_faults_hazard = organize_step3_predictions(results, list_invalid_hazard)


if list_invalid_constraint:
  path_step3_constraint = path+'beds_step3_constraint'
  model_step3_constraint = AutoModelForSequenceClassification.from_pretrained(path_step3_constraint, num_labels=2)

  start = time.perf_counter()
  with torch.no_grad():
    encodings = tokenizer(list_invalid_constraint, truncation=True, padding='max_length', max_length=512,return_tensors="pt")
    results = model_step3_constraint(encodings['input_ids'], encodings['attention_mask'])
  end = time.perf_counter()
  print('Step 3 (constraint) execution time:', end-start)

  list_dict_faults_constraint = organize_step3_predictions(results, list_invalid_constraint)

Step 3 (loss) execution time: 21.280700102000083
Step 3 (hazard) execution time: 28.33082945299975
Step 3 (constraint) execution time: 14.372860659999787


In [13]:
def print_dict_faults(list_dict_faults):
  for dict_faults in list_dict_faults:
    print('Sentence:            ', dict_faults['sentence'])
    print('Fault type detected: ', dict_faults['faults'])
    print('Probabilities')

    for i in range(len(dict_faults['probabilities'])):
      probability = dict_faults['probabilities'][i]
      faults = step3_labels[i]
      print(faults, ': ', probability.item())
    print('\n')

if verbose == 'True':
  print('########### LOSS')
  print_dict_faults(list_dict_faults_loss)
  print('########### HAZARD')
  print_dict_faults(list_dict_faults_hazard)
  print('########### CONSTRAINT')
  print_dict_faults(list_dict_faults_constraint)

In [14]:
def format_reference(df):
  references = []
  for sentence in df:
    references.append([sentence,sentence])
  return references

def check_similarity(list_invalid, list_valid, model):
  list_similar = []
  embeddings = model.encode(list_valid)
  for sentence in list_invalid:
    if verbose == 'True':
      print(sentence)
      print("suggestions:")
    sentence = model.encode(sentence)
    similarity = model.similarity(sentence, embeddings)
    sim_pair = []

    for sim, valid in zip(similarity[0].tolist(), list_valid):
      sim_pair.append([sim, valid[0]])

    sim_pair.sort(key=lambda x: x[0])
    sim_pair.reverse()
    list_aux = []
    for sim, valid in sim_pair[:5]:
      if verbose == 'True':
        print("{:.4f} \t\t {}".format(sim, valid))
      list_aux.append([sim, valid])
    list_similar.append([sentence, list_aux])
    if verbose == 'True':
      print("\n")
  return list_similar

In [15]:
# Step 4: Sentence suggestion

# the inputs of this step are the lists generated in the step 2 ('list_invalid_loss', 'list_invalid_hazard', and 'list_invalid_constraint')

model_step4 = SentenceTransformer(path+'beds_step4')

reference_path = "reference/"#/content/drive/MyDrive/Colab Notebooks

list_similar_loss = []
list_similar_hazard = []
list_similar_constraint = []

if list_invalid_loss:
  valid_loss_df = pd.read_csv(reference_path+'valid_loss_reference.csv').squeeze().tolist()
  reference_valid_loss = format_reference(valid_loss_df)
  print("LOSS #####################################")
  list_similar_loss = check_similarity(list_invalid_loss, reference_valid_loss, model_step4)
  print("\n")


if list_invalid_hazard:
  valid_hazard_df = pd.read_csv(reference_path+'valid_hazard_reference.csv').squeeze().tolist()
  reference_valid_hazard = format_reference(valid_hazard_df)
  print("HAZARD #####################################")
  list_similar_hazard = check_similarity(list_invalid_hazard, reference_valid_hazard, model_step4)
  print("\n")


if list_invalid_constraint:
  valid_constraint_df = pd.read_csv(reference_path+'valid_constraint_reference.csv').squeeze().tolist()
  examples_valid_constraint = format_reference(valid_constraint_df)
  print("CONSTRAINT #####################################")
  list_similar_constraint = check_similarity(list_invalid_constraint, examples_valid_constraint, model_step4)

LOSS #####################################


HAZARD #####################################


CONSTRAINT #####################################


In [16]:
def print_invalid(list_dict_faults, list_similar):

  for faults, similar in zip(list_dict_faults, list_similar):
    print('Sentence:            ', faults['sentence'])
    print('Fault type detected: ', faults['faults'])
    print('Probabilities:')
    for probability, name in zip(faults['probabilities'], step3_labels):
      print("{:.4f} \t\t {}".format(probability.item(), name))
    print('Similar Valid Sentences:')
    for sim, valid in similar[1]:
      print("{:.4f} \t\t {}".format(sim, valid))
    print('\n')

In [17]:
# print Loss results

print('########### LOSS')
print('Vaid Sentences:')
print(list_valid_loss)
print('\n')
print('Invalid Sentences:')
print_invalid(list_dict_faults_loss, list_similar_loss)

########### LOSS
Vaid Sentences:
['Aircraft (including both manned and unmanned systems) in the air are damaged or destroyed.', 'AV gets damaged.', 'Damage and/or interruption of operations of other systems (Supporting infrastructure, other satellites, all other equipment).', 'Damage of property (internal or external company).', 'Damage or loss of equipment.', 'Damage or loss of equipment.', 'Damage or loss of equipment.', 'Damage to airport infra-structure and/or proximities.', 'Damage to equipment or infrastructure.', 'damage to equipment.', 'Damage to equipment.', 'Damage to equipment.', 'Damage to or loss of aircraft.', 'Damage to patient or staff satisfaction, or hospital reputation.', 'Damage to SSRMS.', 'Damage to the aircraft or objects outside the aircraft.', 'Damage to the aircraft.', 'Damage to the Laboratorial Facility or the laboratory equipment.', 'Damage to the ship or objects outside the ship.', 'Damage to the vehicle or objects outside the vehicle.', 'Damage to the veh

In [18]:
# print Hazard results

print('########### HAZARD')
print('Valid Sentences:')
print(list_valid_hazard)
print('\n')
print('Invalid Sentences:')
print_invalid(list_dict_faults_hazard, list_similar_hazard)

########### HAZARD
Valid Sentences:


Invalid Sentences:
Sentence:             A person or worker is standing/working under or passing through the excavators’ range cycle.
Fault type detected:  rewrite
Probabilities:
0.6326 		 rewrite
0.0568 		 not
0.0753 		 condition
0.2353 		 accident
Similar Valid Sentences:
0.4481 		 Excavator within the vicinity of overhead electric lines.
0.2808 		 Robot makes contact with a person (directly or indirectly).
0.2661 		 Person is exposed to harm during testing process.
0.2471 		 Equipment Operated Beyond Limits.
0.2471 		 Equipment operated beyond limits.


Sentence:             ACC did not maintain a safe distance from the object in the front, resultng in collision.
Fault type detected:  condition
Probabilities:
0.2480 		 rewrite
0.1412 		 not
0.5833 		 condition
0.0275 		 accident
Similar Valid Sentences:
0.8977 		 ACC does not maintain a safe distance from the object in the front (resulting in a collision).
0.6072 		 ACC violates the safe distanc

In [19]:
# print Constraint results

print('########### CONSTRAINT')
print('Valid Sentences:')
print(list_valid_constraint)
print('\n')
print('Invalid Sentences:')
print_invalid(list_dict_faults_constraint, list_similar_constraint)

########### CONSTRAINT
Valid Sentences:
['A collision between the ACROBOTER robotic platform and an unknown object must be avoided at all times.', 'A/C must maintain minimum safe altitude limits.', 'A/C must maintain minimum safe separation distance.', 'ACC must not brake too abruptly.', 'ACC must not violate separation requirements with object ahead.', 'ACC should not brake too abruptly.', 'ACC should not let the vehicle gets in contact with the object ahead.', 'ACS must not continue providing attitude maneuver commands too long after attitude has stabilized.', 'ACS must not provide attitude maneuver commands in the same direction as rotation.', 'ACS must not provide attitude maneuver commands too early to achieve desired attitude.', 'ACS must not provide attitude maneuver commands too late after ASTRO-H has rotated too far.', 'ACS must not provide attitude maneuver commands when ASTRO-H is not rotating.', 'ACS must not stop providing attitude commands too soon before attitude has sta