In [1]:
import os
import re
import json
from envs.env import text_path, json_path, ntfy_nofi
from envs.whitelist import *
from datetime import datetime

In [2]:
directory_path = text_path
output_path = json_path

In [3]:
def extract_using_pattern(pattern, content):
    return re.findall(pattern, content)


def extract_references(content):
    reference_name_patterns = [r"CVE-\d{4}-\d{4,5}", r"T\d{3,5}"]
    references = []
    for pattern in reference_name_patterns:
        matches = extract_using_pattern(pattern, content)
        for match in matches:
            references.append(match.upper())
    return references


# def extract_filenames(content):
#     filename_pattern = r"\b\w+\.[a-zA-Z]{2,4}\b"
#     potential_filenames = extract_using_pattern(filename_pattern, content)

#     cleaned_filenames = []
#     for filename in potential_filenames:
#         name, ext = filename.rsplit(".", 1)
#         english_part = re.search("[a-zA-Z]+$", name)
#         if english_part:
#             name = english_part.group()
#         cleaned_filenames.append(f"{name}.{ext}")
#     return cleaned_filenames


def extract_md5(content):
    md5_pattern = r"\b[0-9a-fA-F]{32}\b"
    return extract_using_pattern(md5_pattern, content)


def extract_sha1(content):
    sha1_pattern = r"\b[0-9a-fA-F]{40}\b"
    return extract_using_pattern(sha1_pattern, content)


def extract_sha256(content):
    sha256_pattern = r"\b[0-9a-fA-F]{64}\b"
    return extract_using_pattern(sha256_pattern, content)


# def extract_registry_entries(content):
#     registry_pattern = r'HK[A-Z_]+\\[^"\n]+'
#     return extract_using_pattern(registry_pattern, content)


def extract_URL(content):
    url_pattern = (
        # r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|"
        # r"(?:%[0-9a-fA-F][0-9a-fA-F]))+"
        r"\b[\w\[\]\.]*\[\.\][\w\[\]\.]*\b"
    )
    URL = extract_using_pattern(url_pattern, content)
    # Remove "[" and "]" from the extracted URL
    cleaned_URL = [url.replace("[", "").replace("]", "") for url in URL]
    return cleaned_URL


def categorize_content(content, rule_id, filename, ti_no):
    # Initialize the types list
    types = []

    # Extract the necessary data
    references = extract_references(content)
    md5_hashes = extract_md5(content)
    sha1_hashes = extract_sha1(content)
    sha256_hashes = extract_sha256(content)
    urls = extract_URL(content)

    # Check for the presence of each type and update the types list
    if md5_hashes:
        types.append("HASH")
    if sha1_hashes:
        types.append("HASH")
    if sha256_hashes:
        types.append("HASH")
    if urls:
        types.append("URL")

    return {
        "ti_no": ti_no,
        "rule_id": rule_id,
        "ti_nm": filename[:-4],
        "ti_desc": "",
        "ti_ref": extract_references(content),
        # "samples": extract_filenames(content),
        "ti_smp": "",
        "MD5": extract_md5(content),
        "SHA1": extract_sha1(content),
        "SHA256": extract_sha256(content),
        # "Registry_Entries": extract_registry_entries(content),
        "URL": extract_URL(content),
        # "weight": 0.0,
        "ti_area": "etas",
        "ti_type": list(set(types)),
        "ti_applied": "Y",
        "site_tp": "rio",
        # "lastStatus": "added",
        "del_yn": "N",
        "reg_dt": "",
        # "reg_dt": datetime.now().strftime("%Y-%m-%d"),
        "reg_user_id": "admin",
    }

In [4]:
# Read text files
def get_text_files_from_directory(directory_path):
    return [
        os.path.join(directory_path, file)
        for file in os.listdir(directory_path)
        if file.endswith(".txt")
    ]


# Save json files
def write_to_json(output_path, content):
    with open(output_path, "w") as outfile:
        # json.dump(content, outfile)
        json.dump(content, outfile, ensure_ascii=False)

In [5]:
def filter_unique_content(category, content, unique_sets, whitelist=None):
    filtered_content = list(set([x for x in content if x not in unique_sets[category]]))
    if whitelist:
        filtered_content = [
            x
            for x in filtered_content
            if not any(whitelisted in x for whitelisted in whitelist)
        ]
    unique_sets[category].update(filtered_content)
    return filtered_content


def combine_into_signatures(categorized_content):
    keys_to_combine = ["MD5", "SHA1", "SHA256", "URL"]
    return [
        item for key in keys_to_combine for item in categorized_content.get(key, [])
    ]


def process_text_file(file_path, unique_sets, rule_id, ti_no):
    with open(file_path, "r", encoding="utf-8", errors="ignore") as file:
        content = file.read()
        filename = os.path.basename(file_path)
        categorized_content = categorize_content(content, rule_id, filename, ti_no)

        categorized_content["ti_ref"] = filter_unique_content(
            "ti_ref", categorized_content["ti_ref"], unique_sets
        )
        # categorized_content["samples"] = filter_unique_content(
        #     "samples", categorized_content["samples"], unique_sets, whitelist_file_names
        # )
        # categorized_content["Registry_Entries"] = filter_unique_content(
        #     "Registry_Entries", categorized_content["Registry_Entries"], unique_sets
        # )
        categorized_content["MD5"] = filter_unique_content(
            "MD5", categorized_content["MD5"], unique_sets
        )
        categorized_content["SHA1"] = filter_unique_content(
            "SHA1", categorized_content["SHA1"], unique_sets
        )
        categorized_content["SHA256"] = filter_unique_content(
            "SHA256", categorized_content["SHA256"], unique_sets
        )
        categorized_content["URL"] = filter_unique_content(
            "URL", categorized_content["URL"], unique_sets, whitelist_urls
        )

        # merge into signatures category
        categorized_content["ti_sgn"] = combine_into_signatures(categorized_content)

        return categorized_content

In [6]:
def process_all_text_files(directory_path, output_path):
    text_files = get_text_files_from_directory(directory_path)

    unique_sets = {
        "ti_ref": set(),
        # "samples": set(),
        # "Registry_Entries": set(),
        "MD5": set(),
        "SHA1": set(),
        "SHA256": set(),
        "URL": set(),
    }

    rule_id = 10000000
    ti_no = 1000

    for file_path in text_files:
        categorized_content = process_text_file(file_path, unique_sets, rule_id, ti_no)

        # Toggle remove fields
        fields_to_remove = ["MD5", "SHA1", "SHA256", "URL"]
        for field in fields_to_remove:
            categorized_content.pop(field, None)

        # don't create files if signatures is empoy
        if not categorized_content.get("ti_sgn"):
            print("No signatures found for:", file_path)
            continue
        rule_id += 1
        ti_no += 1
        filename_without_extension = os.path.splitext(os.path.basename(file_path))[0]
        output_json_path = os.path.join(
            output_path, filename_without_extension + ".json"
        )
        write_to_json(output_json_path, categorized_content)

In [7]:
# Excute
process_all_text_files(directory_path, output_path)

No signatures found for: /Users/dong-ju/Documents/My_code/ttp_autogpt/python_ground/text/final/New tools uncovered from hacking group APT15.txt
No signatures found for: /Users/dong-ju/Documents/My_code/ttp_autogpt/python_ground/text/final/wp-drilling-deep-a-look-at-cyberattacks-on-the-oil-and-gas-industry.txt
No signatures found for: /Users/dong-ju/Documents/My_code/ttp_autogpt/python_ground/text/final/chessmaster-cyber-espionage-campaign.txt
No signatures found for: /Users/dong-ju/Documents/My_code/ttp_autogpt/python_ground/text/final/Norman_HangOver report_Executive Summary_042513.txt
No signatures found for: /Users/dong-ju/Documents/My_code/ttp_autogpt/python_ground/text/final/Cybereason-Labs-Analysis-Webmail-Sever-APT.txt
No signatures found for: /Users/dong-ju/Documents/My_code/ttp_autogpt/python_ground/text/final/TA17-164A.txt
No signatures found for: /Users/dong-ju/Documents/My_code/ttp_autogpt/python_ground/text/final/The_destruction_of_APT3.txt
No signatures found for: /Users/