In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
import re

from pyspark.sql import Row

# This is the regular expression specific to Apache log analysis, which can be changed to different log formats as needed
# Example of an Apache log line:
#                              127.0.0.1 - - [21/Jul/2014:9:55:27 -0800] "GET /home.html HTTP/1.1" 200 2048
#                              1:IP  2:client 3:user 4:date time           5:method 6:req 7:proto   8:respcode 9:size
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'

# The function below is modeled specifically to the Apache Access Logs model, which can be modified as needed for different log formats
# Return a dictionary containing the Apache access log parts.
def parse_apache_log_line(logline):
    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        raise Error("Invalid logline: %s" % logline)
    return Row(
        ip_address    = match.group(1),
        client_identd = match.group(2),
        user_id       = match.group(3),
        date = (match.group(4)[:-6]).split(":", 1)[0],
        time = (match.group(4)[:-6]).split(":", 1)[1],
        method        = match.group(5),
        endpoint      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = int(match.group(9))
    )

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import sys


#input file
logFile = './apache.access.log'

# .cache() - Keeps the RDD in memory, which will be reused
access_logs = (sc.textFile(logFile)
               .map(parse_apache_log_line)
               .cache())
access_logs.count()

NameError: name 'sc' is not defined

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("ApacheLogAnalysis").getOrCreate()

# .cache() - Keeps the RDD in memory, which will be reused
access_logs = (sc.textFile(logFile)
               .map(parse_apache_log_line)
               .cache())

# Create a DataFrame from the RDD
schema_access_logs = spark.createDataFrame(access_logs)

# Create a table on which SQL-type queries can be triggered for analysis
schema_access_logs.createOrReplaceTempView("logs")


In [4]:


# Traffic size per address (page or endpoint)
topEndpointsMaxSize = (spark
    .sql("SELECT endpoint, content_size/1024 FROM logs ORDER BY content_size DESC LIMIT 10")
    .rdd.map(lambda row: (row[0], row[1]))
    .collect())


NameError: name 'spark' is not defined

In [5]:
display(topEndpointsMaxSize)

NameError: name 'topEndpointsMaxSize' is not defined

In [6]:
# Display the number of times a response code has been triggered
responseCodeToCount = (spark
                       .sql("SELECT response_code, COUNT(*) AS theCount FROM logs GROUP BY response_code")
                       .rdd.map(lambda row: (row[0], row[1]))
                       .collect())


NameError: name 'spark' is not defined

In [7]:
display(responseCodeToCount)

NameError: name 'responseCodeToCount' is not defined

In [8]:
# the number of occurrences of each IP address that has browsed the site (only addresses with a total greater than 10 will be displayed)
frequentIpAddressesHits = (spark
                           .sql("SELECT ip_address, COUNT(*) AS total FROM logs GROUP BY ip_address HAVING total > 10")
                           .rdd.map(lambda row: (row[0], row[1]))
                           .collect())


NameError: name 'spark' is not defined

In [9]:
frequentIpAddressesHits

NameError: name 'frequentIpAddressesHits' is not defined

In [10]:
frequentIpAddressesHits = spark.createDataFrame(frequentIpAddressesHits)


NameError: name 'spark' is not defined

In [11]:
display(frequentIpAddressesHits)


NameError: name 'frequentIpAddressesHits' is not defined

In [12]:
# traffic size by date
trafficWithTime = (spark
                   .sql("SELECT date, content_size/1024 FROM logs")
                   .rdd.map(lambda row: (row[0], row[1])))


NameError: name 'spark' is not defined

In [13]:
trafficWithTime = spark.createDataFrame(trafficWithTime)
trafficWithTime.count()


NameError: name 'spark' is not defined

In [14]:
display(trafficWithTime)

NameError: name 'trafficWithTime' is not defined