In [1]:
import pyspark
import re # regular expressions
import pandas as pd # pandase
import glob

from pyspark.sql.functions import regexp_extract
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum

conf = pyspark.SparkConf()
conf.setMaster('spark://spark:7077')
conf.set('spark.executor.memory', '512m')
conf.set('spark.executor.cores', '1')
conf.set('spark.cores.max', '1')

# creating the spark sql session
spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()



In [2]:
# next step is to read the NASA datasets which are having the extension of .gz

raw_data_files = glob.glob('/data/*.gz')
raw_data_files


['/data/NASA_access_log_Aug95.gz', '/data/NASA_access_log_Jul95.gz']

In [3]:
# reading the files data using spark read which converts it to DataFrame
base_dataframe = spark.read.text(raw_data_files)

# a single string column showing value as string
base_dataframe.printSchema()

# confirm the type for the read data = DataFrame
type(base_dataframe)

root
 |-- value: string (nullable = true)



pyspark.sql.dataframe.DataFrame

In [4]:
# counting the data 
base_dataframe.count()

# printing some log data
base_dataframe.show(10, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
|burger.letters.com - - 

In [5]:
# Data wrangling - data cleaning, parsing to extract structured attributes with meaningful information
# from each of the log message.

# The common log format according to https://www.w3.org/Daemon/User/Config/Logging.html#common-logfile-format is the following
# remotehost rfc931 authuser [date] "request" status bytes

# remotehost = Remote hostname (or IP number if DNS hostname is not available, or if DNSLookup is Off.
# fc931 = The remote logname of the user.
# authUser = The username as which the user has authenticated himself.
# [date] = Date and time of the request.
# request = The request line exactly as it came from the client.
# status = The HTTP status code returned to the client.
# bytes = The content-length of the document transferred.

# parse the semi-structured log data into individual columns
# regexp_extract() function to do the parsing. 
# This function matches a column against a regular expression 
# with one or more capture groups and allows you to extract one 
# of the matched groups. We’ll use one regular expression for each field we wish to extract.

# total number of logs we are currently deadling with
# 3.46 million log messages 
print((base_dataframe.count(), len(base_dataframe.columns)))

# some sample logs
sample_logs = [item['value'] for item in base_dataframe.take(20)]
sample_logs

# regular expression for extracing the host information from the logs
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
# use this to understand the regular expressions = https://regexr.com/

hosts = [re.search(host_pattern, item).group(1)
           if re.search(host_pattern, item)
           else 'no match'
           for item in sample_logs]
#print(hosts)

# regular expression for extracting the timestamp or [date] feature from the log
ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'

timestamps = [re.search(ts_pattern, item).group(1) for item in sample_logs]

#print(timestamps)

# regular expression for extracting the HTTP Method, URL and Protocol from the logs
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()
               if re.search(method_uri_protocol_pattern, item)
               else 'no match'
              for item in sample_logs]

#print(method_uri_protocol)

# regular expression for extracting HTTP status codes for each log request
status_pattern = r'\s(\d{3})\s'
status = [re.search(status_pattern, item).group(1) for item in sample_logs]

#print(status)


# regular expression for getting the response content size
content_size_pattern = r'\s(\d+)$'
content_size = [re.search(content_size_pattern, item).group(1) for item in sample_logs]

print(content_size)



(3461613, 1)
['6245', '3985', '4085', '0', '4179', '0', '0', '3985', '3985', '7074', '40310', '786', '1204', '40310', '786', '1204', '0', '1713', '3977', '34029']


In [6]:
# Putting it all together

logs_dataframe = base_dataframe.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                         regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
logs_dataframe.show(10, truncate=True)

print((logs_dataframe.count(), len(logs_dataframe.columns)))

+--------------------+--------------------+------+--------------------+--------+------+------------+
|                host|           timestamp|method|            endpoint|protocol|status|content_size|
+--------------------+--------------------+------+--------------------+--------+------+------------+
|        199.72.81.55|01/Jul/1995:00:00...|   GET|    /history/apollo/|HTTP/1.0|   200|        6245|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|   GET| /shuttle/countdown/|HTTP/1.0|   200|        3985|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        4085|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...|HTTP/1.0|   304|           0|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        4179|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/images/NASA-logo...|HTTP/1.0|   304|           0|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...|HTTP/1.0|   200|    

In [7]:
# checking for null rows in the original dataframe
base_dataframe.filter(base_dataframe['value'].isNull()).count()

null_rows_dataframe = logs_dataframe.filter(logs_dataframe['host'].isNull()| 
                             logs_dataframe['timestamp'].isNull() | 
                             logs_dataframe['method'].isNull() |
                             logs_dataframe['endpoint'].isNull() |
                             logs_dataframe['status'].isNull() |
                             logs_dataframe['content_size'].isNull()|
                             logs_dataframe['protocol'].isNull())
null_rows_dataframe.count()

# we have almost 33k null values

33905

In [8]:
# To find out which columns have the null values
def count_null(col_name):
    return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

# Build up a list of column expressions, one per column.
exprs = [count_null(col_name) for col_name in logs_dataframe.columns]

# Run the aggregation. The *exprs converts the list of expressions into
# variable function arguments.
logs_dataframe.agg(*exprs).show()


+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     1|       33905|
+----+---------+------+--------+--------+------+------------+



In [9]:
# Handling nulls in HTTP status

null_status_dataframe = base_dataframe.filter(~base_dataframe['value'].rlike(r'\s(\d{3})\s'))
null_status_dataframe.count()

1

In [10]:
# Let’s look at what this bad record looks like!
null_status_dataframe.show(truncate=False)

+--------+
|value   |
+--------+
|alyssa.p|
+--------+



In [11]:
# Looks like a record with a lot of missing information! Let’s pass this through our log data parsing pipeline.

bad_status_dataframe = null_status_dataframe.select(regexp_extract('value', host_pattern, 1).alias('host'),
                                      regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                                      regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                                      regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                                      regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                                      regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                                      regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
bad_status_dataframe.show(truncate=False)

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|    |         |      |        |        |null  |null        |
+----+---------+------+--------+--------+------+------------+



In [12]:
# Looks like the record itself is an incomplete record with no useful information, 
# the best option would be to drop this record as follows!

logs_dataframe = logs_dataframe[logs_dataframe['status'].isNotNull()]
exprs = [count_null(col_name) for col_name in logs_dataframe.columns]
logs_dataframe.agg(*exprs).show()


+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     0|       33904|
+----+---------+------+--------+--------+------+------------+



In [13]:
# Handling nulls in HTTP content size

null_content_size_dataframe = base_dataframe.filter(~base_dataframe['value'].rlike(r'\s\d+$'))
null_content_size_dataframe.count()


33905

In [14]:
null_content_size_dataframe.take(10)

[Row(value='dd15-062.compuserve.com - - [01/Jul/1995:00:01:12 -0400] "GET /news/sci.space.shuttle/archive/sci-space-shuttle-22-apr-1995-40.txt HTTP/1.0" 404 -'),
 Row(value='dynip42.efn.org - - [01/Jul/1995:00:02:14 -0400] "GET /software HTTP/1.0" 302 -'),
 Row(value='ix-or10-06.ix.netcom.com - - [01/Jul/1995:00:02:40 -0400] "GET /software/winvn HTTP/1.0" 302 -'),
 Row(value='ix-or10-06.ix.netcom.com - - [01/Jul/1995:00:03:24 -0400] "GET /software HTTP/1.0" 302 -'),
 Row(value='link097.txdirect.net - - [01/Jul/1995:00:05:06 -0400] "GET /shuttle HTTP/1.0" 302 -'),
 Row(value='ix-war-mi1-20.ix.netcom.com - - [01/Jul/1995:00:05:13 -0400] "GET /shuttle/missions/sts-78/news HTTP/1.0" 302 -'),
 Row(value='ix-war-mi1-20.ix.netcom.com - - [01/Jul/1995:00:05:58 -0400] "GET /shuttle/missions/sts-72/news HTTP/1.0" 302 -'),
 Row(value='netport-27.iu.net - - [01/Jul/1995:00:10:19 -0400] "GET /pub/winvn/readme.txt HTTP/1.0" 404 -'),
 Row(value='netport-27.iu.net - - [01/Jul/1995:00:10:28 -0400] "GET

In [15]:
# It is quite evident that the bad raw data records correspond to error responses, 
# where no content was sent back and the server emitted a “-" for the content_size field.

# Since we don’t want to discard those rows from our analysis, let’s impute or fill them to 0.

# Fix the rows with null content_size
logs_dataframe = logs_dataframe.na.fill({'content_size': 0})
exprs = [count_null(col_name) for col_name in logs_dataframe.columns]
logs_dataframe.agg(*exprs).show()


+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     0|           0|
+----+---------+------+--------+--------+------+------------+



In [16]:
# data cleaning is done

# Handling Temporal Fields (Timestamp)
# Now that we have a clean, parsed DataFrame, we have to parse the 
# timestamp field into an actual timestamp. The Common Log Format time 
# is somewhat non-standard. A User-Defined Function (UDF) is the most 
# straightforward way to parse it.

from pyspark.sql.functions import udf

month_map = {
  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
  'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}

def parse_clf_time(text):
    """ Convert Common Log time format into a Python datetime object
    Args:
        text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
    Returns:
        a string suitable for passing to CAST('timestamp')
    """
    # NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving
    return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
      int(text[7:11]),
      month_map[text[3:6]],
      int(text[0:2]),
      int(text[12:14]),
      int(text[15:17]),
      int(text[18:20])
    )




In [17]:
udf_parse_time = udf(parse_clf_time)

logs_dataframe = (logs_dataframe.select('*', udf_parse_time(logs_dataframe['timestamp'])
                                        .cast('timestamp').alias('time')).drop('timestamp'))
                  
                  

logs_dataframe.show(10, truncate=True)

+--------------------+------+--------------------+--------+------+------------+-------------------+
|                host|method|            endpoint|protocol|status|content_size|               time|
+--------------------+------+--------------------+--------+------+------------+-------------------+
|        199.72.81.55|   GET|    /history/apollo/|HTTP/1.0|   200|        6245|1995-07-01 00:00:01|
|unicomp6.unicomp.net|   GET| /shuttle/countdown/|HTTP/1.0|   200|        3985|1995-07-01 00:00:06|
|      199.120.110.21|   GET|/shuttle/missions...|HTTP/1.0|   200|        4085|1995-07-01 00:00:09|
|  burger.letters.com|   GET|/shuttle/countdow...|HTTP/1.0|   304|           0|1995-07-01 00:00:11|
|      199.120.110.21|   GET|/shuttle/missions...|HTTP/1.0|   200|        4179|1995-07-01 00:00:11|
|  burger.letters.com|   GET|/images/NASA-logo...|HTTP/1.0|   304|           0|1995-07-01 00:00:12|
|  burger.letters.com|   GET|/shuttle/countdow...|HTTP/1.0|   200|           0|1995-07-01 00:00:12|


In [18]:
logs_dataframe.printSchema()

logs_dataframe.cache()

root
 |-- host: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- content_size: integer (nullable = false)
 |-- time: timestamp (nullable = true)



DataFrame[host: string, method: string, endpoint: string, protocol: string, status: int, content_size: int, time: timestamp]

In [19]:
# schema is fine, times are according to the standard
# we can do now the analysis part for the project
# Exploratory Data Analysis (EDA)

In [20]:
# Content Size Statistics
# we’d like to know what are the average, minimum, and maximum content sizes.
# The .describe() function returns the count, mean, stddev, min, and max of a given column.

#content_size_summary_dataframe = logs_dataframe.describe(['content_size'])
#content_size_summary_dataframe.toPandas()

#content_size_summary_dataframe.printSchema()

In [None]:
# Spark SQL functions.

from pyspark.sql import functions as SparkSqlFunctions

content_size_summary_dataframe = logs_dataframe.agg(SparkSqlFunctions.min(logs_dataframe['content_size']).alias('min_content_size'),
             SparkSqlFunctions.max(logs_dataframe['content_size']).alias('max_content_size'),
             SparkSqlFunctions.mean(logs_dataframe['content_size']).alias('mean_content_size'),
             SparkSqlFunctions.stddev(logs_dataframe['content_size']).alias('std_content_size'),
             SparkSqlFunctions.count(logs_dataframe['content_size']).alias('count_content_size'))
        
content_size_summary_dataframe = content_size_summary_dataframe.toPandas()

content_size_summary_dataframe.to_json('analysis/content_size_analysis.json')


In [None]:
# HTTP Status Code Analysis
# We want to know which status code values appear in the data and how many times

status_freq_dataframe = (logs_dataframe
                     .groupBy('status')
                     .count()
                     .sort('status')
                     .cache())
print('Total distinct HTTP Status Codes:', status_freq_dataframe.count())

status_freq_pd_dataframe = (status_freq_dataframe
                         .toPandas()
                         .sort_values(by=['count'],
                                      ascending=False))
status_freq_pd_dataframe
status_freq_pd_dataframe.to_json('analysis/status_codes_frequency_analysis.json')

status_log_freq_dataframe = status_freq_dataframe.withColumn('log(count)', 
                                        SparkSqlFunctions.log(status_freq_dataframe['count']))
status_log_freq_dataframe.show()

status_log_freq_dataframe = status_log_freq_dataframe.toPandas()

status_log_freq_dataframe.to_json('analysis/status_logs_frequency_analysis.json')


In [None]:
# Analyzing Frequent Hosts
# the count of total accesses by each host and then sort by the 
# counts and display only the top ten most frequent hosts.

host_sum_dataframe =(logs_dataframe
               .groupBy('host')
               .count()
               .sort('count', ascending=False).limit(10))

host_sum_dataframe.show(truncate=False)

host_sum_dataframe = host_sum_dataframe.toPandas()

host_sum_dataframe.to_json('analysis/frequenct_host_analysis.json')



In [None]:
# Display the Top 20 Frequent EndPoints

paths_dataframe = (logs_dataframe
            .groupBy('endpoint')
            .count()
            .sort('count', ascending=False).limit(20))

paths_pd_dataframe = paths_dataframe.toPandas()

paths_pd_dataframe.to_json('analysis/20_frequent_endpoints_analysis.json')
paths_pd_dataframe  


In [None]:
# Top Ten Error Endpoints
# What are the top ten endpoints requested which did not have return 
# code 200 (HTTP Status OK)? We create a sorted list containing the 
# endpoints and the number of times that they were accessed with a 
# non-200 return code and show the top ten.

error_endpoints_dataframe = (logs_dataframe
               .filter(logs_dataframe['status'] != 200))

error_endpoints_freq_dataframe = (error_endpoints_dataframe
                               .groupBy('endpoint')
                               .count()
                               .sort('count', ascending=False)
                               .limit(10)
                          )
error_endpoints_freq_dataframe.show(truncate=False) 

error_endpoints_freq_dataframe = error_endpoints_freq_dataframe.toPandas()

error_endpoints_freq_dataframe.to_json('analysis/10_error_endpoints_analysis.json')


In [None]:
# Total number of Unique Hosts
# What were the total number of unique hosts who visited the NASA website in these two months?

unique_host_count = (logs_dataframe
                     .select('host')
                     .distinct()
                     .count())
unique_host_count



In [None]:
# Number of Unique Daily Hosts

host_per_day_dataframe = logs_dataframe.select(logs_dataframe.host, 
                             SparkSqlFunctions.dayofmonth('time').alias('day'))

host_per_day_dataframe.show(5, truncate=False)

In [None]:
# host_per_day_distinct_dataframe : This DataFrame has the same columns as 
# host_per_day_dataframe, but with duplicate (day, host) rows removed.

host_per_day_distinct_dataframe = (host_per_day_dataframe
                          .dropDuplicates())
host_per_day_distinct_dataframe.show(5, truncate=False)

In [None]:
pd.options.display.max_rows = 10

daily_hosts_dataframe = (host_per_day_distinct_dataframe
                     .groupBy('day')
                     .count()
                     .sort("day"))

daily_hosts_dataframe = daily_hosts_dataframe.toPandas()

daily_hosts_dataframe

daily_hosts_dataframe.to_json('analysis/daily_unique_hosts.json')

In [None]:
# Average Number of Daily Requests per Host
daily_hosts_dataframe = (host_per_day_distinct_dataframe
                     .groupBy('day')
                     .count()
                     .select(col("day"), 
                                      col("count").alias("total_hosts")))

total_daily_reqests_dataframe = (logs_dataframe
                              .select(SparkSqlFunctions.dayofmonth("time")
                                          .alias("day"))
                              .groupBy("day")
                              .count()
                              .select(col("day"), 
                                      col("count").alias("total_reqs")))

avg_daily_reqests_per_host_dataframe = total_daily_reqests_dataframe.join(daily_hosts_dataframe, 'day')
avg_daily_reqests_per_host_dataframe = (avg_daily_reqests_per_host_dataframe
                                    .withColumn('avg_reqs', col('total_reqs') / col('total_hosts'))
                                    .sort("day"))
avg_daily_reqests_per_host_dataframe = avg_daily_reqests_per_host_dataframe.toPandas()
avg_daily_reqests_per_host_dataframe

avg_daily_reqests_per_host_dataframe.to_json('analysis/avg_daily_reqests_per_host.json')


In [None]:
# Counting 404 Response Codes
not_found_dataframe = logs_dataframe.filter(logs_dataframe["status"] == 404).cache()
print(('Total 404 responses: {}').format(not_found_dataframe.count()))

In [None]:
# Listing the Top Twenty 404 Response Code Endpoints
endpoints_404_count_dataframe = (not_found_dataframe
                          .groupBy("endpoint")
                          .count()
                          .sort("count", ascending=False)
                          .limit(20))

endpoints_404_count_dataframe.show(truncate=False)

endpoints_404_count_dataframe = endpoints_404_count_dataframe.toPandas()

endpoints_404_count_dataframe.to_json('analysis/endpoints_404_count.json')


In [None]:
# Listing the Top Twenty 404 Response Code Hosts

hosts_404_count_dataframe = (not_found_dataframe
                          .groupBy("host")
                          .count()
                          .sort("count", ascending=False)
                          .limit(20))

hosts_404_count_dataframe.show(truncate=False)

hosts_404_count_dataframe = hosts_404_count_dataframe.toPandas()

hosts_404_count_dataframe.to_json('analysis/hosts_404_count.json')

In [None]:
# Visualizing 404 Errors per Day
errors_by_date_sorted_dataframe = (not_found_dataframe
                                .groupBy(SparkSqlFunctions.dayofmonth('time').alias('day'))
                                .count()
                                .sort("day"))

errors_by_date_sorted_pd_dataframe = errors_by_date_sorted_dataframe.toPandas()
errors_by_date_sorted_pd_dataframe.to_json('analysis/404_errors_by_date_sorted.json')

In [None]:
# Top 5 Days for 404 Errors

(errors_by_date_sorted_dataframe
    .sort("count", ascending=False)
    .show(5))


In [None]:
# Visualizing Hourly 404 Errors
hourly_avg_errors_sorted_dataframe = (not_found_dataframe
                                   .groupBy(SparkSqlFunctions.hour('time')
                                             .alias('hour'))
                                   .count()
                                   .sort('hour'))
hourly_avg_errors_sorted_pd_dataframe = hourly_avg_errors_sorted_dataframe.toPandas()

hourly_avg_errors_sorted_pd_dataframe