- analyses: (secTeam) -> (SamFile)
- associatedWith: (HackOrg) -> (HackOrg)
- discovers: (SecTeam) -> (HackOrg)
- discoveredBy: (HackOrg) -> (SecTeam)
- hasAttackTime: (HackOrg | OffAct | Way)-> (Time)
- hasCharacteristics: (HackOrg | OffAct | Exp | Way | Tool | SamFile) -> (Features)
- locatedAt:  (Org) -> (Area)
- monitors:  (SecTeam) -> (Org | Area | Tool|Exp)
- monitoredBy:  (Org | Area | Tool | Exp) -> (SecTeam)
- motivates: (Purp) -> (HackOrg | OffAct | Exp | Way)
- motivatedBy: (HackOrg | OffAct | Exp|Way) -> Range:Purp
- uses: (HackOrg | OffAct | Exp | Way | Tool | SamFile)-> (Tool|OffAct|Exp|SamFile|Way)
- usedBy: (Feaures | OffAct | Exp | Way | Tool | SamFile) -> HackOrg|OffAct|Exp|Way|Tool|SamFile
- targets: (HackOrg | OffAct | Exp | Way | Tool | SamFile) -> (Area | Org | SecTeam)
- targetedBy: (Area | Org | SecTeam) -> (HackOrg | OffAct | Exp | Way | Tool | SamFile)

In [None]:
relations = [
    ("analyses", ["SecTeam"], ["SamFile"]),
    ("associatedWith", ["HackOrg"], ["HackOrg"]),
    ("discovers", ["SecTeam"], ["HackOrg"]),
    ("discoveredBy", ["HackOrg"], ["SecTeam"]),
    ("hasAttackTime", ["HackOrg", "OffAct", "Way"], "Time"),
    ("hasCharacteristics", ["HackOrg", "OffAct", "Exp", "Way", "Tool", "SamFile"], "Features"),
    ("locatedAt", ["Org"], ["Area"]),
    ("monitors", ["SecTeam"], ["Org", "Area", "Tool", "Exp"]),
    ("monitoredBy", ["Org", "Area", "Tool", "Exp"], "SecTeam"),
    ("motivates", ["Purp"], ["HackOrg", "OffAct", "Exp", "Way"]),
    ("motivatedBy", ["HackOrg", "OffAct", "Exp", "Way"], "Purp"),
    ("uses", ["HackOrg", "OffAct", "Exp", "Way", "Tool", "SamFile"], ["Tool", "OffAct", "Exp", "SamFile", "Way"]),
    ("usedBy", ["Feaures", "OffAct", "Exp", "Way", "Tool", "SamFile"], ["HackOrg", "OffAct", "Exp", "Way", "Tool", "SamFile"]),
    ("targets", ["HackOrg", "OffAct", "Exp", "Way", "Tool", "SamFile"], ["Area", "Org", "SecTeam"]),
    ("targetedBy", ["Area", "Org", "SecTeam"], ["HackOrg", "OffAct", "Exp", "Way", "Tool", "SamFile"])
]

In [5]:
import networkx as nx
import json
import matplotlib.pyplot as plt 

with open("/content/drive/MyDrive/Colab Notebooks/CTI-KG/stix2_onthodology.json", "r", encoding='utf-8') as f:
    stix2_ont = json.load(f)

stix2_ont_g = nx.DiGraph()
for source, relations in stix2_ont.items():
    for relationship_type, targets in relations.items():
        for target in targets:
            stix2_ont_g.add_edge(source, target, label=relationship_type)

pos = nx.nx_pydot.graphviz_layout(stix2_ont_g)
plt.figure(figsize=(10, 7))
nx.draw(stix2_ont_g, pos, with_labels=True, node_size=2000,
        node_color="lightblue", font_size=10,
        font_weight="bold", arrowsize=15)
edge_labels = nx.get_edge_attributes(stix2_ont_g, 'label')
nx.draw_networkx_edge_labels(stix2_ont_g, pos, edge_labels=edge_labels, font_color="red", font_size=8)
plt.title("Knowledge Graph from Domain Ontology", fontsize=14)
plt.show()

{'attack-pattern': {'targets': ['identity', 'vulnerability'],
  'uses': ['malware', 'tool']},
 'campaign': {'attributed-to': ['intrusion-set', 'threat-actor'],
  'targets': ['identity', 'vulnerability'],
  'uses': ['attack-pattern', 'malware', 'tool']},
 'course-of-action': {'mitigates': ['attack-pattern',
   'malware',
   'tool',
   'vulnerability']},
 'identity': {},
 'indicator': {'indicates': ['attack-pattern',
   'campaign',
   'intrusion-set',
   'malware',
   'threat-actor',
   'tool']},
 'intrusion-set': {'attributed-to': ['threat-actor'],
  'targets': ['identity', 'vulnerability'],
  'uses': ['attack-pattern', 'malware', 'tool']},
 'malware': {'targets': ['identity', 'vulnerability'],
  'uses': ['tool'],
  'variant-of': ['malware']},
 'observed-data': {},
 'report': {},
 'threat-actor': {'attributed-to': ['identity'],
  'impersonates': ['identity'],
  'targets': ['identity', 'vulnerability'],
  'uses': ['attack-pattern', 'malware', 'tool']},
 'tool': {'targets': ['identity', '

In [1]:
import polars as pl
import string
import nltk
from itertools import groupby
from nltk.corpus import stopwords

nltk.download('stopwords')
stop_words_en = set(stopwords.words('english'))

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/nahtra/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [2]:
other_punc = {"''"}
punc_list = set(string.punctuation).union(other_punc)

def read_dataset_sentence(file_path, len_threshold=5):
  with open(file_path, "r", encoding="utf-8") as f:
      sentences = [list(group) for is_empty, group
                    in groupby((line.strip() for line in f), bool) if is_empty]
  return [sentence for sentence in sentences if len(sentence) >= len_threshold]


def split_tokens_and_labels(data, delimiter=" "):
  tokens, labels = zip(*(item for pair in data
                          if (item := pair.split(delimiter))
                          if (token := item[0].lower())
                         and all([len(item)==2,
                                 token not in punc_list,
                                #  token not in stop_words_en
                                 ])))
  return [list(map(lambda x: x.lower(), tokens)), list(labels)]


def to_df(sentences, delimiter=" "):
  token_label = []
  for idx, sentence in enumerate(sentences):
    try:
      token_label.append(split_tokens_and_labels(sentence, delimiter))
    except Exception as ex:
      print(f"At index: {idx}, exception: {ex}")
      raise Exception("Error in sentence: ", sentence)
  df = pl.DataFrame(token_label, schema=["tokens", "labels"], orient="row")
  df = df.with_columns(pl.col("tokens").list.join(" ").alias("content"))
  mismatch_count = df.filter(pl.col("tokens").list.len() != pl.col("labels").list.len()).shape[0]
  assert mismatch_count == 0, "Mismatched tokens and labels"
  return df

def get_unique_label(df, label_column="labels"):
  unique_labels = df.select(pl.col(label_column).list.explode().unique())
  unique_labels_list = unique_labels[label_column].to_list()
  return unique_labels_list

In [7]:
dnrti_senteces = read_dataset_sentence("datasets/dataset-TiKG/DNRTI.txt")
dnrti_df = to_df(dnrti_senteces, delimiter=" ")
print(get_unique_label(dnrti_df))
print(dnrti_df)

['I-Purp', 'B-Purp', 'B-Exp', 'I-Org', 'O', 'B-Way', 'I-SecTeam', 'B-SamFile', 'I-OffAct', 'B-Time', 'B-Org', 'I-Features', 'I-HackOrg', 'I-Time', 'B-Tool', 'B-Features', 'B-SecTeam', 'I-SamFile', 'I-Tool', 'I-Area', 'I-Exp', 'I-Way', 'B-HackOrg', 'B-Area', 'B-OffAct']
shape: (6_576, 3)
┌────────────────────────────────┬────────────────────────────────┬────────────────────────────────┐
│ tokens                         ┆ labels                         ┆ content                        │
│ ---                            ┆ ---                            ┆ ---                            │
│ list[str]                      ┆ list[str]                      ┆ str                            │
╞════════════════════════════════╪════════════════════════════════╪════════════════════════════════╡
│ ["the", "admin@338", …         ┆ ["O", "B-HackOrg", … "I-Tool"] ┆ the admin@338 has largely      │
│ "backdo…                       ┆                                ┆ targ…                          │
│ ["t

In [47]:
tikg_do = {
    "SecTeam": {
        "Org": "monitors",
        "Area": "monitors"
    },
    "HackOrg": {
        "HackOrg": "associatedWith",
        "SecTeam": "targets",
        "Time": "hasAttackTime",
        "Features": "hasCharacteristics",
        "Tool": "uses",
        "OffAct": "uses",
        "Exp": "uses",
        "SamFile": "uses",
        "Way": "uses",
        "Area": "targets",
        "Org": "targets"
    },
    "OffAct": {
        "Time": "hasAttackTime",
        "Features": "hasCharacteristics",
        "Area": "targets",
        "Org": "targets",
        "SecTeam": "targets"
    },
    "Way": {
        "Time": "hasAttackTime",
        "Features": "hasCharacteristics",
        "Area": "targets",
        "Org": "targets",
        "SecTeam": "targets"
    },
    "Exp": {
        "Features": "hasCharacteristics",
        "SecTeam": "targets",
        "Area": "targets",
        "Org": "targets"
    },
    "Tool": {
        "Features": "hasCharacteristics",
        "SecTeam": "targets",
        "Area": "targets",
        "Org": "targets"
    },
    "SamFile": {
        "Features": "hasCharacteristics",
        "Area": "targets",
        "Org": "targets",
        "SecTeam": "targets"
    },
    "Org": {
        "Area": "locatedAt"
    },
    "Purp": {
        "HackOrg": "motivates",
        "OffAct": "motivates",
        "Exp": "motivates",
        "Way": "motivates"
    }
}

In [55]:
def rela_labeling(tokens, ner_labels):
    entities = {}
    triplets = set()

    i = 0
    while i < len(tokens):
        if ner_labels[i].startswith("B"):
            label = ner_labels[i].split("-")[-1]
            start = i
            i += 1
            while i < len(tokens) and ner_labels[i].startswith("I"):
                i += 1
            entities[start] = (" ".join(tokens[start:i]), label)
        else:
            i += 1

    for obj_idx, (obj_name, obj_label) in entities.items():
        if obj_label in tikg_do:
            for subj_idx, (subj_name, subj_label) in entities.items():
                if subj_idx != obj_idx and subj_label in tikg_do[obj_label]:
                    relation = tikg_do[obj_label][subj_label]
                    triplets.add((obj_name, relation, subj_name))

    return triplets



In [49]:
test_tokens = dnrti_df["tokens"][0].to_list()
test_labels = dnrti_df["labels"][0].to_list()
test_triplets = rela_labeling(test_tokens, test_labels)
print(test_tokens)
print(test_labels)
print(test_triplets)

In [56]:
result = []
for row in dnrti_df.iter_rows(named=True):
    tokens = row["tokens"]
    labels = row["labels"]
    triplets = rela_labeling(tokens, labels)
    result.append(triplets)

In [59]:
3 * len(result[15])

90