In [1]:
import re
import datetime
import os

from pyspark.sql import Row
from pyspark import SparkContext

In [2]:
month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
    'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12}

APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'

In [3]:
def parse_apache_time(s):

    """ Convert Apache time format into a Python datetime object
    Args:
        s (str): date and time in Apache time format
    Returns:
        datetime: datetime object (ignore timezone for now)
    """
    return datetime.datetime(int(s[7:11]),
                             month_map[s[3:6]],
                             int(s[0:2]),
                             int(s[12:14]),
                             int(s[15:17]),
                             int(s[18:20]))



In [5]:
def parseApacheLogLine(logline):

    """ Parse a line in the Apache Common Log format
    Args:
        logline (str): a line of text in the Apache Common Log format
    Returns:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """

    # A regular expression pattern to extract fields from the log line

    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        return (logline, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = long(0)
    else:
        size = long(match.group(9))
    return (Row(
        host          = match.group(1),
        client_identd = match.group(2),
        user_id       = match.group(3),
        date_time     = parse_apache_time(match.group(4)),
        method        = match.group(5),
        endpoint      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = size
    ), 1)


In [8]:
def parseLogs():

    """ Read and parse log file """
    logFile = os.path.join('apache.access.log_small')
    parsed_logs = (sc
                   .textFile(logFile)
                   .map(parseApacheLogLine)
                   .cache())

    access_logs = (parsed_logs
                   .filter(lambda s: s[1] == 1)
                   .map(lambda s: s[0])
                   .cache())

    failed_logs = (parsed_logs
                   .filter(lambda s: s[1] == 0)
                   .map(lambda s: s[0]))
    failed_logs_count = failed_logs.count()
    if failed_logs_count > 0:
        print 'Number of invalid logline: %d' % failed_logs.count()
        for line in failed_logs.take(20):
            print 'Invalid logline: %s' % line

    print 'Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())
    return parsed_logs, access_logs, failed_logs

In [13]:
sc= SparkContext()

In [14]:
parsed_logs, access_logs, failed_logs = parseLogs()

Read 3432 lines, successfully parsed 3432 lines, failed to parse 0 lines


In [15]:
parsed_logs.take(1)

[(Row(client_identd=u'-', content_size=1839L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 1), endpoint=u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', host=u'in24.inetnebr.com', method=u'GET', protocol=u'HTTP/1.0', response_code=200, user_id=u'-'),
  1)]

In [24]:
access_logs.take(1)

[Row(client_identd=u'-', content_size=1839L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 1), endpoint=u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', host=u'in24.inetnebr.com', method=u'GET', protocol=u'HTTP/1.0', response_code=200, user_id=u'-')]

# Resolviendo con SQL

In [16]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [25]:
interactions_df = sqlContext.createDataFrame(access_logs)

In [26]:
interactions_df.registerTempTable("interactions")

In [32]:
interactions_df

DataFrame[client_identd: string, content_size: bigint, date_time: timestamp, endpoint: string, host: string, method: string, protocol: string, response_code: bigint, user_id: string]

Máximo

In [117]:
tcp_interactions = sqlContext.sql("""SELECT max(content_size) as Maximo FROM interactions""")
tcp_interactions.show()

+------+
|Maximo|
+------+
|887988|
+------+



Mínimo

In [118]:
tcp_interactions = sqlContext.sql("""SELECT min(content_size) as Minimo FROM interactions""")
tcp_interactions.show()

+------+
|Minimo|
+------+
|     0|
+------+



Media

In [119]:
tcp_interactions = sqlContext.sql("""SELECT mean(content_size) as Media FROM interactions""")
tcp_interactions.show()

+------------------+
|             Media|
+------------------+
|16051.863636363636|
+------------------+



Número de peticiones por response code

In [115]:
x=interactions_df.groupBy('response_code').count().collect()
x

[Row(response_code=304, count=219),
 Row(response_code=404, count=22),
 Row(response_code=403, count=1),
 Row(response_code=200, count=3140),
 Row(response_code=302, count=50)]

In [86]:
tcp_interactions = sqlContext.sql("""SELECT COUNT(*) AS Suma,response_code FROM interactions GROUP BY response_code """)
tcp_interactions.show()

+----+-------------+
|Suma|response_code|
+----+-------------+
| 219|          304|
|  22|          404|
|   1|          403|
|3140|          200|
|  50|          302|
+----+-------------+



20 hosts que han sido visitados mas de 10 veces

In [75]:
tcp_interactions = sqlContext.sql("""SELECT COUNT(*) as Suma,host FROM interactions group by host HAVING COUNT(host) > 20""")
tcp_interactions.show()

+----+--------------------+
|Suma|                host|
+----+--------------------+
|  55|   in24.inetnebr.com|
|  28|www-b5.proxy.aol.com|
|  78|ix-min1-02.ix.net...|
|  29|term1-24.sb.west.net|
|  31|       193.84.66.147|
|  32|haraway.ucet.ufl.edu|
|  40|hsccs_gatorbox07....|
|  21|    sakura.crl.go.jp|
|  27| rpgopher.aist.go.jp|
|  27|www-d3.proxy.aol.com|
|  30|www-c1.proxy.aol.com|
|  40|www-b2.proxy.aol.com|
|  37|  port13.wavenet.com|
|  26|piweba1y.prodigy.com|
|  41|ppp1016.po.iijnet...|
|  24|stockyard58.onram...|
|  37|        133.43.96.45|
|  21|      205.161.163.25|
|  39|www-d1.proxy.aol.com|
|  33|       pc-heh.icl.dk|
+----+--------------------+
only showing top 20 rows



10 endpoints más visitados

In [78]:
tcp_interactions = sqlContext.sql("""SELECT COUNT(*) as Suma,endpoint FROM interactions group by endpoint order by count(*) desc""")
tcp_interactions.show()

+----+--------------------+
|Suma|            endpoint|
+----+--------------------+
| 167|/images/KSC-logos...|
| 160|/images/NASA-logo...|
| 122|/images/MOSAIC-lo...|
| 120|/images/WORLD-log...|
| 118|/images/USA-logos...|
| 106|/images/ksclogo-m...|
|  85|                   /|
|  74|/history/apollo/i...|
|  69|/images/launch-lo...|
|  66|/images/ksclogosm...|
|  54|/shuttle/missions...|
|  50|/images/launchmed...|
|  49|/shuttle/countdow...|
|  45|           /ksc.html|
|  44| /shuttle/countdown/|
|  34|/shuttle/missions...|
|  32|    /icons/blank.xbm|
|  32|     /icons/menu.xbm|
|  32|/history/apollo/i...|
|  30|            /images/|
+----+--------------------+
only showing top 20 rows



10 endpoints más visitados que no tienen codigo de respuesta 200

In [149]:
tcp_interactions = sqlContext.sql("""SELECT COUNT(*) as Suma,endpoint FROM interactions where response_code !=200 group by endpoint order by count(*) desc""")
tcp_interactions.show()

+----+--------------------+
|Suma|            endpoint|
+----+--------------------+
|  25|/images/NASA-logo...|
|  24|/images/KSC-logos...|
|  17|/images/MOSAIC-lo...|
|  17|/images/WORLD-log...|
|  16|/images/USA-logos...|
|  10|/images/ksclogo-m...|
|   8|/images/construct...|
|   8|/software/winvn/w...|
|   8|/software/winvn/b...|
|   6|/software/winvn/w...|
|   6|/software/winvn/w...|
|   5|/images/ksclogosm...|
|   5|                   /|
|   5|/shuttle/countdow...|
|   5|/history/apollo/i...|
|   4|/pub/winvn/releas...|
|   4|    /icons/blank.xbm|
|   4|     /icons/menu.xbm|
|   4| /shuttle/countdown/|
|   4|/history/apollo/a...|
+----+--------------------+
only showing top 20 rows



Número de hosts distintos

In [87]:
tcp_interactions = sqlContext.sql("""SELECT COUNT(*) as Suma,host FROM interactions group by host""")
tcp_interactions.count()

311

Número de hosts unicos cada día

In [145]:
tcp_interactions = sqlContext.sql("""SELECT DISTINCT date_time FROM interactions""")
tcp_interactions.show()

+--------------------+
|           date_time|
+--------------------+
|1995-08-01 00:01:...|
|1995-08-01 00:16:...|
|1995-08-01 00:23:...|
|1995-08-01 00:23:...|
|1995-08-01 00:24:...|
|1995-08-01 00:28:...|
|1995-08-01 00:32:...|
|1995-08-01 00:36:...|
|1995-08-01 00:38:...|
|1995-08-01 00:51:...|
|1995-08-01 00:52:...|
|1995-08-01 00:53:...|
|1995-08-01 00:55:...|
|1995-08-01 00:59:...|
|1995-08-01 01:02:...|
|1995-08-01 01:31:...|
|1995-08-01 01:36:...|
|1995-08-01 01:38:...|
|1995-08-01 01:46:...|
|1995-08-01 01:53:...|
+--------------------+
only showing top 20 rows



In [138]:
x=interactions_df.groupBy('date_time').count().collect()

In [155]:
tcp_interactions = sqlContext.sql(""" SELECT day(date_time) as day FROM interactions group by day(date_time)""")
tcp_interactions.show()

+---+
|day|
+---+
|  1|
+---+



In [162]:
tcp_interactions = sqlContext.sql(""" SELECT distinct substring(date_time,9,2) as day FROM interactions""")
tcp_interactions.show()


+---+
|day|
+---+
| 01|
+---+



Media de peticiones diarias por host

In [111]:
tcp_interactions = sqlContext.sql("""SELECT mean(host),host FROM interactions group by host""")
tcp_interactions.show()

+-------------------------+--------------------+
|avg(CAST(host AS DOUBLE))|                host|
+-------------------------+--------------------+
|                     null|ix-sea6-23.ix.net...|
|                     null|grimnet23.idirect...|
|                     null|        hella.stm.it|
|                     null|       198.161.85.36|
|                     null|ix-sd6-29.ix.netc...|
|                     null|     info.telenor.no|
|                     null|dd08-029.compuser...|
|                     null|engei.engei-hs.oy...|
|                     null|  server.indo.net.id|
|                     null|in2pc2.med.niigat...|
|                     null|ix-sf10-28.ix.net...|
|                     null|ip-pdx4-15.telepo...|
|                     null|   in24.inetnebr.com|
|                     null|  pwestec.sierra.net|
|                     null|roseanne06.slip.y...|
|                     null|     204.199.188.113|
|                     null|        206.24.43.11|
|                   

40 endpoints distintos con response code 404

In [114]:
tcp_interactions = sqlContext.sql("""SELECT DISTINCT endpoint FROM interactions where response_code=404""")
tcp_interactions.show()

+--------------------+
|            endpoint|
+--------------------+
|/pub/winvn/readme...|
|/elv/DELTA/uncons...|
|/history/apollo/a...|
|/history/apollo/a...|
|     /sts-71/launch/|
|/history/apollo/a...|
|/history/apollo/a...|
|/shuttle/resource...|
|/www/software/win...|
|/pub/winvn/releas...|
|/history/apollo/a...|
|/history/apollo/a...|
|/history/history.htm|
+--------------------+



Top 25 endpoints que más codigos 404 de respuesta generan

Top 5 días que mas se generaron codigos 404

# Resolviendo con spark

In [12]:
sc.stop()