In [4]:
%pwd

'/workspace'

In [19]:
import os
import pandas as pd

# データの読み込み

capec = pd.read_csv(f"/work/security_knowledge_graph/data/raw/capec.csv")
cwe = pd.read_csv(f"/work/security_knowledge_graph/data/raw/cwe.csv")
cve = pd.read_csv(f"/work/security_knowledge_graph/data/raw/cve.csv")

# IDを一意に設定
capec["ID"] = ["CAPEC-" + str(id) for id in capec["ID"]]
cwe["ID"] = ["CWE-" + str(id) for id in cwe["CWE-ID"]]
cve["ID"] = [str(id) for id in cve["CVE-ID"]]

# IDを数字にマッピング
dataset = pd.concat([capec[["ID", "Description"]], cwe[["ID", "Description"]], cve[["ID", "Description"]]]).reset_index(drop=True)
mapped_id = pd.DataFrame(
    data={
        "ID": dataset["ID"],
        "Description": dataset["Description"],
        "mappedID": range(len(dataset))
    }
)

In [20]:
print(mapped_id["ID"])

0              CAPEC-1
1             CAPEC-10
2            CAPEC-100
3            CAPEC-101
4            CAPEC-102
             ...      
3524     CVE-2022-0002
3525     CVE-2018-3615
3526     CVE-2019-1135
3527     CVE-2022-0001
3528    CVE-2021-33149
Name: ID, Length: 3529, dtype: object


In [24]:
import os
import re
import math
import numpy as np
import pandas as pd
from typing import Union, Literal, NewType



class Cwe:
    def __init__(self, value: Union[str, int]):
        self.value = value

class Capec:
    def __init__(self, value: Union[str, int]):
        self.value = value

class Cve:
    def __init__(self, value: Union[str, int]):
        self.value = value
    


rel_type = Literal[
    "ParentOf",
    "ChildOf",
    "CanPrecede",
    "CanFollow",
    "PeerOf",
    "TragetOf",
    "AttackOf",
    "ExampleOf"
]

def isnan(input: any) -> bool:
    if isinstance(input, (int, float)):
        return math.isnan(input)
    else:
        return isinstance(input, float)


def delete_duplicate(triples: list[tuple[Union[Capec, Cve], rel_type, Union[Capec,Cve]]]):
    new_triples = []
    tmp_pair = []
    for triple in triples:
        id1 = triple[0]
        id2 = triple[2]
        if (id1.value, id2.value) not in tmp_pair:
            tmp_pair.append((id1.value, id2.value))
            tmp_pair.append((id2.value, id1.value))
            new_triples.append(triple)
    
    return new_triples


def create_capec_triples() -> list[tuple[Capec, rel_type, Capec]]:
    triples = []
    for data in capec.to_dict(orient="records"):
        data_id: int = data["ID"]
        relations: Union[str, float] = data["Related Attack Patterns"]
        if isnan(relations):
            continue
        relations: list[str] = re.findall(r"::NATURE:(\w+):CAPEC ID:(\d+)", relations)
        for rel_type, id in relations:
            triples.append((Capec(data_id), rel_type, Capec(int(id))))
    triples = delete_duplicate(triples)
    return triples


def create_cwe_triples() -> list[tuple[Cwe, rel_type, Cwe]]:
    triples = []
    for data in cwe.to_dict(orient="records"):
        data_id: int = data["CWE-ID"]
        relations: Union[str, float] = data["Related Weaknesses"]
        if isnan(relations):
            continue
        relations: list[str] = re.findall(r"::NATURE:(\w+):CWE ID:(\d+):", relations)
        for rel_type, id in relations:
            triples.append((Cwe(data_id), rel_type, Cwe(int(id))))
    delete_duplicate(triples)
    return triples


def create_capec_cwe_triples() -> list[tuple[Cwe, rel_type, Capec]]:
    triples = []
    for data in capec.to_dict(orient="records"):
        capec_id: int = data["ID"]
        weaknesses: Union[str, float] = data["Related Weaknesses"]
        if isnan(weaknesses):
            continue
        weaknesses: list[str] = re.findall(r"::(\d+)", weaknesses)
        for cwe_id in weaknesses:
            triples.append((Cwe(int(cwe_id)), "TargetOf", Capec(capec_id)))
    return triples


def create_cwe_cve_triples() -> list[tuple[Cve, rel_type, Cwe]]:
    triples = []
    cves = []
    cve_ids = []
    for data in cwe.to_dict(orient="records"):
        cwe_id: int = data["CWE-ID"]
        examples: Union[str, float] = data["Observed Examples"]
        if isnan(examples):
            continue
        examples: list[str] = re.findall(r"::REFERENCE:(CVE-\d+-\d+):DESCRIPTION:([^:]+(?::(?!:)[^:]+)*)", examples)
        for cve_id, description in examples:
            triples.append((Cve(cve_id), "ExampleOf", Cwe(str(cwe_id))))
            if cve_id not in cve_ids:  
               cves.append([cve_id, description])
               cve_ids.append(cve_id)
    cve_df = pd.DataFrame(data=cves, columns=["CVE-ID", "Description"])
    return triples



In [30]:
import os
import re
import torch
import math
import numpy as np
import pandas as pd
from typing import Union, Literal, NewType



class Cwe:
    def __init__(self, value: Union[str, int]):
        self.value = value

class Capec:
    def __init__(self, value: Union[str, int]):
        self.value = value

class Cve:
    def __init__(self, value: Union[str, int]):
        self.value = value
    

mapped_relation = {
    "ParentOf": 0,
    "ChildOf": 1,
    "CanPrecede": 2,
    "CanFollow": 3,
    "PeerOf": 4,
    "TargetOf": 5,
    "AttackOf": 6,
    "ExampleOf": 7
}


def get_mapped_id(
    id: Union[Capec, Cwe, Cve]
) -> int:
    if isinstance(id, Capec):
        id = "CAPEC-"+str(id.value)
    elif isinstance(id, Cwe):
        id = "CWE-"+str(id.value)
    else:
        id: str = str(id.value)
        
        print(id)
        
    
    return mapped_id[mapped_id["ID"] == id]["mappedID"].item()


def create_postive_triples() -> torch.tensor:
    triples = []
    capec_triples = create_capec_triples()
    cwe_triples = create_cwe_triples()
    cwe_capec_triples = create_capec_cwe_triples()
    cve_cwe_triples = create_cwe_cve_triples()
    
    
    for id1, rel, id2 in capec_triples:
        try:
            id1 = get_mapped_id(id1)
            id2 = get_mapped_id(id2)
            rel = mapped_relation[rel]
        except:
            continue
        triples.append([id1, rel, id2])
    
    capec_rel_num = len(triples)

    for id1, rel, id2 in cwe_triples:
        try:
            id1 = get_mapped_id(id1)
            id2 = get_mapped_id(id2)
            rel = mapped_relation[rel]
        except:
            continue
        triples.append([id1, rel, id2])
        
    cwe_rel_num = len(triples) - capec_rel_num

    for id1, rel, id2 in cwe_capec_triples:
        id1 = get_mapped_id(id1)
        id2 = get_mapped_id(id2)
        rel = mapped_relation[rel]
        triples.append([id1, rel, id2])
    
    cwe_capec_rel_num = len(triples) - capec_rel_num - cwe_rel_num

    for id1, rel, id2 in cve_cwe_triples:
        id1 = get_mapped_id(id1)
        id2 = get_mapped_id(id2)
        rel = mapped_relation[rel]
        triples.append([id1, rel, id2])
    
    cve_cwe_rel_num = len(triples) - capec_rel_num - cwe_rel_num - cwe_capec_rel_num
    
    return torch.tensor(triples)
    

def add_reverse_edge(triples: torch.Tensor) -> torch.Tensor:
    new_triples = []
    for triple in triples:
        id1 = triple[0].item()
        rel = triple[1].item()
        id2 = triple[2].item()
        if rel == 0:
            new_triples.append([id1, 0, id2])
            new_triples.append([id2, 1, id1])
        elif rel == 1:
            new_triples.append([id1, 1, id2])
            new_triples.append([id2, 0, id1])
        elif rel == 2:
            new_triples.append([id2, 2, id1])
            new_triples.append([id1, 3, id2])
        elif rel == 3:
            new_triples.append((id1, 3, id2))
            new_triples.append((id2, 2 ,id1))
        elif rel == 4:
            new_triples.append([id1, 4, id2])
            new_triples.append([id2, 4, id1])
        elif rel == 5:
            new_triples.append([id1, 5, id2])
            new_triples.append([id2, 6, id1])
        elif rel == 6:
            new_triples.append([id1, 6, id2])
            new_triples.append([id2, 5, id1])
    
    return torch.tensor(new_triples)


create_postive_triples()
       
    
    
    
    
    
    

ValueError: can only convert an array of size 1 to a Python scalar

In [29]:
create_postive_triples()

ValueError: can only convert an array of size 1 to a Python scalar