In [9]:
import pgmpy
import stix2
import json

In [10]:
from typing import Any
def read_flow_file(name)-> Any:
    """
    Read a flow file and return the data.

    Parameters:
    name (str): The name of the flow file to read.

    Returns:
    bundle (stix2.Bundle): The parsed STIX bundle data.

    """
    with open(name, "r", encoding="utf-8") as file:
        data = json.load(file)
        return data

In [None]:
custom_properties={
                        "x_foo": "bar"
                    }

In [11]:
data = read_flow_file("attack-flow-example.json")

In [15]:
bundle = stix2.parse(data)#allow_custom=True)
bundle

InvalidValueError: Invalid value for Bundle 'objects': customized attack-flow object found

In [22]:
import json
from stix2 import CustomObject, properties, parse

# Load the Attack Flow schema
schema_path = "attack-flow-schema-2.0.0.json"
with open(schema_path, 'r') as f:
    attack_flow_schema = json.load(f)

import stix2
import stix2.properties

# Define the extension ID
ATTACK_FLOW_EXTENSION_DEFINITION_ID = 'extension-definition--fb9c968a-745b-4ade-9b25-c324172197f4'

# Custom Observable for Attack Flow
@stix2.v21.CustomObservable(
    'attack-flow', [
        ('name', stix2.properties.StringProperty(required=True)),
        ('description', stix2.properties.StringProperty(required=False)),
        ('scope', stix2.properties.StringProperty(required=True)),
        ('start_refs', stix2.properties.ListProperty(stix2.properties.StringProperty(), required=True))
    ], ['name', 'scope', 'start_refs'], ATTACK_FLOW_EXTENSION_DEFINITION_ID,
)
class AttackFlow:
    pass

# Custom Observable for Attack Action
@stix2.v21.CustomObservable(
    'attack-action', [
        ('name', stix2.properties.StringProperty(required=True)),
        ('tactic_id', stix2.properties.StringProperty(required=False)),
        ('tactic_ref', stix2.properties.StringProperty(required=False)),
        ('technique_id', stix2.properties.StringProperty(required=False)),
        ('technique_ref', stix2.properties.StringProperty(required=False)),
        ('description', stix2.properties.StringProperty(required=False)),
        ('execution_start', stix2.properties.TimestampProperty(required=False)),
        ('execution_end', stix2.properties.TimestampProperty(required=False)),
        ('command_ref', stix2.properties.StringProperty(required=False)),
        ('asset_refs', stix2.properties.ListProperty(stix2.properties.StringProperty(), required=False)),
        ('effect_refs', stix2.properties.ListProperty(stix2.properties.StringProperty(), required=False))
    ], ['name'], ATTACK_FLOW_EXTENSION_DEFINITION_ID,
)
class AttackAction:
    pass

# Custom Observable for Attack Condition
@stix2.v21.CustomObservable(
    'attack-condition', [
        ('description', stix2.properties.StringProperty(required=True)),
        ('pattern', stix2.properties.StringProperty(required=False)),
        ('pattern_type', stix2.properties.StringProperty(required=False)),
        ('pattern_version', stix2.properties.StringProperty(required=False)),
        ('on_true_refs', stix2.properties.ListProperty(stix2.properties.StringProperty(), required=False)),
        ('on_false_refs', stix2.properties.ListProperty(stix2.properties.StringProperty(), required=False))
    ], ['description'], ATTACK_FLOW_EXTENSION_DEFINITION_ID,
)
class AttackCondition:
    pass

# Custom Observable for Attack Operator
@stix2.v21.CustomObservable(
    'attack-operator', [
        ('operator', stix2.properties.StringProperty(required=True)),
        ('effect_refs', stix2.properties.ListProperty(stix2.properties.StringProperty(), required=True))
    ], ['operator', 'effect_refs'], ATTACK_FLOW_EXTENSION_DEFINITION_ID,
)
class AttackOperator:
    pass

# Custom Observable for Attack Asset
@stix2.v21.CustomObservable(
    'attack-asset', [
        ('name', stix2.properties.StringProperty(required=True)),
        ('description', stix2.properties.StringProperty(required=False)),
        ('object_ref', stix2.properties.StringProperty(required=False))
    ], ['name'], ATTACK_FLOW_EXTENSION_DEFINITION_ID,
)
class AttackAsset:
    pass

# Create an instance of Attack Flow
attack_flow = AttackFlow(
    id='attack-flow--e9ec3a4b-f787-4e81-a3d9-4cfe017ebc2f',
    name='Example Attack Flow',
    description='This is an example of an Attack Flow.',
    scope='incident',
    start_refs=[
        'attack-action--37345417-3ee0-4e11-b421-1d4be68e6f15'
    ]
)

# Create an instance of Attack Action
attack_action = AttackAction(
    id='attack-action--37345417-3ee0-4e11-b421-1d4be68e6f15',
    name='Example Attack Action',
    tactic_id='TA0001',
    technique_id='T0001',
    description='This is an example of an Attack Action.',
    execution_start='2022-01-01T00:00:00Z',
    execution_end='2022-01-01T01:00:00Z'
)

# Create an instance of Attack Condition
attack_condition = AttackCondition(
    id='attack-condition--7e809f5b-319a-4b3f-82fe-e4dc09af5088',
    description='Example Condition',
    pattern='[file:hashes.md5 = "d41d8cd98f00b204e9800998ecf8427e"]',
    pattern_type='stix',
    pattern_version='2.1'
)

# Create an instance of Attack Operator
attack_operator = AttackOperator(
    id='attack-operator--609d7adf-a3d2-44e8-82de-4b30e3fb97be',
    operator='AND',
    effect_refs=[
        'attack-condition--7e809f5b-319a-4b3f-82fe-e4dc09af5088'
    ]
)

# Create an instance of Attack Asset
attack_asset = AttackAsset(
    id='attack-asset--f7edf4aa-29ec-47aa-b4f6-c42dfbe2ac20',
    name='Example Asset',
    description='This is an example of an Attack Asset.',
    object_ref='identity--b1da8c3e-34d8-470f-9d2b-392e275f1f7d'
)

# Serialize and print the instances
print(attack_flow.serialize(pretty=True))
print(attack_action.serialize(pretty=True))
print(attack_condition.serialize(pretty=True))
print(attack_operator.serialize(pretty=True))
print(attack_asset.serialize(pretty=True))




DuplicateRegistrationError: A(n) STIX Object with type 'attack-flow' already exists and cannot be registered again