# Preparação dos dados

Limpeza, conversões e agregações utilizada para preparar o conjuto de dados para análises posteriores.

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

from datetime import datetime

try:
    sc = SparkContext('local[*]')
    sc.setLogLevel("OFF")
    
    spark = SparkSession(sc)
except ValueError:
    # Prevent the notebook to crash if this block is executed more then once
    pass

import re
import glob
import functools
import shutil

In [2]:
def save_dataset(df, name):
    try:
        shutil.rmtree("./data/ds/%s" % name)
    except FileNotFoundError:
        pass
    df.write.parquet("./data/ds/%s" % name)

## Arquivo OUI

Processa o arquivo *IEEE Organizationally unique identifier (OUI)* que identifica os fabricantes de placas de redes baseado no prefixo do endereço MAC deste equipamentos.

Exporta o conjunto de dados para o arquivo: `data/ds/oui.parquet`

In [3]:
# From: https://www.gsmarena.com/makers.php3
# .main-makers table
# ((<tr>)?<td><a (.+?)>|</a></td>(</tr>)?)
# (<br><span>|</span>)
makers = spark.read.csv("./data/makers.csv", header=True, inferSchema=True)

makers.orderBy(desc("devices")).toPandas().head(10)

Unnamed: 0,maker,devices
0,Samsung,1133
1,LG,602
2,Nokia,459
3,Motorola,454
4,alcatel,376
5,Micromax,276
6,Huawei,269
7,BLU,262
8,HTC,255
9,ZTE,240


In [4]:
oui_parser = re.compile(r"(?P<prefix>([0-9A-F]{2}-?){3})\s{1,}\(hex\)\t{2}(?P<organization>.*)")

def tupled(txt):
    m = oui_parser.search(txt)
    p, o = (m.group("prefix").replace("-", ":"), m.group("organization"))
    return (p, o)
    
oui = (
    sc.textFile("./data/oui.txt")
    .filter(lambda x: oui_parser.search(x))
    .map(tupled)
    .toDF().toDF("prefix", "organization")
)

(
    oui
    .groupBy("organization")
    .count()
    .orderBy(desc("count"))
).toPandas().head(20)

Unnamed: 0,organization,count
0,"Cisco Systems, Inc",802
1,"Apple, Inc.",631
2,"Samsung Electronics Co.,Ltd",474
3,"HUAWEI TECHNOLOGIES CO.,LTD",426
4,"ARRIS Group, Inc.",276
5,Intel Corporate,226
6,Texas Instruments,173
7,zte corporation,147
8,Hewlett Packard,140
9,"Hon Hai Precision Ind. Co.,Ltd.",129


In [5]:
oui.count()

24627

In [6]:
import sys

stop_words = ["", "inc", "co", "ltd", "coltd", "llc", "oo", "oy", "cisco", "shenzhen", "electromechanics"]

msx = makers.withColumn("soundex", soundex("maker")).drop("devices")

osx = (    
    oui
    .withColumn("bits", split("organization", " "))
    .select(col("prefix"), col("organization"), explode("bits").alias("bit"))
    .withColumn("safeBit", lower(regexp_replace("bit", "[^A-Za-z0-9]", "")))
    .filter(~col("safeBit").isin(*stop_words))
    .withColumn("soundex", soundex("safeBit"))
    .drop("bit")
)

makers_oui = (
    osx
    .join(msx, msx.soundex == osx.soundex)
    .withColumn("distance", levenshtein("safeBit", lower(col("maker"))))
    .filter(col("distance") == 0)
    .drop("soundex", "safeBit", "distance")
    .distinct()
    .orderBy("organization")
)

save_dataset(makers_oui, "oui.parquet")

makers_oui.groupBy("maker").count().orderBy(desc("count")).sample(True, 0.123).toPandas().head()

Unnamed: 0,maker,count
0,Philips,19
1,HP,9
2,BLU,7
3,Casio,4
4,Sonim,1


## Pesquisa visual


Converte os dados da pesquisa visual para os tipos corretos.

Exporta o conjunto de dados para o arquivo: `data/ds/survey.parquet`

In [7]:
survey = (
    spark.read.csv("./data/survey.csv", header=True)
    .withColumn("date", to_date("rideDate"))
    .withColumn("timestamp", to_timestamp(col("collectedAt")))
    .withColumnRenamed("occupation", "occupationString")
    .withColumn("occupation", col("occupationString").cast("int"))
    .drop("rideDate", "rideId", "collectedAt", "occupationString")
)

save_dataset(survey, "survey.parquet")

survey_report = (
    survey
    .groupBy("date")
    .agg(
        date_format(min("timestamp"), "HH:mm:ss").alias("start"), 
        date_format(max("timestamp"), "HH:mm:ss").alias("end"), 
        count("occupation").alias("count")
    )
    .withColumn("day", date_format("date", "EEEE"))
    .select("date", "day", "start", "end", "count")
    .orderBy("date")
)

survey_report.toPandas()

Unnamed: 0,date,day,start,end,count
0,2017-11-29,Wednesday,07:56:27,09:45:36,38
1,2017-12-06,Wednesday,19:03:06,21:23:48,46
2,2017-12-07,Thursday,07:56:47,09:36:31,43
3,2017-12-08,Friday,19:24:08,22:21:25,46
4,2017-12-11,Monday,08:26:26,10:18:28,36
5,2017-12-12,Tuesday,19:13:35,21:38:29,36
6,2017-12-13,Wednesday,12:57:52,14:56:19,33


In [11]:
noop_filter = re.compile(r"[ -~]+")

adjust = udf(lambda x: x if x < 20 else 20, LongType()) 

survey_info = survey_report.select("date", "start", "end").withColumnRenamed("date", "dateS").cache()

def fixTimestamp(df):
    datePart = Window.partitionBy("date")
    nextRow = datePart.orderBy("timestamp").rowsBetween(Window.currentRow, 1)
    acumRow = datePart.orderBy(asc("timestamp")).rowsBetween(Window.unboundedPreceding, Window.currentRow)

    return (
        df  
        # 1) Calculate the amount of seconds elapsed since the application started
        .withColumn("next", last("timestamp").over(nextRow))
        .withColumn("secs", adjust(col("next").cast("long") - col("timestamp").cast("long")))
        .withColumn("secsElapsed", sum("secs").over(acumRow))
        .withColumn("maxSecs", max("secsElapsed").over(datePart))
        .withColumn("secsElapsedDesc", col("maxSecs") - col("secsElapsed"))
        .drop("next", "secs", "maxSecs")
        # 2) Get the last known hour of the survey by each subset
        .join(survey_info, col("date") == col("dateS"))
        # 3) Recalculate the timestamp based of the last know position and the elapsed seconds 
        .withColumn("endSecs", to_timestamp(concat_ws(" ", "date", "end"), "yyyy-MM-dd HH:mm:ss").cast("long"))
        .withColumn("timestampA", (col("endSecs") - col("secsElapsedDesc")).cast("timestamp"))
        .drop("dateS", "start", "endSecs", "secsElapsed", "secsElapsedDesc", "timestamp")
        .withColumnRenamed("timestampA", "timestamp")
    )

def parseAll(pattern, cols, filter_regex=noop_filter):
    def parseFile(path):
        return (
            sc
            .textFile(path)
            .filter(lambda x: filter_regex.search(x))
            .map(lambda x: x.split('\t'))
            .filter(lambda x: len(x) == len(cols))
            .toDF().toDF(*cols)
            .withColumn("date", lit(path[-14:-4]))
        )

    paths = glob.glob(pattern)
    logs  = map(lambda x: parseFile(x), paths)
    df    = (
        functools.reduce(lambda x, y: x.union(y), logs)
        .withColumnRenamed("timestamp", "timestampStr")
        .withColumn("timestamp", to_timestamp("timestampStr", "yyyy-MM-dd HH:mm:ss,SSS"))
        .drop("level", "timestampStr")
        .orderBy("timestamp")
        .filter(col("timestamp").isNotNull())
    )
    return fixTimestamp(df)


## Dados do WiFi

Filtra, concatena e ajusta a hora dos arquivos de log obtidos pelo WiFi.

Exporta o conjunto de dados para o arquivo: `data/ds/wifi-log.parquet`

In [12]:
mac_address_filter = re.compile(r"([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})")
columns = ['timestamp', 'level', 'mac', 'ssid', 'rssi']

wifi_df = parseAll("./data/wifi-apc.*.log", columns, mac_address_filter)

save_dataset(wifi_df, "wifi-log.parquet")

# Summary
(
    wifi_df
    .groupBy("date")
    .agg(

        date_format(min("timestamp"), "HH:mm:ss").alias("start"), 
        date_format(max("timestamp"), "HH:mm:ss").alias("end"), 
        count("mac").alias("count")
    )
    .withColumn("day", date_format("date", "EEEE"))
    .select("date", "day", "start", "end", "count")
    .orderBy("date")
).toPandas()

# 2 	2017-12-07 	Thursday 	07:56:47 	09:36:31 	43
# 3 	2017-12-08 	Friday 	19:24:08 	22:21:25 	46

Unnamed: 0,date,day,start,end,count
0,2017-11-29,Wednesday,05:47:30,09:45:36,144815
1,2017-12-06,Wednesday,18:51:49,21:23:48,158350
2,2017-12-07,Thursday,07:52:29,09:36:31,86472
3,2017-12-08,Friday,19:59:24,22:21:25,194200
4,2017-12-11,Monday,08:18:34,10:18:28,95707
5,2017-12-12,Tuesday,19:03:27,21:38:29,139417
6,2017-12-13,Wednesday,12:47:44,14:56:19,88207


## Dados do GPS

Filtra e concatena e ajusta a hora dos arquivos de log obtidos pelo GPS.

Exporta o conjunto de dados para o arquivo: `data/ds/gps-log.parquet`

In [10]:
cols = ['timestamp','level','lat','long','altitude','speed','satellites','mode','datetime']

gps_df = parseAll("./data/gps.*.log", cols)

save_dataset(gps_df, "gps-log.parquet")

# Summary
(
    gps_df
    .groupBy("date")
    .agg(

        date_format(min("timestamp"), "HH:mm:ss").alias("start"), 
        date_format(max("timestamp"), "HH:mm:ss").alias("end"), 
        count("lat").alias("count")
    )
    .withColumn("day", date_format("date", "EEEE"))
    .select("date", "day", "start", "end", "count")
    .orderBy("date")
).toPandas()

Unnamed: 0,date,day,start,end,count
0,2017-11-29,Wednesday,06:23:21,09:47:36,23279
1,2017-12-06,Wednesday,18:53:28,21:25:48,10026
2,2017-12-07,Thursday,07:54:22,09:38:31,6872
3,2017-12-08,Friday,19:59:23,22:23:25,12787
4,2017-12-11,Monday,08:20:23,10:20:28,7939
5,2017-12-12,Tuesday,17:08:08,21:40:29,18691
6,2017-12-13,Wednesday,14:55:35,14:58:19,167
