In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

In [None]:
import torch
import warnings
import pandas as pd
import os
import io
import numpy

from tqdm.notebook import tqdm
from torch.utils.data import Dataset, DataLoader
from transformers import (set_seed,
                          GPT2Config,
                          GPT2Tokenizer,
                          GPT2ForSequenceClassification)

# Supress deprecation warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', category=FutureWarning)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model_name_or_path = '/data/forta/ethereum/model_128_stride/'
tokenizer_name_or_path = '/data/forta/ethereum/tokenizer'
evaluation_data_file = "/data/forta/ethereum/text/evaluation/malicious-eval.csv"

labels_ids = {'malicious': 0, 'normal': 1}
n_labels = len(labels_ids)

# Set seed for reproducibility.
set_seed(4444)

In [None]:
class SmartContractOpcodeDataset(Dataset):
  r""" PyTorch Dataset class for loading data. """

  def __init__(self, file_path, use_tokenizer):
    self.texts = []
    self.labels = []
    current_file = io.open(file_path, mode='r', encoding='utf-8')
    for line in current_file:
        self.texts.append(line)
        self.labels.append(0)
    self.n_examples = len(self.labels)
    return

  def __len__(self):
    return self.n_examples

  def __getitem__(self, item):
    return {'text':self.texts[item],
            'labels':self.labels[item]}

In [None]:
def extract_sequences(data, stride):
    encodings = tokenizer(data, return_tensors="pt")
    max_length = model.config.n_positions
    seq_len = encodings.input_ids.size(1)
    
    prev_end_loc = 0
    sequences = []
    for begin_loc in tqdm(range(0, seq_len, stride)):
        end_loc = min(begin_loc + max_length, seq_len)
        trg_len = end_loc - prev_end_loc
        input_ids = encodings.input_ids[:, begin_loc:end_loc].to(device)
        list = input_ids[0].tolist()
        list[0:0] = [0] * (max_length - len(list))
        sequences.append(list)
        prev_end_loc = end_loc
        if end_loc == seq_len:
            break
    sequence_data = pd.DataFrame(sequences)
    return torch.tensor(sequence_data.values).type(torch.long)


In [None]:
eval_dataset = SmartContractOpcodeDataset(file_path=evaluation_data_file, use_tokenizer=None)

In [None]:
# Get model configuration.
print('Loading configuration...')
model_config = GPT2Config.from_pretrained(pretrained_model_name_or_path=model_name_or_path,
                                          num_labels=n_labels, local_files_only=True,
                                         use_safetensors=True)

# Get model's tokenizer.
print('Loading tokenizer...')
tokenizer = GPT2Tokenizer.from_pretrained(pretrained_model_name_or_path=tokenizer_name_or_path,
                                         local_files_only=True, use_safetensors=True)
# default to left padding
tokenizer.padding_side = "left"
# Define PAD Token = EOS Token = 0
tokenizer.pad_token = tokenizer.eos_token

# Get the actual model.
print('Loading model...')
model = GPT2ForSequenceClassification.from_pretrained(pretrained_model_name_or_path=model_name_or_path,
                                                      config=model_config, local_files_only=True,
                                                      use_safetensors=True)

# resize model embedding to match new tokenizer
model.resize_token_embeddings(len(tokenizer))
# fix model padding token id
model.config.pad_token_id = model.config.eos_token_id

# Load model to defined device.
model.to(device)
print('Model loaded to `%s`'%device)

In [None]:
def evaluate(dataloader, device_):
    global model
    # Tracking variables
    predictions_labels = []
    model.eval()
    index = 0
    normal = 0
    malicious = 0
    
    for batch in tqdm(dataloader, total=len(dataloader)):
        processed_batch = {}
        label = 0
        for k,v in batch.items():
            if k == "text":
                processed_batch["input_ids"] = extract_sequences(v, 128).to(device_)
        with torch.no_grad():
            outputs = model(**processed_batch)
            logits = outputs.logits.detach().cpu().numpy()
            predict_content = logits.argmax(axis=-1).flatten()
            print(predict_content)
            if not predict_content.all():
                print(str(index)+": Malicious Trace detected!!!")
                malicious = malicious + 1
            else:
                print(str(index)+": Normal")
                normal = normal + 1
            index = index + 1
    print("Amount of normal samples: "+str(normal))
    print("Amount of malicious samples: "+str(malicious))
    print("Proportion of malicious over total: "+str(malicious/len(dataloader)))

In [None]:
evaluate(eval_dataset, device)