# OpenCTI Examples Notebook

This notebook takes the examples found in the [OpenCTI client-python GitHub repo](https://github.com/OpenCTI-Platform/client-python/tree/master/examples) and stores them in one central location. This allows analysts to leverage the existing examples or modify them based on needs.

## Table of Contents

* [Set-up](#set-up)
* [Add external reference to report](#add-external-reference-to-report)
* [Add label to malware](#add-label-to-malware)
* [Add label to observable](#add-label-to-observable)
* [Add organization to sector](#add-organization-to-sector)
* [Add tool usage to intrusion set](#add-tool-usage-to-intrusion-set)
* [Ask enrichment of observable](#ask-enrichment-of-observable)
* [CMD line tag latest indicators of threat](#cmd-line-tag-latest-indicators-of-threat)
* [Create campaign attributed to intrusion set](#create-campaign-attributed-to-intrusion-set)
* [Create file with hashes](#create-file-with-hashes)
* [Create incident with TTPs and indicators](#create-incident-with-ttps-and-indicators)
* [Create indicator of campaign](#create-of-indicator-of-campaign)
* [Create intrusion set](#create-intrusion-set)
* [Create IP domain resolution](#create-ip-domain-resolution)
* [Create marking definition](#create-marking-definition)
* [Create observable relationships](#create-observable-relationships)
* [Create process observable](#create-process-observable)
* [Create report with author](#create-report-with-author)
* [Delete intrusion set](#delete-intrusion-set)
* [Delete relation](#delete-relation)
* [Export incident in stix2](#export-incident-in-stix2)
* [Export incidents in stix2](#export-incidents-in-stix2)
* [Export intrusion set in stix2](#export-intrusion-set-in-stix2)
* [Export report in stix2](#export-report-in-stix2)
* [Get all indicators using pagination](#get-all-indicators-using-pagination)
* [Get all reports using pagination](#get-all-reports-using-pagination)
* [Get attack pattern by MITRE ID](#get-attack-pattern-by-mitre-id)
* [Get entity by name or alias](#get-entity-by-name-or-alias)
* [Get malwares of intrusion set](#get-malwares-on-intrusion-set)
* [Get marking definitions](#get-marking-definitions)
* [Get observable exact match](#get-observable-exact-match)
* [Get observables search](#get-observables-search)
* [Get reports about intrusion set](#get-reports-about-intrusion-set)
* [Import stix2 file](#import-stix2-file)
* [Search attack pattern](#search-attack-pattern)
* [Search malware](#search-malware)
* [Update entity attribute](#update-entity-attribute)
* [Update observable attributes](#update-observable-attributes)
* [Upload artifacts](#upload-artifacts)
* [Upload file](#upload-file)
* [Upload file to intrusion set](#upload-file-to-intrusion-set)

### Set-up <a class="anchor" id="set-up"></a>

In [None]:
# Install pycti if not already installed
pip install pycti

In [None]:
# Import pycti and other dependencies
from pycti import OpenCTIApiClient
from dateutil.parser import parse
import argparse
import os
import magic

In [None]:
# Variables
api_url = "http://opencti:4000" # ChangeMe
api_token = "ChangeMe"

In [None]:
# OpenCTI initialization
opencti_api_client = OpenCTIApiClient(api_url, api_token)

### Add external reference to report <a class="anchor" id="add-external-reference-to-report"></a>

In [None]:
# Define the date
date = parse("2019-12-01").strftime("%Y-%m-%dT%H:%M:%SZ")

# Create the report
report = opencti_api_client.report.create(
    name="My test report",
    description="A new threat report.",
    published=date,
    report_class="Threat Report",
)

# Create the external reference
external_reference = opencti_api_client.external_reference.create(
    source_name="Wikipedia", url="https://en.wikipedia.org/wiki/Fancy_Bear"
)

# Add the external reference to the report
opencti_api_client.stix_domain_object.add_external_reference(
    id=report["id"], external_reference_id=external_reference["id"]
)

# Get the report
report = opencti_api_client.report.read(id=report["id"])

# Print
report

### Add label to malware <a class="anchor" id="add-label-to-malware"></a>

In [None]:
# Create the malware
malware = opencti_api_client.malware.create(
    name="My new malware", description="A new evil tool."
)

# Create the tag (if not exists)
label = opencti_api_client.label.create(
    value="Ransomware",
    color="#ffa500",
)

# Add the tag
opencti_api_client.stix_domain_object.add_label(id=malware["id"], label_id=label["id"])

# Print
malware = opencti_api_client.malware.read(id=malware["id"])
malware

### Add label to observable <a class="anchor" id="add-label-to-observable"></a>

In [None]:
# Create the observable
url = opencti_api_client.stix_cyber_observable.create(
    observableData={"type": "url", "value": "http://johndoe.com"}
)
# Create the tag (if not exists)
label = opencti_api_client.label.create(
    value="Suspicious",
    color="#ffa500",
)

# Add the tag
opencti_api_client.stix_cyber_observable.add_label(id=url["id"], label_id=label["id"])

# Read the observable
obs = opencti_api_client.stix_cyber_observable.read(id=url["id"])
obs

### Add organization to sector <a class="anchor" id="add-organization-to-sector"></a>

In [None]:
# Get the sector
sector = opencti_api_client.identity.create(
    type="Sector", name="Banking institutions", description="Banks"
)

# Create the organization
organization = opencti_api_client.identity.create(
    type="Organization", name="BNP Paribas", description="A french bank."
)

# Create the relation
relation = opencti_api_client.stix_core_relationship.create(
    fromType="Organization",
    fromId=organization["id"],
    toType="Sector",
    toId=sector["id"],
    relationship_type="part-of",
    description="BNP Paribas is part of the sector Banking institutions.",
)

# Print
relation

### Add tool usage to intrusion set <a class="anchor" id="add-tool-usage-to-intrusion-set"></a>

In [None]:
tool = opencti_api_client.tool.create(
    name="powashell.exe", description="A new evil tool."
)

print(tool)

intrusion_set = opencti_api_client.intrusion_set.create(name="APT_EVIL")

print(intrusion_set)

# Create the relation
relation = opencti_api_client.stix_core_relationship.create(
    fromType="IntrusionSet",
    fromId=intrusion_set["id"],
    toType="Tool",
    toId=tool["id"],
    relationship_type="uses",
    description="APT_EVIL uses the tool powashell.exe",
)

relation

### Ask enrichment of observable <a class="anchor" id="ask-enrichment-of-observable"></a>

In [None]:
# Define name of INTERNAL_ENRICHMENT Connector which can enrich IPv4 addresses
connector_name = "AbuseIPDB"

# Create the observable
observable = opencti_api_client.stix_cyber_observable.create(
    **{
        "simple_observable_key": "IPv4-Addr.value",
        "simple_observable_value": "8.8.4.4",
    }
)

# Get connector id for defined connector name
connector_list = opencti_api_client.connector.list()
connector_names = []
connector_id = ""
for connector in connector_list:
    connector_names.append(connector["name"])
    if connector["name"] == connector_name:
        connector_id = connector["id"]

if connector_id == "":
    print(f"Connector with name '{connector_name}' could not be found")
    print(f"Running connectors: {connector_names}")
    exit(0)

print("Asking for enrichment... (this might take a bit to finish)")
# Ask for enrichment
work_id = opencti_api_client.stix_cyber_observable.ask_for_enrichment(
    id=observable["id"], connector_id=connector_id
)
# Wait for connector to finish
opencti_api_client.work.wait_for_work_to_finish(work_id)

# Read the observable
obs = opencti_api_client.stix_cyber_observable.read(id=observable["id"])
obs

### CMD line tag latest indicators of threat <a class="anchor" id="cmd-line-tag-latest-indicators-of-threat"></a>

In [None]:
# Run this as a command line script
def main():
    # Parameters
    parser = argparse.ArgumentParser(description="Mandatory arguments.")
    parser.add_argument(
        "--entity-type",
        dest="entity_type",
        default="Intrusion-Set",
        required=True,
        help="Type of the threat (Threat-Actor, Intrusion-Set, Campaign, X-OpenCTI,-Incident, Malware, Tool, Attack-Pattern)",
    )
    parser.add_argument(
        "--name",
        dest="name",
        required=True,
        help="Name of the threat",
    )
    parser.add_argument(
        "--created-after",
        dest="createdAfter",
        help="Indicator created before (ISO date)",
    )
    parser.add_argument(
        "--created-before",
        dest="createdBefore",
        help="Indicator created after (ISO date)",
    )
    parser.add_argument(
        "--tags",
        dest="tags",
        required=True,
        help="Tags to add or remove (separated by ,)",
    )
    parser.add_argument(
        "--operation",
        dest="operation",
        required=True,
        default="add",
        help="Operation (add/remove)",
    )
    args = parser.parse_args()

    entity_type = args.entity_type
    name = args.name
    created_after = parse(args.createdAfter).strftime("%Y-%m-%dT%H:%M:%SZ")
    created_before = parse(args.createdBefore).strftime("%Y-%m-%dT%H:%M:%SZ")
    tags = args.tags.split(",")
    operation = args.operation

    # Resolve the entity
    threat = opencti_api_client.stix_domain_object.read(
        types=[entity_type], filters=[{"key": "name", "values": [name]}]
    )

    if not threat:
        raise ValueError("Cannot find the entity with the name " + name)

    # Resolve all tags
    labels = []
    for tag in tags:
        labels.append(opencti_api_client.label.create(value=tag))

    # Get indicators
    custom_attributes = """
        id
        created_at
    """

    data = {"pagination": {"hasNextPage": True, "endCursor": None}}
    while data["pagination"]["hasNextPage"]:
        after = data["pagination"]["endCursor"]
        data = opencti_api_client.indicator.list(
            first=50,
            after=after,
            customAttributes=custom_attributes,
            filters=[
                {"key": "indicates", "values": [threat["id"]]},
                {"key": "created_at", "values": [created_after], "operator": "gt"},
                {"key": "created_at", "values": [created_before], "operator": "lt"},
            ],
            orderBy="created_at",
            orderMode="asc",
            withPagination=True,
        )
        for indicator in data["entities"]:
            print("[" + indicator["created_at"] + "] " + indicator["id"])
            if operation == "add":
                for label in labels:
                    opencti_api_client.stix_domain_object.add_label(
                        id=indicator["id"], label_id=label["id"]
                    )
            elif operation == "remove":
                for label in labels:
                    opencti_api_client.stix_domain_object.remove_label(
                        id=indicator["id"], label_id=label["id"]
                    )


if __name__ == "__main__":
    main()

### Create campaign attributed to intrusion set <a class="anchor" id="create-campaign-attributed-to-intrusion-set"></a>

In [None]:
# Define the date
date = parse("2019-12-01").strftime("%Y-%m-%dT%H:%M:%SZ")

# Create the Intrusion Set
intrusion_set = opencti_api_client.intrusion_set.create(
    name="My new Intrusion Set",
    description="Evil Cluster",
    first_seen=date,
    last_seen=date,
    update=True,
)
print(intrusion_set)

# Create the Campaign
campaign = opencti_api_client.campaign.create(
    name="My new Campaign",
    description="Large SpearPhishing and intrusions followed by ransomware",
    objective="Financial gain",
    first_seen=date,
    last_seen=date,
    update=True,
)
print(campaign)

# Attribute the Campaign to the Intrusion Set
relation = opencti_api_client.stix_core_relationship.create(
    fromType="Campaign",
    fromId=campaign["id"],
    toType="Intrusion-Set",
    toId=intrusion_set["id"],
    relationship_type="attributed-to",
    first_seen=date,
    last_seen=date,
    description="My new campaign is attributed to my new Intrusion Set, the evil cluster.",
)

# Print
relation

### Create file with hashes <a class="anchor" id="create-file-with-hashes"></a>

In [None]:
# Create observable
observable = opencti_api_client.stix_cyber_observable.create(
    observableData={
        "type": "file",
        "hashes": {
            "md5": "fcd76de79819813b631d949b18b1e996",
            "sha-1": "e08b42d92fa579c095834909b893d49259b158be",
            "sha-256": "7cca822e0fdfeca033762213bf16a3f04d7cac8c345f84a0d740324d97f671c0",
        },
    }
)

# Print
observable

### Create incident with TTPs and indicators <a class="anchor" id="create-incident-with-ttps-and-indicators"></a>

In [None]:
# Define the date
date = parse("2019-12-01").strftime("%Y-%m-%dT%H:%M:%SZ")

# Prepare all the elements of the report
object_refs = []
observable_refs = []

# Create the incident
incident = opencti_api_client.incident.create(
    name="My new incident",
    description="We have been compromised",
    objective="Espionage",
)
object_refs.append(incident["id"])
# Create the associated report
report = opencti_api_client.report.create(
    name="Report about my new incident",
    description="Forensics and investigation report",
    published=date,
    report_class="Internal Report",
)

# Associate the TTPs to the incident

kcp_ia = opencti_api_client.kill_chain_phase.create(
    phase_name="initial-access", kill_chain_name="mitre-attack"
)

ttp1 = opencti_api_client.attack_pattern.create(
    name="Phishing: Spearphishing Attachment",
    description="Adversaries may send spearphishing emails with a malicious attachment in an attempt to gain access to victim systems. Spearphishing attachment is a specific variant of spearphishing. Spearphishing attachment is different from other forms of spearphishing in that it employs the use of malware attached to an email. All forms of spearphishing are electronically delivered social engineering targeted at a specific individual, company, or industry. In this scenario, adversaries attach a file to the spearphishing email and usually rely upon User Execution to gain execution. Spearphishing may also involve social engineering techniques, such as posing as a trusted source.",
    x_mitre_id="T1566.001",
    killChainPhases=[kcp_ia["id"]],
)
ttp1 = opencti_api_client.attack_pattern.read(id=ttp1["id"])
ttp1_relation = opencti_api_client.stix_core_relationship.create(
    fromId=incident["id"],
    toId=ttp1["id"],
    relationship_type="uses",
    description="We saw the attacker use Spearphishing Attachment.",
    start_time=date,
    stop_time=date,
)
# Add kill chain phases to the relation
for kill_chain_phase_id in ttp1["killChainPhasesIds"]:
    opencti_api_client.stix_core_relationship.add_kill_chain_phase(
        id=ttp1_relation["id"], kill_chain_phase_id=kill_chain_phase_id
    )

# Create the observable and indicator and indicates to the relation
# Create the observable
observable_ttp1 = opencti_api_client.stix_cyber_observable.create(
    simple_observable_key="Email-Addr.value",
    simple_observable_value="phishing@mail.com",
    createIndicator=True,
)
# Get the indicator
indicator_ttp1 = observable_ttp1["indicators"][0]
# Indicates the relation Incident => uses => TTP
indicator_ttp1_relation = opencti_api_client.stix_core_relationship.create(
    fromId=indicator_ttp1["id"],
    toId=ttp1_relation["id"],
    relationship_type="indicates",
    description="This email address is the sender of the spearphishing.",
    start_time=date,
    stop_time=date,
)

# Prepare elements for the report
object_refs.extend(
    [
        ttp1["id"],
        ttp1_relation["id"],
        indicator_ttp1["id"],
        indicator_ttp1_relation["id"],
    ]
)
observable_refs.append(observable_ttp1["id"])

kcp_p = opencti_api_client.kill_chain_phase.create(
    phase_name="persistence", kill_chain_name="mitre-attack"
)
kcp_pe = opencti_api_client.kill_chain_phase.create(
    phase_name="privilege-escalation", kill_chain_name="mitre-attack"
)

# Registry Run Keys / Startup Folder
ttp2 = opencti_api_client.attack_pattern.create(
    name="Boot or Logon Autostart Execution: Registry Run Keys / Startup Folder ",
    description="Adversaries may achieve persistence by adding a program to a startup folder or referencing it with a Registry run key. Adding an entry to the 'run keys' in the Registry or startup folder will cause the program referenced to be executed when a user logs in. These programs will be executed under the context of the user and will have the account's associated permissions level.",
    x_mitre_id="T1547.001",
    killChainPhases=[kcp_pe["id"], kcp_p["id"]],
)
ttp2 = opencti_api_client.attack_pattern.read(id=ttp2["id"])
# Create the relation
ttp2_relation = opencti_api_client.stix_core_relationship.create(
    fromId=incident["id"],
    toId=ttp2["id"],
    relationship_type="uses",
    description="We saw the attacker use Registry Run Keys / Startup Folder.",
    start_time=date,
    stop_time=date,
)
# Add kill chain phases to the relation
for kill_chain_phase_id in ttp2["killChainPhasesIds"]:
    opencti_api_client.stix_core_relationship.add_kill_chain_phase(
        id=ttp2_relation["id"], kill_chain_phase_id=kill_chain_phase_id
    )

# Create the observable and indicator and indicates to the relation
# Create the observable
observable_ttp2 = opencti_api_client.stix_cyber_observable.create(
    simple_observable_key="Windows-Registry-Key.key",
    simple_observable_value="HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Run",
    createIndicator=True,
)
# Get the indicator
indicator_ttp2 = observable_ttp2["indicators"][0]
# Indicates the relation Incident => uses => TTP
indicator_ttp2_relation = opencti_api_client.stix_core_relationship.create(
    fromId=indicator_ttp2["id"],
    toId=ttp2_relation["id"],
    relationship_type="indicates",
    description="This registry key is used for persistence of tools.",
    start_time=date,
    stop_time=date,
)
# Elements for the report
object_refs.extend(
    [
        ttp2["id"],
        ttp2_relation["id"],
        indicator_ttp2["id"],
        indicator_ttp2_relation["id"],
    ]
)
observable_refs.append(observable_ttp2["id"])

kcp_c = opencti_api_client.kill_chain_phase.create(
    phase_name="collection", kill_chain_name="mitre-attack"
)
# Data Encrypted
ttp3 = opencti_api_client.attack_pattern.create(
    name=" Archive Collected Data",
    description="An adversary may compress and/or encrypt data that is collected prior to exfiltration. Compressing the data can help to obfuscate the collected data and minimize the amount of data sent over the network. Encryption can be used to hide information that is being exfiltrated from detection or make exfiltration less conspicuous upon inspection by a defender.",
    x_mitre_id="T1560",
    killChainPhases=[kcp_c["id"]],
)
ttp3 = opencti_api_client.attack_pattern.read(id=ttp3["id"])
ttp3_relation = opencti_api_client.stix_core_relationship.create(
    fromId=incident["id"],
    toId=ttp3["id"],
    relationship_type="uses",
    description="We saw the attacker use Data Encrypted.",
    start_time=date,
    stop_time=date,
)
# Add kill chain phases to the relation
for kill_chain_phase_id in ttp3["killChainPhasesIds"]:
    opencti_api_client.stix_core_relationship.add_kill_chain_phase(
        id=ttp3_relation["id"], kill_chain_phase_id=kill_chain_phase_id
    )
# Elements for the report
object_refs.extend([ttp3["id"], ttp3_relation["id"]])

# Add all element to the report
for object_ref in object_refs:
    opencti_api_client.report.add_stix_object_or_stix_relationship(
        id=report["id"], stixObjectOrStixRelationshipId=object_ref
    )
for observable_ref in observable_refs:
    opencti_api_client.report.add_stix_object_or_stix_relationship(
        id=report["id"], stixObjectOrStixRelationshipId=observable_ref
    )
    opencti_api_client.stix_core_relationship.create(
        fromId=observable_ref,
        toId=incident["id"],
        relationship_type="related-to",
        description="This observable is related to the incident.",
        start_time=date,
        stop_time=date,
    )

### Create indicator of campaign <a class="anchor" id="create-of-indicator-of-campaign"></a>

In [None]:
# Define the date
date = parse("2019-12-01").strftime("%Y-%m-%dT%H:%M:%SZ")

# Create the Campaign
campaign = opencti_api_client.campaign.create(
    name="My new Campaign",
    description="Large SpearPhishing and intrusions followed by ransomware",
    objective="Financial gain",
    first_seen=date,
    last_seen=date,
    update=True,
)
campaign

# Create the indicator
indicator = opencti_api_client.indicator.create(
    name="C2 server of the new campaign",
    description="This is the C2 server of the campaign",
    pattern_type="stix",
    pattern="[domain-name:value = 'www.5z8.info' AND domain-name:resolves_to_refs[*].value = '198.51.100.1/32']",
    x_opencti_main_observable_type="IPv4-Addr",
    valid_from=date,
)
indicator

# Create the relation
relation = opencti_api_client.stix_core_relationship.create(
    fromType="Indicator",
    fromId=indicator["id"],
    toType="Campaign",
    toId=campaign["id"],
    relationship_type="indicates",
    first_seen=date,
    last_seen=date,
    description="This is the C2 server of the campaign.",
)
relation

# Create the observables (optional)
observable_1 = opencti_api_client.stix_cyber_observable.create(
    simple_observable_key="Domain-Name.value", simple_observable_value="www.5z8.info"
)
observable_2 = opencti_api_client.stix_cyber_observable.create(
    simple_observable_key="IPv4-Addr.value", simple_observable_value="198.51.100.1"
)
# Create the relation between observables and the indicator
opencti_api_client.indicator.add_stix_cyber_observable(
    id=indicator["id"], stix_cyber_observable_id=observable_1["id"]
)
opencti_api_client.indicator.add_stix_cyber_observable(
    id=indicator["id"], stix_cyber_observable_id=observable_2["id"]
)

### Create intrusion set <a class="anchor" id="create-intrusion-set"></a>

In [None]:
# Create the Intrusion Set
intrusion_set = opencti_api_client.intrusion_set.create(
    name="My new Intrusion Set",
    description="Evil Cluster",
    first_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    last_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    update=True,
)

# Print
intrusion_set

### Create IP domain resolution <a class="anchor" id="create-ip-domain-resolution"></a>

In [None]:
observable_domain = opencti_api_client.stix_cyber_observable.create(
    observableData={
        "type": "domain-name",
        "value": "dns.google",
    }
)

observable_ip = opencti_api_client.stix_cyber_observable.create(
    observableData={
        "type": "ipv4-addr",
        "value": "8.8.8.8",
    }
)

opencti_api_client.stix_cyber_observable_relationship.create(
    fromId=observable_domain["id"],
    toId=observable_ip["id"],
    relationship_type="resolves-to",
)

relationships = opencti_api_client.stix_cyber_observable_relationship.list(
    elementId=observable_domain["id"]
)

relationships

### Create marking definition <a class="anchor" id="create-marking-definition"></a>

In [None]:
# Create the marking definition
marking_definition = opencti_api_client.marking_definition.create(
    definition_type="TLP",
    definition="TLP:BLACK",
    x_opencti_order=10,
    x_opencti_color="#000000",
)

# Print
marking_definition

### Create observable relationships <a class="anchor" id="create-observable-relationships"></a>

In [None]:
observable = opencti_api_client.stix_cyber_observable.create(
    observableData={
        "type": "file",
        "hashes": {
            "md5": "16b3f663d0f0371a4706642c6ac04e42",
            "sha1": "3a1f908941311fc357051b5c35fd2a4e0c834e37",
            "sha256": "bcc70a49fab005b4cdbe0cbd87863ec622c6b2c656987d201adbb0e05ec03e56",
        },
    }
)

process = opencti_api_client.stix_cyber_observable.create(
    observableData={
        "type": "Process",
        "x_opencti_description": "A process",
        "cwd": "C:\Process.exe",
        "pid": 19000,
        "command_line": "--run exe",
        "x_opencti_score": 90,
    }
)

author = opencti_api_client.identity.create(
    name="John's Work",
    description="Automated Toolkit",
    type="Organization",
)

opencti_api_client.stix_core_relationship.create(
    toId=observable["id"],
    fromId=process["id"],
    confidence=90,
    createdBy=author["id"],
    relationship_type="related-to",
    description="Relation between the File and Process objects",
)

### Create process observable <a class="anchor" id="create-process-observable"></a>

In [None]:
process = opencti_api_client.stix_cyber_observable.create(
    observableData={
        "type": "Process",
        "x_opencti_description": "A process",
        "cwd": "C:\Process.exe",
        "pid": 19000,
        "command_line": "--run exe",
        "x_opencti_score": 90,
    }
)

process

### Create report with author <a class="anchor" id="create-report-with-author"></a>

In [None]:
# Define the date
date = parse("2019-12-01").strftime("%Y-%m-%dT%H:%M:%SZ")

# Create the author (if not exists)
organization = opencti_api_client.identity.create(
    type="Organization",
    name="My organization",
    alias=["my-organization"],
    description="A new organization.",
)

# Create the report
report = opencti_api_client.report.create(
    name="My new report of my organization",
    description="A report wrote by my organization",
    published=date,
    report_types=["internal-report"],
    createdBy=organization["id"],
)

# Print
report

### Delete intrusion set <a class="anchor" id="delete-intrusion-set"></a>

In [None]:
opencti_api_client.intrusion_set.create(
    name="EvilSET123",
    description="Evil Cluster",
    first_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    last_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    update=True,
)

# Get the intrusion set APT28
intrusion_set = opencti_api_client.intrusion_set.read(
    filters=[{"key": "name", "values": ["EvilSET123"]}]
)

# Delete the intrusion set
opencti_api_client.stix_domain_object.delete(id=intrusion_set["id"])

### Delete relation <a class="anchor" id="delete-relation"></a>

In [None]:
intrusion_set = opencti_api_client.intrusion_set.create(name="EvilSET123")

malware = opencti_api_client.malware.create(
    name="TheWorm", description="A new evil worm."
)

opencti_api_client.stix_core_relationship.create(
    fromId=intrusion_set["id"],
    fromTypes=["Intrusion-Set"],
    toId=malware["id"],
    toTypes=["Malware"],
    relationship_type="uses",
)

# Get the relations between APT28 and DealersChoice
relations = opencti_api_client.stix_core_relationship.list(
    fromId=intrusion_set["id"],
    fromTypes=["Intrusion-Set"],
    toId=malware["id"],
    toTypes=["Malware"],
    relationship_type="uses",
)

# Delete the relations
for relation in relations:
    opencti_api_client.stix_core_relationship.delete(id=relation["id"])

### Export incident in stix2 <a class="anchor" id="export-incident-in-stix2"></a>

In [None]:
opencti_api_client.incident.create(name="My new incident")

# Get the incident created in the create_incident_with_ttps_and_indicators.py
incident = opencti_api_client.incident.read(
    filters=[{"key": "name", "values": ["My new incident"]}]
)

# Create the bundle
bundle = opencti_api_client.stix2.export_entity("Incident", incident["id"], "full")
json_bundle = json.dumps(bundle, indent=4)

# Write the bundle
f = open("My new incident.json", "w")
f.write(json_bundle)
f.close()

### Export incidents in stix2 <a class="anchor" id="export-incidents-in-stix2"></a>

In [None]:
# Create the bundle
bundle = opencti_api_client.stix2.export_list("Incident")
json_bundle = json.dumps(bundle, indent=4)

# Write the bundle
f = open("Incidents.json", "w")
f.write(json_bundle)
f.close()

### Export intrusion set in stix2 <a class="anchor" id="export-intrusion-set-in-stix2"></a>

In [None]:
# Create the bundle
bundle = opencti_api_client.stix2.export_entity(
    "Intrusion-Set", "4ecc2f52-d10a-4e10-bb9b-1ab5df2b282e", "full"
)
json_bundle = json.dumps(bundle, indent=4)

# Write the bundle
f = open("intrusion-set.json", "w")
f.write(json_bundle)
f.close()

### Export report in stix2 <a class="anchor" id="export-report-in-stix2"></a>

In [None]:
# Create the bundle
bundle = opencti_api_client.stix2.export_entity(
    "Report", "report--2dc2b918-a0a3-569e-a305-f784486003c2", "full"
)
json_bundle = json.dumps(bundle, indent=4)

# Write the bundle
f = open("report.json", "w")
f.write(json_bundle)
f.close()

### Get all indicators using pagination <a class="anchor" id="get-all-indicators-using-pagination"></a>

In [None]:
# Get all reports using the pagination
custom_attributes = """
    id
    pattern_type
    created
"""

final_indicators = []
data = {"pagination": {"hasNextPage": True, "endCursor": None}}
while data["pagination"]["hasNextPage"]:
    after = data["pagination"]["endCursor"]
    if after:
        print("Listing indicators after " + after)
    data = opencti_api_client.indicator.list(
        first=50,
        after=after,
        customAttributes=custom_attributes,
        withPagination=True,
        orderBy="created_at",
        orderMode="asc",
    )
    final_indicators += data["entities"]

for indicator in final_indicators:
    print("[" + indicator["created"] + "] " + indicator["id"])

### Get all reports using pagination <a class="anchor" id="get-all-reports-using-pagination"></a>

In [None]:
# Get all reports using the pagination
custom_attributes = """
    id
    name
    published
    description
"""

final_reports = []
data = {"pagination": {"hasNextPage": True, "endCursor": None}}
while data["pagination"]["hasNextPage"]:
    after = data["pagination"]["endCursor"]
    if after:
        print("Listing reports after " + after)
    data = opencti_api_client.report.list(
        first=50,
        after=after,
        customAttributes=custom_attributes,
        withPagination=True,
        orderBy="created_at",
        orderMode="asc",
    )
    final_reports = final_reports + data["entities"]

# Print
for report in final_reports:
    print("[" + report["published"] + "] " + report["name"])

### Get attack pattern by MITRE ID <a class="anchor" id="get-attack-pattern-by-mitre-id"></a>

In [None]:
# Get the Attack-Pattern T1514
attack_pattern = opencti_api_client.attack_pattern.read(
    filters=[{"key": "x_mitre_id", "values": ["T1514"]}]
)

# Print
attack_pattern

### Get entity by name or alias <a class="anchor" id="get-entity-by-name-or-alias"></a>

In [None]:
# Get the ANSSI entity
anssi = opencti_api_client.stix_domain_object.get_by_stix_id_or_name(name="ANSSI")

# Print
anssi

### Get malwares of intrusion set <a class="anchor" id="get-malwares-on-intrusion-set"></a>

In [None]:
# Create the Intrusion Set
opencti_api_client.intrusion_set.create(
    name="APT28",
    description="Evil hackers",
    first_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    last_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    update=True,
)

# Get the intrusion set APT28
intrusion_set = opencti_api_client.intrusion_set.read(
    filters=[{"key": "name", "values": ["APT28"]}]
)

# Get the relations from APT28 to malwares
stix_relations = opencti_api_client.stix_core_relationship.list(
    fromId=intrusion_set["id"], toTypes=["Malware"]
)

# Print
for stix_relation in stix_relations:
    print("[" + stix_relation["to"]["stix_id"] + "] " + stix_relation["to"]["name"])

### Get marking definitions <a class="anchor" id="get-marking-definitions"></a>

In [None]:
# Get all marking definitions
marking_definitions = opencti_api_client.marking_definition.list()

# Print
for marking_definition in marking_definitions:
    print(
        "["
        + marking_definition["definition_type"]
        + "] "
        + marking_definition["definition"]
    )

### Get observable exact match <a class="anchor" id="get-observable-exact-match"></a>

In [None]:
# Exact IP match
opencti_api_client.stix_cyber_observable.create(
    simple_observable_key="IPv4-Addr.value", simple_observable_value="110.172.180.180"
)
print("IP ADDRESS")
observable = opencti_api_client.stix_cyber_observable.read(
    filters=[{"key": "value", "values": ["110.172.180.180"]}]
)
observable

# Exact File name match
opencti_api_client.stix_cyber_observable.create(
    simple_observable_key="File.name", simple_observable_value="activeds.dll"
)
print("FILE NAME")
observable = opencti_api_client.stix_cyber_observable.read(
    filters=[{"key": "name", "values": ["activeds.dll"]}]
)
observable

# Exact File name match
opencti_api_client.stix_cyber_observable.create(
    simple_observable_key="File.hashes.MD5",
    simple_observable_value="3aad33e025303dbae12c12b4ec5258c1",
)
print("FILE MD5")
observable = opencti_api_client.stix_cyber_observable.read(
    filters=[{"key": "hashes_MD5", "values": ["3aad33e025303dbae12c12b4ec5258c1"]}]
)
observable

### Get observables search <a class="anchor" id="get-observables-search"></a>

In [None]:
# Search
opencti_api_client.stix_cyber_observable.create(
    simple_observable_key="IPv4-Addr.value", simple_observable_value="65.89.87.4"
)
observables = opencti_api_client.stix_cyber_observable.list(search="65.89.87.4")

observables

### Get reports about intrusion set <a class="anchor" id="get-reports-about-intrusion-set"></a>

In [None]:
# Create the Intrusion Set
opencti_api_client.intrusion_set.create(
    name="Sandworm Team",
    description="Evil hackers",
    first_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    last_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    update=True,
)

# Get the intrusion set Sandworm
intrusion_set = opencti_api_client.intrusion_set.read(
    filters=[{"key": "name", "values": ["Sandworm Team"]}]
)

# Get all reports
reports = opencti_api_client.report.list(
    filters=[{"key": "objectContains", "values": [intrusion_set["id"]]}],
    orderBy="published",
    orderMode="asc",
)

# Print
if not reports:
    print(f"No {intrusion_set['name']} reports available")
else:
    for report in reports:
        print(
            "["
            + report["standard_id"]
            + "] "
            + report["name"]
            + " ("
            + report["published"]
            + ")"
        )

### Import stix2 file <a class="anchor" id="import-stix2-file"></a>

In [None]:
# File to import
file_to_import = "./test.json"

# Import the bundle
opencti_api_client.stix2.import_bundle_from_file(file_to_import, True)

### Search attack pattern <a class="anchor" id="search-attack-pattern"></a>

In [None]:
# Search
attack_patterns = opencti_api_client.attack_pattern.list(search="localgroup")

# Print
attack_patterns

### Search malware <a class="anchor" id="search-malware"></a>

In [None]:
# Search
malwares = opencti_api_client.malware.list(search="windows")

# Print
malwares

### Update entity attribute <a class="anchor" id="update-entity-attribute"></a>

In [None]:
# Create the Intrusion Set
opencti_api_client.intrusion_set.create(
    name="APT28",
    description="Evil hackers",
    first_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    last_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    update=True,
)

# Get the intrusion set APT28
intrusion_set = opencti_api_client.intrusion_set.read(
    filters=[{"key": "name", "values": ["APT28"]}]
)

# Update the description
opencti_api_client.stix_domain_object.update_field(
    id=intrusion_set["id"], input={"key": "description", "value": "This is APT28!"}
)

### Update observable attributes <a class="anchor" id="update-observable-attributes"></a>

In [None]:
# Create an observable
observable = opencti_api_client.stix_cyber_observable.create(
    observableData={
        "type": "file",
        "x_opencti_description": "A malicious file",
        "hashes": {
            "MD5": "348aefbb6142d4fff8cf26fc5dc97f8a",
            "SHA-1": "486e7e66c3a098c1c8f42e26c78f259d6b3108a6",
            "SHA-256": "42c5e1fe01e689e550ba700b3c5dd4a04a84798c1868ba53c02abcbe21491515",
        },
        #        "x_opencti_score": "90",
    }
)

# Update the fields

reference = opencti_api_client.external_reference.create(
    source_name="Jen", url="https://janedoe.com", description="Sample Report"
)

opencti_api_client.stix_cyber_observable.add_external_reference(
    id=observable["id"], external_reference_id=reference["id"]
)

label = opencti_api_client.label.create(
    value="Suspicious",
    color="#ffa500",
)

opencti_api_client.stix_cyber_observable.add_label(
    id=observable["id"], marking_definition_id=label["id"]
)

author = opencti_api_client.identity.create(
    name="John's Work",
    description="Automated Toolkit",
    type="Organization",
)

opencti_api_client.stix_cyber_observable.update_created_by(
    id=observable["id"], identity_id=author["id"]
)

### Upload artifacts <a class="anchor" id="upload-artifacts"></a>

In [None]:
# Run this as a command line script
def main():

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-f", "--file", required=True, help="The path of the Artifact(s) to upload."
    )
    parser.add_argument(
        "-d", "--description", default="", help="The description for the Artifact."
    )
    parser.add_argument(
        "-l", "--label", default="", help="Comma separated labels for the Artifact."
    )
    parser.add_argument(
        "-r",
        "--related",
        default=None,
        help="Standard id of an object related to the Artifact.",
    )

    args = parser.parse_args()

    if os.path.isdir(args.file):
        for currentpath, folders, files in os.walk(args.file):
            for filep in files:
                upload(
                    os.path.join(currentpath, filep),
                    args.description,
                    args.label,
                    args.related,
                )

    else:
        upload(args.file, args.description, args.label, args.related)


def upload(file_path, description, labels, related_standard_id):

    file_data = b""
    with open(file_path, "rb") as f:
        file_data = f.read()

    mime_type = magic.from_buffer(file_data, mime=True)

    # Upload the file, returns the query response for the file upload
    kwargs = {
        "file_name": os.path.basename(file_path),
        "data": file_data,
        "mime_type": mime_type,
        "x_opencti_description": "",
    }

    if description:
        kwargs["x_opencti_description"] = description

    response = OPENCTI_API_CLIENT.stix_cyber_observable.upload_artifact(**kwargs)
    print(response)

    for label_str in labels.split(","):
        if label_str:
            label = OPENCTI_API_CLIENT.label.create(value=label_str)
            OPENCTI_API_CLIENT.stix_cyber_observable.add_label(
                id=response["id"], label_id=label["id"]
            )

    if related_standard_id:
        OPENCTI_API_CLIENT.stix_core_relationship.create(
            fromId=related_standard_id,
            toId=response["standard_id"],
            relationship_type="related-to",
            description=f"Related to {related_standard_id}",
        )


if __name__ == "__main__":
    main()

### Upload file <a class="anchor" id="upload-file"></a>

In [None]:
# Upload the file
file = opencti_api_client.upload_file(
    file_name="./upload_file_example.pdf",
)

file

### Upload file to intrusion set <a class="anchor" id="upload-file-to-intrusion-set"></a>

In [None]:
# Create the Intrusion Set
intrusion_set = opencti_api_client.intrusion_set.create(
    name="My new Intrusion Set",
    description="Evil Cluster",
    first_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    last_seen=datetime.date.today().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
    update=True,
)

# Print
intrusion_set

# Upload the file
file = opencti_api_client.stix_domain_object.add_file(
    id=intrusion_set["id"],
    file_name="./upload_file_example.pdf",
)
file