In [1]:
# Ame Digital - Teste Engenheiro de Dados
#
# Autor: Dennis Cardoso
#
# E-mail: dennis.cardoso@outlook.com
#
# Data: 22 de Dezembro de 2019

In [2]:
#Importar biblioteca
import re
from datetime import datetime
from pyspark.sql import Window
from pyspark.sql.types import *
import pyspark.sql.functions as functions
from pyspark import SparkContext, SparkConf, SQLContext

In [3]:
# Iniciar spark context
conf = SparkConf().setMaster('local[*]')
sc = SparkContext().getOrCreate(conf)
sqlc =  SQLContext(sc)

In [4]:
#Função que retorna String a partir de uma data (ordinal)
def str_date(input_date):
        try:
            return datetime.fromordinal(input_date).strftime('%d-%m-%Y')
        except Exception as e:
            return '01/01/1900'

In [5]:
# Função para fazer parse dos dados de entrada
def parseLog(data):
        ''' Read and parse log data '''
        RE_MASK = '(.*) - - \[(.*):(.*):(.*):(.*)\] "(.*)" ([0-9]*) ([0-9]*|-)'

        try:
            re_result = re.compile(RE_MASK).match(data)
            host = re_result.group(1)
            ord_day = datetime.strptime(re_result.group(2), '%d/%b/%Y').toordinal()
            req = re_result.group(6)
            reply_code = int(re_result.group(7))
            
            try:
                reply_bytes = int(re_result.group(8))
            except ValueError as e:
                reply_bytes = 0
            return host, ord_day, req, reply_code, reply_bytes
        
        except Exception as e:
            return '', -1, '', -1, -1

In [6]:
# Declaração do Schema de dados a ser utilizado
schema = StructType([StructField('host',StringType(), True),StructField('timestamp',IntegerType(), True),StructField('request',StringType(), True),StructField('http_code',IntegerType(), True), StructField('total_bytes',IntegerType(), True)])

In [7]:
# Gerar RDD com dados dos arquivos
rows = sc.textFile('files')

# Gerar parse dos dados de Log
nasa_parse = rows.map(parseLog)

# remover linhas com valores inválidos
nasa_rdd = nasa_parse.filter(lambda x: x[1] > -1)

In [8]:
# Criação do Dataframe
nasa_df = sqlc.createDataFrame(nasa_rdd, schema)

In [9]:
# Informativo - Schema do dataframe
nasa_df.printSchema()

root
 |-- host: string (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- request: string (nullable = true)
 |-- http_code: integer (nullable = true)
 |-- total_bytes: integer (nullable = true)



In [10]:
#### 1 - Número de HOSTs únicos (Utilizando Dataframe e SparkSQL)
host_number = nasa_df.select('host').distinct().count()

In [11]:
#### 2 - Total De error 404 dentro do Periodo (Utilizando Dataframe e SparkSQL)
total_404_errors = nasa_df.filter("http_code = 404")
total_404_errors_number = total_404_errors.count()

In [12]:
#### 3 - Quais dias do período especificado tiveram o maior número de erros 404 (Utilizando rdd e reduceByKey).
data_404_rdd = nasa_rdd.filter(lambda y: y[3] == 404).map(lambda x: (x[1], 1)) 
data_404_count = data_404_rdd.reduceByKey(lambda a, b: a+b).sortBy(keyfunc=lambda l: l[1], ascending=False)
data_404_list = data_404_count.collect()

In [13]:
#### 3 - Quais dias do período especificado tiveram o maior número de erros 404 (Utilizando Dataframe e SparkSQL).
data_404_list = total_404_errors.groupby('timestamp').agg(functions.count('timestamp').alias('count_error')).orderBy('count_error', ascending=False).collect()

In [14]:
#### 4 - O total de bytes retornados no período, com uma visão acumulada (Utilizando Dataframe).
total_bytes_acc_group = nasa_df.groupby('timestamp').agg(functions.count('total_bytes').alias('sum_bytes'))
time = (Window.orderBy('timestamp').rowsBetween(Window.unboundedPreceding, 0))
df_cumsum = total_bytes_acc_group.withColumn('cum_sum', functions.sum('sum_bytes').over(time))
total_bytes_list = df_cumsum.select(['timestamp','cum_sum']).collect()

In [15]:
# Output para o resultado de Total de Error 404 por dia 
data_404_final = '\n'
for date_count in data_404_list:
    data_404_final += 'Dia %s: %d ocorrências de Erro 404 \n' % (str_date(int(date_count[0])), date_count[1])

In [16]:
# Output para o resultado do Total de Bytes acumulado
total_bytes_acc = '\n'
for acc_data in total_bytes_list:
   total_bytes_acc += 'Dia %s: %d Bytes \n' % (str_date(int(acc_data[0])), acc_data[1])

In [17]:
#gerar mensagem com resultados
print('-> 1. Numero de hosts unicos: %s ' % host_number)
print('-> 2. Numero total de erros 404: %s ' % total_404_errors_number)
print('-> 3. Quais dias do período especificado tiveram o maior número de erros 404 (Lista completa): %s' % data_404_final)
print('-> 4. O total de bytes retornados no período, com uma visão acumulada. %s \n' % total_bytes_acc)

-> 1. Numero de hosts unicos: 137978 
-> 2. Numero total de erros 404: 20901 
-> 3. Quais dias do período especificado tiveram o maior número de erros 404 (Lista completa): 
Dia 06-07-1995: 640 ocorrências de Erro 404 
Dia 19-07-1995: 639 ocorrências de Erro 404 
Dia 30-08-1995: 571 ocorrências de Erro 404 
Dia 07-07-1995: 570 ocorrências de Erro 404 
Dia 07-08-1995: 537 ocorrências de Erro 404 
Dia 13-07-1995: 532 ocorrências de Erro 404 
Dia 31-08-1995: 526 ocorrências de Erro 404 
Dia 05-07-1995: 497 ocorrências de Erro 404 
Dia 03-07-1995: 474 ocorrências de Erro 404 
Dia 11-07-1995: 471 ocorrências de Erro 404 
Dia 12-07-1995: 471 ocorrências de Erro 404 
Dia 18-07-1995: 465 ocorrências de Erro 404 
Dia 25-07-1995: 461 ocorrências de Erro 404 
Dia 20-07-1995: 428 ocorrências de Erro 404 
Dia 24-08-1995: 420 ocorrências de Erro 404 
Dia 29-08-1995: 420 ocorrências de Erro 404 
Dia 25-08-1995: 415 ocorrências de Erro 404 
Dia 14-07-1995: 413 ocorrências de Erro 404 
Dia 28-08-1995: 