# Práctica pyspark

Álvaro Rodríguez López

### Crear RDD

In [1]:
from pyspark import SparkContext

sc = SparkContext()


In [2]:
data_file = "./data/apache.access.log_small"
raw_data = sc.textFile(data_file)

### Comprobar carga

In [3]:
#comprobar número de registros
raw_data.count()

3432

In [4]:
# comprobar cinco primeros registros
raw_data.take(5)

['in24.inetnebr.com - - [01/Aug/1995:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839',
 'uplherc.upl.com - - [01/Aug/1995:00:00:07 -0400] "GET / HTTP/1.0" 304 0',
 'uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/ksclogo-medium.gif HTTP/1.0" 304 0',
 'uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/MOSAIC-logosmall.gif HTTP/1.0" 304 0',
 'uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/USA-logosmall.gif HTTP/1.0" 304 0']

In [39]:
# comprobar que todos los regristros coinciden con un formato log apache
import re
# utilizamos el modulo regular expresions para parsear las distintas partes de los logs de apache.
# definimos la cadena de expresiones
apache_log_formato = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
# función search del module regular expression para comprobar si las distintas líneas coinciden con el formato
def parse_log_apache(linea):
    match = re.search(apache_log_formato, linea)
    if match is None:
        return 0
    else:
        return 1
total_logs = raw_data.count() # número total de registros
fallos = raw_data.map(lambda linea: parse_log_apache(linea)).filter(lambda linea: linea == 0).count() # filtramos los registros que no coinciden
print('De un total de {} logs, {} fallaron en la conversión'.format(total_logs,fallos))
# todos los registros coinciden con la expresión apache_log_formato

De un total de 3432 logs, 0 fallaron en la conversión


### Parseando 

In [13]:
def map_log_apache(linea):
    match = re.search(apache_log_formato,linea)
    return(match.groups()) 

parsed_data = raw_data.map(map_log_apache)

# parseamos cada registro conforme a la expresión apache_log_formato


In [14]:
parsed_data.count() # Comprobamos que el número de registros es el mismo

3432

In [29]:
# definimos función para trabajar con las fechas
from datetime import datetime
def fechas(linea):
    fecha = linea[3]
    return datetime.strptime(fecha[:11], "%d/%b/%Y")

In [None]:
# definimos función para trabajar con números size
def convertir_size(size): # convertir los datos en números
    size = re.sub('[^0-9]',"",size) 
    if size =="":
        return 0
    else:
        return int(size)


### Ejercicios

- Mínimo, Máximo, Media del tamaño de las peticiones (size)

In [38]:

# convertir el registro nueve y extraer estadísticas
resultado = parsed_data.map(lambda linea: convertir_size(linea[8])).stats() 
resultado

(count: 3432, mean: 16051.863636363634, stdev: 53247.8157482, max: 887988.0, min: 0.0)

In [17]:
# alternativamente
media = parsed_data.map(lambda linea: convertir_size(linea[8])).mean()
minimo = parsed_data.map(lambda linea: convertir_size(linea[8])).min()
maximo = parsed_data.map(lambda linea: convertir_size(linea[8])).max()
print('Los tamaños de los logs tienen una media de {}, un máximo de {} y un mínimo de {}'
      .format(round(media,3),minimo,maximo))


Los tamaños de los logs tienen una media de 16051.864, un máximo de 0 y un mínimo de 887988


- Núm. peticiones de cada código de respuesta (respon

In [18]:
# calculamos el número de códigos de respuesta distintos
num_codigos = parsed_data.map(lambda linea: (linea[7], 1)).distinct().count()

#  transformaciones map, reducebykey, acción takeOrdered
contar_codigos = (parsed_data.map(lambda linea: (linea[7], 1)) # valores de response_code
          .reduceByKey(lambda a, b: a + b) #une los valores para cada clave usando una función de reducción asociativa. 
          .takeOrdered(num_codigos, lambda x: -x[1])) # tomo una muestra odenadada de cuantos valores distintos existen
contar_codigos # ordenados de maás a menos repetido

[('200', 3140), ('304', 219), ('302', 50), ('404', 22), ('403', 1)]

- Mostrar 20 hosts que han sido visitados más de 10 veces.

In [19]:
# transformaciónes map, reducebykey y filter, acción take
resultado = (parsed_data.map(lambda linea: (linea[0],1)) # valores de endpoints
            .reduceByKey(lambda a, b: a + b) #funcion reduccion para clave valor
            .filter(lambda linea: linea[1] > 10)
            .take(20))
resultado
# se muestran 20 hosts visitados más de 10 veces

[('prakinf2.prakinf.tu-ilmenau.de', 14),
 ('centauri.tksc.nasda.go.jp', 13),
 ('www-a1.proxy.aol.com', 23),
 ('133.68.18.180', 17),
 ('hsccs_gatorbox07.unm.edu', 40),
 ('pme609.onramp.awinc.com', 22),
 ('lutzp.tigernet.net', 12),
 ('maui56.maui.net', 23),
 ('193.84.66.147', 31),
 ('dws.urz.uni-magdeburg.de', 13),
 ('168.78.14.166', 24),
 ('ix-sf10-28.ix.netcom.com', 12),
 ('adam.tower.com.au', 44),
 ('www-d1.proxy.aol.com', 39),
 ('pc-heh.icl.dk', 33),
 ('www-c3.proxy.aol.com', 17),
 ('143.158.26.50', 29),
 ('dd15-053.compuserve.com', 11),
 ('ppp33.asahi-net.or.jp', 11),
 ('ts6-11.westwood.ts.ucla.edu', 15)]

- Mostrar los 10 endpoints más visitados

In [20]:
# transformaciones map, reducebykey, acción takeOrdered
resultado = (parsed_data.map(lambda linea: (linea[5],1)) # valores de endpoints
            .reduceByKey(lambda a, b: a + b) #funcion reduccion para clave valor
            .takeOrdered(10, lambda x: -x[1])) # accion, tomar 10 valores ordenados por número de visitas
resultado


[('/images/KSC-logosmall.gif', 167),
 ('/images/NASA-logosmall.gif', 160),
 ('/images/MOSAIC-logosmall.gif', 122),
 ('/images/WORLD-logosmall.gif', 120),
 ('/images/USA-logosmall.gif', 118),
 ('/images/ksclogo-medium.gif', 106),
 ('/', 85),
 ('/history/apollo/images/apollo-logo1.gif', 74),
 ('/images/launch-logo.gif', 69),
 ('/images/ksclogosmall.gif', 66)]

- Mostrar los 10 endpoints más visitados que no tienen código de respuesta =200

In [21]:
# transformaciones filter, map, reducebykey, acción takeOrdered
resultado = (parsed_data.filter(lambda linea: linea[7] != '200') # valores de respuesta distintos de 200
          .map(lambda linea: (linea[5], 1)) # clave-valor de endpoints
          .reduceByKey(lambda a, b: a+b)  #funcion reduccion para clave valor
          .takeOrdered(10, lambda x: -x[1])) # tomamos los diez endpoints más visitados.
resultado

[('/images/NASA-logosmall.gif', 25),
 ('/images/KSC-logosmall.gif', 24),
 ('/images/MOSAIC-logosmall.gif', 17),
 ('/images/WORLD-logosmall.gif', 17),
 ('/images/USA-logosmall.gif', 16),
 ('/images/ksclogo-medium.gif', 10),
 ('/images/construct.gif', 8),
 ('/software/winvn/winvn.html', 8),
 ('/software/winvn/bluemarb.gif', 8),
 ('/software/winvn/wvsmall.gif', 6)]

- Calcular el número de hosts distintos

In [22]:
# número de host distintos
# transformaciones map, distinct, acción count
parsed_data.map(lambda linea: linea[0]).distinct().count()

311

- Contar el núm. de hosts únicos cada día

In [30]:
# número de host distintos por día
# transformaciones map, groupbykey,mapvalues,map, acción collect
resultado = (parsed_data.map(lambda linea:(fechas(linea),linea[0]))
          .groupByKey()
          .mapValues(set) # Pasa cada línea en el par de clave-valor RDD a través de una función map sin cambiar las claves
          .map(lambda x: (x[0], len(x[1])))) # para cada fecha, número de hosts distintos

resultado.collect()
# solo hay un día, los 311 hosts se dan el mismo día.

[(datetime.datetime(1995, 8, 1, 0, 0), 311)]

- Calcular la media de peteciones diarias por host

In [42]:
# calculamos nuevamente el número de hosts distintos por día
host_distintos_dia = (parsed_data.map(lambda linea:  (fechas(linea),linea[0]))
          .groupByKey()
          .mapValues(set) # nuevamente con la función set
          .map(lambda x: (x[0], len(x[1])))) 
# calculamos las peticiones totales por día
peticiones_totales_dia = (parsed_data.map(lambda linea:(fechas(linea),linea[0]))
          .groupByKey()
          .mapValues(len)) # aplicamos directamente la función len

# unimos ambos resultados y para cada fecha, calculamos el número total de peticiones para ese día entre el número de hosts diarios.
peticiones_diarias_media = (peticiones_totales_dia.join(host_distintos_dia)
                         .map(lambda a: (a[0], round((a[1][0])/(a[1][1]),3)))
                         .collect())

peticiones_diarias_media

# solo hay un día, su media es 11.03 peticiones por host

[(datetime.datetime(1995, 8, 1, 0, 0), 11.035)]

- Mostrar una de lista de 40 endpoints distintos que generan código de respuesta = 404

In [32]:
# transformacion filter, map, reducebykey y distinct, acción collect
resultado = (parsed_data.filter(lambda linea: linea[7] == '404')
          .map(lambda linea: (linea[5], 1))
          .reduceByKey(lambda a, b: a+b) # hacemos igualmente la agregación, para ver el número de veces que se repite cada uno.
          .take(40))
resultado

# solo hay 22 endopoints que generan código de respuesta 404, 13 valores únicos.
# se muestran todos los distintos endpoints con respuesta 404, junto con el número de veces que se han dado

[('/history/apollo/a-001/images/', 1),
 ('/history/apollo/a-001/a-001-patch-small.gif', 4),
 ('/history/apollo/a-004/a-004-patch-small.gif', 2),
 ('/sts-71/launch/', 1),
 ('/history/history.htm', 1),
 ('/history/apollo/apollo-13.html', 1),
 ('/history/apollo/a-004/movies/', 1),
 ('/shuttle/resources/orbiters/discovery.gif', 1),
 ('/www/software/winvn/winvn.html', 1),
 ('/history/apollo/a-001/movies/', 2),
 ('/pub/winvn/release.txt', 4),
 ('/elv/DELTA/uncons.htm', 1),
 ('/pub/winvn/readme.txt', 2)]

- Mostrar el top 25 de endpoints que más códigos de respuesta 404 generan

In [37]:
# ordenamos endpoints por el número de veces que han registrado códigos de respuesta 404 
resultado = (parsed_data.filter(lambda linea: linea[7] == '404')
          .map(lambda linea: (linea[5], 1))
          .reduceByKey(lambda a, b: a+b)
          .takeOrdered(25, lambda x: -x[1])) # Seleccionamos los 25 mayores
resultado
# Solo hay 13 endpoints en nuestro data set que han dado codigo de respuesta 404

[('/history/apollo/a-001/a-001-patch-small.gif', 4),
 ('/pub/winvn/release.txt', 4),
 ('/history/apollo/a-004/a-004-patch-small.gif', 2),
 ('/history/apollo/a-001/movies/', 2),
 ('/pub/winvn/readme.txt', 2),
 ('/history/apollo/a-001/images/', 1),
 ('/sts-71/launch/', 1),
 ('/history/history.htm', 1),
 ('/history/apollo/apollo-13.html', 1),
 ('/history/apollo/a-004/movies/', 1),
 ('/shuttle/resources/orbiters/discovery.gif', 1),
 ('/www/software/winvn/winvn.html', 1),
 ('/elv/DELTA/uncons.htm', 1)]

- El top 5 de días que se generaron código de respuestas 404

In [36]:
resultado = (parsed_data.filter(lambda linea: linea[7] == '404')
          .map(lambda linea:(fechas(linea), 1))
          .reduceByKey(lambda a, b: a+b)
          .takeOrdered(5, lambda x: -x[1]))
resultado
# solo hay un único día en el dataset

[(datetime.datetime(1995, 8, 1, 0, 0), 22)]