In [1]:
# criar o contexto Spark 
sc = spark.sparkContext

In [2]:
# Criação dos RDDs abrindo os arquivos texto
jul95_rdd = sc.textFile('/FileStore/tables/access_log_Jul95')
ago95_rdd = sc.textFile('/FileStore/tables/access_log_Aug95')

In [3]:
# Junção dos 2 RDDs criados
all_dataset_rdd = jul95_rdd + ago95_rdd


In [4]:
# Import dos pacotes necessários
import re
from datetime import datetime

In [5]:
# Contagem de linhas do novo RDD criado
all_dataset_rdd.count()

In [6]:
# Pegar 15 linhas do RDD
all_dataset_rdd.take(15)

In [7]:
# Função que valida as informações de cada linha do RDD criado
# https://pt.infobyip.com/regularexpressioncalculator.php

def valida_estrutura(linha_rdd):
  
    #################################################################
    # Valida host validando o grupo 1 da expressão regular
	#################################################################

    host = re.search(r'(.*) - - ', linha_rdd)
    host = host.group(1) if host else ''

	#################################################################
    # Valida se a data está no formato válido
	#################################################################
    
    data = re.search(r'\[([0-9A-Za-z\/]*):.*\]', linha_rdd)
    data = data.group(1) if data else ''

 	#################################################################
    # Valida se o status é inteiro e numérico  
    #################################################################

    status = re.search(r'\" ([0-9]+) ', linha_rdd)
    status = int(status.group(1)) if status else 0
    
	#################################################################
    # Valida se o tamanho transferido  é inteiro e numérico
	#################################################################
    
    bytes_transf = re.search(r' ([0-9]+)$', linha_rdd)
    bytes_transf = int(bytes_transf.group(1)) if bytes_transf else 0

   	#################################################################
    # Retorna  os valores tratados
	#################################################################

    return (host, data, status, bytes_transf)
  


In [8]:
# Executar a função lambda do Python, criada para linha do RDD
final_rdd = all_dataset_rdd.map(lambda linha_rdd: valida_estrutura(linha_rdd))

# Carrega rdd  no cache, para melhor performance
final_rdd.cache()

# Consulta 15 registros
final_rdd.take(15)


In [9]:
# Conta quantidade de linhas do RDD tratado
final_rdd.count()


In [10]:
# Configurando a sessao spark
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Criando o dataframe "final_df" à partir do RDD final criado (final_rdd)
# https://stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark

#final_df = sqlContext.createDataFrame(final_rdd)

final_df = final_rdd.toDF().selectExpr('_1 as host', '_2 as timestamp', '_3 as return_code', '_4 as qtd_bytes')

final_df.createOrReplaceTempView("tab_final_df")

In [11]:
final_df.show()

In [12]:
%sql 
select count(distinct(host)) as host_unicos 
from   tab_final_df

host_unicos
137979


In [13]:
%sql 
select count(*) as qtd_erro_404 
from   tab_final_df 
where  return_code = '404'

qtd_erro_404
20901


In [14]:
%sql 
select host as url, count(*) as qtd 
from   tab_final_df 
where  return_code = '404' 
group  by host 
order by count(*) desc
limit 5


url,qtd
hoohoo.ncsa.uiuc.edu,251
piweba3y.prodigy.com,157
jbiagioni.npt.nuwc.navy.mil,132
piweba1y.prodigy.com,114
www-d4.proxy.aol.com,91


In [15]:
%sql
select substr(timestamp,1,2) as dia_req, count(*) as qtd_dia 
from   tab_final_df
where  return_code = '404' 
group  by substr(timestamp,1,2)
order  by dia_req


dia_req,qtd_dia
1,559
2,291
3,778
4,705
5,733
6,1013
7,1107
8,693
9,627
10,713


In [16]:
%sql
select sum(qtd_bytes) as total_bytes_retornado
from   tab_final_df


total_bytes_retornado
65524314915
