In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder\
                    .appName('Processo')\
                    .getOrCreate()

# Preparando dados no HDFS

In [3]:
!ls

NASA_access_log_Aug95  NASA_access_log_Jul95  Semantix.ipynb


In [4]:
!hdfs dfs -ls /

Found 11 items
drwxrwxrwx   - yarn   hadoop          0 2018-06-18 15:18 /app-logs
drwxr-xr-x   - hdfs   hdfs            0 2018-06-18 16:13 /apps
drwxr-xr-x   - yarn   hadoop          0 2018-06-18 14:52 /ats
drwxr-xr-x   - hdfs   hdfs            0 2018-06-18 14:52 /hdp
drwx------   - livy   hdfs            0 2018-06-18 15:11 /livy2-recovery
drwxr-xr-x   - mapred hdfs            0 2018-06-18 14:52 /mapred
drwxrwxrwx   - mapred hadoop          0 2018-06-18 14:52 /mr-history
drwxr-xr-x   - hdfs   hdfs            0 2018-06-18 15:59 /ranger
drwxrwxrwx   - spark  hadoop          0 2019-08-17 20:45 /spark2-history
drwxrwxrwx   - hdfs   hdfs            0 2018-06-18 16:06 /tmp
drwxr-xr-x   - hdfs   hdfs            0 2018-06-18 16:08 /user


In [5]:
!hdfs dfs -mkdir /semantix
!hdfs dfs -ls /

Found 12 items
drwxrwxrwx   - yarn   hadoop          0 2018-06-18 15:18 /app-logs
drwxr-xr-x   - hdfs   hdfs            0 2018-06-18 16:13 /apps
drwxr-xr-x   - yarn   hadoop          0 2018-06-18 14:52 /ats
drwxr-xr-x   - hdfs   hdfs            0 2018-06-18 14:52 /hdp
drwx------   - livy   hdfs            0 2018-06-18 15:11 /livy2-recovery
drwxr-xr-x   - mapred hdfs            0 2018-06-18 14:52 /mapred
drwxrwxrwx   - mapred hadoop          0 2018-06-18 14:52 /mr-history
drwxr-xr-x   - hdfs   hdfs            0 2018-06-18 15:59 /ranger
drwxr-xr-x   - root   hdfs            0 2019-08-17 20:45 /semantix
drwxrwxrwx   - spark  hadoop          0 2019-08-17 20:45 /spark2-history
drwxrwxrwx   - hdfs   hdfs            0 2018-06-18 16:06 /tmp
drwxr-xr-x   - hdfs   hdfs            0 2018-06-18 16:08 /user


In [6]:
!ls -lha

total 356M
drwxr-xr-x 3 root root 4.0K Aug 17 20:43 .
dr-xr-x--- 1 root root 4.0K Aug 17 19:06 ..
drwxr-xr-x 2 root root 4.0K Aug 17 19:07 .ipynb_checkpoints
-rw-r--r-- 1 root root 161M Aug 17 19:20 NASA_access_log_Aug95
-rw-r--r-- 1 root root 196M Aug 17 19:08 NASA_access_log_Jul95
-rw-r--r-- 1 root root  21K Aug 17 20:43 Semantix.ipynb


In [7]:
!hdfs dfs -put NASA* /semantix
!hdfs dfs -ls /semantix

Found 2 items
-rw-r--r--   1 root hdfs  167813770 2019-08-17 20:45 /semantix/NASA_access_log_Aug95
-rw-r--r--   1 root hdfs  205242368 2019-08-17 20:45 /semantix/NASA_access_log_Jul95


# Carregando dados no spark

In [8]:
df1 = spark.read.text("/semantix/NASA_access_log_Aug95")
df1.show(10)

+--------------------+
|               value|
+--------------------+
|in24.inetnebr.com...|
|uplherc.upl.com -...|
|uplherc.upl.com -...|
|uplherc.upl.com -...|
|uplherc.upl.com -...|
|ix-esc-ca2-07.ix....|
|uplherc.upl.com -...|
|slppp6.intermind....|
|piweba4y.prodigy....|
|slppp6.intermind....|
+--------------------+
only showing top 10 rows



In [9]:
df2 = spark.read.text("/semantix/NASA_access_log_Jul95")
df2.show(10)

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
|burger.letters.co...|
|burger.letters.co...|
|205.212.115.106 -...|
|d104.aa.net - - [...|
|129.94.144.152 - ...|
+--------------------+
only showing top 10 rows



## Verificando carregamento

In [10]:
df1.count()

1569898

In [11]:
df2.count()

1891715

## Juntando os arquivos

In [12]:
df = df1.union(df2)
df.count()

3461613

# Tratamento dos dados e organização do dataframe

In [14]:
df_table = df.select( 
            F.regexp_extract('value', r'^([^\s]+\s)', 1).alias('HOST'),
            F.regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('TIMESTAMP'),
            F.regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('REQUISITION'),
            F.regexp_extract('value', r'^.*"\s+([^\s]+)', 1).alias('STATUS'),
            F.regexp_extract('value', r'^.*\s+(\d+)$', 1).alias('SIZE')
            )

df_table.show()

+--------------------+--------------------+--------------------+------+-----+
|                HOST|           TIMESTAMP|         REQUISITION|STATUS| SIZE|
+--------------------+--------------------+--------------------+------+-----+
|  in24.inetnebr.com |01/Aug/1995:00:00...|/shuttle/missions...|   200| 1839|
|    uplherc.upl.com |01/Aug/1995:00:00...|                   /|   304|    0|
|    uplherc.upl.com |01/Aug/1995:00:00...|/images/ksclogo-m...|   304|    0|
|    uplherc.upl.com |01/Aug/1995:00:00...|/images/MOSAIC-lo...|   304|    0|
|    uplherc.upl.com |01/Aug/1995:00:00...|/images/USA-logos...|   304|    0|
|ix-esc-ca2-07.ix....|01/Aug/1995:00:00...|/images/launch-lo...|   200| 1713|
|    uplherc.upl.com |01/Aug/1995:00:00...|/images/WORLD-log...|   304|    0|
|slppp6.intermind....|01/Aug/1995:00:00...|/history/skylab/s...|   200| 1687|
|piweba4y.prodigy....|01/Aug/1995:00:00...|/images/launchmed...|   200|11853|
|slppp6.intermind....|01/Aug/1995:00:00...|/history/skylab/s...|

In [15]:
df_table.count()

3461613

## Tratamento de data e tipagem

In [16]:
df_table = df_table\
.where(F.trim(df_table.TIMESTAMP) != "")\
.select(df_table.HOST, 
        F.when(df_table.TIMESTAMP.like("%Aug%"),\
                F.from_unixtime(F.unix_timestamp(\
                                   F.regexp_replace(df_table.TIMESTAMP, "Aug", "08"), 'dd/MM/yyyy'))
                                                 )\
         .otherwise(F.from_unixtime(F.unix_timestamp(
                                        F.regexp_replace(df_table.TIMESTAMP, "Jul", "07"), 'dd/MM/yyyy')))\
         .cast("date")\
         .alias("DATE"),
            df_table.REQUISITION, df_table.STATUS.cast('integer'), df_table.SIZE.cast('integer')
                        )

df_table.show()

+--------------------+----------+--------------------+------+-----+
|                HOST|      DATE|         REQUISITION|STATUS| SIZE|
+--------------------+----------+--------------------+------+-----+
|  in24.inetnebr.com |1995-08-01|/shuttle/missions...|   200| 1839|
|    uplherc.upl.com |1995-08-01|                   /|   304|    0|
|    uplherc.upl.com |1995-08-01|/images/ksclogo-m...|   304|    0|
|    uplherc.upl.com |1995-08-01|/images/MOSAIC-lo...|   304|    0|
|    uplherc.upl.com |1995-08-01|/images/USA-logos...|   304|    0|
|ix-esc-ca2-07.ix....|1995-08-01|/images/launch-lo...|   200| 1713|
|    uplherc.upl.com |1995-08-01|/images/WORLD-log...|   304|    0|
|slppp6.intermind....|1995-08-01|/history/skylab/s...|   200| 1687|
|piweba4y.prodigy....|1995-08-01|/images/launchmed...|   200|11853|
|slppp6.intermind....|1995-08-01|/history/skylab/s...|   200| 9202|
|slppp6.intermind....|1995-08-01|/images/ksclogosm...|   200| 3635|
|ix-esc-ca2-07.ix....|1995-08-01|/history/apollo

## Questões

### 1. Número de hosts únicos.

In [18]:
df_table.select(df_table.HOST).distinct().count()

137978

### 2. O total de erros 404.

In [19]:
df_table.where(df_table.STATUS == 404).count()

20901

### 3. Os 5 URLs que mais causaram erro 404.

In [20]:
df_table.select(df_table.HOST,df_table.STATUS)\
          .where(df_table.STATUS == 404)\
          .groupBy(df_table.HOST)\
          .count()\
          .withColumnRenamed("count", "COUNT_404")\
          .orderBy("COUNT_404", ascending = False)\
          .select("HOST", "COUNT_404")\
          .show(5)

+--------------------+---------+
|                HOST|COUNT_404|
+--------------------+---------+
|hoohoo.ncsa.uiuc....|      251|
|piweba3y.prodigy....|      157|
|jbiagioni.npt.nuw...|      132|
|piweba1y.prodigy....|      114|
|www-d4.proxy.aol....|       91|
+--------------------+---------+
only showing top 5 rows



### 4. Quantidade de erros 404 por dia.

In [21]:
df_table.select(df_table.DATE, df_table.STATUS)\
          .where(df_table.STATUS == 404)\
          .groupBy(df_table.DATE)\
          .count()\
          .withColumnRenamed("count", "COUNT_404")\
          .orderBy("DATE")\
          .show(30)

+----------+---------+
|      DATE|COUNT_404|
+----------+---------+
|1995-07-01|      316|
|1995-07-02|      291|
|1995-07-03|      474|
|1995-07-04|      359|
|1995-07-05|      497|
|1995-07-06|      640|
|1995-07-07|      570|
|1995-07-08|      302|
|1995-07-09|      348|
|1995-07-10|      398|
|1995-07-11|      471|
|1995-07-12|      471|
|1995-07-13|      532|
|1995-07-14|      413|
|1995-07-15|      254|
|1995-07-16|      257|
|1995-07-17|      406|
|1995-07-18|      465|
|1995-07-19|      639|
|1995-07-20|      428|
|1995-07-21|      334|
|1995-07-22|      192|
|1995-07-23|      233|
|1995-07-24|      328|
|1995-07-25|      461|
|1995-07-26|      336|
|1995-07-27|      336|
|1995-07-28|       94|
|1995-08-01|      243|
|1995-08-03|      304|
+----------+---------+
only showing top 30 rows



### 5. O total de bytes retornados.

In [22]:
df_table.select(F.sum(df_table.SIZE).alias("TOTAL")).show()

+-----------+
|      TOTAL|
+-----------+
|65524314915|
+-----------+



In [23]:
gb=1073741824

In [24]:
size_table = df_table.select(F.sum(df_table.SIZE).alias("TOTAL"))

In [25]:
size_table.select("TOTAL").withColumn('TOTAL_GB', (size_table.TOTAL/gb).cast('integer')).show()

+-----------+--------+
|      TOTAL|TOTAL_GB|
+-----------+--------+
|65524314915|      61|
+-----------+--------+

