diff --git a/internal-enrichment/urlscan-enrichment/.dockerignore b/internal-enrichment/urlscan-enrichment/.dockerignore new file mode 100644 index 0000000000..c7b649d46a --- /dev/null +++ b/internal-enrichment/urlscan-enrichment/.dockerignore @@ -0,0 +1,4 @@ +src/config.yml +src/__pycache__ +src/logs +src/*.gql diff --git a/internal-enrichment/urlscan-enrichment/.gitignore b/internal-enrichment/urlscan-enrichment/.gitignore new file mode 100644 index 0000000000..9317bd7436 --- /dev/null +++ b/internal-enrichment/urlscan-enrichment/.gitignore @@ -0,0 +1,4 @@ +config.yml +__pycache__ +logs +*.gql \ No newline at end of file diff --git a/internal-enrichment/urlscan-enrichment/Dockerfile b/internal-enrichment/urlscan-enrichment/Dockerfile index 7f116e5d92..4140f542bb 100644 --- a/internal-enrichment/urlscan-enrichment/Dockerfile +++ b/internal-enrichment/urlscan-enrichment/Dockerfile @@ -1,14 +1,15 @@ FROM python:3.11-alpine ENV CONNECTOR_TYPE=INTERNAL_ENRICHMENT -# Install Python modules -RUN apk --no-cache add git build-base libmagic libffi-dev libxml2-dev libxslt-dev -COPY requirements.txt /tmp/requirements.txt -RUN pip3 install --no-cache-dir -r /tmp/requirements.txt +# Copy the worker +COPY src /opt/opencti-connector-urlscan-enrichment -# Copy the connector -COPY src /opt/connector -WORKDIR /opt/connector +# Install Python modules +# hadolint ignore=DL3003 +RUN apk --no-cache add git build-base libmagic libffi-dev && \ + cd /opt/opencti-connector-urlscan-enrichment && \ + pip3 install --no-cache-dir -r requirements.txt && \ + apk del git build-base # Expose and entrypoint COPY entrypoint.sh / diff --git a/internal-enrichment/urlscan-enrichment/README.md b/internal-enrichment/urlscan-enrichment/README.md index 8b2c5f8dee..38bc0cecec 100644 --- a/internal-enrichment/urlscan-enrichment/README.md +++ b/internal-enrichment/urlscan-enrichment/README.md @@ -1 +1,103 @@ -# URLScan Enrichment connector \ No newline at end of file +# OpenCTI URLScan Enrichment Connector + +## Introduction + +URLScan (https://urlscan.io/) is an online service that allows you to scan URLs to analyze and detect potential security threats. It provides a platform where users can submit links to be scanned to obtain information about the page's content, loaded external resources, potential threats, and other relevant security details. + +## Requirements + +- pycti + +## Configuration variables + +There are a number of configuration options, which are set either in `docker-compose.yml` (for Docker) or in `config.yml` (for manual deployment). + +## OpenCTI environment variables + +Below are the parameters you'll need to set for OpenCTI: + +| Parameter | config.yml | Docker environment variable | Mandatory | Description | +|---------------|------------|-----------------------------|-----------|------------------------------------------------------| +| OpenCTI URL | url | `OPENCTI_URL` | Yes | The URL of the OpenCTI platform. | +| OpenCTI Token | token | `OPENCTI_TOKEN` | Yes | The default admin token set in the OpenCTI platform. | + +### Base connector environment variables + +Below are the parameters you'll need to set for running the connector properly: + +| Parameter | config.yml | Docker environment variable | Default | Mandatory | Description | +|-------------------|-------------------|---------------------------------|-----------|-----------|--------------------------------------------------------------------------------------------------| +| Connector ID | id | `CONNECTOR_ID` | / | Yes | A unique `UUIDv4` identifier for this connector instance. | +| Connector Name | name | `CONNECTOR_NAME` | `URLScan` | Yes | Name of the connector. | +| Connector Scope | scope | `CONNECTOR_SCOPE` | / | Yes | Scope of the connector. Availables: `url or hostname or domain-name`, `ipv4-addr`, `ipv6-addr` | +| Run and Terminate | run_and_terminate | `CONNECTOR_RUN_AND_TERMINATE` | `False` | No | Launch the connector once if set to True. Takes 2 available values: `True` or `False` | +| Log Level | log_level | `CONNECTOR_LOG_LEVEL` | / | Yes | Determines the verbosity of the logs. Options are `debug`, `info`, `warn`, or `error`. | + +### URLScan Enrichment connector environment variables + +Below are the parameters you'll need to set for URLScan Enrichment connector: + +| Parameter | config.yml | Docker environment variable | Default | Mandatory | Description | +|--------------------------------------|-------------------------|---------------------------------------------------|-----------|------------|-------------------------------------------------------------------------------------------------------------------------| +| URLScan Enr. Api Key | api_key | `URLSCAN_ENRICHMENT_API_KEY` | / | Yes | URLScan API Key | +| URLScan Enr. Api Base Url | api_base_url | `URLSCAN_ENRICHMENT_API_BASE_URL` | / | Yes | URLScan Base Url | +| URLScan Enr. Import Screenshot | import_screenshot | `URLSCAN_ENRICHMENT_IMPORT_SCREENSHOT` | `true` | Yes | Allows or not the import of the screenshot of the scan submitted in URLScan to OpenCTI. | +| URLScan Enr. Visibility | visibility | `URLSCAN_ENRICHMENT_VISIBILITY` | `public` | Yes | URLScan offers several levels of visibility for submitted scans: `public`, `unlisted`, `private` | +| URLScan Enr. Search filtered by date | search_filtered_by_date | `URLSCAN_ENRICHMENT_SEARCH_FILTERED_BY_DATE` | `>now-1y` | Yes | Allows you to filter by date available: `>now-1h`, `>now-1d`, `>now-1y`, `[2022 TO 2023]`, `[2022/01/01 TO 2023/12/01]` | +| URLScan Enr. Max TLP | max_tlp | `URLSCAN_ENRICHMENT_MAX_TLP` | / | Yes | Do not send any data to URLScan if the TLP of the observable is greater than MAX_TLP | + + +## Deployment + +### Docker Deployment + +Before building the Docker container, you need to set the version of pycti in `requirements.txt` equal to whatever version of OpenCTI you're running. Example, `pycti==6.1.3`. If you don't, it will take the latest version, but sometimes the OpenCTI SDK fails to initialize. + +Build a Docker Image using the provided `Dockerfile`. + +Example: + +```shell +# Replace the IMAGE NAME with the appropriate value +docker build . -t [IMAGE NAME]:latest +``` + +Make sure to replace the environment variables in `docker-compose.yml` with the appropriate configurations for your +environment. Then, start the docker container with the provided docker-compose.yml + +```shell +docker compose up -d +# -d for detached +``` + +### Manual Deployment + +Create a file `config.yml` based on the provided `config.yml.sample`. + +Replace the configuration variables (especially the "**ChangeMe**" variables) with the appropriate configurations for +you environment. + +Install the required python dependencies (preferably in a virtual environment): + +```shell +pip3 install -r requirements.txt +``` + +Then, start the connector from crowdstrike-endpoint-security/src: + +```shell +python3 main.py +``` + +## Usage + +After installation, the connector should require minimal interaction to use, and some configurations should be specified in your `docker-compose.yml` or `config.yml`. + +## Warnings + +- If you have the variable auto set to true, then it is important to choose the correct scope by selecting only one type of scope-submission (url or hostname or domain-name) to avoid looping ingestions. + - This is an example of looping ingestion: you have set a scope submission of URL and Domain name. When you will search for URL, it will retrieve lots of entities, including some domain names. These domain names will then be searched too. However, they can bring you some URLs too, creating this infinite loop. + +- If you enrich IPv4 and IPv6 observables, only a link to URLScan search in external reference (OpenCTI) will be generated, but you can play with the search period with the environment variable search_filtered_by_date to refine the search. + +- While the analysis is still in progress, the Result API endpoint will respond with an HTTP status code of 404. The connector's polling logic is to wait 10 seconds and retry 12 times, for a maximum wait time of 2 minutes, until the analysis is complete or the maximum wait time is reached. diff --git a/internal-enrichment/urlscan-enrichment/docker-compose.yml b/internal-enrichment/urlscan-enrichment/docker-compose.yml index 65afb53ed9..ce8af8107c 100644 --- a/internal-enrichment/urlscan-enrichment/docker-compose.yml +++ b/internal-enrichment/urlscan-enrichment/docker-compose.yml @@ -3,15 +3,20 @@ services: connector-urlscan-enrichment: image: opencti/connector-urlscan-enrichment:6.1.5 environment: - - CONNECTOR_NAME=connector-urlscanio - - CONNECTOR_SCOPE=Url,Domain-Name,Hostname - - OPENCTI_URL=http://opencti:8080 - - OPENCTI_TOKEN= ChangeMe - - CONNECTOR_ID= ChangeMe - - CONNECTOR_CONFIDENCE_LEVEL=100 # From 0 (Unknown) to 100 (Fully trusted). - - CONNECTOR_LOG_LEVEL=info - - CONNECTOR_AUTO=true - - URLSCAN_API_KEY=ChangeMe - - CONNECTOR_WANT_RESULTS=true - - CONNECTOR_DOMAIN_ENRICHMENT_COUNT=5 # Maximum Number of domain enrichment results added to notes. - restart: always \ No newline at end of file + # OpenCTI's generic execution parameters: + - OPENCTI_URL=http://localhost + - OPENCTI_TOKEN=ChangeMe + # Connector's generic execution parameters: + - CONNECTOR_ID=ChangeMe + - CONNECTOR_NAME=Urlscan + - CONNECTOR_SCOPE=url,ipv4-addr,ipv6-addr + - CONNECTOR_AUTO=false + - CONNECTOR_LOG_LEVEL=error + # Connector's custom execution parameters: + - URLSCAN_ENRICHMENT_API_KEY=ChangeMe + - URLSCAN_ENRICHMENT_API_BASE_URL=https://urlscan.io/api/v1/ + - URLSCAN_ENRICHMENT_IMPORT_SCREENSHOT=true + - URLSCAN_ENRICHMENT_VISIBILITY=public # Available values : public, unlisted, private + - URLSCAN_ENRICHMENT_SEARCH_FILTERED_BY_DATE=>now-1y # Available : ">now-1h", ">now-1d", ">now-1y", "[2022 TO 2023]", "[2022/01/01 TO 2023/12/01]" + - URLSCAN_ENRICHMENT_MAX_TLP=TLP:AMBER # Required, Available values: TLP:CLEAR, TLP:WHITE, TLP:GREEN, TLP:AMBER, TLP:AMBER+STRICT, TLP:RED + restart: always diff --git a/internal-enrichment/urlscan-enrichment/entrypoint.sh b/internal-enrichment/urlscan-enrichment/entrypoint.sh index 77f91c125a..9b5e4dbcfc 100644 --- a/internal-enrichment/urlscan-enrichment/entrypoint.sh +++ b/internal-enrichment/urlscan-enrichment/entrypoint.sh @@ -1,4 +1,7 @@ #!/bin/sh -# Start the connector (WORKDIR is /opt/connector as set in the Dockerfile) -python3 main.py +# Correct working directory +cd /opt/opencti-connector-urlscan-enrichment + +# Start the connector +python3 main.py \ No newline at end of file diff --git a/internal-enrichment/urlscan-enrichment/src/config.yml.sample b/internal-enrichment/urlscan-enrichment/src/config.yml.sample new file mode 100644 index 0000000000..187c1de315 --- /dev/null +++ b/internal-enrichment/urlscan-enrichment/src/config.yml.sample @@ -0,0 +1,19 @@ +opencti: + url: "http://localhost:8080" + token: "ChangeMe" + +connector: + id: "ChangeMe" + type: "INTERNAL_ENRICHMENT" + name: "UrlScan" + scope: "url,ipv4-addr,ipv6-addr" # Availables => scope-submission: url or hostname or domain-name / scope-search: ipv4-addr,ipv6-addr + auto: false # Enable/disable auto-enrichment of observables + log_level: "error" + +urlscan_enrichment: + api_key: "ChangeMe" + api_base_url: "https://urlscan.io/api/v1/" # Required + import_screenshot: false + visibility: "public" # Available values : public, unlisted, private + search_filtered_by_date: ">now-2d" # Available : ">now-1d", ">now-1y", "[2022 TO 2023]", "[2022/01/01 TO 2023/12/01" + max_tlp: "TLP:AMBER" # Required, Available values: TLP:CLEAR, TLP:WHITE, TLP:GREEN, TLP:AMBER, TLP:AMBER+STRICT, TLP:RED \ No newline at end of file diff --git a/internal-enrichment/urlscan-enrichment/src/lib/UrlScanEnrichment.py b/internal-enrichment/urlscan-enrichment/src/lib/UrlScanEnrichment.py deleted file mode 100644 index 94c371a9f1..0000000000 --- a/internal-enrichment/urlscan-enrichment/src/lib/UrlScanEnrichment.py +++ /dev/null @@ -1,208 +0,0 @@ -import json -import os -import time - -import requests -from pycti import OpenCTIConnectorHelper -from stix2 import URL, Note - - -class URLScanSubmissionsConnector: - """Specific external-import connector - - This class encapsulates the main actions, expected to be run by the - any external-import connector. Note that the attributes defined below - will be complemented per each connector type. - - Attributes: - helper (OpenCTIConnectorHelper): The helper to use. - update_existing_data (str): Whether to update existing data or not in OpenCTI. - """ - - def __init__(self): - self.helper = OpenCTIConnectorHelper({}) - self.api_key = os.environ.get("URLSCAN_API_KEY", None).lower() - self.want_results = os.environ.get("CONNECTOR_WANT_RESULTS", "false").lower() - self.domain_note_count = int( - os.environ.get("CONNECTOR_DOMAIN_ENRICHMENT_COUNT", 5) - ) - - update_existing_data = os.environ.get("CONNECTOR_UPDATE_EXISTING_DATA", "false") - if update_existing_data.lower() in ["true", "false"]: - self.update_existing_data = update_existing_data.lower() - else: - msg = f"Error when grabbing CONNECTOR_UPDATE_EXISTING_DATA environment variable: '{self.interval}'. It SHOULD be either `true` or `false`. `false` is assumed. " - self.helper.log_warning(msg) - self.update_existing_data = "false" - - def convert_dict_to_markdown_table(self, table: str, data: dict) -> str: - """Convert a dictionary to a markdown table""" - unsupported_values = ["", "None", None, [], {}] - - for key, value in data.items(): - if value not in unsupported_values: - table += f"\\\n | **{key}** | {value} |" - - return table - - def urlscan_fetch_results(self, uuid, counter): - """Fetch the results of the URLScan API call for the UUID""" - - self.helper.log_info("URLScan fetch result call") - headers = { - "Content-Type": "application/json", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/119.0", - } - response = requests.get( - "https://urlscan.io/api/v1/result/" + uuid + "/", headers=headers - ) - if response.status_code == 200: - response_data = response.json() - verdict = response_data["verdicts"] - page = response_data["page"] - self.helper.log_info("URLScan fetch result call successful") - return {"verdict": verdict, "page": page} - - elif response.status_code == 404 and counter < 5: - time.sleep(20) - return self.urlscan_fetch_results(uuid, counter + 1) - else: - self.helper.log_error(f"URLScan fetch result call failed{response}") - return None - - def urlscan_domain_enrichment(self, observable): - """ - Fetch the results of the URLScan API call for a domain - """ - headers = { - "Content-Type": "application/json", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/119.0", - } - response = requests.get( - "https://urlscan.io/api/v1/search/?q=domain:" + observable["value"], - headers=headers, - ) - - stix_objects = [] - content = "### URL SCAN RESULTS\n\n" - - if response.status_code == 200: - self.helper.log_info("URLScan API call successful") - url_scan_domain_data = response.json() - - if len(url_scan_domain_data["results"]) > 0: - if len(url_scan_domain_data["results"]) > self.domain_note_count: - index = self.domain_note_count + 1 - else: - index = len(url_scan_domain_data["results"]) - - for result in range(0, index): - table = "" - table += " \\\n| Field | Value |" - table += " \\\n| --- | ---|" - result_dict = url_scan_domain_data["results"][result] - result_page = result_dict.get("page") - result_page["result"] = result_dict.get("result") - table = self.convert_dict_to_markdown_table(table, result_page) - # result_url=result_dict.get("result") - content += table + "\\\n" - - stix_objects.append( - Note( - type="note", - abstract="URLScan Domain search results", - content=content, - authors=["sudesh"], - object_refs=[self.entity_id], - ) - ) - - # Create the Note object and send it to OpenCTI - bundle = self.helper.stix2_create_bundle(stix_objects) - bundles_sent = self.helper.send_stix2_bundle(bundle) - self.helper.log_info( - f"Sent {len(bundles_sent)} stix bundle(s) for worker import" - ) - - else: - self.helper.log_error(f"URLScan API call failed{response}") - return None - - def urlscan_submission(self, observable): - headers = {"API-Key": self.api_key, "Content-Type": "application/json"} - data = {"url": observable["value"], "visibility": "public"} - response = requests.post( - "https://urlscan.io/api/v1/scan/", headers=headers, data=json.dumps(data) - ) - - if response.status_code == 200: - self.helper.log_info("URLScan API call successful") - url_scan_data = response.json() - if url_scan_data["message"] == "Submission successful": - external_reference = self.helper.api.external_reference.create( - source_name="urlscan.io", - external_id=url_scan_data["uuid"], - url=url_scan_data["result"], - ) - - if self.want_results == "true": - results = self.urlscan_fetch_results(url_scan_data["uuid"], 0) - description = f"**ASN:** {results['page'].get('asn')} \\\n **Country:** {results['page'].get('country')} \\\n **Title:** {results['page'].get('title')} \\\n **Apex_domain:** {results['page'].get('apexDomain')} \\\n **Tls_ValidFrom:** {results['page'].get('tlsValidFrom')} \\\n **TlsIssuer:** {results['page'].get('tlsIssuer')} \\\n **Server:** {results['page'].get('server')} \\\n **Ip:** {results['page'].get('ip')} \\\n **Verdict:** {results['verdict'].get('overall')} \n" - - # adding score verdict and description to the URL object - score = results["verdict"].get("overall").get("score") - if score > 0: - Url_Object = URL( - value=observable["value"], - custom_properties={ - "x_opencti_description": description, - "x_opencti_score": score, - }, - allow_custom=True, - ) - else: - Url_Object = URL( - value=observable["value"], - custom_properties={"x_opencti_description": description}, - allow_custom=True, - ) - - bundle = self.helper.stix2_create_bundle([Url_Object]) - bundles_sent = self.helper.send_stix2_bundle(bundle) - self.helper.log_info( - f"Sent {len(bundles_sent)} stix bundle(s) for worker import" - ) - - # adding labels to the URL object - for brand in results["verdict"].get("overall").get("brands"): - self.helper.api.stix_cyber_observable.add_label( - id=observable["id"], label_name=brand - ) - - self.helper.api.stix_cyber_observable.add_external_reference( - id=observable["id"], external_reference_id=external_reference["id"] - ) - - else: - self.helper.log_error(f"URLScan API call failed{response}") - - return None - - def process_message(self, data): - self.helper.log_info("process data: " + str(data)) - self.entity_id = data["entity_id"] - observable = self.helper.api.stix_cyber_observable.read(id=self.entity_id) - - # URL enrichment - if observable["entity_type"] == "Url": - return self.urlscan_submission(observable) - # Domain enrichment - elif observable["entity_type"] == "Domain-Name": - return self.urlscan_domain_enrichment(observable) - # Hostname enrichment - elif observable["entity_type"] == "Hostname": - return self.urlscan_domain_enrichment(observable) - - # Start the main loop - def start(self): - self.helper.listen(self.process_message) diff --git a/internal-enrichment/urlscan-enrichment/src/main.py b/internal-enrichment/urlscan-enrichment/src/main.py index f0d933d1cc..759532786d 100644 --- a/internal-enrichment/urlscan-enrichment/src/main.py +++ b/internal-enrichment/urlscan-enrichment/src/main.py @@ -1,56 +1,367 @@ -from lib.UrlScanEnrichment import URLScanSubmissionsConnector +from typing import Dict +from pycti import OpenCTIConnectorHelper +from urlscan_enrichment_services.client import UrlscanClient +from urlscan_enrichment_services.config_variables import UrlscanConfig +from urlscan_enrichment_services.constants import UrlscanConstants +from urlscan_enrichment_services.converter_to_stix2 import UrlscanConverter +from urlscan_enrichment_services.utils import UrlscanUtils + + +class UrlscanConnector: + """ + Urlscan connector class + """ -class CustomConnector(URLScanSubmissionsConnector): def __init__(self): - """Initialization of the connector - - Note that additional attributes for the connector can be set after the super() call. - - Standarised way to grab attributes from environment variables is as follows: - - >>> ... - >>> super().__init__() - >>> self.my_attribute = os.environ.get("MY_ATTRIBUTE", "INFO") - - This will make use of the `os.environ.get` method to grab the environment variable and set a default value (in the example "INFO") if it is not set. - Additional tunning can be made to the connector by adding additional environment variables. - - Raising ValueErrors or similar might be useful for tracking down issues with the connector initialization. - """ - super().__init__() - - def _process_message(self, data): - """Processing the enrichment request - - API enrichment can be performed using the `self.helper.api` helper. Examples below: - >>> self.helper.api.stix_cyber_observable.update_field( - ... id=id, - ... input={ - ... "key": "x_opencti_score", - ... "value": 100, - ... }, - ... ) - >>> external_reference = self.helper.api.external_reference.create( - ... source_name="Example source", - ... url=f"https://www.example.com/1.1.1.1", - ... description="This IP address is from within our whitelist.", - ... ) - >>> self.helper.api.stix_cyber_observable.add_external_reference( - ... id=id, external_reference_id=external_reference["id"] - ... ) - >>> self.helper.api.stix_cyber_observable.add_label( - ... id=id, label_name="dns" - ... ) - - Args: - data (dict): The data to process. The `entity_id` attribute contains the objeccct to enrich. The data passed in the data parameter is a dictionary with the following structure as shown in https://docs.opencti.io/latest/development/connectors/#additional-implementations: + self.config = UrlscanConfig() + self.helper = OpenCTIConnectorHelper(self.config.load, True) + self.client = UrlscanClient(self.helper) + self.converter = UrlscanConverter(self.helper) + self.constants = UrlscanConstants() + self.utils = UrlscanUtils + + # Define variables + self.identity = None + self.tlp = None + + def extract_and_check_markings(self, opencti_entity: dict) -> bool: + """ + Extract TLP, and we check if the variable "max_tlp" is less than + or equal to the markings access of the entity from OpenCTI. + If this is true, we can send the data to connector for enrichment. + + :param opencti_entity: Parameter that contains all information about the entity, + including "objectMarking", the marking that the observable uses. + :return: A boolean + """ + + for marking_definition in opencti_entity["objectMarking"]: + if marking_definition["definition_type"] == "TLP": + self.tlp = marking_definition["definition"] + + is_valid_max_tlp = OpenCTIConnectorHelper.check_max_tlp( + self.tlp, self.config.max_tlp + ) + + return is_valid_max_tlp + + def _generate_labels(self, data: dict) -> list: + """ + This method allows you to generate specific labels as well as their associated colors + + :param data: + :return: List + """ + self.all_labels = [] + + if "servers" in data["lists"]: + # Green flag + for server in data["lists"]["servers"]: + self._create_custom_label(f"urlscan: {server}", "#61ff7c") + + if "verdicts" in data: + if "overall" in data["verdicts"]: + overall = data["verdicts"]["overall"] + # Red flag + if overall["malicious"] is True: + self._create_custom_label("urlscan: malicious", "#ff1f53") + + # Orange flag + if overall["categories"]: + for categorie in overall["categories"]: + self._create_custom_label(f"urlscan: {categorie}", "#ff801f") + + # Blue flag + if overall["brands"]: + for brand in overall["brands"]: + self._create_custom_label(f"urlscan: {brand}", "#5596e3") + # White flag + if "verticals" in overall["brands"]: + for vertical in overall["brands"]["verticals"]: + self._create_custom_label(f"urlscan: {vertical}", "#ffffff") + + return self.all_labels + + def _create_custom_label(self, name_label: str, color_label: str): + """ + This method allows you to create a custom label, using the OpenCTI API. + + :param name_label: A parameter giving the name of the label. + :param color_label: A parameter giving the color of the label. + """ + + new_custom_label = self.helper.api.label.read_or_create_unchecked( + value=name_label, color=color_label + ) + if new_custom_label is None: + self.helper.connector_logger.error( + "[ERROR] The label could not be created. " + "If your connector does not have the permission to create labels, " + "please create it manually before launching", + {"name_label": name_label}, + ) + else: + self.helper.connector_logger.info( + "[INFO] The label has been created.", + {"name_label": name_label}, + ) + self.all_labels.append(new_custom_label["value"]) + + def _generate_stix_bundle( + self, data: dict, stix_entity: dict, is_submission: bool + ) -> str: + """ + This method create a bundle in Stix2 format. + + :param data: A parameter that contains all the data about the IPv4 that was searched for in URLScan. + :param stix_entity: A parameter that contains all the entity information in OpenCTI. + :param is_submission: This parameter allows us to identify whether we have a URL or other scopes. + :return: str bundle + """ + self.identity = self.converter.generate_urlscan_stix_identity() + self.stix_objects.append(self.identity) + + self.helper.connector_logger.info( + "[CONNECTOR] The entity has been identified by URLScan " + "and generation of the Stix bundle is in progress.", + {"entity_type": stix_entity["type"], "entity_value": stix_entity["value"]}, + ) + + if is_submission is True: + prepared_file_png = ( + self.utils.prepare_file_png(data) + if self.config.import_screenshot + else None + ) + labels = self._generate_labels(data) + else: + prepared_file_png = None + labels = None + + external_reference = self.converter.generate_stix_external_reference( + data, stix_entity, is_submission + ) + stix_observable = self.converter.upsert_stix_observable( + stix_entity, external_reference, labels, prepared_file_png + ) + self.stix_objects.append(stix_observable) + + if is_submission is True: + data_ip_stats = data["stats"]["ipStats"] + extracted_info_ip = [ { - "entity_id": "" // StixID of the object wanting to be enriched - }""" - raise NotImplementedError("This method has not been implemented yet.") + "domains": item["domains"], + "ip": item.get("ip", None) or None, + "asn": item["asn"].get("asn", None) or None, + } + for item in data_ip_stats + ] + + merged_data = {} + for index, item in enumerate(extracted_info_ip): + domain = tuple(item["domains"]) + if domain in merged_data: + merged_entry = merged_data[domain] + merged_entry["asn"].append(item["asn"]) + merged_entry["ip"].append(item["ip"]) + else: + merged_data[domain] = { + "domains": item["domains"], + "asn": [item["asn"]], + "ip": [item["ip"]], + } + + reorganized_data = [ + { + "domains": merged_data[domain]["domains"], + "asns": merged_data[domain]["asn"], + "ips": merged_data[domain]["ip"], + } + for domain in merged_data + ] + + if len(reorganized_data) > 0: + for data_stat in reorganized_data: + + # Generate obs_ipv4 or obs_ipv6 + stix_obs_ip = self.converter.generate_stix_ip(data_stat) + for obs_ip in stix_obs_ip: + self.stix_objects.append(obs_ip) + + # Generate obs_asn + stix_obs_asn = self.converter.generate_stix_asn_with_relationship( + data_stat, stix_obs_ip + ) + self.stix_objects.extend(stix_obs_asn) + + if data_stat["domains"][0] in stix_entity["value"]: + + stix_indicator = ( + self.converter.upsert_stix_indicator_with_relationship( + data, + stix_entity, + external_reference, + labels, + prepared_file_png, + ) + ) + self.stix_objects.extend(stix_indicator) + + for index, ip in enumerate(data_stat["ips"]): + if ip is None: + continue + + # Generate Relationship : Indicator -> "based-on" -> obs_ip + indicator_to_ip = self.converter.generate_stix_relationship( + stix_indicator[0].id, "based-on", stix_obs_ip[index].id + ) + self.stix_objects.append(indicator_to_ip) + + # Generate Relationship : Observable -> "related-to" -> obs_ip + observable_to_ip = ( + self.converter.generate_stix_relationship( + stix_entity["id"], + "related-to", + stix_obs_ip[index].id, + ) + ) + self.stix_objects.append(observable_to_ip) + + else: + + # Generate obs_hostname + stix_obs_hostname = ( + self.converter.generate_stix_hostname_with_relationship( + data_stat, + stix_entity, + stix_obs_ip, + external_reference, + labels, + prepared_file_png, + ) + ) + self.stix_objects.extend(stix_obs_hostname) + + filtered_list = [x for x in self.stix_objects if x is not None] + stix_no_relationship = [x for x in filtered_list if x["type"] != "relationship"] + stix_relationship = [x for x in filtered_list if x["type"] == "relationship"] + reordered_data = stix_no_relationship + stix_relationship + + self.helper.connector_logger.info( + "[CONNECTOR] For this entity, the number of Stix bundle(s) that will be enriched.", + { + "Entity": stix_entity["value"], + "Stix_bundle_length": len(reordered_data), + }, + ) + + stix2_bundle = self.helper.stix2_create_bundle(reordered_data) + return stix2_bundle + + def _process_message(self, data: Dict) -> str: + + # OpenCTI entity information retrieval + stix_entity = data["stix_entity"] + opencti_entity = data["enrichment_entity"] + self.stix_objects = data["stix_objects"] + + # Security to limit playbook triggers to something other than the scope initial + scopes = self.helper.connect_scope.lower().replace(" ", "").split(",") + entity_type = stix_entity["type"].lower() + + if entity_type in scopes: + + is_valid_max_tlp = self.extract_and_check_markings(opencti_entity) + if not is_valid_max_tlp: + raise ValueError( + "[ERROR] Do not send any data, TLP of the observable is greater than MAX TLP, " + "the connector does not has access to this observable, please check the group of the connector user" + ) + + if opencti_entity["entity_type"] == "StixFile": + if "SHA-256" in stix_entity["hashes"]: + opencti_entity_value = stix_entity["hashes"]["SHA-256"] + else: + return "[CONNECTOR] Only the SHA-256 hash is correctly interpreted by URLScan" + + else: + # Extract Value from opencti entity data for (Url, IPv4-Addr, IPv6-Addr, Domain-Name, Hostname) + opencti_entity_value = stix_entity["value"] + + try: + stix_entity_type = stix_entity["type"] + if ( + stix_entity_type == "url" + or stix_entity_type == "domain-name" + or stix_entity_type == "hostname" + ): + + # Check Urlscan User Quota API Response + self.client.check_urlscan_user_quota(self.config.visibility) + + # Post Urlscan Submission API Response + # https://urlscan.io/docs/ + json_submission = self.client.urlscan_submission( + opencti_entity_value + ) + + uuid = json_submission["uuid"] + self.helper.connector_logger.info( + "[INFO-SUBMISSION] The urlscan submission request generated the uuid:", + {"uuid": uuid, "entity_value": str(opencti_entity_value)}, + ) + + json_result = self.client.urlscan_result(uuid) + + # Generate a stix bundle + stix_bundle = self._generate_stix_bundle( + json_result, stix_entity, True + ) + + # Send stix2 bundle + bundles_sent = self.helper.send_stix2_bundle(stix_bundle) + return ( + "[CONNECTOR] Sent " + + f"{len(bundles_sent)}" + + " stix bundle(s) for worker import" + ) + + elif entity_type in self.constants.ENTITY_TYPE_MAP_SEARCH_API: + + json_search = {} + # Generate a stix bundle + stix_bundle = self._generate_stix_bundle( + json_search, stix_entity, False + ) + # Send stix2 bundle + bundles_sent = self.helper.send_stix2_bundle(stix_bundle) + return ( + "[CONNECTOR] Sent " + + f"{len(bundles_sent)}" + + " stix bundle(s) for worker import" + ) + else: + raise ValueError( + "[ERROR] This entity type is currently not managed, " + "available_type: Url, Hostname, Domain-Name, IPv4-Addr, IPv6-Addr" + ) + + except Exception as e: + raise ValueError(str(e)) + else: + + return self.helper.connector_logger.info( + "[INFO] The trigger does not concern the initial scope found in the config connector, " + "maybe choose a more specific filter in the playbook", + {"entity_id": data["entity_id"]}, + ) + + # Start the main loop + def start(self): + self.helper.listen(message_callback=self._process_message) if __name__ == "__main__": - connector = CustomConnector() - connector.start() + urlscanInstance = UrlscanConnector() + urlscanInstance.start() diff --git a/internal-enrichment/urlscan-enrichment/requirements.txt b/internal-enrichment/urlscan-enrichment/src/requirements.txt similarity index 100% rename from internal-enrichment/urlscan-enrichment/requirements.txt rename to internal-enrichment/urlscan-enrichment/src/requirements.txt diff --git a/internal-enrichment/urlscan-enrichment/src/test-requirements.txt b/internal-enrichment/urlscan-enrichment/src/test-requirements.txt new file mode 100644 index 0000000000..392b0c9984 --- /dev/null +++ b/internal-enrichment/urlscan-enrichment/src/test-requirements.txt @@ -0,0 +1,3 @@ +-r ./requirements.txt +pytest +pytest-mock \ No newline at end of file diff --git a/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/__init__.py b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/client.py b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/client.py new file mode 100644 index 0000000000..bd806eeb09 --- /dev/null +++ b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/client.py @@ -0,0 +1,163 @@ +import json +import time + +import requests + +from .config_variables import UrlscanConfig +from .constants import UrlscanConstants + + +class UrlscanClient: + """ + Working with URLScan API + """ + + def __init__(self, helper): + self.config = UrlscanConfig() + self.constants = UrlscanConstants + self.helper = helper + # Define headers in session and update when needed + headers = {"API-Key": self.config.api_key, "Content-Type": "application/json"} + self.session = requests.Session() + self.session.headers.update(headers) + + def urlscan_submission(self, entity_value: str) -> list | None: + """ + This method allows you to submit a scan from OpenCTI to URLScan. + Valid scope: url, hostname, domain-name + + :param entity_value: This parameter contains the value of the entity submitted. + :return: List | None + """ + try: + data = {"url": entity_value, "visibility": self.config.visibility} + response = self.session.post( + (self.config.api_base_url + self.constants.SCAN), data=json.dumps(data) + ) + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as err: + error_text = json.loads(err.response.text) + error_msg = "[API-SUBMISSION] Error while fetching data: " + raise ValueError( + error_msg, + { + "reason": str(err.response.reason), + "status_code": int(err.response.status_code), + "message": error_text["message"], + "description": ( + error_text["description"] + if "description" in error_text + else None + ), + "entity_value": entity_value, + }, + ) + + def check_urlscan_user_quota(self, visibility: str) -> None: + """ + This method allows you to check the user quota available for URLScan, + depending on the visibility in the configuration. + + :param visibility: This parameter contains the visibility in the user configuration (environment variable). + :return: None + """ + try: + + response = self.session.get(self.constants.USER_QUOTA) + json_response = response.json() + rate_limits = json_response["limits"][visibility] + self.helper.connector_logger.info( + "[API-RATE-LIMITS] The URLScan rate limits :", + {"visibility": visibility, "rate_limits": rate_limits}, + ) + + if rate_limits["day"]["limit"] == rate_limits["day"]["used"]: + raise ValueError( + "You have reached your limit of API calls per day (URLScan)," + " please try again in a day." + ) + elif rate_limits["hour"]["limit"] == rate_limits["hour"]["used"]: + raise ValueError( + "You have reached your limit of API calls per hour (URLScan)," + " please try again in a hour." + ) + elif rate_limits["minute"]["limit"] == rate_limits["minute"]["used"]: + raise ValueError( + "You have reached your limit of API calls per minute (URLScan), " + "please try again in a minute." + ) + else: + pass + + except requests.exceptions.RequestException as err: + error_msg = "[API-RATE-LIMITS] Error while fetching user quota: " + return self.helper.connector_logger.error(error_msg, {"error": {str(err)}}) + + def urlscan_result(self, uuid: str) -> dict: + """ + This method recovers all the data of the entity scanned by URLScan, its data may take a moment to be processed + by URLScan, if there is a 404 return and a message "Scan is not finished yet" then we make several attempts. + + :param uuid: This parameter contains the uuid of the submitted request. + :return: dict + """ + try: + max_retries = 12 + retry_delay = 10 # in second + + response = self.session.get( + self.config.api_base_url + self.constants.RESULT + uuid + ) + + if response.status_code != 200: + json_response = response.json() + + if ( + response.status_code == 404 + and json_response["message"] == "Scan is not finished yet" + ): + + for i in range(max_retries): + # error 404 -> https://urlscan.io/docs/api/ between 10s - 30s + time.sleep(retry_delay) + new_attempt = self.session.get( + self.config.api_base_url + self.constants.RESULT + uuid + ) + if new_attempt.status_code == 200: + json_new_attempt = new_attempt.json() + if ( + json_new_attempt["data"]["requests"][0]["response"][ + "dataLength" + ] + == 0 + ): + raise ValueError( + "[API-RESULT] The request has been submitted to URLScan, " + "but the URL does not return any data." + ) + return json_new_attempt + + error_msg = "[API-RESULT] Error while fetching result: " + raise ValueError( + error_msg, + { + "status_code": json_response["status"], + "error": json_response["message"], + "uuid": {uuid}, + }, + ) + else: + result = response.json() + if result["data"]["requests"][0]["response"]["dataLength"] == 0: + raise ValueError( + "[API-RESULT] The request has been submitted to URLScan, but the URL does not return any data." + ) + return result + + except requests.exceptions.RequestException as err: + error_msg = "[API-RESULT] Error while fetching result: " + return self.helper.connector_logger.error( + error_msg, {"uuid": uuid, "error": {str(err)}} + ) diff --git a/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/config_variables.py b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/config_variables.py new file mode 100644 index 0000000000..4681c0a03a --- /dev/null +++ b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/config_variables.py @@ -0,0 +1,81 @@ +import os + +import yaml +from pycti import get_config_variable + + +class UrlscanConfig: + def __init__(self): + """ + Initialize the Urlscan connector with necessary configurations + """ + + # Load configuration file and connection helper + self.load = self._load_config() + self._initialize_configurations() + + @staticmethod + def _load_config() -> dict: + """ + Load the configuration from the YAML file + :return: Configuration dictionary + """ + current_dir = os.path.dirname(os.path.abspath(__file__)) + parent_dir = os.path.dirname(current_dir) + config_file_path = os.path.join(parent_dir, "config.yml") + config = ( + yaml.load(open(config_file_path), Loader=yaml.FullLoader) + if os.path.isfile(config_file_path) + else {} + ) + + return config + + def _initialize_configurations(self) -> None: + """ + Connector configuration variables + :return: None + """ + + self.connector_scope = get_config_variable( + "CONNECTOR_SCOPE", ["connector", "scope"], self.load + ) + + self.connector_name = get_config_variable( + "CONNECTOR_NAME", ["connector", "name"], self.load + ) + + self.api_key = get_config_variable( + "URLSCAN_ENRICHMENT_API_KEY", ["urlscan_enrichment", "api_key"], self.load + ) + + self.api_base_url = get_config_variable( + "URLSCAN_ENRICHMENT_API_BASE_URL", + ["urlscan_enrichment", "api_base_url"], + self.load, + ) + + self.import_screenshot = get_config_variable( + "URLSCAN_ENRICHMENT_IMPORT_SCREENSHOT", + ["urlscan_enrichment", "import_screenshot"], + self.load, + default="true", + ) + + self.visibility = get_config_variable( + "URLSCAN_ENRICHMENT_VISIBILITY", + ["urlscan_enrichment", "visibility"], + self.load, + default="public", + ) + + self.search_filtered_by_date = get_config_variable( + "URLSCAN_ENRICHMENT_SEARCH_FILTERED_BY_DATE", + ["urlscan_enrichment", "search_filtered_by_date"], + self.load, + default=">now-1y", + ) + + self.max_tlp = get_config_variable( + "URLSCAN_ENRICHMENT_MAX_TLP", ["urlscan_enrichment", "max_tlp"], self.load + ) diff --git a/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/constants.py b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/constants.py new file mode 100644 index 0000000000..736a9b64cd --- /dev/null +++ b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/constants.py @@ -0,0 +1,10 @@ +class UrlscanConstants: + USER_QUOTA = "https://urlscan.io/user/quotas/" + SEARCH = "search/?q=" + SCAN = "scan/" + RESULT = "result/" + + ENTITY_TYPE_MAP_SEARCH_API = { + "ipv4-addr": "ip:", + "ipv6-addr": "ip:", + } diff --git a/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/converter_to_stix2.py b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/converter_to_stix2.py new file mode 100644 index 0000000000..fcb71c3e30 --- /dev/null +++ b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/converter_to_stix2.py @@ -0,0 +1,399 @@ +from datetime import datetime + +import stix2 +from pycti import CustomObservableHostname, Identity, Indicator, StixCoreRelationship + +from .constants import UrlscanConstants +from .utils import UrlscanUtils + + +class UrlscanConverter: + """ + Convert data from Urlscan to STIX 2 object + """ + + def __init__(self, helper): + self.helper = helper + self.config = self.helper.config["urlscan_enrichment"] + self.identity = self.generate_urlscan_stix_identity() + self.constants = UrlscanConstants + self.utils = UrlscanUtils + + def generate_urlscan_stix_identity(self) -> dict: + """ + This method create the "Identity (organization)" of UrlScan in Stix2 format. + + :return: dict + """ + + # Generate "URLScan" Identity + return stix2.Identity( + id=Identity.generate_id(self.helper.connect_name, "organization"), + name=self.helper.connect_name, + description=f"Connector Enrichment {self.helper.connect_name}", + identity_class="organization", + ) + + def generate_stix_relationship( + self, + source_ref: str, + stix_core_relationship_type: str, + target_ref: str, + start_time: str | None = None, + stop_time: str | None = None, + ) -> dict: + """ + This method allows you to create a relationship in Stix2 format. + + :param source_ref: This parameter is the "from" of the relationship. + :param stix_core_relationship_type: This parameter defines the type of relationship between the two entities. + :param target_ref: This parameter is the "to" of the relationship. + :param start_time: This parameter is the start of the relationship. Value not required, None by default. + :param stop_time: This parameter is the stop of the relationship. Value not required, None by default. + :return: dict + """ + + return stix2.Relationship( + id=StixCoreRelationship.generate_id( + stix_core_relationship_type, source_ref, target_ref + ), + relationship_type=stix_core_relationship_type, + source_ref=source_ref, + start_time=start_time, + stop_time=stop_time, + target_ref=target_ref, + created_by_ref=self.identity["id"], + ) + + def generate_stix_external_reference( + self, data: dict, stix_entity: dict, is_entity_submission: bool + ) -> list: + """ + This method allows you to create an external reference in Stix2 format. + The is_entity_submission parameter is false, so we create an external reference + that contains the URLScan link of the search. + + :param data: This parameter contains all the information about the observable enriched by URLScan. + :param stix_entity: This parameter contains all the information about the observable enriched by OpenCTI. + :param is_entity_submission: This parameter allows us to know if we are in submissive or search mode. (bool) + :return: list -> ExternalReference (Stix2 format) + """ + + if is_entity_submission is True: + description = ( + f"This {stix_entity['type']} has been identified by URLScan, " + "this link allows you to see the result of this analysis." + ) + urlscan_uuid = data["task"]["uuid"] + urlscan_url = data["task"]["reportURL"] + else: + entity_type = stix_entity["type"] + entity_value = stix_entity["value"] + description = ( + f"This {entity_type} has been identified by URLScan, " + f"this link allows you to see all results related to it." + ) + urlscan_uuid = None + if entity_type in self.constants.ENTITY_TYPE_MAP_SEARCH_API: + search_entity_type = self.constants.ENTITY_TYPE_MAP_SEARCH_API.get( + entity_type + ) + + urlscan_url = ( + "https://urlscan.io/search#" + + search_entity_type + + entity_value + + " AND date:" + + self.config["search_filtered_by_date"] + ) + else: + return [] + + # Generate ExternalReference + external_reference = stix2.ExternalReference( + source_name=self.helper.connect_name, + url=urlscan_url, + external_id=urlscan_uuid, + description=description, + ) + return [external_reference] + + def upsert_stix_observable( + self, + stix_entity: dict, + external_reference: list, + labels: list = None, + prepared_file_png: dict = None, + ): + """ + This method allows you to upsert the information collected by URLScan (Submission / Search) + to the enriched observable. + + :param stix_entity: This parameter contains all the information about the observable enriched by OpenCTI. + :param external_reference: This parameter contains the list of all external references. + :param labels: This parameter contains the list of all labels. + :param prepared_file_png: This parameter contains the screen prepare file. + :return: dict + """ + + data_submission = { + "type": stix_entity["type"], + "id": stix_entity["id"], + "value": stix_entity["value"], + "custom_properties": { + "x_opencti_external_references": external_reference, + "x_opencti_labels": labels, + "x_opencti_files": ( + [prepared_file_png] if prepared_file_png is not None else [] + ), + }, + } + data_search = { + "type": stix_entity["type"], + "id": stix_entity["id"], + "value": stix_entity["value"], + "custom_properties": { + "x_opencti_external_references": external_reference, + }, + } + + if stix_entity["type"] == "url": + self.helper.connector_logger.info( + "[CONNECTOR] Entity, has been identified by URLScan and generation of the Stix bundle is in progress.", + {"Entity": stix_entity["value"]}, + ) + stix_observable = stix2.URL(**data_submission) + elif stix_entity["type"] == "domain-name": + stix_observable = stix2.DomainName(**data_submission) + elif stix_entity["type"] == "hostname": + stix_observable = CustomObservableHostname(**data_submission) + elif stix_entity["type"] == "ipv4-addr": + stix_observable = stix2.IPv4Address(**data_search) + elif stix_entity["type"] == "ipv6-addr": + stix_observable = stix2.IPv6Address(**data_search) + else: + return None + + return stix_observable + + def upsert_stix_indicator_with_relationship( + self, + data: dict, + stix_entity: dict, + external_reference: list, + labels: list = None, + prepared_file_png: dict = None, + ) -> list: + """ + This method creates and adds a bundle to "self.stix_objects" the IPv4 associated "Indicator" + in Stix2 format. + + - Relationship : Indicator -> "based-on" -> Observable + :param data: This parameter contains all the information about the observable enriched by URLScan. + :param stix_entity: This parameter contains all the information about the observable enriched by OpenCTI. + :param external_reference: This parameter contains the list of all external references. + :param labels: This parameter contains the list of all labels. + :param prepared_file_png: This parameter contains the screen prepare file. + :return: list + """ + + stix_indicator_with_relationship = [] + now = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") + x_opencti_type = stix_entity.get("x_opencti_type", None) + + common_data = { + "labels": labels, + "created_by_ref": self.identity["id"], + "external_references": external_reference, + "valid_from": now, + "pattern_type": "stix", + "custom_properties": { + "x_opencti_main_observable_type": x_opencti_type, + "x_opencti_files": ( + [prepared_file_png] if prepared_file_png is not None else [] + ), + }, + } + + if stix_entity["type"] == "url": + data_name_url = data["task"]["url"] + specific_data = { + "id": Indicator.generate_id(data_name_url), + "name": data_name_url, + "pattern": f"[url:value = '{data_name_url}']", + } + + elif stix_entity["type"] == "domain-name": + data_name_domain = data["task"]["apexDomain"] + specific_data = { + "id": Indicator.generate_id(data_name_domain), + "name": data_name_domain, + "pattern": f"[domain-name:value = '{data_name_domain}']", + } + + elif stix_entity["type"] == "hostname": + data_name_hostname = data["task"]["domain"] + specific_data = { + "id": Indicator.generate_id(data_name_hostname), + "name": data_name_hostname, + "pattern": f"[hostname:value = '{data_name_hostname}']", + } + else: + return [] + + merged_data = { + **common_data, + **specific_data, + } + stix_indicator = stix2.Indicator(**merged_data) + + stix_indicator_with_relationship.append(stix_indicator) + + # Generate Relationship : Indicator -> "based-on" -> Observable + indicator_to_observable = self.generate_stix_relationship( + stix_indicator.id, "based-on", stix_entity["id"] + ) + stix_indicator_with_relationship.append(indicator_to_observable) + + return stix_indicator_with_relationship + + def generate_stix_ip(self, data_stat: dict) -> list: + """ + This method allows you to check and generate an IPV4 or IPV6 type observable. + + :param data_stat: This parameter contains the organized data associated with the enriched observable. + :return: list + """ + all_ips = [] + for ip in data_stat["ips"]: + + if ip is None: + continue + + is_ipv6 = self.utils.is_ipv6(ip) + is_ipv4 = self.utils.is_ipv4(ip) + + if is_ipv6 is True: + stix_ip_addr = stix2.IPv6Address( + type="ipv6-addr", + value=ip, + custom_properties={ + "x_opencti_created_by_ref": self.identity["id"], + }, + ) + all_ips.append(stix_ip_addr) + + if is_ipv4 is True: + stix_ip_addr = stix2.IPv4Address( + type="ipv4-addr", + value=ip, + custom_properties={ + "x_opencti_created_by_ref": self.identity["id"], + }, + ) + all_ips.append(stix_ip_addr) + + return all_ips + + def generate_stix_asn_with_relationship(self, data_stat, stix_obs_ip): + """ + This method allows you to generate a asn type observable with these relationships. + + - Relationship : Ip_addr -> "belongs-to" -> ASN + :param data_stat: This parameter contains the organized data associated with the enriched observable. + :param stix_obs_ip: This parameter contains the list of IPs in Stix format. + :return: list + """ + stix_asn_with_relationship = [] + + for index, asn in enumerate(data_stat["asns"]): + if asn is None: + continue + + # Generate Asn + entity_asn = "AS" + str(asn) + asn_number = int(asn) + stix_asn = stix2.AutonomousSystem( + type="autonomous-system", + number=asn_number, + name=entity_asn, + custom_properties={ + "created_by_ref": self.identity["id"], + }, + ) + stix_asn_with_relationship.append(stix_asn) + + # Generate Relationship : Ip_addr -> "belongs-to" -> ASN + ip_to_asn = self.generate_stix_relationship( + stix_obs_ip[index].id, "belongs-to", stix_asn.id + ) + stix_asn_with_relationship.append(ip_to_asn) + + return stix_asn_with_relationship + + def generate_stix_hostname_with_relationship( + self, + data_stat, + stix_entity, + stix_obs_ip, + external_reference, + labels, + prepared_file_png, + ): + """ + This method allows you to generate a hostname type observable with these relationships. + + - Relationship : entity -> "related-to" -> hostname + - Relationship : hostname -> "belongs-to" -> Ip_addr + :param data_stat: This parameter contains the organized data associated with the enriched observable. + :param stix_entity: This parameter contains all the information about the observable enriched by OpenCTI. + :param stix_obs_ip: This parameter contains the list of IPs in Stix format. + :param external_reference: This parameter contains the list of all external references. + :param labels: This parameter contains the list of all labels. + :param prepared_file_png: This parameter contains the screen prepare file. + :return: list + """ + stix_hostnames_with_relationship = [] + + for index, domain in enumerate(data_stat["domains"]): + if domain in stix_entity["value"]: + # Generate Hostname + stix_hostname = CustomObservableHostname( + type="hostname", + value=domain, + custom_properties={ + "x_opencti_external_references": external_reference, + "x_opencti_labels": labels, + "x_opencti_files": ( + [prepared_file_png] if prepared_file_png is not None else [] + ), + "created_by_ref": self.identity["id"], + }, + ) + stix_hostnames_with_relationship.append(stix_hostname) + else: + # Generate Hostname + stix_hostname = CustomObservableHostname( + type="hostname", + value=domain, + custom_properties={ + "created_by_ref": self.identity["id"], + }, + ) + stix_hostnames_with_relationship.append(stix_hostname) + + if stix_entity["id"] != stix_hostname.id: + # Generate Relationship : entity -> "related-to" -> hostname + entity_to_hostname = self.generate_stix_relationship( + stix_entity["id"], "related-to", stix_hostname.id + ) + stix_hostnames_with_relationship.append(entity_to_hostname) + + for obs_ip in stix_obs_ip: + # Generate Relationship : hostname -> "belongs-to" -> Ip_addr + hostname_to_ip = self.generate_stix_relationship( + stix_hostname.id, "resolves-to", obs_ip["id"] + ) + stix_hostnames_with_relationship.append(hostname_to_ip) + + return stix_hostnames_with_relationship diff --git a/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/utils.py b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/utils.py new file mode 100644 index 0000000000..f23904f094 --- /dev/null +++ b/internal-enrichment/urlscan-enrichment/src/urlscan_enrichment_services/utils.py @@ -0,0 +1,74 @@ +import base64 +import ipaddress +import os +import tempfile + +import requests + + +class UrlscanUtils: + + @staticmethod + def prepare_file_png(data: dict) -> dict | None: + """ + This method allows you to import the "screenshot" file from URLScan + to prepare it in the correct format for ingestion in OpenCTI. + + :param data: This parameter contains all the information about the observable enriched by URLScan. + :return: dict | None + """ + + data_screenshot = data["task"]["screenshotURL"] + data_uuid = data["task"]["uuid"] + data_title = data["page"]["title"].replace(" ", "-") + + response = requests.get(data_screenshot) + if response.status_code == 200: + + with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_file: + temp_file.write(response.content) + temp_file_path = temp_file.name + + with open(temp_file_path, "rb") as temp_file: + data_temp = temp_file.read() + + prepared_file = { + "name": data_title + "_" + data_uuid + ".png", + "mime_type": "image/png", + "data": base64.b64encode(data_temp), + "no_trigger_import": True, + } + + os.remove(temp_file_path) + else: + prepared_file = None + + return prepared_file + + @staticmethod + def is_ipv6(ip: str) -> bool: + """ + Determine whether the provided IP string is IPv6. + + :param ip: this parameter contains the ip to check. + :return: Boolean + """ + try: + ipaddress.IPv6Address(ip) + return True + except ipaddress.AddressValueError: + return False + + @staticmethod + def is_ipv4(ip: str) -> bool: + """ + Determine whether the provided IP string is IPv4. + + :param ip: this parameter contains the ip to check. + :return: Boolean + """ + try: + ipaddress.IPv4Address(ip) + return True + except ipaddress.AddressValueError: + return False