In [1]:
# Procedemos a cargar las librerias y carga de los archivos
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession

import re
import pandas as pd
import glob

sc = SparkContext()
sqlcontext = SQLContext(sc)
spark = SparkSession(sc)


In [24]:
data = glob.glob('*.gz')

In [25]:
data

['NASA_access_log_Aug95 (1).gz', 'NASA_access_log_Jul95.gz']

In [26]:
nasa_df = spark.read.text(data)

In [27]:
nasa_df.printSchema() # Veamos el esquema de nuestro df

root
 |-- value: string (nullable = true)



In [28]:
type(nasa_df) #vemos el type de nuestro df

pyspark.sql.dataframe.DataFrame

In [29]:
nasa_df.show(10, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
|burger.letters.com - - 

In [30]:
# Procedemos con el data wrangling
# Primero de todo vamos a ver el número de registros de nuestro df
print('El DataFrame tiene {} registros, en {} columna/s'.format(nasa_df.count(), len(nasa_df.columns)))

El DataFrame tiene 3461613 registros, en 1 columna/s


In [31]:
# regex

#host
regex_host = r'(^\S+\.[\S+\.]+\S+)\s'
# timestamps
regex_time = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
# http request
regex_http = r'\"(\S+)\s(\S+)\s*(\S*)\"'
# Status
regex_status = r'\s(\d{3})\s'
#Content size
regex_content = r'\s(\d+)$'

# Para usar las expresiones regulares usaremos regexp_extract()
from pyspark.sql.functions import regexp_extract

nasa_log = nasa_df.select(regexp_extract('value', regex_host, 1).alias('host'),
                          regexp_extract('value', regex_time, 1).alias('timestamp'),
                          regexp_extract('value', regex_http, 1).alias('method'),
                          regexp_extract('value', regex_http, 2).alias('resources'),
                          regexp_extract('value', regex_http, 3).alias('protocol'),
                          regexp_extract('value', regex_status, 1).cast('integer').alias('status'),
                          regexp_extract('value', regex_content, 1).cast('integer').alias('size'))
nasa_log.show(20, truncate=False)

print((nasa_log.count(), len(nasa_log.columns)))

+-------------------------+--------------------------+------+-------------------------------------------------+--------+------+-----+
|host                     |timestamp                 |method|resources                                        |protocol|status|size |
+-------------------------+--------------------------+------+-------------------------------------------------+--------+------+-----+
|199.72.81.55             |01/Jul/1995:00:00:01 -0400|GET   |/history/apollo/                                 |HTTP/1.0|200   |6245 |
|unicomp6.unicomp.net     |01/Jul/1995:00:00:06 -0400|GET   |/shuttle/countdown/                              |HTTP/1.0|200   |3985 |
|199.120.110.21           |01/Jul/1995:00:00:09 -0400|GET   |/shuttle/missions/sts-73/mission-sts-73.html     |HTTP/1.0|200   |4085 |
|burger.letters.com       |01/Jul/1995:00:00:11 -0400|GET   |/shuttle/countdown/liftoff.html                  |HTTP/1.0|304   |0    |
|199.120.110.21           |01/Jul/1995:00:00:11 -0400|GET   |/

In [32]:
# Vamos a proceder a buscar nulos en el df
null_df = nasa_log.filter(nasa_log['host'].isNull()|
                         nasa_log['timestamp'].isNull()|
                         nasa_log['method'].isNull()|
                         nasa_log['resources'].isNull()|
                         nasa_log['status'].isNull()|
                         nasa_log['size'].isNull()|
                         nasa_log['protocol'].isNull())
print('Valores nulos: ', null_df.count())

Valores nulos:  33905


In [33]:
# vamos a buscar donde están eso nulos
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as sm

def null(col_name):
    return sm(col(col_name).isNull().cast('integer')).alias(col_name)

#Construimos una lista de expresiones de columna, una por columna
expr = [null(col_name) for col_name in nasa_log.columns]

# Ejecutamos la agregación. *expr convierte la lista de expresiones en 
# argumentos de funciones variables
nasa_log.agg(*expr).show()

+----+---------+------+---------+--------+------+-----+
|host|timestamp|method|resources|protocol|status| size|
+----+---------+------+---------+--------+------+-----+
|   0|        0|     0|        0|       0|     1|33905|
+----+---------+------+---------+--------+------+-----+



In [34]:
# Vemos que falta un valor en status y 33905 en content_size
# Para averiguar como se ve el regisro de estatus, nos aseguramos si la regex
# se hizo mal
status_null = nasa_df.filter(~nasa_df['value'].rlike(r'\s(\d{3})\s'))
status_null.count()

1

In [35]:
status_null.show()

+--------+
|   value|
+--------+
|alyssa.p|
+--------+



In [36]:
# Procederemos a eliminarlo
nasa_log = nasa_log[nasa_log['status'].isNotNull()]
expr = [null(col_name) for col_name in nasa_log.columns]
nasa_log.agg(*expr).show()

+----+---------+------+---------+--------+------+-----+
|host|timestamp|method|resources|protocol|status| size|
+----+---------+------+---------+--------+------+-----+
|   0|        0|     0|        0|       0|     0|33904|
+----+---------+------+---------+--------+------+-----+



In [37]:
#Procedemos a analizar de la misma manera el content_size
null_content = nasa_df.filter(~nasa_df['value'].rlike(r'\s\d+$'))
null_content.count()

33905

In [38]:
# Observamos que coinciden con los nulos anteriormente citados
null_content.take(10)

[Row(value='dd15-062.compuserve.com - - [01/Jul/1995:00:01:12 -0400] "GET /news/sci.space.shuttle/archive/sci-space-shuttle-22-apr-1995-40.txt HTTP/1.0" 404 -'),
 Row(value='dynip42.efn.org - - [01/Jul/1995:00:02:14 -0400] "GET /software HTTP/1.0" 302 -'),
 Row(value='ix-or10-06.ix.netcom.com - - [01/Jul/1995:00:02:40 -0400] "GET /software/winvn HTTP/1.0" 302 -'),
 Row(value='ix-or10-06.ix.netcom.com - - [01/Jul/1995:00:03:24 -0400] "GET /software HTTP/1.0" 302 -'),
 Row(value='link097.txdirect.net - - [01/Jul/1995:00:05:06 -0400] "GET /shuttle HTTP/1.0" 302 -'),
 Row(value='ix-war-mi1-20.ix.netcom.com - - [01/Jul/1995:00:05:13 -0400] "GET /shuttle/missions/sts-78/news HTTP/1.0" 302 -'),
 Row(value='ix-war-mi1-20.ix.netcom.com - - [01/Jul/1995:00:05:58 -0400] "GET /shuttle/missions/sts-72/news HTTP/1.0" 302 -'),
 Row(value='netport-27.iu.net - - [01/Jul/1995:00:10:19 -0400] "GET /pub/winvn/readme.txt HTTP/1.0" 404 -'),
 Row(value='netport-27.iu.net - - [01/Jul/1995:00:10:28 -0400] "GET

In [39]:
# Observamos que los registros con nulos es porque devolvieron errores, no devuelven nada.
# Al ser una gran cantidad de nulo los reemplazaremos por 0
nasa_log = nasa_log.na.fill({'size': 0})
expr = [null(col_name) for col_name in nasa_log.columns]
nasa_log.agg(*expr).show()

+----+---------+------+---------+--------+------+----+
|host|timestamp|method|resources|protocol|status|size|
+----+---------+------+---------+--------+------+----+
|   0|        0|     0|        0|       0|     0|   0|
+----+---------+------+---------+--------+------+----+



In [40]:
# Cambiamos el formato de fecha para que sea mas amigable, usaremos una UDF

from pyspark.sql.functions import udf
month = {
    'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul':7,
    'Aug': 8, 'Sept': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
}

def parse_time(text):
    return '{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'.format(
    int(text[7:11]),
    month[text[3:6]],
    int(text[0:2]),
    int(text[12:14]),
    int(text[15:17]),
    int(text[18:20]))

udf_time = udf(parse_time)

nasa_log = (nasa_log.select('*', udf_time(nasa_log['timestamp'])
                           .cast('timestamp')
                           .alias('time'))
           .drop('timestamp'))
nasa_log.show(10)

+--------------------+------+--------------------+--------+------+----+-------------------+
|                host|method|           resources|protocol|status|size|               time|
+--------------------+------+--------------------+--------+------+----+-------------------+
|        199.72.81.55|   GET|    /history/apollo/|HTTP/1.0|   200|6245|1995-07-01 00:00:01|
|unicomp6.unicomp.net|   GET| /shuttle/countdown/|HTTP/1.0|   200|3985|1995-07-01 00:00:06|
|      199.120.110.21|   GET|/shuttle/missions...|HTTP/1.0|   200|4085|1995-07-01 00:00:09|
|  burger.letters.com|   GET|/shuttle/countdow...|HTTP/1.0|   304|   0|1995-07-01 00:00:11|
|      199.120.110.21|   GET|/shuttle/missions...|HTTP/1.0|   200|4179|1995-07-01 00:00:11|
|  burger.letters.com|   GET|/images/NASA-logo...|HTTP/1.0|   304|   0|1995-07-01 00:00:12|
|  burger.letters.com|   GET|/shuttle/countdow...|HTTP/1.0|   200|   0|1995-07-01 00:00:12|
|     205.212.115.106|   GET|/shuttle/countdow...|HTTP/1.0|   200|3985|1995-07-0

In [41]:
nasa_log.printSchema()

root
 |-- host: string (nullable = true)
 |-- method: string (nullable = true)
 |-- resources: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- size: integer (nullable = false)
 |-- time: timestamp (nullable = true)



In [None]:
path_parquet= '/nasaParquet'
nasa_log.write \
    .mode("overwrite") \
    .format("parquet") \
    .save(path_parquet)