In [None]:
from langchain_huggingface.llms import HuggingFacePipeline

llm = HuggingFacePipeline.from_model_id(
    model_id="google/gemma-2b",
    task="text-generation",
    device=0,  # replace with device_map="auto" to use the accelerate library.
    pipeline_kwargs={"max_new_tokens": 10},
)

In [2]:
import pandas as pd
import os

# Load dateset
df = pd.read_csv(os.getcwd() + '/../../data/edge-iiot/Edge-IIoTset dataset/Selected dataset for ML and DL/ML-EdgeIIoT-dataset.csv', low_memory=False)
attack_df = df[df['Attack_label'] == 1]
attack_df = attack_df.drop(columns=['Attack_label', 'Attack_type'])
attack_df_train = attack_df.sample(frac=0.8, random_state=42)
attack_df_test = attack_df.drop(attack_df_train.index)

normal_df = df[df['Attack_label'] == 0]
normal_df = normal_df.drop(columns=['Attack_label', 'Attack_type'])
normal_df_train = normal_df.sample(frac=0.8, random_state=42)
normal_df_test = normal_df.drop(normal_df_train.index)

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

def predict(llm, x):
    benign_samples = ""
    attack_samples = ""
    for i in range(10):
        benign_samples += str(normal_df_train.iloc[i].to_list()) + "-->BENIGN\n"
        attack_samples += str(attack_df_train.iloc[i].to_list()) + "-->ATTACK\n"
    system_prompt = (
        "You are intelligent network log analyzer."
        "You will be given a network log to predict ATTACK or BENIGN."
        "Use the example network logs given to predict the label."
        "Output the label ATTACK or BENIGN, nothing else."
        "\n\n"
        "Fields:" + str(normal_df_test.columns.to_list()) + "\n"
        "Examples:\n```\n" + benign_samples + attack_samples + "\n```"
    )
    messages = []
    messages.append(("system", system_prompt))
    messages.append(("user", "{input}"))
    # num_tokens = len(encoding.encode(str(messages)))
    # print("Num tokens:", num_tokens)
    prompt = ChatPromptTemplate.from_messages(messages)
    chain = (
        prompt 
        | llm 
        | StrOutputParser()
    )
    return chain.invoke({"input": x})
    

In [None]:
from sklearn.metrics import classification_report

# Predict for attack entries
sample_size = 10 # attack_df_test.shape[0]
y_pred = []
y_true = []
for i in range(sample_size):
    y = predict(llm, str(attack_df_test.iloc[i].to_list()))
    if y == "ATTACK":
        y_pred.append(1)
    else:
        y_pred.append(0)
    y_true.append(1)

print("Classification report for attack entries")
print(classification_report(y_true, y_pred))

In [None]:
from sklearn.metrics import classification_report

# Predict for attack entries
sample_size = 10 # normal_df_test.shape[0]
y_pred = []
y_true = []
for i in range(sample_size):
    y = predict(llm, str(normal_df_test.iloc[i].to_list()))
    if y == "BENIGN":
        y_pred.append(1)
    else:
        y_pred.append(0)
    y_true.append(1)

print("Classification report for normal entries")
print(classification_report(y_true, y_pred))