# Processing NASA Logs  
  
This challenge was solved using Spark SQL to clean and wrangle the data to answer the proposed questions.  
  
The challenge consists of answering the following questions:  
  
- Number of unique hosts  
- Total number of 404 errors  
- The 5 URLs that caused the most 404 errors  
- Number of daily 404 errors  
- Total bytes returned  
  
Official dataset source:  
https://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html  
  
Data:  
ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz  
ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz  

About the dataset:  
The dataset holds all the HTTP requests to the server at NASA'S Kennedy Space Center for July and August of 1995.  

## Data

In [1]:
# Creating a Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Processing NASA Logs').getOrCreate()

In [2]:
# Reading files into a dataframe
df = spark.read.csv('NASA_access_log_*.gz')

In [3]:
# Splitting each row into columns
from pyspark.sql.functions import split

split_col = split(df['_c0'], ' ')
df = df.withColumn('host', split_col.getItem(0)) \
       .withColumn('rfc931', split_col.getItem(1)) \
       .withColumn('username', split_col.getItem(2)) \
       .withColumn('date:time', split_col.getItem(3)) \
       .withColumn('timezone', split_col.getItem(4)) \
       .withColumn('method', split_col.getItem(5)) \
       .withColumn('resource', split_col.getItem(6)) \
       .withColumn('protocol', split_col.getItem(7)) \
       .withColumn('statuscode', split_col.getItem(8)) \
       .withColumn('bytes', split_col.getItem(9))

In [4]:
# Merging appropriate columns
from pyspark.sql.functions import col, concat, lit

df = df.withColumn('date_time_timezone', concat(col('date:time'), lit(' '), col('timezone')))
df = df.withColumn('request', concat(col('method'), lit(' '), col('resource'), lit(' '), col('protocol')))

In [5]:
# Dropping redundant columns
df = df.drop('_c0', 'date:time', 'timezone', 'method', 'resource', 'protocol')

In [6]:
# Reordering the dataframe
df = df.select('host', 'rfc931', 'username', 'date_time_timezone', 'request', 'statuscode', 'bytes')

In [7]:
# Cleaning request and date_time_timezone columns
from pyspark.sql.functions import expr, substring

df = df.withColumn('request', expr('substring(request, 2, length(request)-2)'))
df = df.withColumn('date_time_timezone', substring(col('date_time_timezone'), 2, 26))

In [8]:
# Casting date_time_timezone as TimeStamp and bytes as Int
from pyspark.sql.functions import to_timestamp

df = df.withColumn('date_time_timezone', to_timestamp(col('date_time_timezone'), 'dd/MMM/yyyy:HH:mm:ss Z'))
df = df.withColumn('bytes', col('bytes').cast('int'))

In [9]:
# Creating view with the dataframe
df.createOrReplaceTempView('accessLog')

In [10]:
# Visualizing the dataframe
df.show() 
df.printSchema()

+--------------------+------+--------+-------------------+--------------------+----------+-----+
|                host|rfc931|username| date_time_timezone|             request|statuscode|bytes|
+--------------------+------+--------+-------------------+--------------------+----------+-----+
|        199.72.81.55|     -|       -|1995-07-01 01:00:01|GET /history/apol...|       200| 6245|
|unicomp6.unicomp.net|     -|       -|1995-07-01 01:00:06|GET /shuttle/coun...|       200| 3985|
|      199.120.110.21|     -|       -|1995-07-01 01:00:09|GET /shuttle/miss...|       200| 4085|
|  burger.letters.com|     -|       -|1995-07-01 01:00:11|GET /shuttle/coun...|       304|    0|
|      199.120.110.21|     -|       -|1995-07-01 01:00:11|GET /shuttle/miss...|       200| 4179|
|  burger.letters.com|     -|       -|1995-07-01 01:00:12|GET /images/NASA-...|       304|    0|
|  burger.letters.com|     -|       -|1995-07-01 01:00:12|GET /shuttle/coun...|       200|    0|
|     205.212.115.106|     -| 

## Number of unique hosts

In [11]:
# Creating a dataframe with the number of unique hosts
unHosts = spark.sql('SELECT COUNT(DISTINCT host) \
                     AS Unique_Hosts \
                     FROM accessLog')

In [12]:
# Visualizing the dataframe
unHosts.show()

+------------+
|Unique_Hosts|
+------------+
|      137979|
+------------+



## Total number of 404 errors

In [13]:
# Creating a dataframe with the total of requests that returned a 404 error
totalErrors = spark.sql('SELECT COUNT(statuscode) \
                         AS 404_Errors \
                         FROM accessLog \
                         WHERE statuscode == 404')

In [14]:
# Visualizing the dataframe
totalErrors.show()

+----------+
|404_Errors|
+----------+
|     20638|
+----------+



## The 5 URLs that caused the most 404 errors

In [15]:
# Creating a dataframe with the five URLs that returned the most 404 errors
totalErrors = spark.sql('SELECT host, COUNT(statuscode) \
                         AS 404_Errors \
                         FROM accessLog \
                         WHERE statuscode == 404 \
                         GROUP BY host \
                         ORDER BY COUNT(statuscode) DESC \
                         LIMIT 5')

In [16]:
# Visualizing the dataframe
totalErrors.show()

+--------------------+----------+
|                host|404_Errors|
+--------------------+----------+
|hoohoo.ncsa.uiuc.edu|       251|
|piweba3y.prodigy.com|       157|
|jbiagioni.npt.nuw...|       132|
|piweba1y.prodigy.com|       114|
|www-d4.proxy.aol.com|        91|
+--------------------+----------+



## Number of daily 404 errors

In [17]:
# Creating a dataframe with the number of 404 errors for each day
dailyErrors = spark.sql('SELECT date_time_timezone, COUNT(statuscode) \
                         AS 404_Errors \
                         FROM accessLog \
                         WHERE statuscode == 404 \
                         GROUP BY date_time_timezone \
                         ORDER BY COUNT(statuscode) DESC')

In [18]:
# Visualizing the dataframe
dailyErrors.show()

+-------------------+----------+
| date_time_timezone|404_Errors|
+-------------------+----------+
|1995-08-11 13:05:59|         7|
|1995-08-28 12:56:35|         7|
|1995-08-11 13:05:58|         5|
|1995-08-17 17:55:00|         5|
|1995-07-12 11:20:43|         5|
|1995-08-28 18:14:32|         5|
|1995-07-12 11:24:50|         5|
|1995-07-12 11:35:12|         5|
|1995-07-12 11:35:09|         5|
|1995-07-12 11:21:30|         5|
|1995-08-28 18:14:42|         5|
|1995-07-12 11:35:11|         5|
|1995-07-11 15:08:06|         5|
|1995-07-12 11:35:01|         4|
|1995-07-20 08:21:17|         4|
|1995-08-13 15:28:59|         4|
|1995-08-28 02:05:47|         4|
|1995-08-28 18:14:15|         4|
|1995-08-28 18:14:28|         4|
|1995-07-06 11:31:11|         4|
+-------------------+----------+
only showing top 20 rows



## Total bytes returned

In [19]:
# Creating a dataframe with the total amount of bytes from all requests
totalBytes = spark.sql('SELECT SUM(bytes) \
                        AS Bytes \
                        FROM accessLog')

In [20]:
# Visualizing the dataframe
totalBytes.show()

+-----------+
|      Bytes|
+-----------+
|65136540452|
+-----------+



## End

In [21]:
# Stopping Spark
spark.stop()