**Verificando se os arquivos estão nas pastas indicadas**

In [2]:
%fs ls /FileStore/tables

path,name,size
dbfs:/FileStore/tables/Mall_Customers.csv,Mall_Customers.csv,4286
dbfs:/FileStore/tables/d1995_07_01_24d0c.json,d1995_07_01_24d0c.json,7451741
dbfs:/FileStore/tables/d1995_07_02_c3f44.json,d1995_07_02_c3f44.json,9572086
dbfs:/FileStore/tables/d1995_07_03_f99af.json,d1995_07_03_f99af.json,14125303
dbfs:/FileStore/tables/d1995_07_04_c5a7f.json,d1995_07_04_c5a7f.json,12030333
dbfs:/FileStore/tables/d1995_07_05_0f261.json,d1995_07_05_0f261.json,14662194
dbfs:/FileStore/tables/d1995_07_06_20619.json,d1995_07_06_20619.json,15557682
dbfs:/FileStore/tables/d1995_07_07_2dd8d.json,d1995_07_07_2dd8d.json,15279295
dbfs:/FileStore/tables/d1995_07_08_83302.json,d1995_07_08_83302.json,7033852
dbfs:/FileStore/tables/d1995_07_09_f75d3.json,d1995_07_09_f75d3.json,5589076


** Criando o diretório de saída**

In [4]:
#define o caminho para a saida
output_path = "/tmp/pydata/Streaming/output/"
dbutils.fs.rm(output_path,True)
dbutils.fs.mkdirs(output_path)

#define o caminho para o checkpoint, necessário para utilizar algumas funçoes e garantir tolerância a falhas
checkpoint_path = "/tmp/pydata/Streaming/checkpoint/"
dbutils.fs.rm(checkpoint_path,True)
dbutils.fs.mkdirs(checkpoint_path)

**Definindo o esquema para o dataset**

In [6]:
from pyspark.sql.types import StructType, IntegerType, StringType, LongType, StructField, TimestampType
schema_entrada =  StructType([
    StructField("bytes", LongType()),
    StructField("host", StringType()),
    StructField("http_reply", IntegerType()),
    StructField("request", StringType()),
    StructField("timestamp", StringType())
])

Criando o dataframe estático

In [8]:
logsDirectoryStatic = "/FileStore/tables/d1995_07_01_24d0c.json" #define o local onde os arquivos estão armazenados
static = spark.read.json(logsDirectoryStatic, schema_entrada)

Verificando se o dataframe é estático ou streaming

In [10]:
static.isStreaming

In [11]:
#print do schema
static.printSchema()

**Conhecendo o banco de dados**

In [13]:
#apresenta as primeiras 5 linhas do dataframe
static.show(5)

In [14]:
#contando a quantidade de linhas existente no dataframe
(static.count())

In [15]:
#retorna o valor médio de bytes gerados pelas consultas
from pyspark.sql.functions import avg
static.select(avg("bytes")).show()

In [16]:
#seleciona a quantidade de valores diferentes existentes na coluna host
from pyspark.sql.functions import asc, col, desc

grupo_host_distintos = static.select("host").distinct().sort(col("host").asc())
grupo_host_distintos.show()

In [17]:
#cria a tabela para utilizar a consulta via SQL
static.createOrReplaceTempView("grupo_1")

In [18]:
%sql
SELECT DISTINCT host
  FROM grupo_1
ORDER BY host

host
128.102.86.254
128.138.177.51
128.147.44.103
128.148.15.20
128.158.21.103
128.158.54.114
128.159.122.119
128.159.132.53
128.159.154.142
128.163.80.98


**Criando o modelo dinâmico (Streaming)**

In [20]:
#importando bibliotecas
from pyspark.sql.functions import input_file_name, current_timestamp

In [21]:
#define o caminho para a entrada dos arquivos de log
logsDirectoryStreaming= "/FileStore/tables/*.json" #define o local onde os arquivos estão armazenados

In [22]:
#Definindo o modelo de Streaming
streamingDF = (
  spark
  .readStream 
  .schema(schema_entrada) #esquema definido para a coleta dos dados presentes em JSON
  .option("maxFilesPerTrigger", 1)  #mantém a leitura de apenas um arquivo por batch, para manter mais lenta a coleta
  .json(logsDirectoryStreaming) #define o local a ser pesquisado para obter os dados
  .withColumn("INPUT_FILE_NAME", input_file_name()) #cria a coluna para armazenar o nome do arquivo o qual o dado foi lido
  .withColumn("PROCESSED_TIME", current_timestamp()) #adiciona o tempo em que o dado foi processado
  .withWatermark("PROCESSED_TIME", "1 minute") #adiciona a janela de tempo para a leitura (marca d'água)
)

In [23]:
# define a saída (sink)
query = (
  streamingDF
  .writeStream
  .format("parquet") #define o formato do arquivo a ser utilizado (parquet)
  .option("path", output_path) #define o local onde os arquivos devem ser adicionados
  .option("checkpointLocation", checkpoint_path) # define o checkpoint para garantir a tolerãncia a falhas
  .outputMode("append") # define o modo de saída para os dados
  .queryName("logs") #define o nome para a consulta
  .trigger(processingTime='5 seconds') #define o "tempo de processamento" para cada dado recebido
  .start() #inicia o processamento em streaming
)

**Visualizando as Saídas em Tempo Real**

In [25]:
#cria a tabela para realizar as consultas sobre os dados que estãos sendo lidos
streamingDF.createOrReplaceTempView("logs_table_in")

In [26]:
%sql select COUNT(*) from logs_table_in where http_reply = 200 

count(1)
1701451


**Realizando a leitura dos dados na pasta de saída**

In [28]:
#define o esquema para a leitura dos dados que estão na pasta de saída
#esquema modificado para que os dados possam ser adicionados ao formato de saída
schemaSaida = (
  StructType()
  .add("timestamp", TimestampType()) #event time 
  .add("bytes", LongType())
  .add("host", StringType())
  .add("http_reply", IntegerType())
  .add("request", StringType())
  .add("INPUT_FILE_NAME", StringType()) #Nome do arquivo em que o dado do sensor foi criado 
  .add("PROCESSED_TIME", TimestampType()) #timestamp para o processamento dos dados
)

In [29]:
spark.conf.set("spark.sql.shuffle.partitions", "1") #define valor para os shuffles

#define a configuração para a leitura
saidasLogs = (
  spark.readStream
  .schema(schemaSaida) #lê os dados através do esquema definido para a transformação
  .format("parquet") #define o formato dos aquivos de leitura
  .option("maxFilesPerTrigger", 1) #Mantém a leitura dos dados como 1 arquivo para a demonstração ser mais lenta
  .load(output_path) # indica de onde o arquivo será lido 
  .withWatermark("PROCESSED_TIME", "1 minute") #define a janela de tempo para a leitura dos dados
)

In [30]:
# define a tabela temporária para que seja possível realizar as consultas sobre os dados utilizando o SQL 
saidasLogs.createOrReplaceTempView("logs_table_out")

In [31]:
%sql select COUNT(*) from logs_table_out

count(1)
1126329


Valor médio para cada host

In [33]:
%sql
SELECT host, AVG(bytes) as media_bytes FROM logs_table_out GROUP BY host

host,media_bytes
204.138.186.16,27859.46153846154
gillside.bfsec.bt.co.uk,11574.5
162.127.2.238,27071.666666666668
137.98.196.2,6662.473684210527
165.113.8.64,46885.75
msp7-9.nas.mr.net,14373.8
ad03-010.compuserve.com,188355.0
startide.odyssey.com.au,9911.47619047619
161.142.162.11,2497.5
b17.ppp.mo.net,5610.25


Linhas com o campo "request" diferentes.

In [35]:
%sql 
SELECT DISTINCT request FROM logs_table_out GROUP BY request 

request
GET /shuttle/technology/sts-newsref/overview.txt HTTP/1.0
GET /shuttle/missions/sts-71/images/KSC-95EC-0916.jpg
GET /htbin/wais.pl?LIDAR HTTP/1.0
"GET /cgi-bin/imagemap/countdown?107,111 HTTP/1.0"
"GET /cgi-bin/imagemap/fr?43,43?19,457 HTTP/1.0"
"GET /cgi-bin/imagemap/countdown?397,152 HTTP/1.0"
GET /history/gemini/gemini-viii/gemini-viii-patch-small.gif HTTP/1.0
"GET /cgi-bin/imagemap/countdown?226,199 HTTP/1.0"
GET /htbin/wais.pl?Jack+Swigert HTTP/1.0
GET /statistics/1993/Dec/Dec93_request.gif HTTP/1.0
