In [1]:
#Case Study: Log Parsing Domain: Telecom
#A telecom software provider is building an application to monitor different telecom components in the production environment. For monitoring purpose, the #application relies on log files by parsing the log files and looking for potential warning or exceptions in the logs and reporting them.
#The Dataset contains the log files from different components used in the overall telecom application.

#Tasks:
#The volume of data is quite large. As part of the R&D team, you are building a solution
#on spark to load and parse the multiple log files and then arranging the error and by the timestamp.

In [2]:
#Load file as a text file in spark

data='/FileStore/tables/access_1_clean-acc4c.log'

RDD=sc.textFile(data)

In [3]:
RDD.count()

In [4]:
#Find out how many 404 HTTP codes are in access logs.

import sys
import os

log_file_path = 'dbfs:/' + os.path.join('databricks-datasets', 'cs100', 'lab2', 'data-001', 'apache.access.log.PROJECT')


In [5]:
base_df = sqlContext.read.text(log_file_path)
# Let's look at the schema
base_df.printSchema()

In [6]:
base_df.count()

In [7]:
#(2b) Parsing the log file

from pyspark.sql.functions import split, regexp_extract
split_df = base_df.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host'),
                          regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
                          regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
                          regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
                          regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
split_df.show(truncate=False)

#remotehost rfc931 authuser [date] “request” status bytes
#remotehost	Remote hostname (or IP number if DNS hostname is not available).
#rfc931	The remote logname of the user. We don’t really care about this field.
#authuser	The username of the remote user, as authenticated by the HTTP server.
#[date]	The date and time of the request.
#“request”	The request, exactly as it came from the browser or client.
#status	The HTTP status code the server sent back to the client.
#bytes	The number of bytes (Content-Length) transferred to the client.

In [8]:
# Data Cleaning

#base_df.filter(base_df['value'].isNull()).count()

bad_rows_df = split_df.filter(split_df['host'].isNull() |
                              split_df['timestamp'].isNull() |
                              split_df['path'].isNull() |
                              split_df['status'].isNull() |
                             split_df['content_size'].isNull())
bad_rows_df.count()

In [9]:
from pyspark.sql.functions import col, sum

def count_null(col_name):
  return sum(col(col_name).isNull().cast('integer')).alias(col_name)


In [10]:
# Build up a list of column expressions, one per column.
#
# This could be done in one line with a Python list comprehension, but we're keeping
# it simple for those who don't know Python very well.
exprs = []
for col_name in split_df.columns:
  exprs.append(count_null(col_name))

# Run the aggregation. The *exprs converts the list of expressions into
# variable function arguments.
split_df.agg(*exprs).show()


In [11]:
bad_content_size_df = base_df.filter(~ base_df['value'].rlike(r'\d+$'))
bad_content_size_df.count()


In [12]:
from pyspark.sql.functions import lit, concat
bad_content_size_df.select(concat(bad_content_size_df['value'], lit('*'))).show(truncate=False)

In [13]:
#Fix the rows with null content_size

# Replace all null content_size values with 0.
cleaned_df = split_df.na.fill({'content_size': 0})

# Ensure that there are no nulls left.
exprs = []
for col_name in cleaned_df.columns:
  exprs.append(count_null(col_name))

cleaned_df.agg(*exprs).show()

In [14]:
#Parsing the timestamp.

month_map = {
  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
  'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}

def parse_clf_time(s):
    """ Convert Common Log time format into a Python datetime object
    Args:
        s (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
    Returns:
        a string suitable for passing to CAST('timestamp')
    """
    # NOTE: We're ignoring time zone here. In a production application, you'd want to handle that.
    return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
      int(s[7:11]),
      month_map[s[3:6]],
      int(s[0:2]),
      int(s[12:14]),
      int(s[15:17]),
      int(s[18:20])
    )

u_parse_time = udf(parse_clf_time)

logs_df = cleaned_df.select('*', u_parse_time(split_df['timestamp']).cast('timestamp').alias('time')).drop('timestamp')
total_log_entries = logs_df.count()

logs_df.printSchema()

display(logs_df)


host,path,status,content_size,time
in24.inetnebr.com,/shuttle/missions/sts-68/news/sts-68-mcc-05.txt,200,1839,1995-08-01T00:00:01.000+0000
uplherc.upl.com,/,304,0,1995-08-01T00:00:07.000+0000
uplherc.upl.com,/images/ksclogo-medium.gif,304,0,1995-08-01T00:00:08.000+0000
uplherc.upl.com,/images/MOSAIC-logosmall.gif,304,0,1995-08-01T00:00:08.000+0000
uplherc.upl.com,/images/USA-logosmall.gif,304,0,1995-08-01T00:00:08.000+0000
ix-esc-ca2-07.ix.netcom.com,/images/launch-logo.gif,200,1713,1995-08-01T00:00:09.000+0000
uplherc.upl.com,/images/WORLD-logosmall.gif,304,0,1995-08-01T00:00:10.000+0000
slppp6.intermind.net,/history/skylab/skylab.html,200,1687,1995-08-01T00:00:10.000+0000
piweba4y.prodigy.com,/images/launchmedium.gif,200,11853,1995-08-01T00:00:10.000+0000
slppp6.intermind.net,/history/skylab/skylab-small.gif,200,9202,1995-08-01T00:00:11.000+0000


In [15]:
#Let’s cache logs_df. We’re going to be using it quite a bit from here forward.
logs_df.cache()

In [16]:
#Example: HTTP Status Analysis

status_to_count_df =(logs_df
                     .groupBy('status')
                     .count()
                     .sort('status')
                     .cache())

status_to_count_length = status_to_count_df.count()
print('Found %d response codes' % status_to_count_length)
status_to_count_df.show()

assert status_to_count_length == 7
assert status_to_count_df.take(100) == [(200, 940847), (302, 16244), (304, 79824), (403, 58), (404, 6185), (500, 2), (501, 17)]

In [17]:
display(status_to_count_df)

status,count
200,940847
302,16244
304,79824
403,58
404,6185
500,2
501,17


In [18]:
from pyspark.sql import functions as sqlFunctions
log_status_to_count_df = status_to_count_df.withColumn('log(count)', sqlFunctions.log(status_to_count_df['count']))

display(log_status_to_count_df)

status,count,log(count)
200,940847,13.75453581236166
302,16244,9.69547888880619
304,79824,11.287579490100818
403,58,4.060443010546419
404,6185,8.729882284826589
500,2,0.6931471805599453
501,17,2.833213344056216


In [19]:
from spark_notebook_helpers import prepareSubplot, np, plt, cm
data = log_status_to_count_df.drop('count').collect()
x, y = zip(*data)
index = np.arange(len(x))
bar_width = 0.7
colorMap = 'Accent'
cmap = cm.get_cmap(colorMap)

fig, ax = prepareSubplot(np.arange(0, 6, 1), np.arange(0, 14, 2))
plt.bar(index, y, width=bar_width, color=cmap(0))
plt.xticks(index + bar_width/2.0, x)
display(fig)

In [20]:
# Any hosts that has accessed the server more than 10 times.
host_sum_df =(logs_df
              .groupBy('host')
              .count())

host_more_than_10_df = (host_sum_df
                        .filter(host_sum_df['count'] > 10)
                        .select(host_sum_df['host']))

print('Any 20 hosts that have accessed more then 10 times:\n')
host_more_than_10_df.show(truncate=False)


In [21]:
unique_host_count = (logs_df.select('host')
                     .distinct()
                     .count())
print ('Unique hosts: {0}'.format(unique_host_count))
