![CMCC](http://cmcc.ufabc.edu.br/images/logo_site.jpg)
# ** Análise Exploratória **

#### Esse notebook introduz os conceitos de Análise Exploratória

#### Para isso utilizaremos a base de dados de [Crimes de São Francisco](https://www.kaggle.com/c/sf-crime) obtidos do site de competições [Kaggle](https://www.kaggle.com/).

#### ** Esse notebook contém:  **
#### *Parte 1:* *Parsing* da base de dados de Crimes de São Francisco
#### *Parte 2:* Estatísticas Básicas das Variáveis
#### *Parte 3:* Plotagem de Gráficos

#### Para os exercícios é aconselhável consultar a documentação da [API do PySpark](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

### ** Parte 1: Parsing da Base de Dados **

In [1]:
from pyspark import SparkContext
sc =SparkContext()

In [2]:
import os
import numpy as np

#"Data","Aula03","Crime.csv"
filename = os.path.join('C:/', 'Users', 'Toshiba', 'Documents', 'Andreia', 'Doutorado', '3Quadrimestre_2017', 'Big Data', 'Lab3','Crime.csv')
CrimeRDD = sc.textFile(filename,8)
header = CrimeRDD.take(1)[0] # o cabeçalho é a primeira linha do arquivo

print "Campos disponíveis: {}".format(header)

Campos disponíveis: Id,Dates,DayOfWeek,PdDistrict,Address,X,Y


#### Durante os exercícios precisaremos pular a linha do cabeçalho de tal forma a trabalhar apenas com a tabela de dados.

#### Uma forma de fazer isso é utilizando o comando `filter()` para eliminar toda linha igual a variável `header`.

In [3]:
# EXERCICIO
CrimeHeadlessRDD = CrimeRDD.filter(lambda x: x != header)

firstObject = CrimeHeadlessRDD.take(1)[0]
print firstObject

0,2015-05-10 23:59:00,Sunday,BAYVIEW,2000 Block of THOMAS AV,-122.39958770418998,37.7350510103906


In [4]:
# o assert não deu certo, mas conferi no arquivo e o exercício acima está correto
assert firstObject==u'2015-05-13 23:53:00,WARRANTS,WARRANT ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425891675136,37.7745985956747', 'valor incorreto'
print "OK"

AssertionError: valor incorreto

#### Agora temos um dataset em que cada linha é uma string contendo todos os valores. Porém, para explorarmos os dados precisamos que cada objeto seja uma lista de valores.

#### Utilize o comando `split()` para transformar os objetos em listas de strings.

In [6]:
# EXERCICIO
CrimeHeadlessRDD = (CrimeRDD
                    .filter(lambda x: x != header)
                    .map(lambda x: x.split())
                    )

firstObjectList = CrimeHeadlessRDD.take(1)[0]
print firstObjectList

[u'0,2015-05-10', u'23:59:00,Sunday,BAYVIEW,2000', u'Block', u'of', u'THOMAS', u'AV,-122.39958770418998,37.7350510103906']


In [7]:
#o assert não deu certo, pois a data da primeira linha do arquivo está diferente
assert firstObjectList[0]==u'2015-05-13 23:53:00', 'valores incorretos'
print "OK"

AssertionError: valores incorretos

In [8]:
print CrimeHeadlessRDD.take(2)[0]

[u'0,2015-05-10', u'23:59:00,Sunday,BAYVIEW,2000', u'Block', u'of', u'THOMAS', u'AV,-122.39958770418998,37.7350510103906']


In [33]:
# EXERCICIO

import re
import datetime
from collections import namedtuple

headeritems = header.split(',') # transformar o cabeçalho em lista
del headeritems[-1] # apagar o último item e...
headeritems[-1] = 'COORD' # transformar em COORD

# Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,COORD
Crime = namedtuple('Crime',headeritems) # gera a namedtuple Crime com os campos de header

REGEX = r',(?=(?:[^"]*"[^"]*")*(?![^"]*"))'
# buscar por "," tal que após essa vírgula (?=) ou exista um par de "" ou não tenha " sozinha
# ?= indica para procurarmos pelo padrão após a vírgula
# ?: significa para não interpretar os parênteses como captura de valores
# [^"]* 0 ou sequências de caracteres que não sejam aspas
# [^"]*"[^"]*"  <qualquer caracter exceto aspas> " <qualquer caracter exceto aspas> "
# ?! indica para verificar se não existe tal padrão a frente da vírgula


def ParseCrime(rec):
    # utilizando re.split() vamos capturar nossos valores
    Id, Dates, DayOfWeek, PdDistrict,  Address, X, Y = tuple(re.split(REGEX,rec))
    print Dates
    # Converta a data para o formato datetime
    Dates = datetime.datetime.strptime(Dates, "%Y-%m-%d %H:%M:%S")
    
  
   # COORD é uma tupla com floats representando X e Y
    COORD = (float(X), float(Y))
    
    # O campos 'Resolution' será uma lista dos valores separados por vírgula, sem as aspas
 #   Resolution =
    return Crime(Id, Dates, DayOfWeek, PdDistrict,  Address, COORD)


# Aplique a função ParseCrime para cada objeto da base
CrimeHeadlessRDD = (CrimeRDD
                    .filter(lambda x: x!= header)
                    .map(ParseCrime)
                    )

firstClean = CrimeHeadlessRDD.take(1)[0]
TotalRecs = CrimeHeadlessRDD.count()
print firstClean


Crime(Id=u'0', Dates=datetime.datetime(2015, 5, 10, 23, 59), DayOfWeek=u'Sunday', PdDistrict=u'BAYVIEW', Address=u'2000 Block of THOMAS AV', COORD=(-122.39958770418998, 37.7350510103906))


In [34]:
# o arquivo que baixei,está com cabeçalho diferente
assert type(firstClean.Dates) is datetime.datetime and type(firstClean.COORD) is tuple,'tipos incorretos'
print "OK"

assert CrimeHeadlessRDD.filter(lambda x: len(x)!=7).count()==0, 'algo deu errado!' #alterei para 7, pois no arquivo q baixei não tem 8
print "OK"

assert totalRecs==878049, 'total de registros incorreto'
print "OK"

OK


AssertionError: algo deu errado!

### ** Parte 2: Estatísticas Básicas das Variáveis **

#### Nessa parte do notebook vamos aprender a filtrar a base de dados para calcular estatísticas básicas necessárias para a análise exploratória.

#### ** (2a) Contagem de frequência **

#### A contagem de frequência é realizada de forma similar ao exercício de contagem de palavras. Primeiro mapeamos a variável de interesse. Como exemplo vamos gerar uma lista da quantidade total de cada tipo de crime (Category).

In [24]:
# EXERCICIO no arquivo que baixei não tem Category

CatCountRDD = (CrimeHeadlessRDD
               .
               .
               )
catCount = sorted(CatCountRDD.collect(), key=lambda x: -x[1])
print catCount



Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 177, in main
  File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in process
  File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 346, in func
    return f(iterator)
  File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 1926, in combine
    merger.mergeValues(iterator)
  File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 236, in mergeValues
    for k, v in iterator:
  File "<ipython-input-23-c1f07a4af8e0>", line 41, in <lambda>
  File "<ipython-input-23-c1f07a4af8e0>", line 25, in ParseCrime
ValueError: need more than 7 values to unpack

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:395)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 177, in main
  File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in process
  File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 346, in func
    return f(iterator)
  File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 1926, in combine
    merger.mergeValues(iterator)
  File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 236, in mergeValues
    for k, v in iterator:
  File "<ipython-input-23-c1f07a4af8e0>", line 41, in <lambda>
  File "<ipython-input-23-c1f07a4af8e0>", line 25, in ParseCrime
ValueError: need more than 7 values to unpack

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:395)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [None]:
assert catCount[0][1]==174900, 'valores incorretos'
print "OK"

#### De forma similar, vamos gerar a contagem para as regiões de São Francisco (PdDistrict).

In [36]:
# EXERCICIO

RegionCountRDD = (CrimeHeadlessRDD
                 .map(lambda x : (x.PdDistrict, 1))
                 .reduceByKey(lambda x, y : x+y)
                 )
regCount = sorted(RegionCountRDD.collect(), key=lambda x: -x[1])
print regCount


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 23.0 failed 1 times, most recent failure: Lost task 5.0 in stage 23.0 (TID 48, localhost, executor driver): java.io.IOException: fail to rename file C:\Users\Toshiba\AppData\Local\Temp\blockmgr-cd03291e-eba5-477f-bc97-83d3bec7fb22\37\shuffle_2_5_0.index.7257b9a2-b693-4716-8dcb-0386333e24df to C:\Users\Toshiba\AppData\Local\Temp\blockmgr-cd03291e-eba5-477f-bc97-83d3bec7fb22\37\shuffle_2_5_0.index
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:180)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:164)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: fail to rename file C:\Users\Toshiba\AppData\Local\Temp\blockmgr-cd03291e-eba5-477f-bc97-83d3bec7fb22\37\shuffle_2_5_0.index.7257b9a2-b693-4716-8dcb-0386333e24df to C:\Users\Toshiba\AppData\Local\Temp\blockmgr-cd03291e-eba5-477f-bc97-83d3bec7fb22\37\shuffle_2_5_0.index
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:180)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:164)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [None]:
assert regCount[0][1]==157182, 'valores incorretos'
print "OK"

#### **(2b) Cálculo da Média**

#### Nesse exercício vamos calcular a média de crimes em cada região para cada dia da semana. Para isso, primeiro devemos calcular a quantidade de dias de cada dia da semana que existem na base de dados, para isso vamos criar uma RDD de tuplas em que o primeiro campo é a tupla da data no formato 'dia-mes-ano' e do dia da semana e o segundo campo o valor $1$.

#### Em seguida, reduzimos a RDD sem efetuar a soma, mantendo o valor $1$. Essa redução filtra a RDD para que cada data apareça uma única vez. Ao final,  podemos efetuar o mapeamento de (DayOfWeek,1) e redução com soma para contabilizar quantas vezes cada dia da semana aparece na base de dados.

#### Nossa próxima RDD terá como chave uma tupla ( (DayOfWeek, PdDistrict), 1) para contabilizar quantos crimes ocorreram em determinada região e naquele dia da semana. Após a redução, devemos mapear esse RDD para (DayOfWeek, (PdDistrict, contagem)).

#### Finalmente, podemos juntar as duas RDDs uma vez que elas possuem a mesma chave (DayOfWeek), dessa forma teremos tuplas no formato ( DayOfWeek, ( (PdDistrict,contagem), contagemDiaDaSemana ) ). Isso deve ser mapeado para:

#### ( DayOfWeek, ( PdDistrict, contagem / contagemDiaDaSemana ) )

#### Lembrando de converter `contagemDiaDaSemana` para `float`. Finalmente, o resultado pode ser agrupado pela chave, gerando uma tupla ( DayOfWeek, [ (Pd1, media1), (Pd2, media2), ... ] ). Essa lista pode ser mapeada para um dicionário com o comando `dict`.

#### No final, transformamos o RDD em um dicionário Python com o comando `collectAsMap()`.

In [None]:
# EXERCICIO

from operator import add

# Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,COORD

# Lambda para converter um datetime em `Dia-Mes-Ano`
day2str = lambda x: '{}-{}-{}'.format(x.day,x.month,x.year)

#para isso vamos criar uma RDD de tuplas em que o primeiro campo é a tupla da data no formato 
#'dia-mes-ano' e do dia da semana e o segundo campo o valor $1$.

#### Em seguida, reduzimos a RDD sem efetuar a soma, mantendo o valor $1$. Essa redução filtra a RDD para que cada 
#data apareça uma única vez. Ao final,  podemos efetuar o mapeamento de (DayOfWeek,1)
#e redução com soma para contabilizar quantas vezes cada dia da semana aparece na base de dados.
totalDatesRDD = (CrimeHeadlessRDD
                 .map(lambda x: ((day2str(x.Dates),x.DayOfWeek),1))
                 .reduceByKey(lambda x, y : x)
                 .map(lambda x: (x[0][1], x[1]))
                 .reduceByKey(lambda x, y: x+y) 
                 )

#### Nossa próxima RDD terá como chave uma tupla ( (DayOfWeek, PdDistrict), 1) para contabilizar quantos
#crimes ocorreram em determinada região e naquele dia da semana. 
#Após a redução, devemos mapear esse RDD para (DayOfWeek, (PdDistrict, contagem)).
crimesWeekDayRegionRDD = (CrimeHeadlessRDD
                           .map(lambda x : ((x.DayOfWeek, x.PdDistrict),1))
                           .reduceByKey(lambda x, y: x+y)
                           .map(lambda x: (x[0][0],(x[0][1],x[1])))
                          )
#### Finalmente, podemos juntar as duas RDDs uma vez que elas possuem a mesma chave (DayOfWeek), dessa forma teremos
#tuplas no formato ( DayOfWeek, ( (PdDistrict,contagem), contagemDiaDaSemana ) ). Isso deve ser mapeado para:
#### ( DayOfWeek, ( PdDistrict, contagem / contagemDiaDaSemana ) )
#### Lembrando de converter `contagemDiaDaSemana` para `float`. Finalmente, o resultado pode ser agrupado pela chave,
#gerando uma tupla ( DayOfWeek, [ (Pd1, media1), (Pd2, media2), ... ] ). 
#Essa lista pode ser mapeada para um dicionário com o comando `dict`.
RegionAvgPerDayRDD = (crimesWeekDayRegionRDD
                      .
                      .
                      .
                      .
                     )

RegionAvg = RegionAvgPerDayRDD.collectAsMap()
print RegionAvg['Sunday']

In [None]:
assert np.round(RegionAvg['Sunday']['BAYVIEW'],2)==37.27, 'valores incorretos {}'.format(np.round(RegionAvg[0][2],2))
print "OK"

#### ** (2c) Média e Desvio-Padrão pelo PySpark **

#### Uma alternativa para calcular média, desvio-padrão e outros valores descritivos é utilizando os comandos internos do Spark. Para isso é necessário gerar uma RDD de listas de valores.

#### Gere uma RDD contendo a tupla ( (Dates,DayOfWeek, PdDistrict), contagem), mapeie para ( (DayOfWeek,PdDistrict), Contagem) e agrupe pela chave. Isso irá gerar uma RDD ( (DayOfWeek,PdDistrict), Iterador(contagens) ).

#### Agora crie um dicionário RegionAvgSpark, inicialmente vazio e colete apenas o primeiro elemento da tupla para a variável `Keys`. Itere essa variável realizando os seguintes passos:

* #### Se `key[0]` não existir no dicionário, crie a entrada `key[0]` como um dicionário vazio.
* #### Mapeie countWeekDayDistRDD filtrando por `key` e gerando a RDD com os valores da tupla. Note que não queremos uma lista de listas.
* #### Insira a tupla (media, desvio-padrão) utilizando os comandos `mean()` e `stdev()` do PySpark, armazenando na chave RegionAvgSpark[ key[0] ][ key[1] ].

In [None]:
# EXERCICIO

countWeekDayDistRDD = (CrimeHeadlessRDD
                       .<COMPLETAR>
                       .<COMPLETAR>
                       .<COMPLETAR>
                       .<COMPLETAR>
                       )

# Esse procedimento só é viável se existirem poucas chaves
RegionAvgSpark = {}
Keys = countWeekDayDistRDD.map(lambda rec: rec[0]).collect()
for key in Keys:
    listRDD = (countWeekDayDistRDD
               .filter(lambda rec: rec[0]==key)
               .flatMap(lambda rec: rec[1])
               )
    if key[0] not in RegionAvgSpark:
        RegionAvgSpark[key[0]] = {}    
    RegionAvgSpark[key[0]][key[1]] = (listRDD.mean(), listRDD.stdev())
    
print RegionAvgSpark['Sunday']

In [None]:
assert np.round(RegionAvgSpark['Sunday']['BAYVIEW'][0],2)==37.39 and np.round(RegionAvgSpark['Sunday']['BAYVIEW'][1],2)==10.06, 'valores incorretos'
print "OK"

### Parte 3: Plotagem de Gráficos

#### Nessa parte do notebook vamos aprender a manipular os dados para gerar listas de valores a serem utilizados na plotagem de gráficos.

#### Para a plotagem de gráficos vamos utilizar o [`matplotlib`](http://matplotlib.org/) que já vem por padrão na maioria das distribuições do Python (ex.: Anaconda). Outras bibliotecas alternativas interessantes são: [Seaborn](http://stanford.edu/~mwaskom/software/seaborn/) e [Bokeh](http://bokeh.pydata.org/en/latest/).

#### ** (3a) Gráfico de Barras **

#### O gráfico de barras é utilizado quando queremos comparar dados entre categorias diferentes de uma variável categórica. Como exemplo, vamos contabilizar o número médio de crimes diários por região.

#### Vamos primeiro criar a RDD totalDatesRDD que contém a lista de dias únicos, computaremos o total de dias com o comando `count()` armazenando na variável `totalDays`. Não se esqueça de converter o valor para `float`.

#### Em seguida, crie o RDD avgCrimesRegionRDD que utiliza a RDD RegionCountRDD para calcular a média de crimes por região.

#### Utilizando o comando `zip()` do Python é possível descompactar um dicionário em duas variáveis, uma com as chaves e outra com os valores. Utilizaremos essas variáveis para a plotagem do gráfico.

In [37]:
import matplotlib.pyplot as plt

# Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,COORD

# Lambda para converter um datetime em `Dia-Mes-Ano`
day2str = lambda x: '{}-{}-{}'.format(x.day,x.month,x.year)

totalDatesRDD = (CrimeHeadlessRDD
                 .map(lambda rec: (day2str(rec.Dates),1))
                 .reduceByKey(lambda x,y: x)
                 )

totalDays = float(totalDatesRDD.count())

avgCrimesRegionRDD = (RegionCountRDD
                      .map(lambda rec: (rec[0],rec[1]/totalDays))
                     )

Xticks,Y = zip(*avgCrimesRegionRDD.collectAsMap().items())
indices = np.arange(len(Xticks))
width = 0.35

fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.bar(indices,Y, width)
plt.grid(b=True, which='major', axis='y')
plt.xticks(indices+width/2., Xticks, rotation=17 )
plt.ylabel('Number of crimes')
plt.xlabel('Region')
pass

#### Quando temos subcategorias de interesse, podemos plotar através de um gráfico de barras empilhado. Vamos plotar o conteúdo da variável RegionAvg.

#### Primeiro passo é criar um dicionário `Y` em que a chave é o dia da semana e o valor é uma `np.array` contendo a média de cada região para aquele dia.

#### Em seguida precisamos criar uma matriz `Bottom` que determina qual é o início de cada uma das barras. O início da barra do dia `i` deve ser o final da barra do dia `i-1`.

#### Com isso calculado podemos gerar um plot por dia com o parâmetro bottom correspondente ao vetor `Bottom` daquele dia.

In [38]:
# Dias da semana como referência
Day = ['Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday']

# Uma cor para cada dia
Color = ['r','b','g','y','c','k','purple']

# Dicionário (dia, array de médias)
Y = {}
for day in Day:
    Y[day] = np.array([RegionAvg[day][x] for x in Xticks])

# Matriz dias x regiões    
Bottom = np.zeros( (len(Day),len(Xticks)) )
for i in range(1,len(Day)):
    Bottom[i,:] = Bottom[i-1,:]+Y[Day[i-1]]
    
indices = np.arange(len(Xticks))
width = 0.35

fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')

# Gera uma lista de plots, um para cada dia
plots = [plt.bar(indices,Y[Day[i]], width, color=Color[i], bottom=Bottom[i]) for i in range(len(Day))]

plt.legend( [p[0] for p in plots], Day,loc='center left', bbox_to_anchor=(1, 0.5) ) 
    
plt.grid(b=True, which='major', axis='y')
plt.xticks(indices+width/2., Xticks, rotation=17 )
plt.ylabel('Number of crimes')
plt.xlabel('Region')
pass

NameError: name 'RegionAvg' is not defined

#### ** (3b) Gráfico de Linha **

#### O gráfico de linha é utilizado principalmente para mostrar uma tendência temporal.

#### Nesse exercício vamos primeiro gerar o número médio de crimes em cada hora do dia.

#### Primeiro, novamente, geramos um RDD contendo um único registro de cada hora para cada dia. Em seguida, contabilizamos a soma da quantidade de crime em cada hora. Finalmente, juntamos as duas RDDs e calculamos a média dos valores.

In [None]:
# EXERCICIO

parseWeekday = lambda x: '{}-{}-{}'.format(x.day, x.month, x.year)

hoursRDD = (CrimeHeadlessRDD
            .<COMPLETAR>
            .<COMPLETAR>
            .<COMPLETAR>
            .<COMPLETAR>
           )

crimePerHourRDD = (CrimeHeadlessRDD
                   .<COMPLETAR>
                   .<COMPLETAR>
                  )

avgCrimeHourRDD = (crimePerHourRDD
                   .<COMPLETAR>
                   .<COMPLETAR>
                  )

crimePerHour = avgCrimeHourRDD.collect()
print crimePerHour[0:5]

In [None]:
assert np.round(crimePerHour[0][1],2)==19.96, 'valores incorretos'
print "OK"

In [None]:
crimePerHourSort = sorted(crimePerHour,key=lambda x: x[0])

X,Y = zip(*crimePerHourSort)

fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.plot(X,Y)
plt.grid(b=True, which='major', axis='y')
plt.ylabel('Avg. Number of crimes')
plt.xlabel('Hour')
pass

#### **(3c) Gráfico de Dispersão**

#### O gráfico de dispersão é utilizado para visualizar correlações entre as variáveis. Com esse gráfico é possível observar se o crescimento da quantidade de uma categoria está relacionada ao crescimento/decrescimento de outra (mas não podemos dizer se uma causa a outra).

#### Na primeira parte do exercício calcularemos a correlação entre os diferentes tipos de crime. Para isso primeiro precisamos construir uma RDD em que cada registro corresponde a uma data o valor contido nele é a quantidade de crimes de cada tipo.

#### Diferente dos exercícios anteriores, devemos manter essa informação como uma lista de valores em que todos os registros sigam a mesma ordem da lista de crimes.

#### O primeiro passo é criar uma RDD com a tupla ( (Mes-Ano, Crime), 1 ) e utilizá-la para gerar a tupla ( (Mes-Ano,Crime) Quantidade ).

#### Mapeamos essa RDD para definir Mes-Ano como chave e agrupamos em torno dessa chave, gerando uma lista de quantidade de crimes em cada data. Aplicamos a função `dict()` nessa lista para obtermos uma RDD no seguinte formato: (Mes-Ano, {CRIME: quantidade}).

#### Além disso, vamos criar a variável `crimes` contendo a lista de crimes contidas na lista de pares `catCount` computada anteriormente.

In [None]:
# EXERCICIO
parseMonthYear = lambda x: '{}-{}'.format(x.month, x.year)

crimes = map(lambda x: x[0], catCount)

datesCrimesRDD = (CrimeHeadlessRDD
                  .<COMPLETAR>
                  .<COMPLETAR>
                  .<COMPLETAR>
                  .<COMPLETAR>
                  .<COMPLETAR>
                  .cache()
                 )

print datesCrimesRDD.take(1)

In [None]:
assert datesCrimesRDD.take(1)[0][1][u'KIDNAPPING']==12,'valores incorretos'
print 'ok'

#### O próximo passo consiste em calcular o total de pares Mes-Ano para ser possível o cálculo da média.

#### Finalmente, criamos a RDD `fractionCrimesDateRDD` em que a chave é Mes-Ano e o valor é uma lista da fração de cada tipo de crime ocorridos naquele mês e ano. Para gerar essa lista vamos utilizar o *list comprehension* do Python de tal forma a calcular a fração para cada crime na variável `crimes`.

#### Os dicionários em Python tem um método chamado `get()` que permite atribuir um valor padrão caso a chave não exista. Ex.: `dicionario.get( chave, 0.0)` retornará 0.0 caso a chave não exista.

In [None]:
# EXERCICIO

totalPerDateRDD = (CrimeHeadlessRDD
                   .<COMPLETAR>
                   .<COMPLETAR>
                  )

fractionCrimesDateRDD = (datesCrimesRDD
                         .<COMPLETAR>
                         .<COMPLETAR>
                         .cache()
                        )

print fractionCrimesDateRDD.take(1)

In [None]:
assert np.abs(fractionCrimesDateRDD.take(1)[0][1][0][1]-0.163950)<1e-6,'valores incorretos'
print 'ok'

#### Finalmente, utilizaremos a função `Statistics.corr()` da biblioteca [`pyspark.mlllib.stat`](https://spark.apache.org/docs/1.1.0/api/python/pyspark.mllib.stat.Statistics-class.html).

#### Para isso mapeamos nossa RDD para conter apenas a lista de valores da lista de tuplas.

In [None]:
from pyspark.mllib.stat import Statistics
corr = Statistics.corr(fractionCrimesDateRDD.map(lambda rec: map(lambda x: x[1],rec[1])))
print corr

#### Convertendo a matriz `corr` para `np.array` podemos buscar pelo maior valor negativo e positivo diferentes de 1.0. Para isso vamos utilizar as funções `min()` e `argmin()`.

In [None]:
npCorr = np.array(corr)
rowMin = npCorr.min(axis=1).argmin()
colMin = npCorr[rowMin,:].argmin()
print crimes[rowMin], crimes[colMin], npCorr[rowMin,colMin]

npCorr[npCorr==1.] = 0.
rowMax = npCorr.max(axis=1).argmax()
colMax = npCorr[rowMax,:].argmax()
print crimes[rowMax], crimes[colMax], npCorr[rowMax,colMax]

#### Agora que sabemos quais crimes tem maior correlação, vamos plotar um gráfico de dispersão daqueles com maior correlação negativa.

#### Primeiro criamos duas RDDs, `var1RDD` e `var2RDD`. Elas são um mapeamento da `fractionCrimesDateRDD` filtradas para conter apenas o crime contido em Xlabel e Ylabel, respectivamente.

#### Juntamos as duas RDDs em uma única RDD, `correlationRDD` que mapeará para tuplas de valores, onde os valores são as médias calculadas em fractionCrimesDateRDD.

In [None]:
# EXERCICIO

Xlabel = 'FORGERY/COUNTERFEITING'#'DRIVING UNDER THE INFLUENCE'
Ylabel = 'NON-CRIMINAL'#'LIQUOR LAWS'



var1RDD = (fractionCrimesDateRDD
           .map(lambda rec: (rec[0], filter(lambda x: x[0]==Xlabel,rec[1])[0][1]))
          )
var2RDD = (fractionCrimesDateRDD
           .map(lambda rec: (rec[0], filter(lambda x: x[0]==Ylabel,rec[1])[0][1]))
          )

correlationRDD = (var1RDD
                  .<COMPLETAR>
                  .<COMPLETAR>
                 )


Data = correlationRDD.collect()
print Data[0]

In [None]:
assert np.abs(Data[0][0]-0.015904)<1e-6, 'valores incorretos'
print 'ok'

#### No gráfico abaixo, é possível perceber que quanto mais crimes do tipo *NON-CRIMINAL* ocorrem em um dia, menos *FORGERY/COUNTERFEITING* ocorrem.

In [None]:
X,Y = zip(*Data)

fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.scatter(X,Y)
plt.grid(b=True, which='major', axis='y')
plt.xlabel(Xlabel)
plt.ylabel(Ylabel)
pass

#### **(3d) Histograma **

#### O uso do Histograma é para visualizar a distribuição dos dados. Dois tipos de distribuição que são observadas normalmente é a Gaussiana, em que os valores se concentram em torno de uma média e a Lei de Potência, em que os valores menores são observados com maior frequência.

#### Vamos verificar a distribuição das prisões efetuadas (categoria *ARREST* em * Resolution*) em cada mês. Com essa distribuição poderemos verificar se o número de prisões é consistente durante os meses do período estudado.

#### Primeiro criaremos uma RDD chamada `bookedRDD` que contém apenas os registros contendo *ARREST* no campo *Resolution* (lembre-se que esse campo é uma lista) e contabilizar a quantidade de registros em cada 'Mes-Ano'. Ao final, vamos mapear para uma RDD contendo apenas os valores contabilizados.

In [None]:
assert Data[0]==1914,'valores incorretos'
print 'ok'

In [None]:
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.hist(Data)
plt.grid(b=True, which='major', axis='y')
plt.xlabel('ARRESTED')
pass

In [None]:
# EXERCICIO

bookedRDD = (CrimeHeadlessRDD
             .<COMPLETAR>
             .<COMPLETAR>
             .<COMPLETAR>
             .<COMPLETAR>
            )

Data = bookedRDD.collect()
print Data[:5]

#### Notem que lemos o histograma da seguinte maneira: em cerca de 50 meses foram observadas entre 1750 e 2000 prisões. Porém, não sabemos precisar em quais meses houve um aumento ou redução das prisões. Isso deve ser observado através de um gráfico de linha.

#### **(3e) Box-plot**

#### O Box-plot é um gráfico muito utilizado em estatística para visualizar o resumo estatístico de uma variável.

#### Para esse exercício vamos plotar duas box-plot sobre a média do número de prisões durante os meses analisados para os crimes do tipo *ROBBERY* e *ASSAULT*.

#### O mapeamento é exatamente o mesmo do exercício anterior, porém filtrando para o tipo de roubo analisado.

In [None]:
# EXERCICIO

parseDayMonth = lambda x: '{}-{}'.format(x.month,x.year)

robberyBookedRDD = (CrimeHeadlessRDD
                     .<COMPLETAR>
                     .<COMPLETAR>
                     .<COMPLETAR>
                     .<COMPLETAR>
                    )

assaultBookedRDD = (CrimeHeadlessRDD
                     .<COMPLETAR>
                     .<COMPLETAR>
                     .<COMPLETAR>
                     .<COMPLETAR>
                    )

robData = robberyBookedRDD.collect()
assData = assaultBookedRDD.collect()

In [None]:
assert robData[0]==27,'valores incorretos'
print 'ok'
assert assData[0]==152,'valores incorretos'
print 'ok'

#### No gráfico abaixo, percebemos que existem, em média, muito mais prisões para o tipo *ASSAULT* do que o tipo *ROBBERY*, ambos com pequena variação.

In [39]:
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.boxplot([robData,assData])
plt.grid(b=True, which='major', axis='y')
plt.ylabel('ARRESTED')
plt.xticks([1,2], ['ROBBERY','ASSAULT'])
pass

NameError: name 'robData' is not defined