<h1>Desafio Semantix</h1>

<b><i><font color=red>Questões teóricas</font></i></b>

<b>Qual o objetivo do comando cache em Spark?</b>
<p>Salva o estado atual de uma operação complexa realizada em um RDD, evitando assim ter que realizá-la novamente em utilizações posteriores.</p>

<b>O mesmo código implementado em Spark é normalmente mais rápido que a implementação equivalente em MapReduce. Por quê? </b>
<p>Porque o Spark faz todo o processamento em memória ao contrário do MapReduce.</p>

<b>Qual é a função do SparkContext?</b>
<p>SparkContext é a conexão entre Spark e o cluster, sendo que sua função é calcular a melhor estratégia de distribuição do processo Spark no cluster. Função esta extremamente essencial quando se utiliza objetos como RDD, acumuladores e broadcast.</p>

<b>Explique com suas palavras o que é Resilient Distributed Datasets (RDD).</b>
<p>É uma coleção de objetos distruíbos, imutável e tolerante a falhas.</p>

<b>Explique o que o código Scala abaixo faz</b>
<p><i>val textFile = sc.textFile("hdfs://...")</i></p>
<p><i>val counts = textFile.flatMap(line => line.split(" "))</i></p>
<p><i>.map(word => (word, 1))</i></p>
<p><i>.reduceByKey(_ + _)</i></p>
<p><i>counts.saveAsTextFile("hdfs://...")</i></p>
<p></p>
<p>Abre o arquivo texto do HDFS e transforma-o em um objeto RDD, sendo que em seguida pega o conteúdo de cada linha dentro do RDD e gera uma coleção de itens, separando cada palavra através do caractere espaço. Depois gera um mapeamento de chave e valor, onde chave é cada uma das palavras e valor é igual 1. Na seqüência agrupa  as chaves de mesmo valor, somando a quantidade de números 1 por chave. Por fim, escreve o resultado obtido em um arquivo no HDFS. Finalizando, esse código implementa um contador de palavras.</p>

<b><i><font color=red>Questões práticas</font></i></b>

In [1]:
import re
from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.functions import date_format, sum

In [2]:
pattern1 = '^(.+) (.+) (.+) \[([\w:/]+\s[+\-]\d{4})\] "(.+) (.+) (.+)" (\d{3}) (.+)'

def parse_line(row):
    
    match = re.search(pattern1, row)
    if match is None:
        return None
    
    return Row(host = match.group(1), undef1 = match.group(2), undef2 = match.group(3),  
                      timestamp = datetime.strptime(match.group(4), '%d/%b/%Y:%H:%M:%S %z'),
                      method = match.group(5), url = match.group(6), protocol = match.group(7),
                      code = int(match.group(8)), bytes  = match.group(9))

In [3]:
file1 = sc.textFile('NASA_access_log_Jul95').map(parse_line)
file2 = sc.textFile('NASA_access_log_Aug95').map(parse_line)

In [4]:
psdf = file1.union(file2) \
                .filter(lambda row: row is not None) \
                .toDF() \
                .cache()

<b>1. Número de hosts únicos.</b>

In [5]:
psdf.select('host').distinct().count()

137855

<b>2. O total de erros 404.</b>

In [6]:
psdf.filter('code = 404').count()

20727

<b>3. As 5 URLs que mais causaram erro 404.</b>

In [7]:
psdf.select('host', 'url') \
      .filter('code = 404') \
      .groupBy('host', 'url') \
      .count() \
      .orderBy('count', ascending=False) \
      .limit(5) \
      .show()

+--------------------+--------------------+-----+
|                host|                 url|count|
+--------------------+--------------------+-----+
|ts8-1.westwood.ts...|/images/Nasa-logo...|   37|
| nexus.mlckew.edu.au|/images/nasa-logo...|   34|
|       203.13.168.24|/images/nasa-logo...|   25|
|       203.13.168.17|/images/nasa-logo...|   25|
|        crl5.crl.com|/images/nasa-logo...|   22|
+--------------------+--------------------+-----+



<b>4. Quantidade de erros 404 por dia.</b>

In [8]:
psdf.select(date_format('timestamp','yyyy/MM/dd').alias('date')) \
      .filter('code = 404') \
      .groupBy("date") \
      .count() \
      .orderBy("date") \
      .show()

+----------+-----+
|      date|count|
+----------+-----+
|1995/07/01|  302|
|1995/07/02|  276|
|1995/07/03|  464|
|1995/07/04|  367|
|1995/07/05|  472|
|1995/07/06|  651|
|1995/07/07|  577|
|1995/07/08|  317|
|1995/07/09|  342|
|1995/07/10|  376|
|1995/07/11|  490|
|1995/07/12|  461|
|1995/07/13|  531|
|1995/07/14|  412|
|1995/07/15|  251|
|1995/07/16|  237|
|1995/07/17|  419|
|1995/07/18|  465|
|1995/07/19|  644|
|1995/07/20|  418|
+----------+-----+
only showing top 20 rows



<b>5. O total de bytes retornados.</b>

In [9]:
psdf.select(sum('bytes').cast('long').alias('sum')).show(truncate=False)

+-----------+
|sum        |
+-----------+
|65408685976|
+-----------+

