# Ejercicio 4b
# =============

Lectura de datos

In [52]:
import re
import datetime
import os

from pyspark.sql import Row
from pyspark import SparkContext

month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
    'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12}

APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'





In [2]:
def parse_apache_time(s):

    """ Convert Apache time format into a Python datetime objectArgs:
        s (str): date and time in Apache time format
    Returns:
        datetime: datetime object (ignore timezone for now)
    """
    return datetime.datetime(int(s[7:11]),
                             month_map[s[3:6]],
                             int(s[0:2]),
                             int(s[12:14]),
                             int(s[15:17]),
                             int(s[18:20]))

In [3]:
def parseApacheLogLine(logline):

    """ Parse a line in the Apache Common Log format
    Args:
        logline (str): a line of text in the Apache Common Log format
    Returns:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """

    # A regular expression pattern to extract fields from the log line

    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        return (logline, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = long(0)
    else:
        size = long(match.group(9))
    return (Row(
        host          = match.group(1),
        client_identd = match.group(2),
        user_id       = match.group(3),
        date_time     = parse_apache_time(match.group(4)),
        method        = match.group(5),
        endpoint      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = size
    ), 1)


In [4]:
def parseLogs():

    """ Read and parse log file """
    logFile = os.path.join( 'apache.access.log_small-')
    parsed_logs = (sc
                   .textFile(logFile)
                   .map(parseApacheLogLine)
                   .cache())

    access_logs = (parsed_logs
                   .filter(lambda s: s[1] == 1)
                   .map(lambda s: s[0])
                   .cache())

    failed_logs = (parsed_logs
                   .filter(lambda s: s[1] == 0)
                   .map(lambda s: s[0]))
    failed_logs_count = failed_logs.count()
    if failed_logs_count > 0:
        print 'Number of invalid logline: %d' % failed_logs.count()
        for line in failed_logs.take(20):
            print 'Invalid logline: %s' % line

    print 'Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())
    return parsed_logs, access_logs, failed_logs



In [5]:
sc= SparkContext()

parsed_logs, access_logs, failed_logs = parseLogs()

Read 3432 lines, successfully parsed 3432 lines, failed to parse 0 lines


In [6]:
#cogemos una muestra de 5 filas

parsed_logs.take(5)

[(Row(client_identd=u'-', content_size=1839L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 1), endpoint=u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', host=u'in24.inetnebr.com', method=u'GET', protocol=u'HTTP/1.0', response_code=200, user_id=u'-'),
  1),
 (Row(client_identd=u'-', content_size=0L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 7), endpoint=u'/', host=u'uplherc.upl.com', method=u'GET', protocol=u'HTTP/1.0', response_code=304, user_id=u'-'),
  1),
 (Row(client_identd=u'-', content_size=0L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 8), endpoint=u'/images/ksclogo-medium.gif', host=u'uplherc.upl.com', method=u'GET', protocol=u'HTTP/1.0', response_code=304, user_id=u'-'),
  1),
 (Row(client_identd=u'-', content_size=0L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 8), endpoint=u'/images/MOSAIC-logosmall.gif', host=u'uplherc.upl.com', method=u'GET', protocol=u'HTTP/1.0', response_code=304, user_id=u'-'),
  1),
 (Row(client_identd=u'-', content_size=0L, date_time=date

# 1. Minimo, máximo y media del tamaño de las peticiones

In [7]:
#definir un contexto spark
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [8]:
#Generar dataframe RDD

top_access_logs=sqlContext.createDataFrame(access_logs)
top_access_logs.take(5)

[Row(client_identd=u'-', content_size=1839, date_time=datetime.datetime(1995, 8, 1, 0, 0, 1), endpoint=u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', host=u'in24.inetnebr.com', method=u'GET', protocol=u'HTTP/1.0', response_code=200, user_id=u'-'),
 Row(client_identd=u'-', content_size=0, date_time=datetime.datetime(1995, 8, 1, 0, 0, 7), endpoint=u'/', host=u'uplherc.upl.com', method=u'GET', protocol=u'HTTP/1.0', response_code=304, user_id=u'-'),
 Row(client_identd=u'-', content_size=0, date_time=datetime.datetime(1995, 8, 1, 0, 0, 8), endpoint=u'/images/ksclogo-medium.gif', host=u'uplherc.upl.com', method=u'GET', protocol=u'HTTP/1.0', response_code=304, user_id=u'-'),
 Row(client_identd=u'-', content_size=0, date_time=datetime.datetime(1995, 8, 1, 0, 0, 8), endpoint=u'/images/MOSAIC-logosmall.gif', host=u'uplherc.upl.com', method=u'GET', protocol=u'HTTP/1.0', response_code=304, user_id=u'-'),
 Row(client_identd=u'-', content_size=0, date_time=datetime.datetime(1995, 8, 1, 0, 0, 8)

In [9]:
interactions_df = sqlContext.createDataFrame(access_logs)

In [10]:
interactions_df.registerTempTable("interactions")

In [11]:
maximo = sqlContext.sql("""SELECT max(content_size) FROM interactions""")


maximo.show()

+-----------------+
|max(content_size)|
+-----------------+
|           887988|
+-----------------+



In [12]:
minimo = sqlContext.sql("""SELECT min (content_size) FROM interactions""")

minimo.show()

+-----------------+
|min(content_size)|
+-----------------+
|                0|
+-----------------+



In [13]:
media = sqlContext.sql("""SELECT mean(content_size) FROM interactions""")
media.show()

+------------------+
| avg(content_size)|
+------------------+
|16051.863636363636|
+------------------+



 Otra forma de calcularlo:

In [14]:

#parsed_logs.take(1)
access_logs.take(1)

[Row(client_identd=u'-', content_size=1839L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 1), endpoint=u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', host=u'in24.inetnebr.com', method=u'GET', protocol=u'HTTP/1.0', response_code=200, user_id=u'-')]

In [15]:
#máximo
#parsed_logs.map(lambda x: x[0].content_size).max() #hay x[0] y x[1]
access_logs.map(lambda x: x.content_size).max() #sol hay una componente

887988L

In [16]:
#minimo
access_logs.map(lambda x: x.content_size).min()

0L

In [17]:
#media
access_logs.map(lambda x: x.content_size).mean()

16051.863636363621

Otra forma

In [18]:
content_sizes= access_logs.map(lambda log: log.content_size).cache()

In [19]:
print ("numero obs: %i, Min: %i, Max: %s, Mean: %s)"%(
       content_sizes.reduce(lambda a, b: a + b)/ content_sizes.count(),
       content_sizes.min(),
       content_sizes.max(),
       content_sizes.mean()))
#print (content_sizes.mean())

numero obs: 16051, Min: 0, Max: 887988, Mean: 16051.8636364)


## 2. Numero de peticiones de cada codigo de respuesta:

In [20]:
#response code en una de las componentes de la lista access_logs
responseCode=access_logs.map(lambda x: x.response_code)
print(responseCode.take(10))
responseCodeToCount=access_logs.map(lambda x: [x.response_code,1]).reduceByKey(lambda a,b: a+b)
responseCodeToCount.collect()
#


[200, 304, 304, 304, 304, 200, 304, 200, 200, 200]


[(200, 3140), (304, 219), (403, 1), (404, 22), (302, 50)]

# 3. Mostrar 20 host que han sido visitados más de 10 veces

In [21]:
access_logs.take(1)
#host es un componente de la lista

[Row(client_identd=u'-', content_size=1839L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 1), endpoint=u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', host=u'in24.inetnebr.com', method=u'GET', protocol=u'HTTP/1.0', response_code=200, user_id=u'-')]

In [22]:
hostCode=access_logs.map(lambda x: x.host)
print(hostCode.take(5))

[u'in24.inetnebr.com', u'uplherc.upl.com', u'uplherc.upl.com', u'uplherc.upl.com', u'uplherc.upl.com']


In [23]:
hostToCount=access_logs.map(lambda x: [x.host,1]).reduceByKey(lambda a,b: a+b).filter(lambda x: x[1]>10)
hostToCount.take(20)

[(u'www-d1.proxy.aol.com', 39),
 (u'www-c1.proxy.aol.com', 30),
 (u'cmr2w1.cc.nda.ac.jp', 11),
 (u'lutzp.tigernet.net', 12),
 (u'ix-dfw12-08.ix.netcom.com', 17),
 (u'ncg-69.axionet.com', 12),
 (u'128.135.36.35', 12),
 (u'dialip129.gov.bc.ca', 21),
 (u'pm9.j51.com', 28),
 (u'mallard2.duc.auburn.edu', 16),
 (u'dd15-053.compuserve.com', 11),
 (u'dd10-046.compuserve.com', 13),
 (u'pm6a3.sover.net', 11),
 (u'www-b2.proxy.aol.com', 40),
 (u'itws.info.eng.niigata-u.ac.jp', 14),
 (u'ix-aus4-15.ix.netcom.com', 13),
 (u'pppa006.compuserve.com', 17),
 (u'ts6-11.westwood.ts.ucla.edu', 15),
 (u'kenko2.hyogo-ths.hyogo.kobe.jp', 20),
 (u'www-d3.proxy.aol.com', 27)]

# 4. Mostrar los 10 endpoints más visitados

In [24]:
#miro la primera fila para saber como vienen expresados los endpoints
access_logs.take(1)

[Row(client_identd=u'-', content_size=1839L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 1), endpoint=u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', host=u'in24.inetnebr.com', method=u'GET', protocol=u'HTTP/1.0', response_code=200, user_id=u'-')]

In [25]:
endpointToCount=access_logs.map(lambda x: [x.endpoint,1]).reduceByKey(lambda a,b: a+b)
endpointToCount.collect()

[(u'/shuttle/missions/sts-67/images/images.html', 3),
 (u'/history/apollo/apollo-10/apollo-10-info.html', 1),
 (u'/shuttle/missions/sts-70/images/DSC-95EC-0001.gif', 3),
 (u'/shuttle/technology/sts-newsref/sts-gnnc.html', 1),
 (u'/shuttle/missions/sts-67/mission-sts-67.html', 5),
 (u'/elv/DELTA/euves.jpg', 2),
 (u'/shuttle/missions/sts-63/sts-63-info.html', 1),
 (u'/shuttle/technology/sts-newsref/sts-lc39.html', 1),
 (u'/persons/nasa-cm/hec-sm.gif', 1),
 (u'/elv/TITAN/mars1s.jpg', 2),
 (u'/shuttle/missions/sts-71/images/images.html', 11),
 (u'/shuttle/missions/sts-71/movies/crew-arrival-t38.mpg', 3),
 (u'/software/winvn/faq/WINVNFAQ-II-1.html', 1),
 (u'/shuttle/missions/sts-70/movies/sts-70-roll-back-pad.mpg', 1),
 (u'/shuttle/technology/sts-newsref/stsover-chron.html', 1),
 (u'/history/apollo/apollo-16/apollo-16-info.html', 1),
 (u'/history/apollo/a-004/a-004-patch-small.gif', 2),
 (u'/history/apollo/apollo-13/apollo-13-info.html', 7),
 (u'/shuttle/missions/sts-71/images/KSC-95EC-0913

In [26]:
#ordenar de mayor a menor
endpointCountOrder=endpointToCount.sortBy(lambda b: b[1], ascending=False)
endpointCountOrder.take(10)

[(u'/images/KSC-logosmall.gif', 167),
 (u'/images/NASA-logosmall.gif', 160),
 (u'/images/MOSAIC-logosmall.gif', 122),
 (u'/images/WORLD-logosmall.gif', 120),
 (u'/images/USA-logosmall.gif', 118),
 (u'/images/ksclogo-medium.gif', 106),
 (u'/', 85),
 (u'/history/apollo/images/apollo-logo1.gif', 74),
 (u'/images/launch-logo.gif', 69),
 (u'/images/ksclogosmall.gif', 66)]

# 5. Los 10 endpoint más visitados que no tienen código respuesta =200

In [27]:
#responseCode=access_logs.map(lambda (x,y): (x.response_code, y.endpoint)).take(2)
responseEndCode=access_logs.map(lambda x:[x.response_code, x.endpoint])
responseEndCode.take(10)
#la primera componente es la de response_code y la segunda la de endpoint

[[200, u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt'],
 [304, u'/'],
 [304, u'/images/ksclogo-medium.gif'],
 [304, u'/images/MOSAIC-logosmall.gif'],
 [304, u'/images/USA-logosmall.gif'],
 [200, u'/images/launch-logo.gif'],
 [304, u'/images/WORLD-logosmall.gif'],
 [200, u'/history/skylab/skylab.html'],
 [200, u'/images/launchmedium.gif'],
 [200, u'/history/skylab/skylab-small.gif']]

In [28]:
#filtro quitando los elementos tales que su primera componente sea distinta de 200
responseEndCode=access_logs.map(lambda x:[x.response_code, x.endpoint]).filter(lambda x: x[0]!=200)
responseEndCode.take(10)

[[304, u'/'],
 [304, u'/images/ksclogo-medium.gif'],
 [304, u'/images/MOSAIC-logosmall.gif'],
 [304, u'/images/USA-logosmall.gif'],
 [304, u'/images/WORLD-logosmall.gif'],
 [304, u'/images/NASA-logosmall.gif'],
 [304, u'/images/NASA-logosmall.gif'],
 [304, u'/images/MOSAIC-logosmall.gif'],
 [304, u'/images/USA-logosmall.gif'],
 [304, u'/images/WORLD-logosmall.gif']]

In [29]:
#una vez que ya he filtrado por los que son distintos de 200, me quedo solo con la componente endpoint
# y aplico lo mismo que en el apartado anterior
EndCount=responseEndCode.map(lambda x:(x[1],1)).reduceByKey(lambda a,b: a+b).sortBy(lambda b: b[1], ascending=False)
EndCount.take(10)

[(u'/images/NASA-logosmall.gif', 25),
 (u'/images/KSC-logosmall.gif', 24),
 (u'/images/WORLD-logosmall.gif', 17),
 (u'/images/MOSAIC-logosmall.gif', 17),
 (u'/images/USA-logosmall.gif', 16),
 (u'/images/ksclogo-medium.gif', 10),
 (u'/software/winvn/winvn.html', 8),
 (u'/images/construct.gif', 8),
 (u'/software/winvn/bluemarb.gif', 8),
 (u'/software/winvn/wvsmall.gif', 6)]

# 6. Calcular el número de hosts distintos

In [55]:
NdiferenthostCode=access_logs.map(lambda x: (x.host,1)).reduceByKey(lambda a,b: a+b)

numero=NdiferenthostCode.count()
numero

311

# 7. Contar el número de hosts únicos cada día

In [31]:
#una linea del fichero de lectura
#in24.inetnebr.com - - [01/Aug/1995:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839
#voy a ver como escribe la fecha cuando he leído el fichero
access_logs.take(1)

[Row(client_identd=u'-', content_size=1839L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 1), endpoint=u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', host=u'in24.inetnebr.com', method=u'GET', protocol=u'HTTP/1.0', response_code=200, user_id=u'-')]

In [40]:
#Se puede observar como lee la fecha 1 de agosto de 1995:: datetime(año, mes, día, horas, minutos, segundos)

#Primero, voy a ver el número de dias que hay para tener una idea:

NdaydifCode=access_logs.map(lambda x:(x.date_time.day,1)).reduceByKey(lambda a,b: a+b)
NdaydifCode.collect()

# 3432 son el numero de filas que hay de observaciones en nuestra muestra

[(1, 3432)]

==>Todos son el dia 1, luego habra el mismo numero de hosts únicos que en el apartado anterior, 311 para el día 1

# 8. Calcular la media de peticiones diarias por host

In [54]:
#Como sólo hay un día, basta con calcular el numero de peticiones por host:

NdiferenthostCode=access_logs.map(lambda x: (x.host,1)).reduceByKey(lambda a,b: a+b).collect()
NdiferenthostCode

[(u'dd12-062.compuserve.com', 2),
 (u'ednet1.osl.or.gov', 2),
 (u'202.70.0.128', 1),
 (u'www-d1.proxy.aol.com', 39),
 (u'www-c1.proxy.aol.com', 30),
 (u'cmr2w1.cc.nda.ac.jp', 11),
 (u'ncg-85.axionet.com', 5),
 (u'stockyard21.onramp.net', 6),
 (u'pppa012.compuserve.com', 9),
 (u'sanantonio-1-5.i-link.net', 8),
 (u'engei.engei-hs.oyama.tochigi.jp', 10),
 (u'wtaskr1.ccinet.ab.ca', 9),
 (u'sprite131.azstarnet.com', 5),
 (u'198.161.85.36', 1),
 (u'lutzp.tigernet.net', 12),
 (u'ix-dgr-il1-15.ix.netcom.com', 7),
 (u'ppp20.pacificrim.net', 3),
 (u'ix-dfw12-08.ix.netcom.com', 17),
 (u'ncg-69.axionet.com', 12),
 (u'eris139.mayo.edu', 1),
 (u'koriel.sun.com', 1),
 (u'scctn02.sp.ac.sg', 3),
 (u'128.135.36.35', 12),
 (u'pm054-29.dialip.mich.net', 4),
 (u'dialip129.gov.bc.ca', 21),
 (u'pm9.j51.com', 28),
 (u'dynamic-ara3.csuchico.edu', 1),
 (u'csclass.utdallas.edu', 1),
 (u'cc-sst-mg1-dip1-12.massey.ac.nz', 5),
 (u'dialip-24.athenet.net', 1),
 (u'delorme.richard.i4e000.ic.gc.ca', 4),
 (u'31.dn258-la

# 9. Mostrar una lista de 40 endpoints que generan codigo de respuesta=404

In [56]:
access_logs.filter(lambda x: x.response_code==404).map(lambda x: (x.endpoint)).take(40)

[u'/shuttle/resources/orbiters/discovery.gif',
 u'/pub/winvn/release.txt',
 u'/www/software/winvn/winvn.html',
 u'/history/history.htm',
 u'/elv/DELTA/uncons.htm',
 u'/sts-71/launch/',
 u'/history/apollo/apollo-13.html',
 u'/history/apollo/a-001/a-001-patch-small.gif',
 u'/history/apollo/a-001/movies/',
 u'/history/apollo/a-001/a-001-patch-small.gif',
 u'/history/apollo/a-001/movies/',
 u'/history/apollo/a-001/a-001-patch-small.gif',
 u'/history/apollo/a-001/images/',
 u'/history/apollo/a-001/a-001-patch-small.gif',
 u'/history/apollo/a-004/a-004-patch-small.gif',
 u'/history/apollo/a-004/movies/',
 u'/history/apollo/a-004/a-004-patch-small.gif',
 u'/pub/winvn/release.txt',
 u'/pub/winvn/readme.txt',
 u'/pub/winvn/release.txt',
 u'/pub/winvn/readme.txt',
 u'/pub/winvn/release.txt']

In [58]:
#Veo que hay algún endpoint igual y tienen que ser distintos, por lo que falta añadir distinct()

access_logs.filter(lambda x: x.response_code==404).map(lambda x: (x.endpoint)).distinct().take(40)

[u'/history/apollo/a-001/movies/',
 u'/www/software/winvn/winvn.html',
 u'/shuttle/resources/orbiters/discovery.gif',
 u'/pub/winvn/readme.txt',
 u'/history/apollo/apollo-13.html',
 u'/history/apollo/a-001/a-001-patch-small.gif',
 u'/sts-71/launch/',
 u'/elv/DELTA/uncons.htm',
 u'/history/apollo/a-001/images/',
 u'/history/apollo/a-004/a-004-patch-small.gif',
 u'/history/apollo/a-004/movies/',
 u'/history/history.htm',
 u'/pub/winvn/release.txt']

# 10. Mostrar el top 25 de los endpoints distintos que generan codigo respuesta=404

In [171]:
access_logs.filter(lambda x: x.response_code==404).map(lambda x: (x.endpoint,1)).reduceByKey(lambda a,b: a+b).sortBy(lambda b: b[1], ascending=False).collect()

[(u'/history/apollo/a-001/a-001-patch-small.gif', 4),
 (u'/pub/winvn/release.txt', 4),
 (u'/history/apollo/a-001/movies/', 2),
 (u'/pub/winvn/readme.txt', 2),
 (u'/history/apollo/a-004/a-004-patch-small.gif', 2),
 (u'/www/software/winvn/winvn.html', 1),
 (u'/shuttle/resources/orbiters/discovery.gif', 1),
 (u'/history/apollo/apollo-13.html', 1),
 (u'/sts-71/launch/', 1),
 (u'/elv/DELTA/uncons.htm', 1),
 (u'/history/apollo/a-001/images/', 1),
 (u'/history/apollo/a-004/movies/', 1),
 (u'/history/history.htm', 1)]

# 11. El top 5 de días que generaron código respuesta =404

In [51]:
access_logs.filter(lambda x: x.response_code==404).map(lambda x:x.date_time.day).take(5)
# el unico dia que hay es el 1 de agosto

[1, 1, 1, 1, 1]