# 🛡️ Log Analytics for Security

This notebook processes and analyzes raw log files (Apache/Nginx-style) using PySpark. It detects patterns, flags suspicious behavior, and stores the results in Delta format for further analysis on Microsoft Fabric.


### 🧪 Step 1: Start Spark Session

We begin by initializing a Spark session on Microsoft Fabric to process the log data using PySpark.


In [11]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder \
    .appName("Security Log Analysis") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")  # 👈 Hide warnings like SparkUI port binding, etc.

### 📂 Step 2: Read Raw Log File from `/data/access.log`

We load the Apache-style access log from the `data/` folder using Spark’s `read.text()` method.


In [12]:
log_file_path = "../data/access.log"  # Adjust the path for Fabric if needed

raw_logs_df = spark.read.text(log_file_path)
raw_logs_df.show(5, truncate=False)

+------------------------------------------------------------------------------------+
|value                                                                               |
+------------------------------------------------------------------------------------+
|192.168.1.41 - - [10/May/2025:15:30:48 +0000] "POST /api/data HTTP/1.1" 403 2724    |
|192.168.1.9 - - [10/May/2025:21:56:46 +0000] "PUT /contact HTTP/1.1" 200 766        |
|192.168.1.13 - - [10/May/2025:13:35:18 +0000] "POST /contact HTTP/1.1" 503 3161     |
|192.168.1.20 - - [10/May/2025:18:27:22 +0000] "DELETE /index.html HTTP/1.1" 302 1411|
|192.168.1.24 - - [10/May/2025:19:00:41 +0000] "GET /dashboard HTTP/1.1" 503 2090    |
+------------------------------------------------------------------------------------+
only showing top 5 rows



### 🔍 Step 3: Parse Log Lines into Structured Format

We apply regular expressions using `regexp_extract` to extract key fields from each log line:  
- IP address  
- Timestamp  
- HTTP method  
- Endpoint  
- Protocol  
- Status code  
- Response size  

In [13]:
from pyspark.sql.functions import regexp_extract, col

parsed_df = raw_logs_df.select(
    regexp_extract('value', r'^(\S+)', 1).alias('ip'),
    regexp_extract('value', r'\[(.*?)\]', 1).alias('timestamp'),
    regexp_extract('value', r'\"(\S+)\s(\S+)\s*(\S*)\"', 1).alias('method'),
    regexp_extract('value', r'\"(\S+)\s(\S+)\s*(\S*)\"', 2).alias('endpoint'),
    regexp_extract('value', r'\"(\S+)\s(\S+)\s*(\S*)\"', 3).alias('protocol'),
    regexp_extract('value', r'\"\s(\d{3})\s', 1).cast("integer").alias('status_code'),
    regexp_extract('value', r'\s(\d+)$', 1).cast("integer").alias('response_size')
)

parsed_df.show(5, truncate=False)

+------------+--------------------------+------+-----------+--------+-----------+-------------+
|ip          |timestamp                 |method|endpoint   |protocol|status_code|response_size|
+------------+--------------------------+------+-----------+--------+-----------+-------------+
|192.168.1.41|10/May/2025:15:30:48 +0000|POST  |/api/data  |HTTP/1.1|403        |2724         |
|192.168.1.9 |10/May/2025:21:56:46 +0000|PUT   |/contact   |HTTP/1.1|200        |766          |
|192.168.1.13|10/May/2025:13:35:18 +0000|POST  |/contact   |HTTP/1.1|503        |3161         |
|192.168.1.20|10/May/2025:18:27:22 +0000|DELETE|/index.html|HTTP/1.1|302        |1411         |
|192.168.1.24|10/May/2025:19:00:41 +0000|GET   |/dashboard |HTTP/1.1|503        |2090         |
+------------+--------------------------+------+-----------+--------+-----------+-------------+
only showing top 5 rows



### 🧼 Step 4: Clean and Filter

We filter out records where parsing failed (null status codes) and inspect the resulting schema and sample rows.


In [14]:
cleaned_df = parsed_df.filter(col("status_code").isNotNull())
cleaned_df.printSchema()
cleaned_df.show(5)

root
 |-- ip: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status_code: integer (nullable = true)
 |-- response_size: integer (nullable = true)

+------------+--------------------+------+-----------+--------+-----------+-------------+
|          ip|           timestamp|method|   endpoint|protocol|status_code|response_size|
+------------+--------------------+------+-----------+--------+-----------+-------------+
|192.168.1.41|10/May/2025:15:30...|  POST|  /api/data|HTTP/1.1|        403|         2724|
| 192.168.1.9|10/May/2025:21:56...|   PUT|   /contact|HTTP/1.1|        200|          766|
|192.168.1.13|10/May/2025:13:35...|  POST|   /contact|HTTP/1.1|        503|         3161|
|192.168.1.20|10/May/2025:18:27...|DELETE|/index.html|HTTP/1.1|        302|         1411|
|192.168.1.24|10/May/2025:19:00...|   GET| /dashboard|HTTP/1.1|        503|      

### 🧠 Step 5: Apply Basic Anomaly Detection

We detect suspicious behavior by grouping logs by IP and identifying users with 3 or more requests resulting in HTTP status codes ≥ 400.


In [15]:
from pyspark.sql.functions import count

anomalies_df = cleaned_df.groupBy("ip", "status_code").agg(count("*").alias("request_count")) \
    .filter((col("status_code") >= 400) & (col("request_count") >= 2))

anomalies_df.show()


+------------+-----------+-------------+
|          ip|status_code|request_count|
+------------+-----------+-------------+
|192.168.1.40|        404|            2|
|192.168.1.23|        404|            2|
|192.168.1.35|        403|            2|
|192.168.1.11|        404|            2|
| 192.168.1.7|        401|            2|
|192.168.1.14|        500|            2|
+------------+-----------+-------------+



### 💾 Step 6: Save Processed Logs to Delta Lake

We store the cleaned log data as a Delta table in Microsoft Fabric’s default `Tables/` location for downstream analytics.


In [16]:
# 🪣 Try saving to Delta (only works on Fabric or local Delta setup)
try:
    cleaned_df.write \
        .format("delta") \
        .mode("overwrite") \
        .save("Tables/security_logs_cleaned")

    print("✅ Data written to Delta successfully.")

except Exception as e:
    print("⚠️ Could not write to Delta format. This step requires Microsoft Fabric or Delta Lake setup.\n")
    print("Error details:\n", e)

⚠️ Could not write to Delta format. This step requires Microsoft Fabric or Delta Lake setup.

Error details:
 An error occurred while calling o158.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:873)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:260)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.r