<a href="https://colab.research.google.com/github/cmlins/fast_track/blob/main/Engenharia%20de%20dados/desafio_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Imports e instalações de bibliotecas

In [1]:
# Instala bibliotecas necessárias

!pip install --upgrade plotly
!pip install pyspark

Requirement already up-to-date: plotly in /usr/local/lib/python3.7/dist-packages (4.14.3)


In [2]:
# import das bibliotecas usadas

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import length, regexp_extract, isnan, when, count, col, isnull, asc, desc, mean, sum, lit, format_string, concat, udf
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, LongType, FloatType
import re

import plotly.express as px

In [3]:
# Cria uma sessão spark
spark = SparkSession.builder.master('local[*]').getOrCreate()

# Configura para que a visualização das saídas seja similar ao Pandas
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

sc = spark.sparkContext

### Download

Fazer o download do [arquivo de log](https://github.com/elastic/examples/raw/master/Common%20Data%20Formats/apache_logs/apache_logs) do servidor Apache

In [4]:
!wget https://github.com/elastic/examples/raw/master/Common%20Data%20Formats/apache_logs/apache_logs

--2021-05-19 19:34:38--  https://github.com/elastic/examples/raw/master/Common%20Data%20Formats/apache_logs/apache_logs
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/elastic/examples/master/Common%20Data%20Formats/apache_logs/apache_logs [following]
--2021-05-19 19:34:38--  https://raw.githubusercontent.com/elastic/examples/master/Common%20Data%20Formats/apache_logs/apache_logs
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2370789 (2.3M) [text/plain]
Saving to: ‘apache_logs.9’


2021-05-19 19:34:38 (28.9 MB/s) - ‘apache_logs.9’ saved [2370789/2370789]



### Import

 Envia o log para o [sistema de arquivos distribuído](https://docs.databricks.com/data/databricks-file-system.html)
 

In [5]:
data = []
with open ("/content/apache_logs", 'r') as f_read:
  for line in f_read:
    data.append(line)

In [6]:
R = Row('log')

logs = spark.createDataFrame([R(x) for x in data])

logs.show(3, False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|log                                                                                                                                                                                                                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|83.

In [7]:
# logs.toPandas().to_csv('logs.csv')

### Transform

Realiza o parser do arquivo de log disponível no DBFS e cria uma tabela chamada apache

In [8]:
# with open("/content/apache_logs") as f:
#   log = f.read()

#   regex_ip = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
#   ips = re.findall(regex_ip, log)

#   regex_time = r'\[(?P<date>.*?)(?= ) (?P<timezone>.*?)\]'
#   times = re.findall(regex_time, log)

#   regex_method = r'?P<request_method>.*?'
#   method = re.findall(regex_method, log)


#   print(method)

In [9]:
split_col = pyspark.sql.functions.split(logs['log'], ' ')
apache = logs.withColumn('ip_address', split_col.getItem(0))
# apache = apache.withColumn('date', split_col.getItem(1))
# apache = apache.withColumn('user_ID_of_the_client', split_col.getItem(2))
apache = apache.withColumn('timestamp', split_col.getItem(3))
# apache = apache.withColumn('Timestamp_of_the_log_entry_2', split_col.getItem(4))
apache = apache.withColumn('method', split_col.getItem(5))
apache = apache.withColumn('requested', split_col.getItem(6))
apache = apache.withColumn('HTTP_protocol', split_col.getItem(7))
apache = apache.withColumn('status_code', split_col.getItem(8))
apache = apache.withColumn('size', split_col.getItem(9))
apache = apache.withColumn('referer', split_col.getItem(10))

In [10]:
apache.limit(3)

log,ip_address,timestamp,method,requested,HTTP_protocol,status_code,size,referer
83.149.9.216 - - ...,83.149.9.216,[17/May/2015:10:0...,"""GET",/presentations/lo...,"HTTP/1.1""",200,203023,"""http://semicompl..."
83.149.9.216 - - ...,83.149.9.216,[17/May/2015:10:0...,"""GET",/presentations/lo...,"HTTP/1.1""",200,171717,"""http://semicompl..."
83.149.9.216 - - ...,83.149.9.216,[17/May/2015:10:0...,"""GET",/presentations/lo...,"HTTP/1.1""",200,26185,"""http://semicompl..."


In [11]:
apache = apache.select(['ip_address','timestamp', 'requested', 'referer']).where(apache.method == '"GET').where(apache.referer != '"-"')
# apache = apache.where(length(apache.col10) > 3)
# apache.toPandas().to_csv('mycsv.csv')
apache = apache.groupBy('referer').count().sort(desc('count'))

In [12]:
apache.show(5, False)

+-----------------------------------------------------------------+-----+
|referer                                                          |count|
+-----------------------------------------------------------------+-----+
|"http://semicomplete.com/presentations/logstash-puppetconf-2012/"|689  |
|"http://www.semicomplete.com/projects/xdotool/"                  |656  |
|"http://semicomplete.com/presentations/logstash-scale11x/"       |406  |
|"http://www.semicomplete.com/articles/dynamic-dns-with-dhcp/"    |335  |
|"http://www.semicomplete.com/"                                   |228  |
+-----------------------------------------------------------------+-----+
only showing top 5 rows



In [13]:
expression = r'.*\.html"$'
apache_ref = apache.filter(apache["referer"].rlike(expression)).sort(desc('count')).limit(10)

In [14]:
apache_ref.show(10, False)

+---------------------------------------------------------------------------------------+-----+
|referer                                                                                |count|
+---------------------------------------------------------------------------------------+-----+
|"http://www.semicomplete.com/blog/geekery/ssl-latency.html"                            |144  |
|"http://www.semicomplete.com/blog/geekery/debugging-java-performance.html"             |65   |
|"http://www.semicomplete.com/blog/geekery/xvfb-firefox.html"                           |49   |
|"http://semicomplete.com/blog/geekery/xvfb-firefox.html"                               |46   |
|"http://www.semicomplete.com/files/xdotool/docs/html/xdo_8h.html"                      |26   |
|"http://www.semicomplete.com/blog/geekery/disabling-battery-in-ubuntu-vms.html"        |21   |
|"http://www.semicomplete.com/blog/geekery/headless-wrapper-for-ephemeral-xservers.html"|20   |
|"http://www.semicomplete.com/blog/geeke

### View

In [15]:
fig = px.bar(apache_ref.toPandas(), x='referer', y='count',
             labels={'count': 'Número de acessos', 'referer': ''})

fig.update_layout(height=600, width=1200)
                  
fig.show()

### Referências

1. https://docs.databricks.com/data/databricks-file-system.html
2. https://movile.blog/introducao-a-spark-usando-o-google-colab/
3. https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns/51680292
4. https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/#:~:text=PySpark%20UDF%20is%20a%20User,the%20udf()%20is%20StringType.
