In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when
from pyspark.sql.types import StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Cybersecurity Threat Detection") \
    .getOrCreate()

# Specify the local file path to your dataset
file_path = "/content/drive/My Drive/advanced_cybersecurity_data.csv" # Replace with the actual file path

# Load the dataset with header=True
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Display schema
print("Dataset Schema:")
df.printSchema()

# Display the first few rows of the dataset
print("Sample Data:")
df.show(5, truncate=False)

# Inspect unique values in the "Anomaly_Flag" column
print("Unique values in the 'Anomaly_Flag' column:")
df.select("Anomaly_Flag").distinct().show()

# Handle non-numeric values in the "Anomaly_Flag" column
df = df.withColumn(
    "Anomaly_Flag",
    when(col("Anomaly_Flag") == "True", 1)
    .when(col("Anomaly_Flag") == "False", 0)
    .otherwise(col("Anomaly_Flag"))
)

# Cast the "Anomaly_Flag" column to integer type
df = df.withColumn("Anomaly_Flag", col("Anomaly_Flag").cast(IntegerType()))

# Define a function to classify network requests
def classify_request(anomaly_flag):
    if anomaly_flag == 1:  # Assuming 1 indicates suspicious activity
        return "suspicious"
    else:
        return "normal"

# Register the function as a UDF
classify_request_udf = udf(classify_request, StringType())

# Add a new column to classify requests
df = df.withColumn("request_class", classify_request_udf(col("Anomaly_Flag")))

# Filter suspicious requests
suspicious_requests = df.filter(col("request_class") == "suspicious")

# Group by IP address and count suspicious requests
high_risk_ips = suspicious_requests.groupBy("IP_Address").count().orderBy("count", ascending=False)

# Show high-risk IPs
print("High-Risk IP Addresses:")
high_risk_ips.show()

# Stop the Spark session
spark.stop()

Dataset Schema:
root
 |-- Timestamp: timestamp (nullable = true)
 |-- IP_Address: string (nullable = true)
 |-- Request_Type: string (nullable = true)
 |-- Status_Code: integer (nullable = true)
 |-- Anomaly_Flag: integer (nullable = true)
 |-- User_Agent: string (nullable = true)
 |-- Session_ID: integer (nullable = true)
 |-- Location: string (nullable = true)

Sample Data:
+-------------------+--------------+------------+-----------+------------+----------+----------+--------+
|Timestamp          |IP_Address    |Request_Type|Status_Code|Anomaly_Flag|User_Agent|Session_ID|Location|
+-------------------+--------------+------------+-----------+------------+----------+----------+--------+
|2023-01-01 00:00:00|202.118.116.11|GET         |403        |0           |Edge      |4835      |Brazil  |
|2023-01-01 00:01:00|38.30.40.178  |DELETE      |301        |0           |Bot       |3176      |China   |
|2023-01-01 00:02:00|209.5.148.15  |POST        |500        |0           |Opera     |4312  