# Ejercicio LogNASA. Realizado por Daniel Tomé Gordo

Se carga tanto el contexto Spark como la base de datos con la que se trabaja

In [1]:
from pyspark import SparkContext
sc = SparkContext()

In [2]:
archivo = "./apache.access.log_small"
data = sc.textFile(archivo)

## Análisis descriptivo de los datos

Para describir someramente los datos basta con realizar un _take_ que muestra los registros que se indiquen, en este caso 1. Como se ve parece que cada registro está dividido en 5 partes.

- La primera parte podría ser considerada como el _host_ de donde se saca la información ('_in24.inetnebr.com_')
- La segunda parte (_01/Aug/1995:00:00:01 -0400_] podría ser considerada como la fecha y la hora en la que se realiza el acceso, además de algo así como la zona horaria.
- La tercera parte son los endpoints a los que se accede ('_/shuttle/missions/sts-68/news/sts-68-mcc-05.txt_')
- La cuarta parte es el código de respuesta que genera el acceso (_200_)
- La quinta parte son la peticiones a cada código de respuesta (_1839_)

Por otro lado, decir que existen 3432 registros dentro de esta base de datos

In [3]:
data.take(1)

['in24.inetnebr.com - - [01/Aug/1995:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839']

In [4]:
data.count()

3432

## Parseo de los datos

Para ver cuántos grupos distintos existen en la estructura mostrada antes, se usa la siguiente expresión regular para parsear los datos. Luego se creará una función que comprobará si el parseado ha sido el correcto.

In [5]:
import re
estructura_nasa = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'

In [6]:
def parseado(registro):
    salida = re.search(estructura_nasa, registro)
    if salida is None:
        return (registro, 0)
    else:
        return (registro, 1)
fallos = data.map(lambda registro: parseado(registro)).filter(lambda registro: registro[1] == 0).count() 
fallos

0

Efectivamente, no hay ningún registro que no siga esa estructura, por lo que todos los registros estarán bien parseados.

Aquí se guardan los datos parseados (de forma provisional) que serán los usados posteriormente para el análisis.

In [7]:
parsed_data = data.map(lambda registro: parseado(registro)).filter(lambda registro: registro[1] == 1).map(lambda registro : registro[0])

Con esta función se guardan ya los datos bien parseados, divididos por grupos (como luego se observa con el _take_)

In [8]:
def parseado_grupos(registro):
    salida = re.search(estructura_nasa, registro)
    return salida.groups()
parsed_data = parsed_data.map(lambda registro: parseado_grupos(registro))

In [9]:
parsed_data.take(1)

[('in24.inetnebr.com',
  '-',
  '-',
  '01/Aug/1995:00:00:01 -0400',
  'GET',
  '/shuttle/missions/sts-68/news/sts-68-mcc-05.txt',
  'HTTP/1.0',
  '200',
  '1839')]

## Resolución del ejercicio

### 1.  Estadísticas (mínimo, máximo y media)

Para hallar los estadísticos que se piden, primero se debe convertir a formato número las peticiones del código. Tras esto, se aplica con una función _map_ esa conversión al elemento de cada registro que es el número de peticiones. Para hallarlos, se calcula cada uno de ellos y luego se sacan con el _print_.

In [11]:
def numero(x):
    x = re.sub('[^0-9]',"",x) 
    if x !="":
        return int(x)
    else:
        return 0
    
media = parsed_data.map(lambda registro: numero(registro[8])).mean()
minimo = parsed_data.map(lambda registro: numero(registro[8])).min()
maximo = parsed_data.map(lambda registro: numero(registro[8])).max()
print ('Media =', media,'; Mínimo =', minimo,'; Máximo =', maximo)

Media = 16051.863636363621 ; Mínimo = 0 ; Máximo = 887988


### 2. Número de peticiones de cada código de respuesta

En un primer paso se cuentan los distintos códigos de respuesta que aparecen en la base de datos.

In [29]:
n_codigos = parsed_data.map(lambda linea: (linea[7], 1)).distinct().count()

El proceso para contar las peticiones por código es el que sigue:

- La primera _lambda_ convierte cada registro en un registro tipo (nº código, 1). registro[7] es el código de respuesta que se saca de la base de datos ya parseada
- La segunda _lambda_ reduce por código sumando todos los registros obtenidos en el anterior
- Con el _take_ se sacan los códigos hallados antes

In [30]:
peticiones_codigo = (parsed_data.map(lambda registro: (registro[7], 1)).reduceByKey(lambda a, b: a + b))

In [31]:
peticiones_codigo.take(n_codigos)

[('304', 219), ('404', 22), ('302', 50), ('200', 3140), ('403', 1)]

### 3. Halla 20 hosts que hayan sido vistos más de 10 veces

En este caso el proceso inicial es muy similar al del anterior caso, se sacan los hosts en formato (_clave, valor_) y se suman. El cambio viene en el siguiente paso, en este caso se filtra que los valores sean superiores a 10.

Para la salida, se pueden usar varios métodos. Por ejemplo un _take_ que sacaría 20 cualesquiera o, con un _takeOrdered_ que saldrían los 20 hosts que más veces se han visitado

In [16]:
hosts20 = parsed_data.map(lambda registro: (registro[0],1)).reduceByKey(lambda a, b: a + b).filter(lambda x: x[1] > 10)
hosts20.take(20)

[('prakinf2.prakinf.tu-ilmenau.de', 14),
 ('centauri.tksc.nasda.go.jp', 13),
 ('cs1-08.leh.ptd.net', 23),
 ('www-a1.proxy.aol.com', 23),
 ('133.68.18.180', 17),
 ('ip55.van2.pacifier.com', 43),
 ('hsccs_gatorbox07.unm.edu', 40),
 ('stockyard17.onramp.net', 24),
 ('pme609.onramp.awinc.com', 22),
 ('lutzp.tigernet.net', 12),
 ('maui56.maui.net', 23),
 ('193.84.66.147', 31),
 ('143.158.26.50', 29),
 ('adam.tower.com.au', 44),
 ('www-d1.proxy.aol.com', 39),
 ('pc-heh.icl.dk', 33),
 ('dd15-053.compuserve.com', 11),
 ('ts6-11.westwood.ts.ucla.edu', 15),
 ('ad14-027.compuserve.com', 16),
 ('168.78.14.166', 24)]

### 4. ¿Cuáles son los 10 endpoints más visitados?

En este apartado el funcionamiento es el mismo que en los anteriores, se consigue cada _endpoint_ en un formato (_clave, valor_) y luego se reduce sumando las veces que se repite cada _endpoint_. Para ver cúales son los más visitados, se usa la función _takeOrdered_ que saca los 10 más visitados y los ordena de forma decreciente por valor con el -x[1]. Si se usase x[0] en el lambda se ordenaría por la clave (en este caso no tendría ningún sentido ordenar, además tiene formato _string_)

In [17]:
top10endpoints = parsed_data.map(lambda registro: (registro[5],1)).reduceByKey(lambda a, b: a + b)
top10endpoints.takeOrdered(10, lambda x: -x[1])

[('/images/KSC-logosmall.gif', 167),
 ('/images/NASA-logosmall.gif', 160),
 ('/images/MOSAIC-logosmall.gif', 122),
 ('/images/WORLD-logosmall.gif', 120),
 ('/images/USA-logosmall.gif', 118),
 ('/images/ksclogo-medium.gif', 106),
 ('/', 85),
 ('/history/apollo/images/apollo-logo1.gif', 74),
 ('/images/launch-logo.gif', 69),
 ('/images/ksclogosmall.gif', 66)]

### 5. ¿Cuáles son 10 endpoints más visitados sin código 200?

El proceso es exactamente el mismo que el del apartado anterior, el único cambio es que en este caso se filtra previamente los códigos que no sean 200.

In [18]:
top10endpoints_no200 = (parsed_data.filter(lambda registro: registro[7] != '200')
                        .map(lambda registro: (registro[5], 1))
                        .reduceByKey(lambda a, b: a+b))
top10endpoints_no200.takeOrdered(10, lambda x: -x[1])

[('/images/NASA-logosmall.gif', 25),
 ('/images/KSC-logosmall.gif', 24),
 ('/images/MOSAIC-logosmall.gif', 17),
 ('/images/WORLD-logosmall.gif', 17),
 ('/images/USA-logosmall.gif', 16),
 ('/images/ksclogo-medium.gif', 10),
 ('/software/winvn/bluemarb.gif', 8),
 ('/software/winvn/winvn.html', 8),
 ('/images/construct.gif', 8),
 ('/software/winvn/wvsmall.gif', 6)]

### 6. ¿Cuántos hosts únicos existen?

Se cogen los registros, que están en el primer lugar de los datos parseados, y se cuentan los distintos

In [19]:
parsed_data.map(lambda registro: registro[0]).distinct().count()

311

### 7. ¿Cuántos hosts diarios se visitan por día?

Se importa en primer lugar _datetime_ para poder convertir las fechas. Se crea también una funcion de conversión de la fecha que coge el cuarto grupo de los datos parseados y los convierte en una fecha del tipo _día/mes/año_ (el %b se usa porque en el original el mes viene en 3 letras, Aug). 

El proceso para sacar los hosts, es el mismo que en todos los apartados. Se usa en _collect_ porque, en un principio, no se deberían saber cuántas fechas distintas hay en el dataset. En este caso, al ser una única fecha se podría usar un _take_(1), por ejemplo.

In [20]:
from datetime import datetime
def conv_fecha(registro):
    fecha = registro[3]
    return datetime.strptime(fecha[:11], "%d/%b/%Y")

host_diarios = parsed_data.map(lambda registro: (conv_fecha(registro), 1)).reduceByKey(lambda a, b: a + b)
host_diarios.collect()

[(datetime.datetime(1995, 8, 1, 0, 0), 3432)]

### 8. ¿Cuántos hosts únicos hay por día?

En este caso, es inútil hacerlo pues como se ha visto en los dos apartados anteriores sólo hay un día y existen 311 registros únicos. De esta manera, se sabe ya de antemano el resultado que debe salir de este apartado, 311 el día 1 de agosto de 1995. Aún así, se realizará el código para futuras bases de datos que contengan más de una fecha.

El proceso del código es bastante complejo. En primer lugar se crea una función _lambda_ que agrupa la fecha (con la función creada en el ejercicio anterior) y el nombre del host, para poder luego trabajar con él. Se agrupa por la clave y luego se usa un _mapValues_ para poder operar por la parte valor (la del host) ordenándolos (con el set). La segunda parte al tener un formato _clave, valor_ es un diccionario. Con el último map, se consigue un único registro cuya clave es la fecha y el valor es la longitud del diccionario, es decir los hosts únicos.

In [21]:
unicos_dia = (parsed_data.map(lambda registro: (conv_fecha(registro), registro[0]))
              .groupByKey()
              .mapValues(set)
              .map(lambda x: (x[0], len(x[1]))))
unicos_dia.collect()

[(datetime.datetime(1995, 8, 1, 0, 0), 311)]

### 9. ¿Cuál es la media de las peticiones diarias por host?

En este caso se realiza algo similar al proceso del anterior apartado, pero en vez de colocar cada host como el segundo elemento del registro se coloca la cantidad de hosts totales. Posteriormente se unen ambos RDD, *unicos_dia* y *media_peticiones* y se hace un _map_ para colocar en primer lugar la fecha, y en segundo se hace una división (promedio) entre las peticiones totales (3432) y las hosts únicos (311) accediendo al primer y segundo lugar de la segunda parte de la unión.

In [24]:
fecha_total = (parsed_data.map(lambda registro: (conv_fecha(registro), registro[0])).groupByKey().mapValues(len))
media_peticiones = fecha_total.join(unicos_dia).map(lambda a: (a[0], (a[1][0])/(a[1][1])))
media_peticiones.collect()

[(datetime.datetime(1995, 8, 1, 0, 0), 11.035369774919614)]

### 10. Lista de 40 endpoints que generen error 404

En este caso se filtra en primer lugar todos los registros que tienen en la posición del código el valor 404, se saca cada endpoint en una lista

In [25]:
top40_404 = parsed_data.filter(lambda registro: registro[7] == '404').map(lambda line: line[5])         
top40_404.take(40)

['/shuttle/resources/orbiters/discovery.gif',
 '/pub/winvn/release.txt',
 '/www/software/winvn/winvn.html',
 '/history/history.htm',
 '/elv/DELTA/uncons.htm',
 '/sts-71/launch/',
 '/history/apollo/apollo-13.html',
 '/history/apollo/a-001/a-001-patch-small.gif',
 '/history/apollo/a-001/movies/',
 '/history/apollo/a-001/a-001-patch-small.gif',
 '/history/apollo/a-001/movies/',
 '/history/apollo/a-001/a-001-patch-small.gif',
 '/history/apollo/a-001/images/',
 '/history/apollo/a-001/a-001-patch-small.gif',
 '/history/apollo/a-004/a-004-patch-small.gif',
 '/history/apollo/a-004/movies/',
 '/history/apollo/a-004/a-004-patch-small.gif',
 '/pub/winvn/release.txt',
 '/pub/winvn/readme.txt',
 '/pub/winvn/release.txt',
 '/pub/winvn/readme.txt',
 '/pub/winvn/release.txt']

### 11. Top 25 de los que más errores 404 generan

Los primeros pasos de este apartado son iguales al del anterior pero añadiendo a cada _endpoint_ un 1, para luego contearlo y reducirlo por la clave. Finalmente, con el _takeOrdered_ se saca el top-25 de los errores (aunque en este caso no hay tantos errores)

In [25]:
top25_404 = (parsed_data.filter(lambda registro: registro[7] == '404')
             .map(lambda registro: (registro[5], 1))
             .reduceByKey(lambda a, b: a+b))
top25_404.takeOrdered(25, lambda x: -x[1])

[('/pub/winvn/release.txt', 4),
 ('/history/apollo/a-001/a-001-patch-small.gif', 4),
 ('/history/apollo/a-004/a-004-patch-small.gif', 2),
 ('/history/apollo/a-001/movies/', 2),
 ('/pub/winvn/readme.txt', 2),
 ('/shuttle/resources/orbiters/discovery.gif', 1),
 ('/www/software/winvn/winvn.html', 1),
 ('/history/history.htm', 1),
 ('/history/apollo/apollo-13.html', 1),
 ('/elv/DELTA/uncons.htm', 1),
 ('/sts-71/launch/', 1),
 ('/history/apollo/a-001/images/', 1),
 ('/history/apollo/a-004/movies/', 1)]

### 12. Top 5 de días que se generan un código 404

En este caso, también es inútil hacer un top 5 de días cuando sólo existe un día en el dataset. A pesar de ello, se hará un código por si existiese más de un día. El funcionamiento es similar al de los apartados 7 y 8. En este caso se filtra los errores 404 se crea una lista con cada fecha (única en este caso) y luego se suman por cada clave. Finalmente, se sacan los 5 que más tienen en orden descendente.

In [26]:
top5dias_404 = (parsed_data.filter(lambda registro: registro[7] == '404')
                .map(lambda registro: (conv_fecha(registro), 1))
                .reduceByKey(lambda a, b: a+b))
top5dias_404.takeOrdered(5, lambda x: -x[1])

[(datetime.datetime(1995, 8, 1, 0, 0), 22)]