In [None]:
!pip install transformers > null
!pip install -U sentence-transformers > null
!pip install scikit-learn > null

# **HIPAA Dataset**

In [None]:
import gdown

url = 'https://drive.google.com/file/d/1W_XbXGcCumLIBGuAJkCRZ6dEa6APgrds/view?usp=sharing'
gdown.download(url, fuzzy=True)

!unzip HIPAA-original-json.zip

Downloading...
From: https://drive.google.com/uc?id=1W_XbXGcCumLIBGuAJkCRZ6dEa6APgrds
To: /content/HIPAA-original-json.zip
100%|██████████| 78.1k/78.1k [00:00<00:00, 83.5MB/s]

Archive:  HIPAA-original-json.zip
   creating: HIPAA-original-json/
  inflating: HIPAA-original-json/ALLRequirements.json  
  inflating: HIPAA-original-json/ALLTraces.json  
  inflating: HIPAA-original-json/RegulatoryCodes.json  





In [None]:
import json

requirements = {}
regulations = {}
alltraces = {}

addr = "HIPAA-original-json/ALLRequirements.json"
with open(addr, 'r') as myfile:
    json_data = json.load(myfile)
artifacts = json_data["artifacts"]["artifact"]
for tmp in artifacts:
  requirements[tmp['art_id']] = tmp['art_title']


addr = "HIPAA-original-json/ALLTraces.json"
with open(addr, 'r') as myfile:
    json_data = json.load(myfile)
traces = json_data["traces"]
cnt = 0
for trace in traces:
  req_id = trace['requirement-id']
  reg_id = trace['regulatory-code']
  if req_id not in list(alltraces.keys()): alltraces[req_id] = []
  alltraces[req_id].append(reg_id)

addr = "HIPAA-original-json/RegulatoryCodes.json"
with open(addr, 'r') as myfile:
    json_data = json.load(myfile)
regulatory_codes = json_data["artifacts"]["artifact"]
for tmp in regulatory_codes:
  regulations[tmp['art_id']] = tmp['art_title']

In [None]:
print('total number of requirements: ', len(list(requirements.keys())))
print('total number of regulations: ', len(list(regulations.keys())))
print('total number of requirements that have links to regulations: ', len(list(alltraces.keys())))
print('number of all links existed: ', len(traces))

total number of requirements:  1891
total number of regulations:  10
total number of requirements that have links to regulations:  230
number of all links existed:  243


In [None]:
train, test = [], []
split = 0.8
regulation_cnt = {}

## positive samples --> requirements that have at least one link to a regulation
for _id in list(alltraces.keys()):
  for _reg_id in alltraces[_id]:
    regulation_cnt[_reg_id] = regulation_cnt.get(_reg_id, 0) + 1
print(regulation_cnt)

for reg in list(regulation_cnt.keys()):
  n = regulation_cnt[reg]
  split_cnt = int(split * n)
  c = 0
  for req_id in list(alltraces.keys()):
    regs = alltraces[req_id]
    if reg in regs:
      if c > split_cnt:
        test.append({'requirement': requirements[req_id], 'regulation': regulations[reg], 'label': 'linked'})
      else:
        train.append({'requirement': requirements[req_id], 'regulation': regulations[reg], 'label': 'linked'})
      c += 1

print('Number of Positive Instances:')
print('number of instances in train: ', len(train))
print('number of instances in test: ', len(test))

{'AC': 53, 'AL': 10, 'AUD': 86, 'PA': 42, 'SED': 7, 'TED': 5, 'EAP': 4, 'IC': 18, 'TS': 7, 'UUI': 11}
Number of Positive Instances:
number of instances in train:  200
number of instances in test:  43


In [None]:
import random

alltraces_keys = list(alltraces.keys())
regulations_texts = list(regulations.values())
N = len(train) + len(test)
cnt = 0
candidates = []
for _id in list(requirements.keys()):
  if _id not in alltraces_keys:
    if cnt > N: break
    cnt += 1
    _index = random.randint(0, len(regulations_texts)-1)
    candidates.append({'requirement': requirements[req_id], 'regulation': regulations_texts[_index], 'label': 'not_linked'})

split_point = int(0.8 * len(candidates))
train.extend(candidates[:split_point])
test.extend(candidates[split_point:])
print('Number of Instances:')
print('number of instances in train: ', len(train))
print('number of instances in test: ', len(test))

Number of Instances:
number of instances in train:  395
number of instances in test:  92


In [None]:
print(train[0])

{'requirement': 'System will implement access control list mechanism to obtain information security. ACL system will be derived from the hierarchy in hospital / healthcare environments', 'regulation': 'Access Control. Implement technical policies and procedures for electronic information systems that maintain electronic protected health information to allow access only to those persons or software programs that have been granted access rights as specified in ? 164.308(a)(4).', 'label': 'linked'}


# **Semantic Textual Similarity**

In [None]:
from sentence_transformers import SentenceTransformer, SentencesDataset, InputExample, losses
from torch.utils.data import DataLoader
import torch.nn.functional as F
import math
from sentence_transformers import SentenceTransformer, LoggingHandler, losses, InputExample, util, evaluation, CrossEncoder, models
import logging
from datetime import datetime
import json
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
from sklearn import metrics
import re
import torch.nn as nn
import torch

In [None]:
model_name = 'sentence-transformers/paraphrase-mpnet-base-v2'
train_batch_size = 32
num_epochs = 4
weight_decay = 0.2
learning_rate = 5e-03
warmup_per = 0.1
save_steps = 200
# optimizer_class = torch.optim.RMSprop
optimizer_class = torch.optim.AdamW
evaluation_steps = 25


positive_label = 'linked'
negative_label = 'not_linked'

In [None]:
train_set = []
sentence1, sentence2, labels = [], [], []
for idx in range(len(train)):
        row = train[idx]
        s1 = row['requirement']
        s2 = row['regulation']
        label = row['label']

        if label == negative_label: label = np.float32(0)
        else: label = np.float32(1)

        train_set.append(InputExample(texts=[s1, s2], label=label))

        ## for evaluation
        sentence1.append(s1)
        sentence2.append(s2)
        labels.append(label)

In [None]:
word_embedding_model = models.Transformer(model_name, max_seq_length=64)
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension(), pooling_mode_max_tokens=True)
dense_model = models.Dense(in_features=pooling_model.get_sentence_embedding_dimension(), out_features=64, activation_function=nn.Tanh())
model = SentenceTransformer(modules=[word_embedding_model, pooling_model, dense_model])
train_loss = losses.CosineSimilarityLoss(model=model)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/594 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/1.19k [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

In [None]:
train_dataloader = DataLoader(train_set, shuffle=True, batch_size=train_batch_size)
warmup_steps = math.ceil(len(train_dataloader) * num_epochs * warmup_per) #5% of train data for warm-up
evaluator = evaluation.EmbeddingSimilarityEvaluator(sentence1, sentence2, labels)
t1 = time.time()
model.fit(train_objectives=[(train_dataloader, train_loss)],
            epochs=num_epochs,
            weight_decay = weight_decay,
            optimizer_class=optimizer_class,
            optimizer_params={'lr':learning_rate},
            evaluator=evaluator,
            evaluation_steps=int((len(train)/train_batch_size)/2)-2,
            save_best_model = True)

Epoch:   0%|          | 0/4 [00:00<?, ?it/s]

Iteration:   0%|          | 0/13 [00:00<?, ?it/s]

Iteration:   0%|          | 0/13 [00:00<?, ?it/s]

Iteration:   0%|          | 0/13 [00:00<?, ?it/s]

Iteration:   0%|          | 0/13 [00:00<?, ?it/s]

In [None]:
inputs, inputs_texts = [], []
scores = []
TP, TN, FP, FN = 0, 0, 0, 0
threshold = 0.5

for idx in range(len(test)):
  s1 = test[idx]['requirement']
  s2 = test[idx]['regulation']
  label = test[idx]['label']

  embed1 = model.encode(s1, show_progress_bar=False)
  embed2 = model.encode(s2, show_progress_bar=False)
  inputs.append((embed1, embed2))
  inputs_texts.append((s1, s2))
  _s = util.cos_sim(embed1, embed2)
  scores.append(_s)

  if _s >= threshold and label == positive_label:
    TP += 1
  elif _s >= threshold and label == negative_label:
    FP += 1
  elif _s < threshold and label == positive_label:
    FN += 1
  elif _s < threshold and label == negative_label:
    TN += 1

accuracy = (TP + TN) / (TP + TN + FP + FN)
recall = TP / (TP + FN)
precision = TP / (TP + FP)
F1 = 2*recall*precision / (recall + precision)
print('Accuracy: ', accuracy)
print('Recall: ', recall)
print('Precision: ', precision)
print('F1-score: ', F1)

Accuracy:  0.9891304347826086
Recall:  0.9767441860465116
Precision:  1.0
F1-score:  0.988235294117647
