# Análise de dados usando o Spark

### Esse estudo será desenvolvido a partir de duas bases da NASA que contêm dados sobre requisições HTTP para seus servidores com o objetivo de checar as seguintes informações:

1. Qual é o número de hosts únicos?
2. Quantas vezes o erro 404 ocorreu?
3. Quais são as 5 principais URLs responsáveis por isso?
4. Quantidade de erros 404 por dia.
5. O total de bytes retornados.


### Importando as bibliotecas do Spark 

In [1]:
#Importando o Findspark que permite importar o PySpark como uma biblioteca do Python
import findspark
findspark.init()

#Importando o PySpark
import pyspark

#Importando todas as funções PySpark SQL
from pyspark.sql.functions import *

#Importando o recurso "SparkSession", fundamental para trabalharmos com Dataframes no Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Importando as bases de dados da NASA

In [2]:
#O Spark possui um método para importarmos arquivos CSV e convertê-los em Dataframes
nasa1 = spark.read.csv("access_log_Jul95.csv")
nasa2 = spark.read.csv("access_log_Aug95.csv")

### Juntando os dois Dataframes

In [91]:
#O método "union" gera um novo Dataframe através da união de dois Dataframes.
#Essa união vai levar em consideração a posição das colunas e não os seus nomes.
#Então, caso o número de colunas seja diferente entre os dois Dataframes, 
#o Dataframe com menor número de colunas terá novas colunas preenchidas com valor nulo 

df = nasa1.union(nasa2)

In [102]:
#Usado para testes
df = nasa1

In [103]:
df.show(5)

+--------------------+
|                 _c0|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
+--------------------+
only showing top 5 rows



In [104]:
df.collect()[0:10]

[Row(_c0='199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245'),
 Row(_c0='unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985'),
 Row(_c0='199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085'),
 Row(_c0='burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0'),
 Row(_c0='199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179'),
 Row(_c0='burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0'),
 Row(_c0='burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0'),
 Row(_c0='205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985'),
 Row(_c0='d104.aa.net - - [01/Jul/1995:00:00:13 -0

In [105]:
df1 = df#.withColumnRenamed("_c0","nova")

In [106]:
df1.show(5)

+--------------------+
|                 _c0|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
+--------------------+
only showing top 5 rows



In [107]:
df2 = df1.withColumn('_c0', regexp_replace('_c0', '(.*) - - \[(.*)\] "(.*)" (\d*) (.*)', '$1,$2,$3,$4,$5'))

In [108]:
df2.collect()[5]

Row(_c0='burger.letters.com,01/Jul/1995:00:00:12 -0400,GET /images/NASA-logosmall.gif HTTP/1.0,304,0')

In [109]:
df3 = df2.withColumn('_c0', split('_c0', ','))

In [110]:
df3.collect()[5]

Row(_c0=['burger.letters.com', '01/Jul/1995:00:00:12 -0400', 'GET /images/NASA-logosmall.gif HTTP/1.0', '304', '0'])

In [111]:
columns_name = ["host","timestamp","request_type","http_status","returned_bytes"]

In [112]:
df4 = df3.select(*(col('_c0').getItem(i).alias(columns_name[i]) for i in range(5)))

In [113]:
df4.collect()[5]

Row(host='burger.letters.com', timestamp='01/Jul/1995:00:00:12 -0400', request_type='GET /images/NASA-logosmall.gif HTTP/1.0', http_status='304', returned_bytes='0')

In [114]:
df4.show()

+--------------------+--------------------+--------------------+-----------+--------------+
|                host|           timestamp|        request_type|http_status|returned_bytes|
+--------------------+--------------------+--------------------+-----------+--------------+
|        199.72.81.55|01/Jul/1995:00:00...|GET /history/apol...|        200|          6245|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|GET /shuttle/coun...|        200|          3985|
|      199.120.110.21|01/Jul/1995:00:00...|GET /shuttle/miss...|        200|          4085|
|  burger.letters.com|01/Jul/1995:00:00...|GET /shuttle/coun...|        304|             0|
|      199.120.110.21|01/Jul/1995:00:00...|GET /shuttle/miss...|        200|          4179|
|  burger.letters.com|01/Jul/1995:00:00...|GET /images/NASA-...|        304|             0|
|  burger.letters.com|01/Jul/1995:00:00...|GET /shuttle/coun...|        200|             0|
|     205.212.115.106|01/Jul/1995:00:00...|GET /shuttle/coun...|        200|    

In [115]:
df4.agg(countDistinct(df4.host).alias('distinct_hosts')).collect()

[Row(distinct_hosts=119852)]

In [116]:
df4.agg(count(df4.http_status).alias('404_quantity')).collect()

[Row(404_quantity=1853780)]

In [117]:
df4.groupBy("http_status").count().orderBy("http_status").show()

+-----------+-------+
|http_status|  count|
+-----------+-------+
|       null|  37935|
|        200|1701488|
|        302|   8726|
|        304| 132627|
|        400|      5|
|        403|     54|
|        404|  10804|
|        500|     62|
|        501|     14|
+-----------+-------+



In [118]:
df4.createOrReplaceTempView("nasa_db")

In [119]:
spark.sql("SELECT COUNT('http_status') AS 404_error_quantity, host from nasa_db where http_status='404' GROUP BY host ORDER BY COUNT('http_status') DESC LIMIT 5").collect()

[Row(404_error_quantity=251, host='hoohoo.ncsa.uiuc.edu'),
 Row(404_error_quantity=131, host='jbiagioni.npt.nuwc.navy.mil'),
 Row(404_error_quantity=110, host='piweba3y.prodigy.com'),
 Row(404_error_quantity=92, host='piweba1y.prodigy.com'),
 Row(404_error_quantity=64, host='phaelon.ksc.nasa.gov')]

In [120]:
spark.sql("SELECT COUNT('http_status') AS 404_error_quantity_by_day, timestamp from nasa_db where http_status='404' GROUP BY timestamp ORDER BY COUNT('http_status') DESC").collect()

[Row(404_error_quantity_by_day=5, timestamp='12/Jul/1995:10:21:30 -0400'),
 Row(404_error_quantity_by_day=5, timestamp='12/Jul/1995:10:35:12 -0400'),
 Row(404_error_quantity_by_day=5, timestamp='12/Jul/1995:10:20:43 -0400'),
 Row(404_error_quantity_by_day=5, timestamp='12/Jul/1995:10:35:11 -0400'),
 Row(404_error_quantity_by_day=5, timestamp='12/Jul/1995:10:35:09 -0400'),
 Row(404_error_quantity_by_day=5, timestamp='12/Jul/1995:10:24:50 -0400'),
 Row(404_error_quantity_by_day=5, timestamp='11/Jul/1995:14:08:06 -0400'),
 Row(404_error_quantity_by_day=4, timestamp='20/Jul/1995:07:21:17 -0400'),
 Row(404_error_quantity_by_day=4, timestamp='12/Jul/1995:10:35:03 -0400'),
 Row(404_error_quantity_by_day=4, timestamp='06/Jul/1995:10:31:11 -0400'),
 Row(404_error_quantity_by_day=4, timestamp='12/Jul/1995:10:21:32 -0400'),
 Row(404_error_quantity_by_day=4, timestamp='12/Jul/1995:10:35:01 -0400'),
 Row(404_error_quantity_by_day=3, timestamp='12/Jul/1995:10:23:35 -0400'),
 Row(404_error_quantity_b

In [121]:
df4.printSchema


<bound method DataFrame.printSchema of DataFrame[host: string, timestamp: string, request_type: string, http_status: string, returned_bytes: string]>

### Próximos passos: 
#### - Converter schema do Dataframe para otimizar resultados
### - Pesquisar sobre como melhorar a performance de processamento

### Tudo começa com o primeiro passo. A caminhada continua...