In [1]:
import re
import pandas as pd
import os
import urllib

In [2]:
# Download data from ftp
urllib.request.urlretrieve("ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz", "/tmp/NASA_access_log_Jul95.gz")
urllib.request.urlretrieve("ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz","/tmp/NASA_access_log_Aug95.gz")

In [3]:
# Move data from local to DBFS
dbutils.fs.mv("file:/tmp/NASA_access_log_Jul95.gz", "dbfs:/NASA_access_log_Jul95.gz")
dbutils.fs.mv("file:/tmp/NASA_access_log_Aug95.gz", "dbfs:/NASA_access_log_Aug95.gz")

In [4]:
raw_data_files = ["NASA_access_log_Jul95.gz","NASA_access_log_Aug95.gz"]
base_df = spark.read.text(raw_data_files)
base_df.printSchema()
type(base_df)
base_df.show(10, truncate=False)

In [5]:
# Change Spark data to RDD data
base_df_rdd = base_df.rdd
type(base_df_rdd)
base_df_rdd.take(10)

In [6]:
print((base_df.count(), len(base_df.columns)))

In [7]:
# Check how data looks like
sample_logs = [item['value'] for item in base_df.take(15)]
sample_logs

In [8]:
# Extract host
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
hosts = [re.search(host_pattern, item).group(1)
        if re.search(host_pattern, item)
        else 'not match'
        for item in sample_logs]
hosts

In [9]:
# Extract Timestamp
timestamp_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})\]'
timestamps = [re.search(timestamp_pattern, item).group(1)
        if re.search(timestamp_pattern, item)
        else 'not match'
        for item in sample_logs]
timestamps

In [10]:
# Extract HTTP Request Method, URIs and Protocol
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s(\S*)\"'
method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()
                      if re.search(method_uri_protocol_pattern, item)
                      else "not match"
                      for item in sample_logs]
method_uri_protocol

In [11]:
# Extract HTTP Status Code
status_pattern = r'\s(\d{3})\s'
status = [re.search(status_pattern, item).group(1) for item in sample_logs]
print(status)

In [12]:
# Extract HTTP Response Content Size
content_size_pattern = r'\s(\d+)$'
content_size = [re.search(content_size_pattern, item).group(1) for item in sample_logs]
print(content_size)

In [13]:
from pyspark.sql.functions import regexp_extract

logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                        regexp_extract('value', timestamp_pattern, 1).alias('timestamps'),
                        regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                        regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                        regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                        regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                        regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
logs_df.show(10, truncate = True)
print((logs_df.count(), len(logs_df.columns)))

In [14]:
(base_df
    .filter(base_df['value']
                .isNull())
    .count())

In [15]:
# Even though there is no empty row, let's still check how many rows contain some missing values in any column
bad_rows_df = logs_df.filter(logs_df['host'].isNull()|
               logs_df['timestamps'].isNull()|
               logs_df['method'].isNull()|
               logs_df['endpoint'].isNull() |
               logs_df['status'].isNull() |
               logs_df['content_size'].isNull()|
               logs_df['protocol'].isNull())
print("There are {} rows contain some missing values in any of column".format(bad_rows_df.count()))

In [16]:
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum

def count_null(col_name):
    return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

# Build up a list of column expressions, one per column.
exprs = [count_null(col_name) for col_name in logs_df.columns]

# Run the aggregation. The *exprs converts the list of expressions into
# variable function arguments.
logs_df.agg(*exprs).show()

In [17]:
# Find rows in base_df with null value in "status" column
null_status_df = base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))
null_status_df.count()

In [18]:
null_status_df.show(truncate = False)

In [19]:
# Process other columns for null status rows
bad_status_df = null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                                      regexp_extract('value', timestamp_pattern, 1).alias('timestamp'),
                                      regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                                      regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                                      regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                                      regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                                      regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
bad_status_df.show(truncate=False)

In [20]:
# Seems like the row with null status is not informative, we can simply filter it out
logs_df = logs_df[logs_df['status'].isNotNull()] 
logs_df.count()

In [21]:
# Check null value in each column again
exprs = [count_null(col_name) for col_name in logs_df.columns]
logs_df.agg(*exprs).show()

In [22]:
# Find rows in base_df with null value in "content_size" column
null_content_size_df = base_df.filter(~base_df['value'].rlike(r'\s(\d+)$'))
print(null_content_size_df.count())
null_content_size_df.take(10)

In [23]:
# Seems like all null content_size mean no content_size, we can fillna with 0, which have the same meaning
logs_df = logs_df.na.fill({'content_size': 0})

In [24]:
# Check null value again
exprs = [count_null(col_name) for col_name in logs_df.columns]
logs_df.agg(*exprs).show()

In [25]:
from pyspark.sql.functions import udf

month_map = {
  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
  'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}

# Write an user-defined python function
def parse_clf_time(text):
    """ Convert Common Log time format into a Python datetime object
    Args:
        text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
    Returns:
        a string suitable for passing to CAST('timestamp')
    """
    # NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving
    return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
      int(text[7:11]),
      month_map[text[3:6]],
      int(text[0:2]),
      int(text[12:14]),
      int(text[15:17]),
      int(text[18:20])
    )

In [26]:
# Check how timestamps column looks like
sample_ts = [item['timestamps'] for item in logs_df.select('timestamps').take(5)]
sample_ts

In [27]:
# Check how result looks like after applying user-defined python function
[parse_clf_time(item) for item in sample_ts]

In [28]:
# Change user-defined python function into sql function  by udf() in pyspark.sql module

udf_parse_time = udf(parse_clf_time)

logs_df = logs_df.select('*', udf_parse_time(logs_df['timestamps']).cast('timestamp').alias('time')).drop('timestamps')
logs_df.show(10, truncate=True)

In [29]:
logs_df.printSchema()

In [30]:
logs_df.limit(5).toPandas()

Unnamed: 0,host,method,endpoint,protocol,status,content_size,time
0,199.72.81.55,GET,/history/apollo/,HTTP/1.0,200,6245,1995-07-01 00:00:01
1,unicomp6.unicomp.net,GET,/shuttle/countdown/,HTTP/1.0,200,3985,1995-07-01 00:00:06
2,199.120.110.21,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085,1995-07-01 00:00:09
3,burger.letters.com,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0,1995-07-01 00:00:11
4,199.120.110.21,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179,1995-07-01 00:00:11


In [31]:
# Ready to use dataframe for analysis, we need to cache it first
logs_df.cache()

In [32]:
content_size_summary_df = logs_df.describe(['content_size'])
content_size_summary_df.toPandas()

Unnamed: 0,summary,content_size
0,count,3461612.0
1,mean,18928.844398216785
2,stddev,73031.47260949228
3,min,0.0
4,max,6823936.0


In [33]:
from pyspark.sql import functions as F
(logs_df.agg(F.min(logs_df['content_size']).alias('min_content_size'),
            F.max(logs_df['content_size']).alias('max_content_size'),
            F.mean(logs_df['content_size']).alias('mean_content_size'),
            F.stddev(logs_df['content_size']).alias('std_content_size'),
             F.count(logs_df['content_size']).alias('count_content_size')).toPandas())

Unnamed: 0,min_content_size,max_content_size,mean_content_size,std_content_size,count_content_size
0,0,6823936,18928.844398,73031.472609,3461612


In [34]:
status_freq_df = (logs_df
                 .groupBy('status')
                 .count()
                 .sort('status')
                 .cache())

In [35]:
print('Total distinct HTTP Status Codes:', status_freq_df.count())

In [36]:
status_freq_pd_df = (status_freq_df
                    .toPandas()
                    .sort_values(by=['count'],ascending=False))
status_freq_pd_df

Unnamed: 0,status,count
0,200,3100524
2,304,266773
1,302,73070
5,404,20899
4,403,225
6,500,65
7,501,41
3,400,15


In [37]:
!pip install -U seaborn

In [38]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
%matplotlib inline

sns.catplot(x='status',
           y='count',
           data = status_freq_pd_df,
           kind = 'bar',
           order = status_freq_pd_df['status'])

In [39]:
log_freq_df = status_freq_df.withColumn('log(count)', F.log(status_freq_df['count']))
log_freq_df.show()

In [40]:
log_freq_pd_df = (log_freq_df
                    .toPandas()
                    .sort_values(by=['log(count)'],
                                 ascending=False))
sns.catplot(x='status', y='log(count)', data=log_freq_pd_df, 
            kind='bar', order=status_freq_pd_df['status'])

In [41]:
host_sum_df =(logs_df
               .groupBy('host')
               .count()
               .sort('count', ascending=False).limit(10))

host_sum_df.show(truncate=False)

In [42]:
host_sum_pd_df = host_sum_df.toPandas()
host_sum_pd_df.iloc[8]['host']

In [43]:
paths_df = (logs_df
            .groupBy('endpoint')
            .count()
            .sort('count', ascending=False).limit(20))
paths_pd_df = paths_df.toPandas()
paths_pd_df

Unnamed: 0,endpoint,count
0,/images/NASA-logosmall.gif,208353
1,/images/KSC-logosmall.gif,164807
2,/images/MOSAIC-logosmall.gif,127648
3,/images/USA-logosmall.gif,126812
4,/images/WORLD-logosmall.gif,125668
5,/images/ksclogo-medium.gif,121278
6,/ksc.html,83679
7,/images/launch-logo.gif,75957
8,/history/apollo/images/apollo-logo1.gif,68856
9,/shuttle/countdown/,64689


In [44]:
not200_df = (logs_df.filter(logs_df['status']!=200))
error_endpoints_freq_df = (not200_df
                          .groupBy('endpoint')
                          .count()
                          .sort('count', ascending = False)
                          .limit(10))

In [45]:
error_endpoints_freq_df.show(truncate=False)

In [46]:
unique_host_count = (logs_df
                     .select('host')
                     .distinct()
                     .count())
unique_host_count

In [47]:
host_day_distinct_df = logs_df.select(logs_df.host,
                            F.dayofmonth('time').alias('day')).drop_duplicates()
host_day_distinct_df.show(5, truncate = False)

In [48]:
daily_host_df = (host_day_distinct_df
                 .groupBy(['day'])
                 .count()
                .sort("day"))
daily_host_pd_df = daily_host_df.toPandas()

In [49]:
c = sns.catplot(x='day', y='count', 
                data=daily_host_pd_df, 
                kind='point', height=5, 
                aspect=1.5)

In [50]:
daily_hosts_df = (host_day_distinct_df
                     .groupBy('day')
                     .count()
                     .select(col("day"), col("count").alias("total_hosts")))
daily_hosts_df.show(10, truncate = False)

In [51]:
total_daily_requests_df = (logs_df.select(F.dayofmonth("time").alias("day"))
                          .groupBy("day")
                          .count()
                          .select(col("day"), col("count").alias("total_reqs")))
      
avg_daily_reqests_per_host_df = total_daily_requests_df.join(daily_hosts_df, 'day')

avg_daily_reqests_per_host_df = (avg_daily_reqests_per_host_df.withColumn('avg_reqs', col("total_reqs")/col("total_hosts")).sort("day"))

avg_daily_reqests_per_host_pd_df = avg_daily_reqests_per_host_df.toPandas()

avg_daily_reqests_per_host_pd_df

Unnamed: 0,day,total_reqs,total_hosts,avg_reqs
0,1,98710,7609,12.972795
1,2,60265,4858,12.405311
2,3,130972,10238,12.792733
3,4,130009,9411,13.814579
4,5,126468,9640,13.119087
5,6,133380,10133,13.162933
6,7,144595,10048,14.390426
7,8,99024,7112,13.92351
8,9,95730,6699,14.290193
9,10,134108,8532,15.718237


In [52]:
c = sns.catplot(x = "day", y = "avg_reqs", data = avg_daily_reqests_per_host_pd_df,
               kind = "point", height = 5, aspect = 1.5)

In [53]:
# How many 404 records are in the log?
df_404 = logs_df.filter(logs_df['status']==404).cache()
print("There are {} 404 responses.".format(df_404.count()))

In [54]:
endpoints_404_count_df = (df_404.groupBy('endpoint')
                          .count()
                          .sort('count',ascending = False)
                          .limit(20))
endpoints_404_count_df.show(10, truncate = False)

In [55]:
host_404_count_df = (df_404.groupBy('host')
                          .count()
                          .sort('count',ascending = False)
                          .limit(20))
host_404_count_df.show(10, truncate = False)

In [56]:
daily_404_count_df = (df_404.groupBy(F.dayofmonth('time').alias('day'))
                          .count()
                          .sort('day',ascending = True))
daily_404_count_pd_df = daily_404_count_df.toPandas()
daily_404_count_pd_df

Unnamed: 0,day,count
0,1,559
1,2,291
2,3,778
3,4,705
4,5,733
5,6,1013
6,7,1107
7,8,691
8,9,627
9,10,713


In [57]:
c = sns.catplot(x='day',y='count',data=daily_404_count_pd_df,kind='point',height=5,aspect=1.5)

In [58]:
daily_404_count_df.sort("count",ascending=False).show(3)

In [59]:
hourly_404_count_df = (df_404.groupBy(F.hour('time').alias('hour'))
                          .count()
                          .sort('hour',ascending = True))
hourly_404_count_pd_df = hourly_404_count_df.toPandas()
hourly_404_count_pd_df

Unnamed: 0,hour,count
0,0,774
1,1,648
2,2,868
3,3,603
4,4,351
5,5,307
6,6,269
7,7,458
8,8,705
9,9,840


In [60]:
c = sns.catplot(x="hour", y="count", data=hourly_404_count_pd_df,
               kind="bar",height=5,aspect=1.5)