In [1]:
#importando as bibliotecas
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import csv


#localização dos arquivos no Databricks
data_a = "/FileStore/tables/NASA_Jul95.csv"
data_b = "/FileStore/tables/NASA_Aug95.csv"

In [2]:
#Iniciando a sessão Spark usando SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark - Nasa") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
#Lendo CSV data_a data_b
df_data_a = spark.read.format("csv")\
            .option("delimiter", " ")\
            .option("multilines", "true")\
            .option("quotes", '""')\
            .option("header", "false")\
            .load(data_a)\
            .toDF("host", "a", "b", "dias", "time_zone", "request", "http", "bytes")

df_data_b = spark.read.format("csv")\
            .option("delimiter", " ")\
            .option("multilines", "true")\
            .option("quotes", '""')\
            .option("header", "false")\
            .load(data_b)\
            .toDF("host", "a", "b", "dias", "time_zone", "request", "http", "bytes")

In [4]:
#mostrando os dez primeiros resultados de NASA_Jul95
df_data_a.show(10,truncate = False)

In [5]:
#Vamos tirar algumas colunas desnecessárias em nosso data Frame utilizando spark.sql
df_data_a.createOrReplaceTempView("NASA_Jul95")
df_data_b.createOrReplaceTempView("NASA_Aug95")

df1 = spark.sql("Select host, dias, request, http, bytes from NASA_Jul95")
df2 = spark.sql("Select host, dias, request, http, bytes from NASA_Aug95")

#Vamos fazer um union dos dois datasets para respondermos as perguntas abaixo 
access_log = df1.union(df2)

In [6]:
#Vamos utlizar a função select com filtro em host e distinct

access_log.select("host").distinct().count()

In [7]:
access_log.select('host').distinct().count()




In [8]:
#Existem algumas maneiras de fazermos esse filtro pelo código de erro 404, primeiro vamos usar código SQL
access_log.createOrReplaceTempView("Erro404")

spark.sql("Select * from Erro404 where http='404'").count()

In [9]:
#Segundo vamos usar filter
access_log.filter(access_log['http'].contains('404')).count()

In [10]:
#Primeiro exemplo vamos usar SQL para mostrar as 5 URLs com mais erros
spark.sql("Select host, count(host) as qtde from Erro404 where http='404' group by host order by 2 desc limit 5").show(10, truncate=False)

In [11]:
#Segundo vamos utilizar funções
access_log.select('host').filter(access_log['http'].contains('404')).groupBy('host').count().orderBy('count',ascending=False).limit(5).show(truncate=False)

In [12]:
#Quantidade de erros por dia

def get_dia(timestamp):
    data = timestamp.split('/')
    return data[0][1:] + "/" + data[1] + "/" + data[2][:4]

erros = access_log.filter(access_log['http'].contains('404'))

udf_custom = F.udf(get_dia)
dias = erros.withColumn("Erros_dias", udf_custom(erros["dias"]))

dias.select("Erros_dias").groupBy("Erros_dias").count().orderBy("count", ascending=False)

dias.show()

In [13]:
spark.sql("Select substring(dias, 2,11) as dias from Erro404 where http='404'").show(10, truncate=False)


In [14]:
access_log.select(F.sum('bytes')).show()

In [15]:
access_log.createOrReplaceTempView("TotalBytes")

spark.sql("Select sum(bytes) as total_bytes from TotalBytes").show(truncate=False)