### Kafka OTX Consumer

In [1]:
import sys

!{sys.executable} -m pip install intelmq --no-cache-dir
!{sys.executable} -m pip install kafka-python --no-cache-dir
!{sys.executable} -m pip install requests --no-cache-dir
!{sys.executable} -m pip install praw --no-cache-dir
!{sys.executable} -m pip install otx-misp --no-cache-dir



In [2]:
from kafka import KafkaConsumer
import json
import time

# Kafka Consumer Setup
consumer = KafkaConsumer(
    "threat_feed",  # Topic Name
    bootstrap_servers="localhost:9092",
    auto_offset_reset="earliest",
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)

print("🚀 Kafka Consumer Started - Listening for Threat Intelligence Data...")

# List to store threat data
threat_data_list = []
seen_threats = set() #Track unique threats

try:
    for message in consumer:
        threat = message.value
        threat_id = threat.get("name", "Unknown Threat") + threat.get("created", "Unknown")
        
        # Check if this threat is already processed
        if threat_id in seen_threats:
            continue  # Skip duplicate
        
        seen_threats.add(threat_id)  # Mark threat as processed

        # Extract key information
        threat_entry = {
            "Threat Name": threat.get("name", "Unknown Threat"),
            "Adversary": threat.get("adversary", "Unknown"),
            "Created": threat.get("created", "Unknown"),
            "Modified": threat.get("modified", "Unknown"),
            "Description": threat.get("description", "No description"),
            "Tags": threat.get("tags", []),
            "Indicators": []
        }

        # Extract indicators (IP, CVE, Hashes, Domains)
        indicators = threat.get("indicators", [])
        for indicator in indicators:
            threat_entry["Indicators"].append({
                "Type": indicator.get("type", "Unknown"),
                "Value": indicator.get("indicator", "Unknown"),
                "Created": indicator.get("created", "Unknown"),
                "Expiration": indicator.get("expiration", "Unknown"),
                "Is Active": indicator.get("is_active", "Unknown")
            })

        threat_data_list.append(threat_entry)
        print(f"💀 Threat: {threat_entry['Threat Name']} ({len(indicators)} indicators)")
        #time.sleep(1)

except KeyboardInterrupt:
    print("\n❌ Kafka Consumer Stopped by User.")

finally:
    # Save results to a JSON file
    with open("OTX_Threat_Data.json", "w", encoding="utf-8") as f:
        json.dump(threat_data_list, f, indent=4)

    print("\n✅ Threat Intelligence Data Saved to 'OTX_Threat_Data.json'")


🚀 Kafka Consumer Started - Listening for Threat Intelligence Data...
💀 Threat: Unknown Threat (0 indicators)
💀 Threat: CL0P Ransomware: Latest Attacks (9 indicators)
💀 Threat: Russia-nexus APT possibly related to APT28 conducts cyber espionage on Central Asia and Kazakhstan diplomatic relations (48 indicators)
💀 Threat: Exploitation in the Wild of Aviatrix Controller RCE (CVE-2024-50603) (14 indicators)
💀 Threat: XELERA Ransomware Campaign: Fake Food Corporation of India Job Offers Targeting Tech Aspirants (6 indicators)
💀 Threat: Sandworm APT Targets Ukrainian Users with Trojanized Microsoft KMS Activation Tools in Cyber Espionage Campaigns (44 indicators)
💀 Threat: From South America to Southeast Asia: The Fragile Web of REF7707 (50 indicators)
💀 Threat: The BadPilot campaign: Multiyear global access operation (26 indicators)
💀 Threat: How Cracks and Installers Bring Malware to Your Device (40 indicators)
💀 Threat: Russian Influence Operations Target German Elections (114 indicators)

In [3]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("ThreatIntelligenceAnalysis").getOrCreate()

# Read JSON file (treating it as a JSON array)
df = spark.read.json("OTX_Threat_Data.json", multiLine=True)

# Show schema and data
df.printSchema()
#df.show(truncate=False)


root
 |-- Adversary: string (nullable = true)
 |-- Created: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Indicators: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Created: string (nullable = true)
 |    |    |-- Expiration: string (nullable = true)
 |    |    |-- Is Active: string (nullable = true)
 |    |    |-- Type: string (nullable = true)
 |    |    |-- Value: string (nullable = true)
 |-- Modified: string (nullable = true)
 |-- Tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Threat Name: string (nullable = true)



In [4]:
from pyspark.sql.functions import col, explode
from pyspark.ml.feature import CountVectorizer, StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
try:
    # removing null values
    df = df.na.drop()

    # making sure Tags is an array (some entries may be null)
    df = df.withColumn("Tags", col("Tags").cast("array<string>"))

    # Index Threat Name as label
    indexer = StringIndexer(inputCol="Threat Name", outputCol="label")
    df_indexed = indexer.fit(df).transform(df)

    # CountVectorizer on 'Tags' array (no need for concat_ws)
    vectorizer = CountVectorizer(inputCol="Tags", outputCol="features")
    vector_model = vectorizer.fit(df_indexed)
    df_features = vector_model.transform(df_indexed)

    # selecting final dataset for ML model
    df_final = df_features.select("features", "label")

    train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)

    nb = NaiveBayes(modelType="multinomial")
    nb_model = nb.fit(train_data)

    predictions = nb_model.transform(test_data)
    predictions.select("features", "label", "prediction").show()

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print(f"Model Accuracy: {accuracy * 100:.2f}")
except Exception as e:
    print(f"An error occurred: {e}")

+--------------------+------+----------+
|            features| label|prediction|
+--------------------+------+----------+
|        (5439,[],[])|  92.0|       1.0|
|        (5439,[],[])| 258.0|       1.0|
|        (5439,[],[])| 350.0|       1.0|
|        (5439,[],[])| 621.0|       1.0|
|        (5439,[],[])|1025.0|       1.0|
|        (5439,[],[])|1376.0|       1.0|
|(5439,[0,1,2,15,4...| 715.0|      10.0|
|(5439,[0,1,17,26,...|1191.0|    1116.0|
|(5439,[0,2,3,24,3...|1514.0|     118.0|
|(5439,[0,2,4,10,8...| 703.0|       3.0|
|(5439,[0,2,4,14,3...| 158.0|    1374.0|
|(5439,[0,2,5,14,3...| 494.0|     521.0|
|(5439,[0,2,6,456,...| 600.0|     846.0|
|(5439,[0,2,12,20,...|1734.0|    1097.0|
|(5439,[0,2,24,36,...| 497.0|     521.0|
|(5439,[0,3,6,37,1...| 578.0|      10.0|
|(5439,[0,3,45,264...| 100.0|      10.0|
|(5439,[0,5,21,152...|1071.0|      99.0|
|(5439,[0,6,29,162...|1020.0|      99.0|
|(5439,[0,6,754,46...|1303.0|     459.0|
+--------------------+------+----------+
only showing top