# Sample Apache Spark Notebook

Here's an example of cleaning and analyzing an Apache access log, but this time within an interactive Notebook environment!

While prototyping data engineering solutions, notebook environments are popular.

If you installed the pyspark package or are working within an already-established environment for Spark, things will probably "just work." But if not, using the findspark package will tie the notebook to your existing Spark installation:

In [8]:
!pip install findspark



In [9]:
import findspark
findspark.init()

One nice thing about notebooks is that you can leave little comments and explanations like this, in markdown format.

We'll start by importing the stuff we need, and creating a SparkSession.

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col, count, desc
from pyspark.sql.types import IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("ApacheLogAnalysis").getOrCreate()

Next we'll load up our sample access log, and load in the raw text into a Dataframe.

In [11]:
# Define log file path (Update this path to your log file location)
log_file = "./ml-100k/access_log.txt"

# Read log file as text
logs_df = spark.read.text(log_file)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/C:/SparkCourse/ml-100k/access_log.txt. SQLSTATE: 42K03

We'll now parse the log into the fields we are interested in. Note, we know there is some bad data in here where the status code is empty.

In [None]:
# Regular expression pattern to extract fields
log_pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\S{3}) (\d+) "(.*?)"'

# Extract fields using regex
parsed_logs_df = logs_df.select(
    regexp_extract('value', log_pattern, 1).alias("ip_address"),
    regexp_extract('value', log_pattern, 2).alias("timestamp"),
    regexp_extract('value', log_pattern, 3).alias("request"),
    regexp_extract('value', log_pattern, 4).alias("status"),
    regexp_extract('value', log_pattern, 5).cast("integer").alias("bytes"),
    regexp_extract('value', log_pattern, 6).alias("user_agent")
)

We'll use a filter to just remove those bogus rows with no status, and cast the remaining status codes to integers.

In [None]:
# Filter out rows with empty status fields
cleaned_df = parsed_logs_df.filter(col("status").isNotNull() & (col("status") != ""))
cleaned_df = cleaned_df.withColumn("status", col("status").cast(IntegerType()))

A nice thing about notebooks is you can break up your processing into these separate blocks, and inspect the output just for whatever it is you're doing. Then you can go back and iterate on that piece of code as needed, rather than re-running everything.

Let's further process our data to split out the method and endpoint from the request field, and then preview the resulting Dataframe we have thusfar:

In [None]:
# Split request field to get HTTP method and endpoint
parsed_logs_df = cleaned_df.withColumn("method", regexp_extract("request", r'(\S+)', 1)) \
                               .withColumn("endpoint", regexp_extract("request", r' (\S+) ', 1))

# Show parsed log data
parsed_logs_df.show(10, truncate=False)

Now let's start doing some analysis. We'll start with displaying the top 10 IP addresses. Unfortunately, as usual, there's a hacker trying to DOS me:

In [None]:
# 1. Count requests per IP address
ip_count_df = parsed_logs_df.groupBy("ip_address").agg(count("*").alias("request_count")).orderBy(desc("request_count"))
ip_count_df.show(10)

Let's look at the top endpoints, and right away we can see that our friend is trying to break into my WordPress site through xmlrpc.php vulnerabilities and trying to brute-force their way in:

In [None]:
# 2. Most requested endpoints
endpoint_count_df = parsed_logs_df.groupBy("endpoint").agg(count("*").alias("endpoint_count")).orderBy(desc("endpoint_count"))
endpoint_count_df.show(10)

Let's also take a look at the top status codes. Looks like they've succeeded in making my site unstable:

In [None]:
# 3. HTTP status code distribution
status_count_df = parsed_logs_df.groupBy("status").agg(count("*").alias("status_count")).orderBy(desc("status_count"))
status_count_df.show(10)

Finally we'll shut things down:

In [None]:
# Stop Spark session
spark.stop()