
# CYTAXII2, Cyware's TAXII 2.x Client

cytaxii2 is a pip library [for python 3] by Cyware that can be used to connect to and, discover, get collections, poll and inbox data from and to a TAXII 2.x server. This supports stix/taxii 2, 2.1

This library supports python3 and can be installed by "pip3 install cytaxii2". After installation the library can be imported as follows:

In [62]:
!pip3 install requests



In [63]:
import json

In [64]:
import requests

"""
Cyware TAXII 2.0/ 2.1 Client
TAXII Supported Version: 2.0 & 2.1
"""


class cytaxii2(object):
    def __init__(self, discovery_url, username, password, version=2.1, **kwargs):
        """
        This method is used to initialize values throughout the class
        :param discovery_url: Enter the TAXII 2.1 Discovery URL
        :param username: Enter the username to to authenticate with
        :param password: Enter the password to to authenticate with
        """
        if version == 2.1:
            self.headers = {
                'Content-Type': 'application/taxii+json;version=2.1',
                'User-Agent': 'cyware.taxii2client',
                'Accept': 'application/taxii+json;version=2.1'
            }
        elif version == 2.0:
            self.headers = {
                'Content-Type': 'application/vnd.oasis.stix+json; version=2.0',
                'User-Agent': 'cyware.httpclient',
                'Accept': 'application/vnd.oasis.taxii+json; version=2.0'
            }
        else:
            raise SyntaxError("Invalid version entered. Only TAXII versions 2.0 and 2.1 are supported")

        self.discovery_url = discovery_url
        self.auth = (username, password)
        self.collections = "collections"
        self.objects = "objects"

    def request_handler(self, method, url, json_data=None, query_params=None, **kwargs):
        """
        This method is used to handle all TAXII requests
        :param query_params: Any query params to pass
        :param method: Enter the HTTP method to use
        :param url: Enter the URL to make the request to
        :param json_data: Enter the json data to pass as a payload
        """
        try:
            if method == 'GET':
                response = requests.get(url=url, data=json_data, headers=self.headers, auth=self.auth,
                                        params=query_params)
            elif method == 'POST':
                response = requests.post(url=url, data=json_data, headers=self.headers, auth=self.auth,
                                         params=query_params)
            else:
                return {
                    'response': 'Unsupported Method requested',
                    'status': False,
                    'status_code': 405
                }

            status_code = response.status_code

            if response.ok:
                response_json = response.json()
                status = True
            else:
                response_json = response.text
                status = False

        except Exception as e:
            status_code = 'EXCEPTION'
            response_json = str(e)
            status = False

        return {
            'response': response_json,
            'status': status,
            'status_code': status_code
        }

    def discovery_request(self, **kwargs):
        """
        This method is used to make a request to the TAXII discovery URL
        """
        response = self.request_handler(method='GET', url=self.discovery_url)
        return response

    def root_discovery(self, **kwargs):
        """
        This method is used to make a root discovery request to the TAXII URL
        """
        discover_response = self.discovery_request()
        if discover_response['status_code'] == 200:
            api_root = discover_response['response']['default']
            response = self.request_handler(method='GET', url=api_root)
            return response
        else:
            return discover_response

    def collection_request(self, **kwargs):
        """
        This method is used to make a request to the TAXII server to get all collections on the server
        """
        collection_url = "{0}/{1}/".format(self.discovery_url.rstrip("/"), self.collections)
        response = self.request_handler(method='GET', url=collection_url)
        return response

    def collection_data_request(self, collection_id, **kwargs):
        """
        This method is used to get data about a particular collection on the TAXII server
        :param collection_id: Enter the collection ID
        """
        poll_url = "{0}/{1}/{2}/".format(self.discovery_url.rstrip("/"), self.collections, collection_id)
        response = self.request_handler(method='GET', url=poll_url)
        return response

    def poll_request(self, collection_id, added_after=None, limit=None, object_id=None, next=None, object_type=None,
                     **kwargs):
        """
        This method is used to poll data from a particular collection
        :param object_type: Enter the indicator type to retrieve
        :param next: Enter the next integer if the data has been paginated
        :param object_id: Enter a specific object to retrieve
        :param limit: Enter response limit
        :param added_after: Enter the date to poll from, polls all data if left blank
        :param collection_id: Enter the collection ID
        """
        params = {
            'added_after': added_after,
            'limit': limit,
            'next': next,
            'match[id]': object_id,
            'match[type]': object_type
        }
        poll_url = "{0}/{1}/{2}/{3}/".format(self.discovery_url.rstrip("/"), self.collections, collection_id, self.objects)
        response = self.request_handler(method='GET', url=poll_url, query_params=params)
        return response


After importing, we have to create an object with the TAXII URL, username and password and TAXII version [2.0/ 2.1], as follows:

In [65]:
cytaxii_object = cytaxii2(
    discovery_url='discovery_url',
             username='username',
             password='password',
             version=2.1, verify=False)

After the object is created we can call the methods present in the library. Each of the methods are highlighted below.

The first method is where we can perform a TAXII discovery request and can be done as follows.

##### Arguments
None

In [66]:
discovery_response = cytaxii_object.discovery_request()
print(json.dumps(discovery_response, indent=4))

{
    "response": {
        "title": "ThreatFeed TAXII 2.1 API Root",
        "description": "TAXII 2.1 service for ThreatFeed user",
        "versions": [
            "application/taxii+json;version=2.1"
        ],
        "max_content_length": 10000000
    },
    "status": true,
    "status_code": 200
}


Next, we can perform a collection request, where we request all STIX data collections from the TAXII server. We must note in TAXII all data is held inside a collection and a list of collections can be requested as follows.

##### Arguments
None

In [67]:
collections = cytaxii_object.collection_request()
print(json.dumps(collections, indent=4))

{
    "response": {
        "collections": [
            {
                "id": "46cc884e-fd37-4436-95b3-ac73710df3dc",
                "title": "ThreatFox",
                "description": "ThreatFox is a project operated by abuse.ch. The purpose of the project is to collect and share indicators of compromise (IOCs), helping IT-security researchers and threat analysts protecting their constituency and customers from cyber threats.",
                "can_read": true,
                "can_write": false,
                "media_types": [
                    "application/taxii+json;version=2.1"
                ]
            },
            {
                "id": "52ab6ed4-83d0-42fb-891c-708221648181",
                "title": "default",
                "description": "Default collection of ctix",
                "can_read": true,
                "can_write": true,
                "media_types": [
                    "application/taxii+json;version=2.1"
                ]
            },
    

Next, we can also get data about a particular TAXII collection URL and this can be done as follow.

##### Arguments:
- collection_id: Enter the collection ID to get details of __[mandatory]__

In [68]:
collection_data = cytaxii_object.collection_data_request(collection_id="46cc884e-fd37-4436-95b3-ac73710df3dc")
print(json.dumps(collection_data, indent=4))

{
    "response": {
        "id": "46cc884e-fd37-4436-95b3-ac73710df3dc",
        "title": "ThreatFox",
        "description": "ThreatFox is a project operated by abuse.ch. The purpose of the project is to collect and share indicators of compromise (IOCs), helping IT-security researchers and threat analysts protecting their constituency and customers from cyber threats.",
        "can_read": true,
        "can_write": false,
        "media_types": [
            "application/taxii+json;version=2.1"
        ]
    },
    "status": true,
    "status_code": 200
}


Next, we can get data from a TAXII collection. This is one of the most important use cases, where we can poll data from a TAXII server.

##### Arguments:
- collection_id: Enter the collection ID to get details of __[mandatory]__
- added_after: Enter a datetime string to parse. Data will be returned only after this date till current data. Defaults to all data
- limit: Enter the number of responses to return
- object_id: Enter the object_id. This param will limit the response to only this object in the response
- object_type: Enter the object type to return
- next: If the data is paginated, the "next" param can be used to get additional data


In [69]:
poll_response = cytaxii_object.poll_request(collection_id='ce1d5d4a-4ea8-4479-a0c0-bc7dbdffe6e1', limit=10, added_after="01-01-2024")
print(json.dumps(poll_response, indent=4))

{
    "response": {
        "objects": [
            {
                "id": "ipv4-addr--beea347f-d085-4328-99a5-2b4d74198ecf",
                "type": "ipv4-addr",
                "value": "87.121.113.150",
                "created": "2024-01-01T20:46:28.761Z",
                "modified": "2024-01-01T20:46:28.761Z",
                "extensions": {
                    "extension-definition--5c4546f1-7e76-4b4a-844c-64a1f722c788": {
                        "extension_type": "toplevel-property-extension"
                    }
                },
                "spec_version": "2.1",
                "object_marking_refs": [
                    "marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da"
                ],
                "x_ctix_confidence_score": 0
            },
            {
                "id": "ipv4-addr--383ced71-5496-4687-9ae5-986ca519042d",
                "type": "ipv4-addr",
                "value": "20.26.125.189",
                "created": "2024-01-01T20:46:28.