In [1]:
# configure spark variables
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
   
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

# load up other dependencies
import re
import pandas as pd

In [2]:
fpath1 = "/FileStore/tables/NASA_access_log_Jul95.gz"
fpath2 = "/FileStore/tables/NASA_access_log_Aug95.gz"

df = spark.read.text([fpath1,fpath2])
print(df)
df.show()

In [3]:
df.printSchema()

In [4]:
df.show(10, truncate=False)

In [5]:
sample_logs = [item['value'] for item in df.take(10)]
sample_logs

In [6]:
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
hosts = [re.search(host_pattern, item).group(1)
           if re.search(host_pattern, item)
           else 'no match'
           for item in sample_logs]
hosts

In [7]:
ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
timestamps = [re.search(ts_pattern, item).group(1)
              if re.search(ts_pattern, item)
              else 'no match'
              for item in sample_logs]
timestamps

In [8]:
method_endpt_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
method_endpt_protocol = [re.search(method_endpt_protocol_pattern, item).groups()
               if re.search(method_endpt_protocol_pattern, item)
               else 'no match'
              for item in sample_logs]
method_endpt_protocol

In [9]:
status_pattern = r'\s(\d{3})\s'
status = [re.search(status_pattern, item).group(1) 
          if re.search(status_pattern, item)
          else 'no match'
          for item in sample_logs]
print(status)

In [10]:
content_size_pattern = r'\s(\d+)$'
content_size = [re.search(content_size_pattern, item).group(1) 
                if re.search(content_size_pattern, item)
                else 'no match'
                for item in sample_logs]
print(content_size)

In [11]:
from pyspark.sql.functions import regexp_extract

logs_df = df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                    regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                    regexp_extract('value', method_endpt_protocol_pattern, 1).alias('method'),
                    regexp_extract('value', method_endpt_protocol_pattern, 2).alias('endpoint'),
                    regexp_extract('value', method_endpt_protocol_pattern, 3).alias('protocol'),
                    regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                    regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
logs_df.show(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))

In [12]:
logs_df.createOrReplaceTempView('logs_table')

In [13]:
%sql
select * from logs_table

In [14]:
%sql
select host, count(host) as host_count from logs_table group by host order by host_count desc

host,host_count
piweba3y.prodigy.com,21988
piweba4y.prodigy.com,16437
piweba1y.prodigy.com,12825
edams.ksc.nasa.gov,11964
163.206.89.4,9697
news.ti.com,8161
www-d1.proxy.aol.com,8047
alyssa.prodigy.com,8037
,7661
siltb10.orl.mmc.com,7573


In [15]:
%sql
select count(distinct host) as number_of_unique_hosts from logs_table

number_of_unique_hosts
137933


In [16]:
%sql
select distinct endpoint, count(endpoint) as endpoint_count from logs_table group by endpoint order by endpoint_count desc

endpoint,endpoint_count
/images/NASA-logosmall.gif,208714
/images/KSC-logosmall.gif,164970
/images/MOSAIC-logosmall.gif,127908
/images/USA-logosmall.gif,127074
/images/WORLD-logosmall.gif,125925
/images/ksclogo-medium.gif,121572
/ksc.html,83909
/images/launch-logo.gif,76006
/history/apollo/images/apollo-logo1.gif,68896
/shuttle/countdown/,64736


In [17]:
%sql
select endpoint, count(endpoint) as endpoint_count from logs_table where status != 200 group by endpoint order by endpoint_count desc limit 10

endpoint,endpoint_count
/images/NASA-logosmall.gif,40082
/images/KSC-logosmall.gif,23763
/images/MOSAIC-logosmall.gif,15245
/images/USA-logosmall.gif,15142
/images/WORLD-logosmall.gif,14773
/images/ksclogo-medium.gif,13559
/images/launch-logo.gif,8806
/history/apollo/images/apollo-logo1.gif,7489
/,6296
/images/ksclogosmall.gif,5669


In [18]:
%sql
select * from logs_table where status = 404

host,timestamp,method,endpoint,protocol,status,content_size
dd15-062.compuserve.com,01/Jul/1995:00:01:12 -0400,GET,/news/sci.space.shuttle/archive/sci-space-shuttle-22-apr-1995-40.txt,HTTP/1.0,404,
netport-27.iu.net,01/Jul/1995:00:10:19 -0400,GET,/pub/winvn/readme.txt,HTTP/1.0,404,
netport-27.iu.net,01/Jul/1995:00:10:28 -0400,GET,/pub/winvn/readme.txt,HTTP/1.0,404,
blv-pm0-ip28.halcyon.com,01/Jul/1995:00:14:17 -0400,GET,/persons/astronauts/i-to-l/lousmaJR.txt,HTTP/1.0,404,
blv-pm0-ip28.halcyon.com,01/Jul/1995:00:14:32 -0400,GET,/persons/astronauts/a-to-d/beanAL.txt,HTTP/1.0,404,
cu-dialup-1005.cit.cornell.edu,01/Jul/1995:00:18:39 -0400,GET,/pub/winvn/readme.txt,HTTP/1.0,404,
cu-dialup-1005.cit.cornell.edu,01/Jul/1995:00:18:45 -0400,GET,/pub/winvn/readme.txt,HTTP/1.0,404,
cu-dialup-1005.cit.cornell.edu,01/Jul/1995:00:18:49 -0400,GET,/pub/winvn/release.txt,HTTP/1.0,404,
mimas.execpc.com,01/Jul/1995:00:18:59 -0400,GET,/shuttle/missions/technology/sts-newsref/stsref-toc.html,HTTP/1.0,404,
zoom112.telepath.com,01/Jul/1995:00:25:27 -0400,GET,/history/apollo-13/apollo-13.html,HTTP/1.0,404,


In [19]:
%sql
select distinct status, count(status) as status_count from logs_table group by status order by status_count desc

status,status_count
200.0,3100524
304.0,266773
302.0,73070
404.0,20899
403.0,225
500.0,65
501.0,41
400.0,15
,0


In [20]:
%sql
select min(content_size) as minimum_content_size, max(content_size) as maximum_content_size, avg(content_size) as average_content_size 
from logs_table

minimum_content_size,maximum_content_size,average_content_size
0,6823936,19116.072581153352


In [21]:
from pyspark.sql.functions import udf

month_map = {
  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
  'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}

def parse_clf_time(text):
    """ Converting to a Python datetime object
    """
    return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
      int(text[7:11]),
      month_map[text[3:6]],
      int(text[0:2]),
      int(text[12:14]),
      int(text[15:17]),
      int(text[18:20])
    )

In [22]:
udf_parse_time = udf(parse_clf_time)

logs_df = logs_df.select('*', udf_parse_time(logs_df['timestamp']).cast('timestamp').alias('time')).drop('timestamp')


In [23]:
logs_df.show(10, truncate=True)


In [24]:
logs_df.createOrReplaceTempView('logs_table')

In [25]:
%sql
select * from logs_table

host,method,endpoint,protocol,status,content_size,time
199.72.81.55,GET,/history/apollo/,HTTP/1.0,200,6245.0,1995-07-01T00:00:01.000+0000
unicomp6.unicomp.net,GET,/shuttle/countdown/,HTTP/1.0,200,3985.0,1995-07-01T00:00:06.000+0000
199.120.110.21,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085.0,1995-07-01T00:00:09.000+0000
burger.letters.com,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0.0,1995-07-01T00:00:11.000+0000
199.120.110.21,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179.0,1995-07-01T00:00:11.000+0000
burger.letters.com,GET,/images/NASA-logosmall.gif,HTTP/1.0,304,0.0,1995-07-01T00:00:12.000+0000
burger.letters.com,GET,/shuttle/countdown/video/livevideo.gif,HTTP/1.0,200,0.0,1995-07-01T00:00:12.000+0000
205.212.115.106,GET,/shuttle/countdown/countdown.html,HTTP/1.0,200,3985.0,1995-07-01T00:00:12.000+0000
d104.aa.net,GET,/shuttle/countdown/,HTTP/1.0,200,3985.0,1995-07-01T00:00:13.000+0000
129.94.144.152,GET,/,HTTP/1.0,200,7074.0,1995-07-01T00:00:13.000+0000


In [26]:
from pyspark.sql import functions as F

host_day_df = logs_df.select(logs_df.host, 
                             F.dayofmonth('time').alias('day'),F.month('time').alias('month'))
host_day_df.show()

In [27]:
host_day_distinct_df = (host_day_df
                          .dropDuplicates())
display(host_day_distinct_df)

host,day,month
153.64.25.131,1,7
gclab014.ins.gu.edu.au,1,7
indy47.sfc.keio.ac.jp,1,7
ppp20.camtech.com.au,1,7
ednet1.osl.or.gov,1,8
ix-stl3-16.ix.netcom.com,1,8
nettrek.wt.com.au,1,8
slmel2p17.ozemail.com.au,1,8
163.206.89.4,1,8
147.74.25.22,1,8


In [28]:
host_day_distinct_df.printSchema()

In [29]:
display(host_day_distinct_df)

host,day,month
153.64.25.131,1,7
gclab014.ins.gu.edu.au,1,7
indy47.sfc.keio.ac.jp,1,7
ppp20.camtech.com.au,1,7
ednet1.osl.or.gov,1,8
ix-stl3-16.ix.netcom.com,1,8
nettrek.wt.com.au,1,8
slmel2p17.ozemail.com.au,1,8
163.206.89.4,1,8
147.74.25.22,1,8


In [30]:
host_day_distinct_df.createOrReplaceTempView('host_day_table')

In [31]:
%sql
select * from host_day_table

host,day,month
153.64.25.131,1,7
gclab014.ins.gu.edu.au,1,7
indy47.sfc.keio.ac.jp,1,7
ppp20.camtech.com.au,1,7
ednet1.osl.or.gov,1,8
ix-stl3-16.ix.netcom.com,1,8
nettrek.wt.com.au,1,8
slmel2p17.ozemail.com.au,1,8
163.206.89.4,1,8
147.74.25.22,1,8


In [32]:
%sql
select distinct host, day, month from host_day_table where day=21 and month=7

host,day,month
merc13.calon.com,21,7
unicompt214.unicomp.net,21,7
ibm-7-enet.jp.interop.net,21,7
133.43.125.147,21,7
dd09-011.compuserve.com,21,7
maple.ics.es.osaka-u.ac.jp,21,7
signal.dra.hmg.gb,21,7
194.166.3.42,21,7
www-relay.pa-x.dec.com,21,7
koguma.ctrl.titech.ac.jp,21,7
