In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=46f8f76922c1ab76433a16b4176a43577bfcbd8ec8cf401dbbc57a76c465a231
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
# creating a SparkSession
spark = SparkSession.builder \
        .appName("TelecomLogAnalyzer") \
        .getOrCreate()

# Part - 1: Without using Data Frames

In [None]:
# loading the text file as an RDD
text_rdd = spark.sparkContext.textFile('access.log')

# displaying the first 5 lines
for line in text_rdd.take(5):
    print(line)


109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -
109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] POST /administrator/index.php HTTP/1.1 200 4494 http://almhuette-raith.at/administrator/ Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -
46.72.177.4 - - [12/Dec/2015:18:31:08 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -
46.72.177.4 - - [12/Dec/2015:18:31:08 +0100] POST /administrator/index.php HTTP/1.1 200 4494 http://almhuette-raith.at/administrator/ Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -


In [None]:
# removing the empty lines
rdd = text_rdd.filter(lambda line: line != '')

# checking the count
rdd.count()

2338006

In [None]:
def clean_log(log_line):
  """
  Function to clean a given log line

  Args:: log_line: A single log line as a string
  Returns:: (tuple): (ip_address, remote_user_identity, remote_user,
            timestamp, http_method, requested_url,
            http_version, http_status_code, bytes_sent,
            url, user_agent)
  """

  log_line_split1 = log_line.split(' ')

  # identify ip_address
  ip_address = log_line_split1[0]

  # identify remote_user_identity
  remote_user_identity = log_line_split1[1]

  # identify remote_user
  remote_user = log_line_split1[2]

  # identify timestamp
  timestamp = ' '.join([log_line_split1[3], log_line_split1[4]])[1: 27]

  # identify http_method
  if log_line_split1[5][-4:].upper() == 'POST':
    http_method = 'POST'
  elif log_line_split1[5][-3:].upper() == 'GET':
    http_method = 'GET'
  else:
    http_method = log_line_split1[5][-8:].upper()

  # identify requested_url
  requested_url = log_line_split1[6]

  log_line_split2 = log_line.split(' HTTP/')[1]

  # identify http_version
  http_version = log_line_split2[0: 3]

  # identify http_status_code
  http_status_code = log_line_split2.split(' ')[1]

  # identify bytes_sent
  bytes_sent = log_line_split2.split(' ')[2]

  # identify url
  url = log_line_split2.split(' ')[3]

  # identify user_agent
  user_agent = log_line_split1[11]
  for i in range(12, len(log_line_split1)-1):
    user_agent += log_line_split1[i]

  fields = [ip_address, remote_user_identity, remote_user,
            timestamp, http_method, requested_url,
            http_version, http_status_code, bytes_sent,
            url, user_agent]

  return(fields)

In [None]:
# cleaning and parsing the rdd
cleaned_rdd = rdd.map(lambda line: clean_log(line))

# showing the contents of the cleaned RDD
for line in cleaned_rdd.take(5):
    print(line)

['109.169.248.247', '-', '-', '12/Dec/2015:18:25:11 +0100', 'GET', '/administrator/', '1.1', '200', '4263', '-', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0']
['109.169.248.247', '-', '-', '12/Dec/2015:18:25:11 +0100', 'POST', '/administrator/index.php', '1.1', '200', '4494', 'http://almhuette-raith.at/administrator/', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0']
['46.72.177.4', '-', '-', '12/Dec/2015:18:31:08 +0100', 'GET', '/administrator/', '1.1', '200', '4263', '-', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0']
['46.72.177.4', '-', '-', '12/Dec/2015:18:31:08 +0100', 'POST', '/administrator/index.php', '1.1', '200', '4494', 'http://almhuette-raith.at/administrator/', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0']
['83.167.113.100', '-', '-', '12/Dec/2015:18:31:25 +0100', 'GET', '/administrator/', '1.1', '200', '4263', '-', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0']


## 3.	Find out how many 404 HTTP codes are in access logs

In [None]:
# filtering out the 404 HTTP codes
url404 = cleaned_rdd.filter(lambda line: line[7]=='404')

# displaying a few
for line in url404.take(5):
    print(line)

['191.182.199.16', '-', '-', '12/Dec/2015:19:02:36 +0100', 'GET', '/templates/_system/css/general.css', '1.1', '404', '239', 'http://almhuette-raith.at/', 'Mozilla/5.0(WindowsNT6.1;WOW64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/36.0.1985.143Safari/537.36']
['188.45.108.168', '-', '-', '12/Dec/2015:19:44:06 +0100', 'GET', '/templates/_system/css/general.css', '1.1', '404', '239', 'http://www.almhuette-raith.at/', 'Mozilla/5.0(Linux;Android4.4.2;de-at;SAMSUNGGT-I9301IBuild/KOT49H)AppleWebKit/537.36(KHTML,likeGecko)Version/1.5Chrome/28.0.1500.94MobileSafari/537.36']
['188.45.108.168', '-', '-', '12/Dec/2015:19:44:15 +0100', 'GET', '/favicon.ico', '1.1', '404', '217', '-', 'Mozilla/5.0(Linux;Android4.4.2;de-at;SAMSUNGGT-I9301IBuild/KOT49H)AppleWebKit/537.36(KHTML,likeGecko)Version/1.5Chrome/28.0.1500.94MobileSafari/537.36']
['157.55.39.3', '-', '-', '13/Dec/2015:01:01:19 +0100', 'GET', '/icons/text.gif', '1.1', '404', '220', '-', 'Mozilla/5.0(compatible;bingbot/2.0;+http://www.bing.com/bi

In [None]:
# number of 404 HTTP codes
url404.count()

227101

## 4.	Find out which URLs are broken

In [None]:
# filtering out the broken (non-OK) URLS
broke_urls = cleaned_rdd.filter(lambda line: line[7]!='200')

# displaying a few
for line in broke_urls.take(5):
    print(line)

['191.182.199.16', '-', '-', '12/Dec/2015:19:02:36 +0100', 'GET', '/templates/_system/css/general.css', '1.1', '404', '239', 'http://almhuette-raith.at/', 'Mozilla/5.0(WindowsNT6.1;WOW64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/36.0.1985.143Safari/537.36']
['188.45.108.168', '-', '-', '12/Dec/2015:19:44:06 +0100', 'GET', '/templates/_system/css/general.css', '1.1', '404', '239', 'http://www.almhuette-raith.at/', 'Mozilla/5.0(Linux;Android4.4.2;de-at;SAMSUNGGT-I9301IBuild/KOT49H)AppleWebKit/537.36(KHTML,likeGecko)Version/1.5Chrome/28.0.1500.94MobileSafari/537.36']
['188.45.108.168', '-', '-', '12/Dec/2015:19:44:15 +0100', 'GET', '/favicon.ico', '1.1', '404', '217', '-', 'Mozilla/5.0(Linux;Android4.4.2;de-at;SAMSUNGGT-I9301IBuild/KOT49H)AppleWebKit/537.36(KHTML,likeGecko)Version/1.5Chrome/28.0.1500.94MobileSafari/537.36']
['157.55.39.3', '-', '-', '13/Dec/2015:01:01:19 +0100', 'GET', '/icons/text.gif', '1.1', '404', '220', '-', 'Mozilla/5.0(compatible;bingbot/2.0;+http://www.bing.com/bi

In [None]:
# number of broken URLS
broke_urls.count()

1180174

## 5.	Verify there are no null columns in the original dataset

In [None]:
# transposing the RDD to work with columns instead of rows
rdd_transposed = cleaned_rdd.zipWithIndex().flatMap(lambda x: [(i, x[1]) for i in enumerate(x[0])])

# checking for null values in each column
null_columns = rdd_transposed.filter(lambda x: (x[0][1] == "") or (x[0][1] == "-")).map(lambda x: x[0][0]).distinct().collect()

if null_columns:
    print("Null columns found:", null_columns)
else:
    print("No null columns found")

Null columns found: [1, 2, 8, 9, 10]


## 6.	Replace null values with constants such as 0

In [None]:
def fill_na(row):
  """
  Function to replace null/empty values with appropriate constants
  Replaces null values in remote_user_identity(1), remote_user(2),
  and bytes_sent(8) with 0 and those in URL(9) and user_agent(10) with "Unknown"

  Args:: row: A single row of log fields
  Returns:: (tuple): corrected (ip_address, remote_user_identity, remote_user,
            timestamp, http_method, requested_url,
            http_version, http_status_code, bytes_sent,
            url, user_agent)
  """
  new_row = []
  for i in range(len(row)):
    if row[i] in ("", "-"):
      if i in (1, 2, 8):
        new_row.append(0)
      elif i in (9, 10):
        new_row.append("Unknown")
    else:
      new_row.append(row[i])
  return(tuple(new_row))

filled_rdd = cleaned_rdd.map(lambda row: fill_na(row))

In [None]:
### Checking the fill_na step

# transposing the RDD to work with columns instead of rows
rdd_transposed = filled_rdd.zipWithIndex().flatMap(lambda x: [(i, x[1]) for i in enumerate(x[0])])

# checking for null values in each column
null_columns = rdd_transposed.filter(lambda x: (x[0][1] == "") or (x[0][1] == "-")).map(lambda x: x[0][0]).distinct().collect()

if null_columns:
    print("Null columns found:", null_columns)
else:
    print("No null columns found")

No null columns found


## 7.	Parse timestamp to readable date

In [None]:
from datetime import datetime

def parse_datetime(row):
  """
  Function to convert the string timestamp field to date-readable format

  Args:: row: A single row of log fields
  Returns:: (tuple): corrected (ip_address, remote_user_identity, remote_user,
            timestamp, http_method, requested_url,
            http_version, http_status_code, bytes_sent,
            url, user_agent)
  """
  new_row = []
  for i in range(len(row)):
    if i == 3:
      new_row.append(datetime.strptime(row[i], "%d/%b/%Y:%H:%M:%S %z").strftime("%Y-%m-%d %H:%M:%S"))
    else:
      new_row.append(row[i])
  return(tuple(new_row))

# datetime conversion
parsed_rdd = filled_rdd.map(lambda row: parse_datetime(row))

In [None]:
#### Before datetime conversion
for line in filled_rdd.take(5):
    print(line)

('109.169.248.247', 0, 0, '12/Dec/2015:18:25:11 +0100', 'GET', '/administrator/', '1.1', '200', '4263', 'Unknown', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0')
('109.169.248.247', 0, 0, '12/Dec/2015:18:25:11 +0100', 'POST', '/administrator/index.php', '1.1', '200', '4494', 'http://almhuette-raith.at/administrator/', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0')
('46.72.177.4', 0, 0, '12/Dec/2015:18:31:08 +0100', 'GET', '/administrator/', '1.1', '200', '4263', 'Unknown', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0')
('46.72.177.4', 0, 0, '12/Dec/2015:18:31:08 +0100', 'POST', '/administrator/index.php', '1.1', '200', '4494', 'http://almhuette-raith.at/administrator/', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0')
('83.167.113.100', 0, 0, '12/Dec/2015:18:31:25 +0100', 'GET', '/administrator/', '1.1', '200', '4263', 'Unknown', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0')


In [None]:
#### After datetime conversion
for line in parsed_rdd.take(5):
    print(line)

('109.169.248.247', 0, 0, '2015-12-12 18:25:11', 'GET', '/administrator/', '1.1', '200', '4263', 'Unknown', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0')
('109.169.248.247', 0, 0, '2015-12-12 18:25:11', 'POST', '/administrator/index.php', '1.1', '200', '4494', 'http://almhuette-raith.at/administrator/', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0')
('46.72.177.4', 0, 0, '2015-12-12 18:31:08', 'GET', '/administrator/', '1.1', '200', '4263', 'Unknown', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0')
('46.72.177.4', 0, 0, '2015-12-12 18:31:08', 'POST', '/administrator/index.php', '1.1', '200', '4494', 'http://almhuette-raith.at/administrator/', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0')
('83.167.113.100', 0, 0, '2015-12-12 18:31:25', 'GET', '/administrator/', '1.1', '200', '4263', 'Unknown', 'Mozilla/5.0(WindowsNT6.0;rv:34.0)Gecko/20100101Firefox/34.0')


## 8.	Describe which HTTP status values appear in data and how many

The HTTP status values that appear in the data are as follows and their frequencies are given in the cell output below.


* **200**: OK; Standard response for successful HTTP requests.
* **206**: Partial Content; The server is delivering only part of the resource (byte serving) due to a range header sent by the client.
* **301**: Moved Permanently; This and all future requests should be directed to the given URI.
* **303**: See Other; The response to the request can be found under another URI using the GET method. When received in response to a POST (or PUT/DELETE), the client should presume that the server has received the data and should issue a new GET request to the given URI.
* **304**: Not Modified; Indicates that the resource has not been modified since the version specified by the request headers If-Modified-Since or If-None-Match.
* **400**: Bad Request; The server cannot or will not process the request due to an apparent client error.
* **401**: Unauthorized; Similar to 403 Forbidden, but specifically for use when authentication is required and has failed or has not yet been provided.
* **403**: Forbidden; The request contained valid data and was understood by the server, but the server is refusing action.
* **404**: Not Found; The requested resource could not be found but may be available in the future.
* **405**: Method Not Allowed; A request method is not supported for the requested resource.
* **406**: Not Acceptable; The requested resource is capable of generating only content not acceptable according to the Accept headers sent in the request.
* **412**: Precondition Failed; The server does not meet one of the preconditions that the requester put on the request header fields.
* **500**: Internal Server Error; A generic error message, given when an unexpected condition was encountered and no more specific message is suitable.
* **501**: Not Implemented; The server either does not recognize the request method, or it lacks the ability to fulfil the request.


*Reference:* https://en.wikipedia.org/wiki/List_of_HTTP_status_codes



In [None]:
# extracting the HTTP status values
http_status_rdd = parsed_rdd.map(lambda x: x[7])

# getting the frequency of each HTTP status
http_status_freq = http_status_rdd.countByValue()

# printing the frequency of each HTTP status
for http_status, freq in http_status_freq.items():
    print(f"{http_status}: {freq}")

200: 1157832
404: 227101
500: 3252
405: 83
301: 619
304: 6330
501: 143
206: 939929
403: 2222
303: 247
412: 19
400: 23
401: 153
406: 53


In [None]:
# extracting the HTTP status values
http_status_rdd = parsed_rdd.map(lambda x: x[7])

# counting the frequency of each unique status in the RDD
frequency_rdd = http_status_rdd.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)

# saving the frequency to a CSV file
frequency_rdd.map(lambda x: ",".join(map(str, x))) \
             .coalesce(1) \
             .saveAsTextFile('part1_q8_stats.csv')

# printing the frequencies
for line in frequency_rdd.take(frequency_rdd.count()):
    print(line)

('501', 143)
('206', 939929)
('303', 247)
('400', 23)
('200', 1157832)
('401', 153)
('301', 619)
('500', 3252)
('403', 2222)
('412', 19)
('406', 53)
('405', 83)
('304', 6330)
('404', 227101)


## 9.	How many unique hosts are there in the entire log and their average request

In [None]:
# extracting hosts (IP Addresses) from the RDD
hosts_rdd = parsed_rdd.map(lambda row: row[0])

# counting the number of unique hosts
unique_hosts_count = hosts_rdd.distinct().count()

# counting the total number of requests
total_requests_count = hosts_rdd.count()

# calculating the average requests per host
average_requests_per_host = total_requests_count / unique_hosts_count

# printing the results
print("Number of unique hosts:", unique_hosts_count)
print("Total number of requests:", total_requests_count)
print("Average requests per host:", average_requests_per_host)

Number of unique hosts: 40836
Total number of requests: 2338006
Average requests per host: 57.25355078851993


# Part - 2: Using Data Frames

## 1. Load data into Spark Data Frame




In [None]:
# reading the log file into a dataframe
log_df = spark.read.text('access.log')

# displaying a sample of the data
log_df.show(10, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                        |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                             |
|109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -           

In [None]:
# printing the schema of the data
log_df.printSchema()

root
 |-- value: string (nullable = true)



In [None]:
# parsing information from each log lines
# and storing them as separate columns


df = log_df.withColumn("ip_address", split(log_df["value"], " ").getItem(0)) \
           .withColumn("remote_user_identity", split(log_df["value"], " ").getItem(1)) \
           .withColumn("remote_user", split(log_df["value"], " ").getItem(2)) \
           .withColumn("timestamp", concat_ws(" ",
                                              split(log_df["value"], " ").getItem(3),
                                              split(log_df["value"], " ").getItem(4))) \
           .withColumn("timestamp", substring(col('timestamp'), 2, 26)) \
           .withColumn("http_method_", upper(substring(split(log_df["value"], " ").getItem(5), -8, 8))) \
           .withColumn("http_method", when(substring(col('http_method_'), -4, 4)=='POST', lit('POST'))
                                     .when(substring(col('http_method_'), -3, 3)=='GET', lit('GET'))
                                     .otherwise(col('http_method_'))) \
           .withColumn('requested_url', split(log_df["value"], " ").getItem(6)) \
           .withColumn('info', split(log_df["value"], " HTTP/").getItem(1)) \
           .withColumn('http_version', substring(col('info'), 1, 3)) \
           .withColumn("http_status_code", split(col('info'), ' ').getItem(1)) \
           .withColumn('bytes_sent', split(col('info'), ' ').getItem(2)) \
           .withColumn("URL", split(split(log_df["value"], " HTTP/").getItem(1), " ").getItem(3)) \
           .withColumn('user_agent', concat_ws(" ",
                                               split(log_df["value"], " ").getItem(11),
                                               split(log_df["value"], " ").getItem(12),
                                               split(log_df["value"], " ").getItem(13),
                                               split(log_df["value"], " ").getItem(14),
                                               split(log_df["value"], " ").getItem(15),
                                               split(log_df["value"], " ").getItem(16),
                                               split(log_df["value"], " ").getItem(17))) \
           .drop('http_method_', 'info')

df.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+--------------------+-----------+--------------------------+-----------+------------------------+------------+----------------+----------+----------------------------------------+-----------------------------------------------------------------+
|value                                                                                                                                                                                                        |ip_address     |remote_user_identity|remote_user|timestamp                 |http_method|requested_url           |http_version|http_status_code|bytes_sent|URL                                     |user_agent                                                       |
+-------------------------------------------------------------

In [None]:
df.groupBy('http_status_code').count().show()

+----------------+-------+
|http_status_code|  count|
+----------------+-------+
|             200|1157832|
|            NULL|      1|
|             206| 939929|
|             501|    143|
|             404| 227101|
|             403|   2222|
|             412|     19|
|             303|    247|
|             500|   3252|
|             304|   6330|
|             405|     83|
|             400|     23|
|             301|    619|
|             406|     53|
|             401|    153|
+----------------+-------+



In [None]:
df.count()

2338007

In [None]:
# removing null rows
df = df.filter(~((col('value').isNull())|(col('value')=='')))
df.count()

2338006

In [None]:
df.groupBy("http_status_code").count().show()

+----------------+-------+
|http_status_code|  count|
+----------------+-------+
|             200|1157832|
|             206| 939929|
|             501|    143|
|             404| 227101|
|             403|   2222|
|             412|     19|
|             303|    247|
|             500|   3252|
|             304|   6330|
|             405|     83|
|             400|     23|
|             301|    619|
|             406|     53|
|             401|    153|
+----------------+-------+



## 2. Find out how many 404 HTTP codes are in access logs

In [None]:
url404 = df.filter(col('http_status_code') == 404)
url404.count()

227101

In [None]:
url404.select('http_status_code', 'URL').show(truncate=False)

+----------------+---------------------------------------------------------------------------------------+
|http_status_code|URL                                                                                    |
+----------------+---------------------------------------------------------------------------------------+
|404             |http://almhuette-raith.at/                                                             |
|404             |http://www.almhuette-raith.at/                                                         |
|404             |-                                                                                      |
|404             |-                                                                                      |
|404             |http://www.almhuette-raith.at/                                                         |
|404             |-                                                                                      |
|404             |http://www.almhuett

## 3.	Find out which URLs are broke

In [None]:
broke_urls = df.filter(col("http_status_code") != 200)
broke_urls.count()

1180174

In [None]:
broke_urls.select('http_status_code', 'URL').show(truncate=False)

+----------------+---------------------------------------------------------------------------------------+
|http_status_code|URL                                                                                    |
+----------------+---------------------------------------------------------------------------------------+
|404             |http://almhuette-raith.at/                                                             |
|404             |http://www.almhuette-raith.at/                                                         |
|404             |-                                                                                      |
|404             |-                                                                                      |
|500             |-                                                                                      |
|500             |-                                                                                      |
|404             |http://www.almhuett

## 4. Verify there are no null columns in the original dataset

In [None]:
for c in df.columns:
  print(f"""{c}: {df.where((col(c).isNull()) |
                           (col(c) == '-') |
                           (col(c) == '')).count()}""")

value: 0
ip_address: 0
remote_user_identity: 2338006
remote_user: 2337872
timestamp: 0
http_method: 0
requested_url: 0
http_version: 0
http_status_code: 0
bytes_sent: 6341
URL: 677884
user_agent: 0


## 5. Replace null values with constants such as 0

In [None]:
df = df.withColumn('remote_user_identity', when((col('remote_user_identity').isNull()) |
                                                (col('remote_user_identity') == '-') |
                                                (col('remote_user_identity') == ''), lit(0))
                                          .otherwise(col('remote_user_identity'))) \
       .withColumn('remote_user', when((col('remote_user').isNull()) |
                                       (col('remote_user') == '-') |
                                       (col('remote_user') == ''), lit(0))
                                 .otherwise(col('remote_user'))) \
       .withColumn('bytes_sent', when((col('bytes_sent').isNull()) |
                                      (col('bytes_sent') == '-') |
                                      (col('bytes_sent') == ''), lit(0))
                                .otherwise(col('bytes_sent'))) \
       .withColumn('URL', when((col('URL').isNull()) |
                               (col('URL') == '-') |
                               (col('URL') == ''), lit('Unknown'))
                         .otherwise(col('URL')))

In [None]:
for c in df.columns:
  print(f"""{c}: {df.where((col(c).isNull()) |
                           (col(c) == '-') |
                           (col(c) == '')).count()}""")

value: 0
ip_address: 0
remote_user_identity: 0
remote_user: 0
timestamp: 0
http_method: 0
requested_url: 0
http_version: 0
http_status_code: 0
bytes_sent: 0
URL: 0
user_agent: 0


## 6. Parse timestamp to readable date

In [None]:
df = df.withColumn("date", to_timestamp("timestamp", "dd/MMM/yyyy:HH:mm:ss Z"))
df.select('timestamp', 'date').show(truncate=False)

+--------------------------+-------------------+
|timestamp                 |date               |
+--------------------------+-------------------+
|12/Dec/2015:18:25:11 +0100|2015-12-12 17:25:11|
|12/Dec/2015:18:25:11 +0100|2015-12-12 17:25:11|
|12/Dec/2015:18:31:08 +0100|2015-12-12 17:31:08|
|12/Dec/2015:18:31:08 +0100|2015-12-12 17:31:08|
|12/Dec/2015:18:31:25 +0100|2015-12-12 17:31:25|
|12/Dec/2015:18:31:25 +0100|2015-12-12 17:31:25|
|12/Dec/2015:18:32:10 +0100|2015-12-12 17:32:10|
|12/Dec/2015:18:32:11 +0100|2015-12-12 17:32:11|
|12/Dec/2015:18:32:56 +0100|2015-12-12 17:32:56|
|12/Dec/2015:18:32:56 +0100|2015-12-12 17:32:56|
|12/Dec/2015:18:33:51 +0100|2015-12-12 17:33:51|
|12/Dec/2015:18:33:52 +0100|2015-12-12 17:33:52|
|12/Dec/2015:18:36:16 +0100|2015-12-12 17:36:16|
|12/Dec/2015:18:36:16 +0100|2015-12-12 17:36:16|
|12/Dec/2015:18:38:42 +0100|2015-12-12 17:38:42|
|12/Dec/2015:18:38:42 +0100|2015-12-12 17:38:42|
|12/Dec/2015:18:38:55 +0100|2015-12-12 17:38:55|
|12/Dec/2015:18:38:5

In [None]:
df.select('timestamp', 'date').printSchema()

root
 |-- timestamp: string (nullable = false)
 |-- date: timestamp (nullable = true)



## 7. Describe which HTTP status values appear in data and how many

In [None]:
http_statuses = df.groupBy('http_status_code').count().orderBy(desc('count'))

http_statuses.show(truncate=False)

+----------------+-------+
|http_status_code|count  |
+----------------+-------+
|200             |1157832|
|206             |939929 |
|404             |227101 |
|304             |6330   |
|500             |3252   |
|403             |2222   |
|301             |619    |
|303             |247    |
|401             |153    |
|501             |143    |
|405             |83     |
|406             |53     |
|400             |23     |
|412             |19     |
+----------------+-------+



In [None]:
http_statuses.coalesce(1) \
             .write.csv('part2_q8_stats.csv', header=True)

## 8. How many unique hosts are there in the entire log and their average request

In [None]:
hosts = df.groupBy(['ip_address']).count()
n_unique_hosts = hosts.count()
print(n_unique_hosts)
hosts.show()

40836
+--------------+-----+
|    ip_address|count|
+--------------+-----+
|   46.72.177.4|    8|
| 194.48.218.78|    2|
| 31.181.253.16|    2|
|  37.112.46.76|    2|
| 95.107.90.225|    2|
|  5.138.58.118|    2|
|95.188.228.228|    2|
|  66.7.119.112|    1|
| 145.255.2.176|    4|
| 176.59.208.95|    2|
| 62.133.162.65|    4|
| 95.29.129.235|    2|
|  66.249.64.64|   41|
| 207.46.13.165|    6|
| 180.76.15.162|   75|
|  37.139.52.40|   16|
| 89.144.209.67|   26|
|23.106.216.107|    3|
|  195.20.125.6|   18|
| 92.113.63.101|    6|
+--------------+-----+
only showing top 20 rows



In [None]:
total_requests_count = df.count()
average_requests_per_host = total_requests_count/n_unique_hosts

# printing the results
print("Number of unique hosts:", n_unique_hosts)
print("Total number of requests:", total_requests_count)
print("Average requests per host:", average_requests_per_host)

Number of unique hosts: 40836
Total number of requests: 2338006
Average requests per host: 57.25355078851993


In [None]:
# stopping the session
spark.stop()