In [1]:
#Realizar upload dos arquivos e configuração da plataforma databricks antes de iniciar o processo

In [2]:
spark

In [3]:
sqlContext

In [4]:
#Importação pacotes necessários

from pyspark.context import SparkContext
from pyspark.sql.functions import regexp_extract
import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum


In [5]:
#Adicionando os arquivos e carregando para o banco df_0

file1 = "/FileStore/tables/NASA_access_log_Aug95.gz"
file2 = "/FileStore/tables/NASA_access_log_Jul95.gz"
file =[file1,file2]
df_0 = spark.read.text(file)

In [6]:
# Olhando o banco de dados df_0

df_0.printSchema()

In [7]:
#Observação: cada linha do arquivo foi carregada como uma coluna única de nome value
#Verificando o tipo de df_0

type(df_0)

In [8]:
#Olhando as 20 primeiras linhas do dataframe

df_0.show(20, truncate=False)


In [9]:
# Verificando o tamanho de df_0
(df_0.count(), len(df_0.columns))

In [10]:
#Configurando as expressões regulares para extração de dados:

re_host = r'(^\S+\.[\S+\.]+\S+)\s' #Host
re_timestamp = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]' # Timestamp
re_url = r'\"(\S+)\s(\S+)\s*(\S*)\"' #url -Requisição 
re_codigo = r'\s(\d{3})\s' # codigo http
re_bytes = r'\s(\d+)$' # bytes

In [11]:
#Usando as expressões regulares para se obter um novo dataframe

df_1 = df_0.select(regexp_extract('value', re_host, 1).alias('host'),
                         regexp_extract('value', re_timestamp, 1).alias('timestamp'),
                         regexp_extract('value', re_url, 2).alias('url'),
                         regexp_extract('value', re_codigo, 1).cast('integer').alias('codigo'),
                         regexp_extract('value', re_bytes, 1).cast('integer').alias('bytes'))

#Visualizando o dataframe

df_1.show(20, truncate=True)
print((df_1.count(), len(df_1.columns)))

In [12]:
#Verificando o número de linhas com null no dataframe original

df_0.filter(df_0['value'].isNull()).count()


In [13]:
# Verificando o número de linhas com alguma coluna nula

df_1_null = df_1.filter(df_1['host'].isNull()| 
                             df_1['timestamp'].isNull()| 
                             df_1['url'].isNull()|
                             df_1['codigo'].isNull()|
                             df_1['bytes'].isNull())
                            
df_1_null.count()

In [14]:
#Verificando em quais colunas estão os nulos

def count_null(colunas):
    return spark_sum(col(colunas).isNull().cast('integer')).alias(colunas)

tabela = [count_null(colunas) for colunas in df_1.columns]

df_1.agg(*tabela).show()

In [15]:
# Encontrando a linha onde o codigo é nulo

df_0.filter(~df_0['value'].rlike(r'\s(\d{3})\s')).show()


In [16]:
# Essa linha será excluída pois está incompleta, usando um editor de texto como o notepad++ é possível observar que esta é a última linha do arquivo de julho
# Como só existe uma única linha com código nulo vamos excluir essa linha

df_2 = df_1[df_1['codigo'].isNotNull()]

In [17]:
#Verificando o tamanho de df_2
df_2.count()

In [18]:
#Os valores nulo se encontram agora na última coluna, como se trata de bytes vou substituir esses nulos por 0
df_3 = df_2.na.fill({'bytes': 0})

In [19]:
#Verificando que não existem mais colunas nulas

tabela = [count_null(colunas) for colunas in df_3.columns]
df_3.agg(*tabela).show()


In [20]:
df_3.cache()

In [21]:
# salvando o arquivo

df_4 = df_3.select("*").toPandas()
df_4.to_csv("/dbfs/FileStore/dados.csv", sep=',',header = True)