# Respostas e código para o desafio Semantix
## Candidado Gabriel da Costa Cupello



### Qual o objetivo do comando cache em Spark?

R: O uso do comando cache permite salver na memória resultados parciais do processamento para serem utilizados nas próximas etapas.

### O mesmo código implementado em Spark é normalmente mais rápido que a implementação equivalente em MapReduce. Por quê?

R: Porque o Spark utiliza processamento em memória, enquanto o MapReduce utiliza o disco. Como os processos de leitura e escrita (IOs) no disco são mais demorados em relação ao acesso dos dados em memória, o código escrito em Spark torna-se mais rápido.

### Qual é a função do SparkContext?

R: O SparkContext representa a conexão com o Cluster Spark, permitindo que uma aplicação acesse o Cluster. Ele ainda é utilizado conmo intermediar podendo acessar RDDs, cancelar um job, alterar configurações, entre outras.

### Explique com suas palavras o que é Resilient Distributed Datasets (RDD).

R: É o elemento fundamental do Spark, sendo a maneira como os dados são processados. São um conjunto de dados imutáveis e armazenados em diferentes nós em um cluster. Suportam dois tipos de operação: transformação e ação. Na transformação realizam uma determinada operação com os dados como por exemplo, filtro e retornam um novo RDD com o resultado. Na ação realizam uma operação como, por exemplo, count e retornam o resultado. 

### GroupByKey é menos eficiente que reduceByKey em grandes dataset. Por quê?

R: Porque o reduceByKey realiza primeiro a redução em cada partição onde há dados para depois realizar a comuniação entre as outras partições via rede, enquanto que o groupByKey realiza a comunição via rede para realizar a redução.

### Explique o que o código Scala abaixo faz.
val textFile = sc.textFile("hdfs://...")

val counts = textFile.flatMap(line => line.split(" "))

    .map(word => (word, 1))
    
    .reduceByKey(_ + _)
    
counts.saveAsTextFile("hdfs://...")

R: Realiza a leitura de um arquivo de texto, em seguida quebra as palavras através dos espaços em branco. A partir disso, realiza cria uma tupla para cada palavra com o número 1, para a seguir realizar a redução, ou seja, realizando a contagem de cada palavra igual no texto do arquivo. Dessa forma, o algoritmo realiza a contagem de cada palavra no arquivo e salva o resultado em um arquivo de texto no HDFS.

## Código e respostas do desafio de programação

In [6]:
# importar bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract

In [7]:
# iniciar sessao
spark = SparkSession.builder.appName("Desafio").getOrCreate()

In [8]:
# leitura dos arquivos
df = spark.read.text("files/*")
df.show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
+-----------------------

In [9]:
# verificar tamanho de linhas e colunas
print((df.count(), len(df.columns)))

(3461613, 1)


In [10]:
# definir expressões regulares para separar os campos
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
status_pattern = r'\s(\d{3})\s'
content_size_pattern = r'\s(\d+)$'

In [11]:
# aplicando as expressões regulares e separando os valores nos respectivos campos
logs_df = df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('data'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('metodo'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('URL'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocolo'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('retorno'),
                         regexp_extract('value', content_size_pattern, 1).cast('integer').alias('bytes'))
logs_df.show(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))

+--------------------+--------------------+------+--------------------+---------+-------+-----+
|                host|                data|metodo|                 URL|protocolo|retorno|bytes|
+--------------------+--------------------+------+--------------------+---------+-------+-----+
|        199.72.81.55|01/Jul/1995:00:00...|   GET|    /history/apollo/| HTTP/1.0|    200| 6245|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|   GET| /shuttle/countdown/| HTTP/1.0|    200| 3985|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...| HTTP/1.0|    200| 4085|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...| HTTP/1.0|    304|    0|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...| HTTP/1.0|    200| 4179|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/images/NASA-logo...| HTTP/1.0|    304|    0|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...| HTTP/1.0|    200|    0|
|     205.212.115.106|01/Jul/1995:00:00.

In [12]:
# criando view para poder utilizar a funcionalidade de banco de dados
logs_df.createOrReplaceTempView("nasa")

### Questão 1 - Número de hosts únicos

In [13]:
sql_results = spark.sql("SELECT count(distinct(host)) from nasa")
sql_results.show(truncate=False)

+--------------------+
|count(DISTINCT host)|
+--------------------+
|137933              |
+--------------------+



### Questão 2 - O total de erros 404

In [14]:
sql_results = spark.sql("SELECT count(1) from nasa where retorno = '404'")
sql_results.show()

+--------+
|count(1)|
+--------+
|   20899|
+--------+



### Questão 3 - Os 5 URLs que mais causaram erro 404

In [15]:
sql_results = spark.sql("SELECT URL, count(1) from nasa where retorno = '404' group by URL order by 2 desc")
sql_results.show(5, truncate=False)

+--------------------------------------------+--------+
|URL                                         |count(1)|
+--------------------------------------------+--------+
|/pub/winvn/readme.txt                       |2004    |
|/pub/winvn/release.txt                      |1732    |
|/shuttle/missions/STS-69/mission-STS-69.html|683     |
|/shuttle/missions/sts-68/ksc-upclose.gif    |428     |
|/history/apollo/a-001/a-001-patch-small.gif |384     |
+--------------------------------------------+--------+
only showing top 5 rows



### Questão 4 - Quantidade de erros 404 por dia

In [16]:
sql_results = spark.sql("SELECT SUBSTR(data,1,11), count(1) from nasa where retorno = '404' group by SUBSTR(data,1,11) order by 1")
sql_results.show(1000, truncate=False)

+----------------------+--------+
|substring(data, 1, 11)|count(1)|
+----------------------+--------+
|01/Aug/1995           |243     |
|01/Jul/1995           |316     |
|02/Jul/1995           |291     |
|03/Aug/1995           |304     |
|03/Jul/1995           |474     |
|04/Aug/1995           |346     |
|04/Jul/1995           |359     |
|05/Aug/1995           |236     |
|05/Jul/1995           |497     |
|06/Aug/1995           |373     |
|06/Jul/1995           |640     |
|07/Aug/1995           |537     |
|07/Jul/1995           |570     |
|08/Aug/1995           |391     |
|08/Jul/1995           |300     |
|09/Aug/1995           |279     |
|09/Jul/1995           |348     |
|10/Aug/1995           |315     |
|10/Jul/1995           |398     |
|11/Aug/1995           |263     |
|11/Jul/1995           |471     |
|12/Aug/1995           |196     |
|12/Jul/1995           |471     |
|13/Aug/1995           |216     |
|13/Jul/1995           |532     |
|14/Aug/1995           |287     |
|14/Jul/1995  

### Questão 5 - O total de bytes retornandos

In [17]:
sql_results = spark.sql("SELECT sum(bytes)/1024/1024/1024 from nasa")
sql_results.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------+
|(((CAST(sum(CAST(bytes AS BIGINT)) AS DOUBLE) / CAST(1024 AS DOUBLE)) / CAST(1024 AS DOUBLE)) / CAST(1024 AS DOUBLE))|
+---------------------------------------------------------------------------------------------------------------------+
|61.0242736665532                                                                                                     |
+---------------------------------------------------------------------------------------------------------------------+

