##  BIGDATA -  Ingestão de Dados

###  ETL RDD

### Claudio Gervasio de Lisboa  ABR/2019

In [1]:
import pyspark.sql.functions as f
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructField, StructType, StringType, DateType, IntegerType,LongType, DecimalType, DoubleType
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.functions import date_format, dayofmonth, hour, col, max as max_, split, regexp_extract

from datetime import datetime, date

In [2]:
# Iniciar spark session
_spark = SparkSession.builder \
                    .master("local") \
                    .appName("ETL") \
                    .getOrCreate()
_sqlContext = SQLContext(_spark)

In [3]:
# Files localization ON Windows
_diretorio = 'C:\\dump\\sm\\\etl_pyspark\\dados\\mock\\nasa'

# Diretório Docker teste !!!!
_diretorio = '/home/jovyan/work/etl_pyspark/dados/mock/nasa'

df_nasa_log  = _sqlContext.read.text(_diretorio)

### RDD Ingestão APACHE LOG

In [4]:
# Dataframes
df_nasa_view = df_nasa_log.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host'),
                                           regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]',
                                                          1).alias('timestamp'),
                                           regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('url'),
                                           regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias(
                                               'status'),
                                           regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias(
                                               'bytes'))

In [5]:
# Tratamento dos dados
df_nasa_view.show(n=5,truncate=False)

+------------------+--------------------------+-----------------------------------------------+------+-----+
|host              |timestamp                 |url                                            |status|bytes|
+------------------+--------------------------+-----------------------------------------------+------+-----+
|in24.inetnebr.com |01/Aug/1995:00:00:01 -0400|/shuttle/missions/sts-68/news/sts-68-mcc-05.txt|200   |1839 |
|uplherc.upl.com   |01/Aug/1995:00:00:07 -0400|/                                              |304   |0    |
|uplherc.upl.com   |01/Aug/1995:00:00:08 -0400|/images/ksclogo-medium.gif                     |304   |0    |
|uplherc.upl.com   |01/Aug/1995:00:00:08 -0400|/images/MOSAIC-logosmall.gif                   |304   |0    |
|uplherc.upl.com   |01/Aug/1995:00:00:08 -0400|/images/USA-logosmall.gif                      |304   |0    |
+------------------+--------------------------+-----------------------------------------------+------+-----+
only showing top 5 

### RDD em CACHE para otimizar a leitura das informações em memória

In [6]:
# RDD Cache

df_nasa_view.cache()

DataFrame[host: string, timestamp: string, url: string, status: int, bytes: int]

## Data VIEW - 1 - Número de hosts únicos.

In [7]:
# Data VIEW - 1 - Número​ ​ de​ ​ hosts​ ​ únicos.
def view_d1():
    df = df_nasa_view.groupBy('host') \
        .agg(f.count('host').alias('total')) \
        .filter(col('total') == 1) \
        .count()

    print('Data VIEW - 1 - Número​ ​ de​ ​ hosts​ ​ únicos = {0}'.format(df))

In [8]:
view_d1()

Data VIEW - 1 - Número​ ​ de​ ​ hosts​ ​ únicos = 9269


###  Data VIEW - 2. O total de erros 404.

In [9]:
# Data VIEW - 2. O total de erros 404.
def view_d2():
    print('Data VIEW - 2. O total de erros 404.')
    df_nasa_view.groupBy('status') \
                .agg(f.count('status').alias('total')) \
                .filter(col('status') == '404') \
                .show(truncate=False)

In [10]:
view_d2()

Data VIEW - 2. O total de erros 404.
+------+-----+
|status|total|
+------+-----+
|404   |20901|
+------+-----+



### Data View - 3. Os 5 URLs que mais causaram erro 404.

In [11]:
# Data View - 3. Os 5 URLs que mais causaram erro 404.
def view_d3():
    print('Data View - 3. Os 5 URLs que mais causaram erro 404.')
    df_nasa_view.groupBy('url','status') \
                .agg(f.count('url').alias('total')) \
                .where( col('status') == '404') \
                .sort(col('total').desc()) \
                .show(n=5, truncate=False)

In [12]:
view_d3()

Data View - 3. Os 5 URLs que mais causaram erro 404.
+--------------------------------------------+------+-----+
|url                                         |status|total|
+--------------------------------------------+------+-----+
|/pub/winvn/readme.txt                       |404   |2004 |
|/pub/winvn/release.txt                      |404   |1732 |
|/shuttle/missions/STS-69/mission-STS-69.html|404   |682  |
|/shuttle/missions/sts-68/ksc-upclose.gif    |404   |426  |
|/history/apollo/a-001/a-001-patch-small.gif |404   |384  |
+--------------------------------------------+------+-----+
only showing top 5 rows



### Data View - 4. Quantidade de erros 404 por dia.

In [13]:
# Data View - 4. Quantidade de erros 404 por dia.
def view_d4():
    print('Data View - 4. Quantidade de erros 404 por dia.')
    df_nasa_view.withColumn('dia', f.col('timestamp').substr(1,2)) \
                .groupBy('status', col('dia')) \
                .agg(f.count('status').alias('total')) \
                .where( col('status') == '404') \
                .distinct() \
                .sort(col('dia').asc(), col('total').desc()) \
                .show(truncate=False)

In [14]:
view_d4()

Data View - 4. Quantidade de erros 404 por dia.
+------+---+-----+
|status|dia|total|
+------+---+-----+
|404   |01 |559  |
|404   |02 |291  |
|404   |03 |778  |
|404   |04 |705  |
|404   |05 |733  |
|404   |06 |1013 |
|404   |07 |1107 |
|404   |08 |693  |
|404   |09 |627  |
|404   |10 |713  |
|404   |11 |734  |
|404   |12 |667  |
|404   |13 |748  |
|404   |14 |700  |
|404   |15 |581  |
|404   |16 |516  |
|404   |17 |677  |
|404   |18 |721  |
|404   |19 |848  |
|404   |20 |740  |
+------+---+-----+
only showing top 20 rows



### Data View - 5. O total de bytes retornados.

In [16]:
# Data View - 5. O total de bytes retornados.
def view_d5():
    print('Data View - 5. O​ ​total de​ ​bytes​ retornados.')
    df_nasa_view.agg(f.sum('bytes').alias('total_bytes')) \
                .sort(col('total_bytes').desc()) \
                .show(n=5, truncate=False)

In [17]:
view_d5()

Data View - 5. O​ ​total de​ ​bytes​ retornados.
+-----------+
|total_bytes|
+-----------+
|65524314915|
+-----------+

