In [24]:
#Parsing STiX 1.x Data and creating a common format
import xml.etree.ElementTree as ET
import json

tree = ET.parse("./stix1/STIX_URL_Watchlist.xml")
root = tree.getroot()

stix1_schema = {

 "STIX_Package": [
     "version",
     "id",
     "timestamp",
     "Indicators",
     "Incidents",
     "Campaigns",
     "Threat_Actors",
 ],

 "Indicator": [
     "id",
     "timestamp",
     "Type",
     "Description",
     "Observable",
 ],

 "Observable": [
     "id",
     "Object",
     "Title",
     "Description"
 ],

 "Object": [
     "Properties"
 ],

 "URIObject": [
     "Value",
     "condition",
     "apply_condition",
     "type"
 ],

 "Incident": [
     "id",
     "timestamp",
     "Description",
     "Time"
 ],

 "TTP": [
     "id",
     "Title",
     "Description",
 ],

 "Campaign": [
     "id",
     "Title",
     "Description",
 ],

 "ThreatActor": [
     "id",
     "Title",
     "Description",
 ]
}

ns = {
    "stix": "http://docs.oasis-open.org/cti/ns/stix/core-1",
    "indicator": "http://docs.oasis-open.org/cti/ns/stix/indicator-1",
    "URIObject": "http://docs.oasis-open.org/cti/ns/cybox/objects/uri-2"
}

def strip_ns(tag):
    return tag.split("}")[-1]

top_idn = root.get("id").split(":")[-1] or "ID not found"
version = root.get("version")

all_outputs = []
modified_colln = []
for indicator_elem in root.findall(".//stix:Indicator", ns):
    modified_dict = {
        "stix_version": "",
        "entity_type": "",
        "indicator_type": "",
        "description": "",
        "timestamp":"",
        "values": None,
        "validation" : False
    }

    valid_tags = []
    valid_attributes = []

    for inx in root.iter():
        clean = strip_ns(inx.tag)
        valid_tags.append(clean)
        for a in inx.attrib.keys():
            valid_attributes.append(a)

    tags_val = False
    attr_val = False

    for key, attributes in stix1_schema.items():
        if key in valid_tags:
            tags_val = True
            for at in attributes:
                if at in valid_attributes:
                   attr_val = True
                else:
                    attr_val = False
        else:
            continue

    if tags_val == True or attr_val == True:
        modified_dict["validation"] = True

    modified_dict["stix_version"] = version
    raw_id = indicator_elem.get("id") or ""
    idn = raw_id.split(":")[-1] if ":" in raw_id else raw_id

    timestamp = indicator_elem.get("timestamp") or ""

    modified_dict["entity_type"] = strip_ns(indicator_elem.tag)

    type_elem = indicator_elem.find(".//indicator:Type", ns)
    desc_elem = indicator_elem.find(".//indicator:Description", ns)
    value_elem = indicator_elem.find(".//URIObject:Value", ns)

    indicator_type = type_elem.text if type_elem is not None else ""
    description = desc_elem.text if desc_elem is not None else ""
    values = value_elem.text.split("##comma##") if value_elem is not None else []

    modified_dict["indicator_type"] = indicator_type
    modified_dict["description"] = description
    modified_dict["values"] = values
    modified_dict["timestamp"] = timestamp

    modified_colln.append(modified_dict)

    pattern_parts = []
    for u in values:
        pattern_parts.append(f"url:value = '{u}'")

    pattern = "[ " + " OR ".join(pattern_parts) + " ]" if pattern_parts else ""

    stix2_output = {
        "type": "bundle",
        "spec_version": "2.1",
        "id": top_idn,
        "objects": [
            {
                "type": modified_dict["entity_type"].lower(),
                "id": idn,
                "created": timestamp,
                "modified": timestamp,
                "labels": [indicator_type] if indicator_type else [],
                "description": description,
                "pattern": pattern
            }
        ]
    }

    all_outputs.append(stix2_output)

print(json.dumps(all_outputs, indent=4))

[
    {
        "type": "bundle",
        "spec_version": "2.1",
        "id": "package-4cc56b6b-748f-440b-9f01-03bcf3ce7c68",
        "objects": [
            {
                "type": "indicator",
                "id": "Indicator-db4a6ffe-61f0-488d-85a1-20bd5e360f37",
                "created": "2015-05-15T09:00:00.000000Z",
                "modified": "2015-05-15T09:00:00.000000Z",
                "labels": [
                    "URL Watchlist"
                ],
                "description": "Sample URL Indicator for this watchlist",
                "pattern": "[ url:value = 'http://example.com/foo/malicious1.html' OR url:value = 'http://example.com/foo/malicious2.html' OR url:value = 'http://example.com/foo/malicious3.html' ]"
            }
        ]
    }
]


In [25]:
#STiX 2.x Parsing
import json
with open("./stix20/enterprise-attack.json") as f:
    item = json.load(f)

stix2_validation_schema = {
    "bundle": {
        "required": ["type", "id", "objects"]
    },
    "common": {
        "required": ["type", "id", "created"]
    },
    "malware": {
        "required": ["name", "labels"]
    },
    "indicator": {
        "required": ["pattern", "valid_from", "labels"]
    },
    "relationship": {
        "required": ["source_ref", "target_ref", "relationship_type"]
    },
    "attack-pattern": {
        "required": ["name"]
    },
    "tool": {
        "required": ["name", "labels"]
    },
    "identity": {
        "required": ["name", "identity_class"]
    },
    "report": {
        "required": ["name", "published", "object_refs"]
    },
    "course-of-action": {
        "required": ["name"]
    }
}

root_obj = item["type"]
is_field = True if item.get("objects") else False
typeof_obj = isinstance(item.get("objects"), list)

ph1 = False
ph2 = False
ph3 = False
ph0 = False

if (root_obj == "bundle") and (is_field == True) and (typeof_obj == True):
    ph1 = True
    if ("type" in item.keys()) and ("id" in item.keys()) and ("objects" in item.keys()):
        ph0 = True

        for _ in range(len(item["objects"])):

            if ("type" in item["objects"][_].keys()) and ("id" in item["objects"][_].keys()):
                ph2 = True

                obj_type = item["objects"][_]["type"]
                if obj_type in stix2_validation_schema:
                    ph3 = True
                    for vals in stix2_validation_schema[obj_type]["required"]:
                        if vals not in item["objects"][_].keys():
                            ph3 = False
                            break
            else:
                ph2 = False
    else:
        ph0 = False
else:
    ph1 = False

validation = False
if ph0 == True and ph1 == True and ph2 == True and ph3 == True:
    validation = True

arr_of_data = []

def version_definition(data):
    try:
        item_out = list(data.keys())
        item_in = list(data["objects"][0].keys())
        merged_list = item_in + item_out

        for indice in range(len(data["objects"])):

            modified_dict_20 = {
                "stix_version": "",
                "entity_type": "",
                "indicator_type": "",
                "description": "",
                "timestamp":"",
                "values": None,
                "validation": False
            }

            if validation == False:
                continue
            else:
                modified_dict_20["validation"] = validation

            if ("stix_version" in merged_list) or ("spec_version" in merged_list) or ("version" in merged_list):
                modified_dict_20["stix_version"] = "2.1"
            else:
                modified_dict_20["stix_version"] = "2.0"

            obj = data["objects"][indice]

            modified_dict_20["entity_type"] = obj.get("type", "Couldn't find entity type")
            modified_dict_20["description"] = obj.get("description", "Description not found")
            modified_dict_20["timestamp"] = obj.get("created", None)

            # -------- MINIMAL CHANGE START --------
            name = obj.get("name")
            alias = obj.get("x_mitre_aliases")
            labels = obj.get("labels")

            if name:
                final = [name]
            elif alias:
                final = alias
            elif labels:
                final = labels if isinstance(labels, list) else [labels]
            else:
                final = [obj.get("type")]

            modified_dict_20["indicator_type"] = final

            refs = obj.get("external_references") or []

            er = []
            for r in refs:
                if "external_id" in r:
                    er.append(r["external_id"])
                elif "url" in r:
                    er.append(r["url"])

            name_list = [name] if name else []

            merged_values = name_list + (alias or []) + er
            modified_dict_20["values"] = merged_values

            arr_of_data.append(modified_dict_20)

    except Exception as e:
        print("Error accessing STiX data!", e)

version_definition(item)
idn = item.get("id")
modi_colln = []

if len(arr_of_data) > 0:
    for inl in range(len(arr_of_data)):

        modified_dict = arr_of_data[inl]
        if inl < len(item["objects"]):
            nest_id = item["objects"][inl].get("id")
        else:
            nest_id = None

        urls = modified_dict["values"] or []

        pattern_parts = []
        for u in urls:
            pattern_parts.append(f"url:value = '{u}'")

        pattern = "[ " + " OR ".join(pattern_parts) + " ]" if pattern_parts else ""

        stix2_output = {
            "type": "bundle",
            "spec_version": modified_dict["stix_version"],
            "id": idn,
            "objects": [
                {
                    "type": modified_dict["entity_type"],
                    "id": nest_id,
                    "created": modified_dict["timestamp"],
                    "modified": modified_dict["timestamp"],
                    "labels": modified_dict["indicator_type"],
                    "description": modified_dict["description"],
                    "pattern": pattern
                }
            ]
        }

        modi_colln.append(stix2_output)

print(json.dumps(modi_colln, indent=4))

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [26]:
#STiX 2.x Parsing
import json
with open("./stix21/enterprise-attack.json") as f:
    item = json.load(f)

stix2_validation_schema = {
    "bundle": {
        "required": ["type", "id", "objects"]
    },
    "common": {
        "required": ["type", "id", "created"]
    },
    "malware": {
        "required": ["name"]
    },
    "indicator": {
        "required": ["pattern", "valid_from", "labels"]
    },
    "relationship": {
        "required": ["source_ref", "target_ref", "relationship_type"]
    },
    "attack-pattern": {
        "required": ["name"]
    },
    "tool": {
        "required": ["name", "labels"]
    },
    "identity": {
        "required": ["name", "identity_class"]
    },
    "report": {
        "required": ["name", "published", "object_refs"]
    },
    "course-of-action": {
        "required": ["name"]
    }
}

root_obj = item["type"]
is_field = True if item.get("objects") else False
typeof_obj = isinstance(item.get("objects"), list)

ph1 = False
ph2 = False
ph3 = False
ph0 = False

if (root_obj == "bundle") and (is_field == True) and (typeof_obj == True):
    ph1 = True
    if ("type" in item.keys()) and ("id" in item.keys()) and ("objects" in item.keys()):
        ph0 = True

        for _ in range(len(item["objects"])):

            if ("type" in item["objects"][_].keys()) and ("id" in item["objects"][_].keys()):
                ph2 = True

                obj_type = item["objects"][_]["type"]

                if obj_type in stix2_validation_schema:
                    ph3 = True
                    for vals in stix2_validation_schema[obj_type]["required"]:
                        if vals not in item["objects"][_].keys():
                            ph3 = False
                            break
            else:
                ph2 = False
    else:
        ph0 = False
else:
    ph1 = False

validation = False
if ph0 == True and ph1 == True and ph2 == True and ph3 == True:
    validation = True

arr_of_data2 = []

def version_definition(data):
    try:
        item_out = list(data.keys())
        item_in = list(data["objects"][0].keys())
        merged_list = item_in + item_out

        for indice in range(len(data["objects"])):

            modified_dict_20 = {
                "stix_version": "",
                "entity_type": "",
                "indicator_type": "",
                "description": "",
                "timestamp":"",
                "values": None,
                "validation": False
            }

            if validation == False:
                continue
            else:
                modified_dict_20["validation"] = validation

            if ("stix_version" in merged_list) or ("spec_version" in merged_list) or ("version" in merged_list):
                modified_dict_20["stix_version"] = "2.1"
            else:
                modified_dict_20["stix_version"] = "2.0"

            obj = data["objects"][indice]

            modified_dict_20["entity_type"] = obj.get("type", "Couldn't find entity type")
            modified_dict_20["description"] = obj.get("description", "Description not found")
            modified_dict_20["timestamp"] = obj.get("created", None)
            name = obj.get("name")
            alias = obj.get("x_mitre_aliases")

            if name:
                final = [name]
            elif alias:
                final = alias
            else:
                final = [obj.get("type")]

            modified_dict_20["indicator_type"] = final
            refs = obj.get("external_references") or []

            er = []
            for r in refs:
                if "external_id" in r:
                    er.append(r["external_id"])
                elif "url" in r:
                    er.append(r["url"])

            name_list = [name] if name else []

            merged_values = name_list + (alias or []) + er
            modified_dict_20["values"] = merged_values

            arr_of_data2.append(modified_dict_20)

    except Exception as e:
        print("Error accessing STiX data!", e)

version_definition(item)

idn = item.get("id")
modi_colln2 = []

if len(arr_of_data2) > 0:
    for inl in range(len(arr_of_data2)):

        modified_dict = arr_of_data2[inl]

        if inl < len(item["objects"]):
            nest_id = item["objects"][inl].get("id")
        else:
            nest_id = None

        urls = modified_dict["values"] or []

        pattern_parts = []
        for u in urls:
            pattern_parts.append(f"url:value = '{u}'")

        pattern = "[ " + " OR ".join(pattern_parts) + " ]" if pattern_parts else ""
        obj_type = modified_dict["entity_type"]

        obj_dict = {
            "type": obj_type,
            "id": nest_id,
            "created": modified_dict["timestamp"],
            "modified": modified_dict["timestamp"],
            "labels": modified_dict["indicator_type"],
            "description": modified_dict["description"]
        }

        if obj_type == "indicator":
            obj_dict["pattern"] = pattern

        stix2_output = {
            "type": "bundle",
            "spec_version": modified_dict["stix_version"],
            "id": idn,
            "objects": [obj_dict]
        }

        modi_colln2.append(stix2_output)

print(json.dumps(modi_colln2, indent=4))

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [27]:
import pandas as pd

stix_1_df = pd.DataFrame(data=modified_colln)
stix_2_df = pd.DataFrame(data=arr_of_data)
stix_3_df = pd.DataFrame(data=arr_of_data2)

In [28]:
stix_1_df

Unnamed: 0,stix_version,entity_type,indicator_type,description,timestamp,values,validation
0,stix-1.2.1,Indicator,URL Watchlist,Sample URL Indicator for this watchlist,2015-05-15T09:00:00.000000Z,"[http://example.com/foo/malicious1.html, http:...",True


In [29]:
stix_2_df

Unnamed: 0,stix_version,entity_type,indicator_type,description,timestamp,values,validation
0,2.1,x-mitre-matrix,[Enterprise ATT&CK],Below are the tactics and technique representi...,2018-10-17T00:14:20.652Z,"[Enterprise ATT&CK, enterprise-attack]",True
1,2.1,course-of-action,[Password Filter DLL Mitigation],Ensure only valid password filters are registe...,2018-10-17T00:14:20.652Z,"[Password Filter DLL Mitigation, T1174, https:...",True
2,2.1,course-of-action,[Space after Filename Mitigation],Prevent files from having a trailing space aft...,2018-10-17T00:14:20.652Z,"[Space after Filename Mitigation, T1151]",True
3,2.1,course-of-action,[HISTCONTROL Mitigation],Prevent users from changing the <code>HISTCONT...,2018-10-17T00:14:20.652Z,"[HISTCONTROL Mitigation, T1148, http://www.aky...",True
4,2.1,course-of-action,[Credentials in Files Mitigation],Establish an organizational policy that prohib...,2018-10-17T00:14:20.652Z,"[Credentials in Files Mitigation, T1081, http:...",True
...,...,...,...,...,...,...,...
24766,2.1,relationship,[relationship],[ToddyCat](https://attack.mitre.org/groups/G10...,2024-01-23T20:36:19.677Z,[https://securelist.com/toddycat-keep-calm-and...,True
24767,2.1,relationship,[relationship],[Bisonal](https://attack.mitre.org/software/S...,2022-01-27T18:04:46.484Z,[https://blog.talosintelligence.com/2020/03/bi...,True
24768,2.1,relationship,[relationship],[ZxShell](https://attack.mitre.org/software/S0...,2019-09-24T14:19:05.322Z,[https://blogs.cisco.com/security/talos/openin...,True
24769,2.1,identity,[The MITRE Corporation],,2017-06-01T00:00:00.000Z,[The MITRE Corporation],True


In [30]:
stix_3_df

Unnamed: 0,stix_version,entity_type,indicator_type,description,timestamp,values,validation
0,2.1,x-mitre-collection,[Enterprise ATT&CK],ATT&CK for Enterprise provides a knowledge bas...,2018-01-17T12:56:55.080Z,[Enterprise ATT&CK],True
1,2.1,x-mitre-matrix,[Enterprise ATT&CK],Below are the tactics and technique representi...,2018-10-17T00:14:20.652Z,"[Enterprise ATT&CK, enterprise-attack]",True
2,2.1,course-of-action,[Password Filter DLL Mitigation],Ensure only valid password filters are registe...,2018-10-17T00:14:20.652Z,"[Password Filter DLL Mitigation, T1174, https:...",True
3,2.1,course-of-action,[Space after Filename Mitigation],Prevent files from having a trailing space aft...,2018-10-17T00:14:20.652Z,"[Space after Filename Mitigation, T1151]",True
4,2.1,course-of-action,[HISTCONTROL Mitigation],Prevent users from changing the <code>HISTCONT...,2018-10-17T00:14:20.652Z,"[HISTCONTROL Mitigation, T1148, http://www.aky...",True
...,...,...,...,...,...,...,...
24767,2.1,relationship,[relationship],[ToddyCat](https://attack.mitre.org/groups/G10...,2024-01-23T20:36:19.677Z,[https://securelist.com/toddycat-keep-calm-and...,True
24768,2.1,relationship,[relationship],[Bisonal](https://attack.mitre.org/software/S...,2022-01-27T18:04:46.484Z,[https://blog.talosintelligence.com/2020/03/bi...,True
24769,2.1,relationship,[relationship],[ZxShell](https://attack.mitre.org/software/S0...,2019-09-24T14:19:05.322Z,[https://blogs.cisco.com/security/talos/openin...,True
24770,2.1,identity,[The MITRE Corporation],,2017-06-01T00:00:00.000Z,[The MITRE Corporation],True


In [31]:
stix_1_df['stix_version'] = "1.2"
stix_1_df["values"] = stix_1_df["values"].apply(lambda x: ",".join(x) if isinstance(x, list) else x)
stix_1_df = stix_1_df.drop_duplicates()
stix_1_df = stix_1_df.dropna()

if len(stix_1_df) == stix_1_df.index.nunique():
    print("All unique rows after removal of duplicates!")
else:
    print("Some duplicate rows existed!")

stix_1_df["validation_flag"] = stix_1_df["validation"].apply(lambda x: 1 if x == True else 0)

feature_array = ["entity_type", "description"]

for feature in feature_array:
    stix_1_df[feature] = stix_1_df[feature].astype(str).str.lower()

stix_1_df["datetime"] = pd.to_datetime(stix_1_df["timestamp"])

stix_1_df["Time"] = stix_1_df["datetime"].dt.time
stix_1_df["Date"] = stix_1_df["datetime"].dt.date

stix_1_df = stix_1_df.drop(columns=["datetime"])

All unique rows after removal of duplicates!


In [32]:
stix_2_df['stix_version'] = "2.0"
stix_2_df["values"] = stix_2_df["values"].apply(lambda x: ",".join(x) if isinstance(x, list) else x)
stix_2_df["indicator_type"] = stix_2_df["indicator_type"].apply(lambda x: ",".join(x) if isinstance(x, list) else x)
stix_2_df = stix_2_df.drop_duplicates()
stix_2_df = stix_2_df.dropna()

if len(stix_2_df) == stix_2_df.index.nunique():
    print("STIX2: All unique rows!")
else:
    print("STIX2: Duplicates existed!")

stix_2_df["validation_flag"] = stix_2_df["validation"].apply(lambda x: 1 if x == True else 0)

feature_array = ["entity_type", "description"]

for feature in feature_array:
    stix_2_df[feature] = stix_2_df[feature].astype(str).str.lower()

stix_2_df["datetime"] = pd.to_datetime(stix_2_df["timestamp"])

stix_2_df["Time"] = stix_2_df["datetime"].dt.time
stix_2_df["Date"] = stix_2_df["datetime"].dt.date

stix_2_df = stix_2_df.drop(columns=["datetime"])

STIX2: All unique rows!


In [33]:
stix_3_df['stix_version'] = "2.0"
stix_3_df["values"] = stix_3_df["values"].apply(lambda x: ",".join(x) if isinstance(x, list) else x)
stix_3_df["indicator_type"] = stix_3_df["indicator_type"].apply(lambda x: ",".join(x) if isinstance(x, list) else x)
stix_3_df = stix_3_df.drop_duplicates()
stix_3_df = stix_3_df.dropna()

if len(stix_3_df) == stix_3_df.index.nunique():
    print("STIX2: All unique rows!")
else:
    print("STIX2: Duplicates existed!")

stix_3_df["validation_flag"] = stix_3_df["validation"].apply(lambda x: 1 if x == True else 0)

feature_array = ["entity_type", "description"]

for feature in feature_array:
    stix_3_df[feature] = stix_3_df[feature].astype(str).str.lower()

stix_3_df["datetime"] = pd.to_datetime(stix_3_df["timestamp"])

stix_3_df["Time"] = stix_3_df["datetime"].dt.time
stix_3_df["Date"] = stix_3_df["datetime"].dt.date

stix_3_df = stix_3_df.drop(columns=["datetime"])

STIX2: All unique rows!


In [34]:
merged_df = pd.concat([stix_1_df, stix_2_df, stix_3_df])

In [35]:
merged_df

Unnamed: 0,stix_version,entity_type,indicator_type,description,timestamp,values,validation,validation_flag,Time,Date
0,1.2,indicator,URL Watchlist,sample url indicator for this watchlist,2015-05-15T09:00:00.000000Z,"http://example.com/foo/malicious1.html,http://...",True,1,09:00:00,2015-05-15
0,2.0,x-mitre-matrix,Enterprise ATT&CK,below are the tactics and technique representi...,2018-10-17T00:14:20.652Z,"Enterprise ATT&CK,enterprise-attack",True,1,00:14:20.652000,2018-10-17
1,2.0,course-of-action,Password Filter DLL Mitigation,ensure only valid password filters are registe...,2018-10-17T00:14:20.652Z,"Password Filter DLL Mitigation,T1174,https://m...",True,1,00:14:20.652000,2018-10-17
2,2.0,course-of-action,Space after Filename Mitigation,prevent files from having a trailing space aft...,2018-10-17T00:14:20.652Z,"Space after Filename Mitigation,T1151",True,1,00:14:20.652000,2018-10-17
3,2.0,course-of-action,HISTCONTROL Mitigation,prevent users from changing the <code>histcont...,2018-10-17T00:14:20.652Z,"HISTCONTROL Mitigation,T1148,http://www.akyl.n...",True,1,00:14:20.652000,2018-10-17
...,...,...,...,...,...,...,...,...,...,...
24767,2.0,relationship,relationship,[toddycat](https://attack.mitre.org/groups/g10...,2024-01-23T20:36:19.677Z,https://securelist.com/toddycat-keep-calm-and-...,True,1,20:36:19.677000,2024-01-23
24768,2.0,relationship,relationship,[bisonal](https://attack.mitre.org/software/s...,2022-01-27T18:04:46.484Z,https://blog.talosintelligence.com/2020/03/bis...,True,1,18:04:46.484000,2022-01-27
24769,2.0,relationship,relationship,[zxshell](https://attack.mitre.org/software/s0...,2019-09-24T14:19:05.322Z,https://blogs.cisco.com/security/talos/opening...,True,1,14:19:05.322000,2019-09-24
24770,2.0,identity,The MITRE Corporation,,2017-06-01T00:00:00.000Z,The MITRE Corporation,True,1,00:00:00,2017-06-01


In [36]:
# merged_df = merged_df.drop(columns=["timestamp"])
# merged_df = merged_df.drop(columns=["validation"])

In [37]:
merged_df

Unnamed: 0,stix_version,entity_type,indicator_type,description,timestamp,values,validation,validation_flag,Time,Date
0,1.2,indicator,URL Watchlist,sample url indicator for this watchlist,2015-05-15T09:00:00.000000Z,"http://example.com/foo/malicious1.html,http://...",True,1,09:00:00,2015-05-15
0,2.0,x-mitre-matrix,Enterprise ATT&CK,below are the tactics and technique representi...,2018-10-17T00:14:20.652Z,"Enterprise ATT&CK,enterprise-attack",True,1,00:14:20.652000,2018-10-17
1,2.0,course-of-action,Password Filter DLL Mitigation,ensure only valid password filters are registe...,2018-10-17T00:14:20.652Z,"Password Filter DLL Mitigation,T1174,https://m...",True,1,00:14:20.652000,2018-10-17
2,2.0,course-of-action,Space after Filename Mitigation,prevent files from having a trailing space aft...,2018-10-17T00:14:20.652Z,"Space after Filename Mitigation,T1151",True,1,00:14:20.652000,2018-10-17
3,2.0,course-of-action,HISTCONTROL Mitigation,prevent users from changing the <code>histcont...,2018-10-17T00:14:20.652Z,"HISTCONTROL Mitigation,T1148,http://www.akyl.n...",True,1,00:14:20.652000,2018-10-17
...,...,...,...,...,...,...,...,...,...,...
24767,2.0,relationship,relationship,[toddycat](https://attack.mitre.org/groups/g10...,2024-01-23T20:36:19.677Z,https://securelist.com/toddycat-keep-calm-and-...,True,1,20:36:19.677000,2024-01-23
24768,2.0,relationship,relationship,[bisonal](https://attack.mitre.org/software/s...,2022-01-27T18:04:46.484Z,https://blog.talosintelligence.com/2020/03/bis...,True,1,18:04:46.484000,2022-01-27
24769,2.0,relationship,relationship,[zxshell](https://attack.mitre.org/software/s0...,2019-09-24T14:19:05.322Z,https://blogs.cisco.com/security/talos/opening...,True,1,14:19:05.322000,2019-09-24
24770,2.0,identity,The MITRE Corporation,,2017-06-01T00:00:00.000Z,The MITRE Corporation,True,1,00:00:00,2017-06-01


In [38]:
merged_df["Threat_Label"] = 0
merged_df
#merged_df.drop(columns=["Threat Label"])

Unnamed: 0,stix_version,entity_type,indicator_type,description,timestamp,values,validation,validation_flag,Time,Date,Threat_Label
0,1.2,indicator,URL Watchlist,sample url indicator for this watchlist,2015-05-15T09:00:00.000000Z,"http://example.com/foo/malicious1.html,http://...",True,1,09:00:00,2015-05-15,0
0,2.0,x-mitre-matrix,Enterprise ATT&CK,below are the tactics and technique representi...,2018-10-17T00:14:20.652Z,"Enterprise ATT&CK,enterprise-attack",True,1,00:14:20.652000,2018-10-17,0
1,2.0,course-of-action,Password Filter DLL Mitigation,ensure only valid password filters are registe...,2018-10-17T00:14:20.652Z,"Password Filter DLL Mitigation,T1174,https://m...",True,1,00:14:20.652000,2018-10-17,0
2,2.0,course-of-action,Space after Filename Mitigation,prevent files from having a trailing space aft...,2018-10-17T00:14:20.652Z,"Space after Filename Mitigation,T1151",True,1,00:14:20.652000,2018-10-17,0
3,2.0,course-of-action,HISTCONTROL Mitigation,prevent users from changing the <code>histcont...,2018-10-17T00:14:20.652Z,"HISTCONTROL Mitigation,T1148,http://www.akyl.n...",True,1,00:14:20.652000,2018-10-17,0
...,...,...,...,...,...,...,...,...,...,...,...
24767,2.0,relationship,relationship,[toddycat](https://attack.mitre.org/groups/g10...,2024-01-23T20:36:19.677Z,https://securelist.com/toddycat-keep-calm-and-...,True,1,20:36:19.677000,2024-01-23,0
24768,2.0,relationship,relationship,[bisonal](https://attack.mitre.org/software/s...,2022-01-27T18:04:46.484Z,https://blog.talosintelligence.com/2020/03/bis...,True,1,18:04:46.484000,2022-01-27,0
24769,2.0,relationship,relationship,[zxshell](https://attack.mitre.org/software/s0...,2019-09-24T14:19:05.322Z,https://blogs.cisco.com/security/talos/opening...,True,1,14:19:05.322000,2019-09-24,0
24770,2.0,identity,The MITRE Corporation,,2017-06-01T00:00:00.000Z,The MITRE Corporation,True,1,00:00:00,2017-06-01,0


In [39]:
import re

def threat_label(row):
    threats = {"malware", "trojan", "attack", "malicious", "phish", "ransom"}
    ind_type = str(row.get("indicator_type", "")).lower()
    ent_type = str(row.get("entity_type", "")).lower()
    
    if ind_type in threats or ent_type in threats:
        return 1
    return 0

def credibility_score(row):
    if row["validation_flag"] != 1:
        return 0

    val_string = str(row.get("values", "")) 
    val_arr = val_string.split(",")
    
    if len(val_arr) > 3:
        description = str(row.get("description", ""))
        if description:
            url_pattern = r'^https?://[^\s/$.?#].[^\s]*'
            if re.match(url_pattern, description):
                return 1
                
    return 0

merged_df["Threat_Label"] = merged_df.apply(threat_label, axis=1)
merged_df["Credibility_Score"] = merged_df.apply(credibility_score, axis=1)

In [40]:
merged_df

Unnamed: 0,stix_version,entity_type,indicator_type,description,timestamp,values,validation,validation_flag,Time,Date,Threat_Label,Credibility_Score
0,1.2,indicator,URL Watchlist,sample url indicator for this watchlist,2015-05-15T09:00:00.000000Z,"http://example.com/foo/malicious1.html,http://...",True,1,09:00:00,2015-05-15,0,0
0,2.0,x-mitre-matrix,Enterprise ATT&CK,below are the tactics and technique representi...,2018-10-17T00:14:20.652Z,"Enterprise ATT&CK,enterprise-attack",True,1,00:14:20.652000,2018-10-17,0,0
1,2.0,course-of-action,Password Filter DLL Mitigation,ensure only valid password filters are registe...,2018-10-17T00:14:20.652Z,"Password Filter DLL Mitigation,T1174,https://m...",True,1,00:14:20.652000,2018-10-17,0,0
2,2.0,course-of-action,Space after Filename Mitigation,prevent files from having a trailing space aft...,2018-10-17T00:14:20.652Z,"Space after Filename Mitigation,T1151",True,1,00:14:20.652000,2018-10-17,0,0
3,2.0,course-of-action,HISTCONTROL Mitigation,prevent users from changing the <code>histcont...,2018-10-17T00:14:20.652Z,"HISTCONTROL Mitigation,T1148,http://www.akyl.n...",True,1,00:14:20.652000,2018-10-17,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...
24767,2.0,relationship,relationship,[toddycat](https://attack.mitre.org/groups/g10...,2024-01-23T20:36:19.677Z,https://securelist.com/toddycat-keep-calm-and-...,True,1,20:36:19.677000,2024-01-23,0,0
24768,2.0,relationship,relationship,[bisonal](https://attack.mitre.org/software/s...,2022-01-27T18:04:46.484Z,https://blog.talosintelligence.com/2020/03/bis...,True,1,18:04:46.484000,2022-01-27,0,0
24769,2.0,relationship,relationship,[zxshell](https://attack.mitre.org/software/s0...,2019-09-24T14:19:05.322Z,https://blogs.cisco.com/security/talos/opening...,True,1,14:19:05.322000,2019-09-24,0,0
24770,2.0,identity,The MITRE Corporation,,2017-06-01T00:00:00.000Z,The MITRE Corporation,True,1,00:00:00,2017-06-01,0,0


In [41]:
#merged_df = merged_df.drop(columns=["Threat Label"])

In [47]:
def merge_values(row):
    return (
        str(row.get("indicator_type", "")).lower() +
        str(row.get("description", "")).lower() +
        str(row.get("values", "")).lower()
    )

merged_df["Combined_Attribute"] = merged_df.apply(merge_values, axis=1)

import re

combined_attribute_list = merged_df["Combined_Attribute"].tolist()
url_pattern = r'^https?://[^\s/$.?#].[^\s]*'
combined_attribute_list_new = [["URL_LINK" if re.match(url_pattern, word) else word for word in line.split()]for line in combined_attribute_list]

In [48]:
#!pip install gensim
#!pip install "numpy<2"
from gensim.models import Word2Vec
model = Word2Vec(sentences=combined_attribute_list_new, vector_size=100, window=5, min_count=1)
merged_df["vector"] = merged_df["entity_type"].apply(lambda x: model.wv[x] if x in model.wv else None)

In [49]:
merged_df["ml_text"] = (
    merged_df["indicator_type"].astype(str) + " " +
    merged_df["description"].astype(str) + " " +
    merged_df["values"].astype(str)
)

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

vectorizer = TfidfVectorizer(lowercase=True, stop_words="english")
X = vectorizer.fit_transform(merged_df["ml_text"])
y = merged_df["Threat_Label"]

train_x, test_x, train_y, test_y = train_test_split(
    X, y, test_size=0.2, random_state=42
)
model = LogisticRegression(max_iter=1000)
model.fit(train_x, train_y)

prediction = model.predict(test_x)
print("Accuracy:", accuracy_score(test_y, prediction))

sample = merged_df["ml_text"].iloc[0]
sample_vec = vectorizer.transform([sample])

result = model.predict(sample_vec)[0]

if result == 1:
    print("Sample classified as: THREAT")
else:
    print("Sample classified as: BENIGN")
    
import pickle

pickle.dump(model, open("model.pkl", "wb"))
pickle.dump(vectorizer, open("vectorizer.pkl", "wb"))

Accuracy: 0.9725297681219971
Sample classified as: BENIGN


In [45]:
import nest_asyncio
nest_asyncio.apply()

import uvicorn
from fastapi import FastAPI, UploadFile, File
import pickle
import json
from threading import Thread

app = FastAPI()

model = pickle.load(open("model.pkl", "rb"))
vectorizer = pickle.load(open("vectorizer.pkl", "rb"))

@app.post("/analyze")
async def analyze(file: UploadFile = File(...)):

    data = await file.read()
    stix = json.loads(data)

    text = str(stix.get("indicator_type","")) + " " + \
           str(stix.get("description","")) + " " + \
           str(stix.get("values",""))

    vec = vectorizer.transform([text])
    result = model.predict(vec)[0]

    return {
        "prediction": int(result),
        "meaning": "THREAT" if result == 1 else "BENIGN"
    }

def run():
    uvicorn.run(app, host="127.0.0.1", port=8000)

Thread(target=run).start()

INFO:     Started server process [6169]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
ERROR:    [Errno 98] error while attempting to bind on address ('127.0.0.1', 8000): address already in use
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
