In [1]:
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
    
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

In [2]:
import re
import os
import pandas as pd
import numpy as np

### Basic regular expression

In [3]:
m = re.finditer(r'.*?(spark).*?', "I'm searching for a spark in PySpark", re.I)
for match in m:
    print(match, match.start(), match.end())

<re.Match object; span=(0, 25), match="I'm searching for a spark"> 0 25
<re.Match object; span=(25, 36), match=' in PySpark'> 25 36


## Spark setup 
It can be setup as described below or through bashrc file.

In [4]:
os.environ["SPARK_HOME"] = "/opt/spark-2.4.5-bin-hadoop2.7"
print(os.environ.get('SPARK_HOME'))

/opt/spark-2.4.5-bin-hadoop2.7


## Download data
1) Download it if they do not exist locally.

In [5]:
path_to_data_dir = "./data/"
if not os.path.exists(path_to_data_dir):
    os.mkdir(path_to_data_dir)
    print("Created data directory")

if not os.path.exists("./data/NASA_access_log_Jul95.gz"):
    !wget ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz 
    !mv ./NASA_access_log_Jul95.gz ./data/

if not os.path.exists("./data/NASA_access_log_Aug95.gz"):
    !wget ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz 
    !mv ./NASA_access_log_Aug95.gz ./data/

if not os.path.exists("./data/clarknet_access_log_Aug28.gz"):
    !wget ftp://ita.ee.lbl.gov/traces/clarknet_access_log_Aug28.gz 
    !mv ./clarknet_access_log_Aug28.gz ./data/

if not os.path.exists("./data/clarknet_access_log_Sep4.gz"):
    !wget ftp://ita.ee.lbl.gov/traces/clarknet_access_log_Sep4.gz
    !mv ./clarknet_access_log_Sep4.gz ./data/

## Parse log files

Four gz files were downloaded and they have same format with same number of columns.

In [6]:
import glob
import os

raw_data_files = glob.glob('./data/*.gz')
print(raw_data_files)
base_df = spark.read.text(raw_data_files)
print(base_df.printSchema())
print(type(base_df))

['./data/clarknet_access_log_Sep4.gz', './data/NASA_access_log_Jul95.gz', './data/clarknet_access_log_Aug28.gz', './data/NASA_access_log_Aug95.gz']
root
 |-- value: string (nullable = true)

None
<class 'pyspark.sql.dataframe.DataFrame'>


## Data Wrangling

Converting data frame to Resilient Distributed Datasets (RDDs)

In [7]:
base_df.show(2, truncate=False)

base_df_rdd = base_df.rdd
print(type(base_df_rdd))
base_df_rdd.take(2)



+------------------------------------------------------------------------------------------------------------------+
|value                                                                                                             |
+------------------------------------------------------------------------------------------------------------------+
|204.249.225.59 - - [28/Aug/1995:00:00:34 -0400] "GET /pub/rmharris/catalogs/dawsocat/intro.html HTTP/1.0" 200 3542|
|access9.accsyst.com - - [28/Aug/1995:00:00:35 -0400] "GET /pub/robert/past99.gif HTTP/1.0" 200 4993               |
+------------------------------------------------------------------------------------------------------------------+
only showing top 2 rows

<class 'pyspark.rdd.RDD'>


[Row(value='204.249.225.59 - - [28/Aug/1995:00:00:34 -0400] "GET /pub/rmharris/catalogs/dawsocat/intro.html HTTP/1.0" 200 3542'),
 Row(value='access9.accsyst.com - - [28/Aug/1995:00:00:35 -0400] "GET /pub/robert/past99.gif HTTP/1.0" 200 4993')]

In [8]:
# !pip3 install findspark
# os.environ.get('SPARK_HOME')
import findspark
findspark.init()

### Clean data

There are multiple columns that are listed below. They have their unique patterns.  

1. Host names
2. Access timestamps
3. HTTP Request method, endpoint, and protocol
4. HTTP Status codes
5. HTTP Response content size

In [9]:
pa_host = r'(^\S+\.[\S+\.]+\S+)\s' # host name - ['204.249.225.59', 'access9.accsyst.com']
pa_time = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]' # date and time - 01/Jul/1995:00:00:01 -0400
pa_protocol = r'\"(\S+)\s(\S+)\s*(\S*)\"' # ('GET', '/history/apollo/', 'HTTP/1.0')
pa_status = r'\s(\d{3})\s' # status code
pa_content_size = r'\s(\d+)$' # Content size

In [10]:
from pyspark.sql.functions import regexp_extract

df_parse_log = base_df.select(regexp_extract('value', pa_host, 1).alias('Host'),
                         regexp_extract('value', pa_time, 1).alias('Timestamp'),
                         regexp_extract('value', pa_protocol, 1).alias('Method'),
                         regexp_extract('value', pa_protocol, 2).alias('Endpoint'),
                         regexp_extract('value', pa_protocol, 3).alias('Protocol'),
                         regexp_extract('value', pa_status, 1).cast('integer').alias('Status'),
                         regexp_extract('value', pa_content_size, 1).cast('integer').alias('Content_size'))
df_parse_log.show(2, truncate=True)
print((df_parse_log.count, len(df_parse_log.columns)))

+-------------------+--------------------+------+--------------------+--------+------+------------+
|               Host|           Timestamp|Method|            Endpoint|Protocol|Status|Content_size|
+-------------------+--------------------+------+--------------------+--------+------+------------+
|     204.249.225.59|28/Aug/1995:00:00...|   GET|/pub/rmharris/cat...|HTTP/1.0|   200|        3542|
|access9.accsyst.com|28/Aug/1995:00:00...|   GET|/pub/robert/past9...|HTTP/1.0|   200|        4993|
+-------------------+--------------------+------+--------------------+--------+------+------------+
only showing top 2 rows

(<bound method DataFrame.count of DataFrame[Host: string, Timestamp: string, Method: string, Endpoint: string, Protocol: string, Status: int, Content_size: int]>, 7)


In [11]:
df_day.show()

NameError: name 'df_day' is not defined

## Finding missing values

In [None]:
df_missing_data = df_parse_log.filter(df_parse_log['Host'].isNull()| 
                                    df_parse_log['Timestamp'].isNull() | 
                                    df_parse_log['Method'].isNull() |
                                    df_parse_log['Endpoint'].isNull() |
                                    df_parse_log['Protocol'].isNull() |
                                    df_parse_log['Status'].isNull() |
                                    df_parse_log['Content_size'].isNull())

In [None]:
df_missing_data.show(2)

What is distribution of missing values per column?

-> Build up a list of column expressions, one per column.

-> Run the aggregation. The *exprs converts the list of expressions into

-> variable function arguments.

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum

def count_null(col_name):
    return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

exprs = [count_null(col_name) for col_name in df_parse_log.columns]
df_parse_log.agg(*exprs).show()

In [None]:
null_status_df = base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))
null_status_df.show(1)

In [None]:
bad_status_df = null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                                      regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                                      regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                                      regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                                      regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                                      regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                                      regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
bad_status_df.show(truncate=False)

In [None]:
df_parse_log = df_parse_log[df_parse_log['Status'].isNotNull()]
df_parse_log.show(2)

In [None]:
exprs = [count_null(col_name) for col_name in df_parse_log.columns]
df_parse_log.agg(*exprs).show(2)

## Handling nulls in HTTP content size

In [None]:
null_content_size_df = base_df.filter(~base_df['value'].rlike(r'\s\d+$'))
null_content_size_df.show(1, truncate=True)

In [None]:
null_content_size_df.take(2)

In [None]:
df_parse_log = df_parse_log.na.fill({'content_size': 0})

In [None]:
exprs = [count_null(col_name) for col_name in logs_df.columns]
logs_df.agg(*exprs).show()

## Handling Temporal Fields (Timestamp)

Caution: There is a built in function for casting. In this case, it can be easily cast using timestamp however it keeps giving an error. Therefore, I used following technique. 

In [126]:
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType
from datetime import datetime

month_str_ints = {'Jan': '1', 'Feb': '2', 'Mar':'3', 'Apr':'4' , 'May': '5', 'Jun':'6', 
                  'Jul': '7', 'Aug': '8', 'Sep':'9', 'Oct':'10', 'Nov':'11', 'Dec':'12' }

def get_month(text):
    if not text[3:6] in month_str_ints:
        return "13"
    else:
        return "%s" %str(month_str_ints[text[3:6]])
      
def get_year(text):
    return "%s" %str(text[7:11])

def get_day(text):
    return "%s" %str(text[0:2])
    
  

In [127]:
from pyspark.sql.functions import unix_timestamp
udf_parse_time = udf(get_year)
df_cleaned = df_parse_log.select('*', udf_parse_time(df_parse_log['Timestamp'])
                                .alias('Year'))
udf_parse_time = udf(get_month)
df_cleaned = df_cleaned.select('*', udf_parse_time(df_parse_log['Timestamp'])
                                .alias('Month'))
udf_parse_time = udf(get_day)
df_cleaned = df_cleaned.select('*', udf_parse_time(df_parse_log['Timestamp'])
                                .alias('Day')).drop('Timestamp')

In [128]:
df_cleaned.show()

+--------------------+------+--------------------+--------+------+------------+----+-----+---+
|                Host|Method|            Endpoint|Protocol|Status|Content_size|Year|Month|Day|
+--------------------+------+--------------------+--------+------+------------+----+-----+---+
|      204.249.225.59|   GET|/pub/rmharris/cat...|HTTP/1.0|   200|        3542|1995|    8| 28|
| access9.accsyst.com|   GET|/pub/robert/past9...|HTTP/1.0|   200|        4993|1995|    8| 28|
| access9.accsyst.com|   GET|/pub/robert/curr9...|HTTP/1.0|   200|        5836|1995|    8| 28|
|       world.std.com|   GET|/pub/atomicbk/cat...|HTTP/1.0|   200|       18338|1995|    8| 28|
|    cssu24.cs.ust.hk|   GET|/pub/job/vk/view1...|HTTP/1.0|   200|        5944|1995|    8| 28|
|     er6.rutgers.edu|   GET|/pub/rjgula/netwo...|HTTP/1.0|   200|        2017|1995|    8| 28|
|cyclom1-1-6.inter...|   GET|/pub/k2/jeep/jxj.htm|HTTP/1.0|   200|        3254|1995|    8| 28|
|d24-1.cpe.Brisban...|   GET|/pub/eurocent/hom...|

+--------------------+-----+---+
|                Host|Month|Day|
+--------------------+-----+---+
|      204.249.225.59|    8| 28|
| access9.accsyst.com|    8| 28|
| access9.accsyst.com|    8| 28|
|       world.std.com|    8| 28|
|    cssu24.cs.ust.hk|    8| 28|
|     er6.rutgers.edu|    8| 28|
|cyclom1-1-6.inter...|    8| 28|
|d24-1.cpe.Brisban...|    8| 28|
|       world.std.com|    8| 28|
|       world.std.com|    8| 28|
|       world.std.com|    8| 28|
|    cssu24.cs.ust.hk|    8| 28|
|d24-1.cpe.Brisban...|    8| 28|
|  ppp19.glas.apc.org|    8| 28|
|  ppp19.glas.apc.org|    8| 28|
|  ppp19.glas.apc.org|    8| 28|
|  ppp19.glas.apc.org|    8| 28|
|         ari.ari.net|    8| 28|
|       world.std.com|    8| 28|
|  ppp19.glas.apc.org|    8| 28|
+--------------------+-----+---+
only showing top 20 rows



In [131]:
test.show()

+-----+-------+
|Month|  count|
+-----+-------+
|    7|1891714|
|    8|2621568|
|    9|2276962|
|   13|    101|
+-----+-------+



# Explortory Data Analysis (EDA)

## HTTP Status
First, unique HTTP status is determined and then find their distribution.



In [None]:
df_status_freq = (logs_df
                     .groupBy('status')
                     .count()
                     .sort('status')
                     .cache())

In [None]:
df_pd_status_freq = (df_status_freq
                         .toPandas()
                         .sort_values(by=['count'], ascending=False))

df_pd_status_freq['percent'] = df_pd_status_freq['count']/sum(df_pd_status_freq['count'])
df_pd_status_freq['log(count)'] = np.log(df_pd_status_freq['count'])
df_pd_status_freq


In [None]:
df_pd_status_freq.head()

## Frequent host

Identify the top ten host and also compute their proportion.

In [None]:
logs_df.show(2)

In [None]:
df_host_freq = (logs_df
                     .groupBy('host')
                     .count()
                     .cache())
df_pd_host_freq = (df_host_freq
                        .toPandas()
                        .sort_values(by=['count'], ascending=False))
df_pd_host_freq["Percentage"] = df_pd_host_freq["count"] / sum(df_pd_host_freq["count"])
df_pd_host_freq["Log(count)"] = np.log(df_pd_host_freq["count"])

In [None]:
df_pd_host_freq.head(3)

In [None]:
mean    = np.mean(df_pd_host_freq['count'])
std     = np.std(df_pd_host_freq['count'])
df_pd_host_freq['zscore'] = (df_pd_host_freq['count'] - mean)/std
df_pd_host_freq.head()

## What method is most frequently used?

In [None]:
df_method = (logs_df
                     .groupBy('method')
                     .count()
                     .cache())
df_pd_method = (df_method
                        .toPandas()
                        .sort_values(by=['count'], ascending=False))
df_pd_method["Percentage"] = 100.00 * (df_pd_method["count"] / sum(df_pd_method["count"]))
df_pd_method["Log(count)"] = np.log(df_pd_method["count"])

In [None]:
df_pd_method.head()

Most popular method is GET followed by HEAD. However, 99% of times GET method is used. Superisingly, POST method is almost never used.  

In [None]:
df_protocol = (logs_df
                     .groupBy('protocol')
                     .count()
                     .cache())
df_protocol = (df_protocol
                        .toPandas()
                        .sort_values(by=['count'], ascending=False))
df_protocol["Percentage"] = 100.00 * (df_protocol["count"] / sum(df_protocol["count"]))
df_protocol["Log(count)"] = np.log(df_protocol["count"])

In [None]:
df_protocol.head()

In [54]:
host_day_df.show(2)

+-------------------+---+
|               Host|day|
+-------------------+---+
|     204.249.225.59| 28|
|access9.accsyst.com| 28|
+-------------------+---+
only showing top 2 rows



### number of host per day

In [133]:
from pyspark.sql.functions import dayofmonth
1z = df_cleaned.select(df_cleaned.Host,df_cleaned.Month, df_cleaned.Day)
df_host_month_day.select('*').count()

6790345

In [134]:
from pyspark.sql.functions import dayofmonth
df_host_month_day = df_cleaned.select(df_cleaned.Host,df_cleaned.Month, df_cleaned.Day)
df_host_month_day.select('*').distinct().count()

478545

In [135]:
df_host_group_by = (host_day_df.groupBy('Month', 'Day').count())

In [136]:
df_host_group_by.show()

+-----+---+------+
|Month|Day| count|
+-----+---+------+
|    7| 07| 87233|
|    8| 21| 55540|
|    7| 12| 92536|
|    8| 17| 58988|
|    7| 11| 80407|
|    7| 21| 64629|
|    7| 13|134203|
|    8| 06| 32420|
|    8| 28|306823|
|    9| 05|229939|
|    8| 30|354082|
|    8| 10| 61248|
|    7| 01| 64714|
|    7| 27| 61680|
|    8| 07| 57362|
|    8| 20| 32963|
|    9| 07|274660|
|    7| 20| 66593|
|    8| 25| 57321|
|    9| 10|192265|
+-----+---+------+
only showing top 20 rows



In [137]:
df_host_month_day_unique = df_host_month_day.select('*').distinct()

In [138]:
df_host_month_day_unique_gb = (df_host_month_day_unique.groupBy('Month', 'Day').count())

In [139]:
df_host_month_day_unique_gb.show()

+-----+---+-----+
|Month|Day|count|
+-----+---+-----+
|    7| 07| 6471|
|    8| 21| 4128|
|    7| 12| 5339|
|    8| 17| 4382|
|    7| 11| 4923|
|    7| 21| 4337|
|    7| 13| 6946|
|    8| 06| 2537|
|    8| 28|20979|
|    9| 05|16206|
|    8| 30|23158|
|    8| 10| 4520|
|    7| 01| 5191|
|    7| 27| 4367|
|    8| 20| 2560|
|    8| 07| 4104|
|    9| 07|19167|
|    7| 20| 4725|
|    8| 25| 4404|
|    9| 10|14414|
+-----+---+-----+
only showing top 20 rows



In [144]:
from pyspark.sql.functions import col
df_gb_host_month_day = (host_day_df
                        .groupBy('Host','Month', 'Day')
                        .count()
                        .sort(col("count").desc())
                       )

In [145]:
df_gb_host_month_day.show()

+--------------------+-----+---+-----+
|                Host|Month|Day|count|
+--------------------+-----+---+-----+
|www-e3.proxy.aol.com|    9| 06| 2632|
|www-e2.proxy.aol.com|    9| 06| 2045|
|intgate.raleigh.i...|    9| 08| 1895|
|      acme.clark.net|    9| 02| 1780|
|           clark.net|    8| 30| 1512|
|mpngate1.ny.us.ib...|    8| 30| 1462|
|    webgate1.mot.com|    8| 30| 1444|
|www-e3.proxy.aol.com|    9| 07| 1424|
|    webgate1.mot.com|    8| 31| 1407|
|piweba5y.prodigy.com|    9| 09| 1397|
|   bill.ksc.nasa.gov|    7| 11| 1394|
|   indy.gradient.com|    7| 12| 1356|
| siltb10.orl.mmc.com|    7| 21| 1354|
|mpngate1.ny.us.ib...|    9| 06| 1344|
|   bill.ksc.nasa.gov|    7| 12| 1317|
|  reboot.dt.navy.mil|    9| 07| 1304|
|piweba5y.prodigy.com|    9| 04| 1301|
|piweba3y.prodigy.com|    7| 16| 1280|
|piweba4y.prodigy.com|    7| 16| 1269|
|piweba5y.prodigy.com|    8| 31| 1258|
+--------------------+-----+---+-----+
only showing top 20 rows

