# Lista 3 - Sorts Eficientes - EDAII
#### Alunos: Ícaro Pires (15/0129815) e Vinicius Bernardo (15/0151331)

## Introdução

<p>
Escolhemos trabalhar com dados reais, para que o número de elementos fosse relativamente alto e seus valores significativos. Os dados usadaos estão disponibilizados pelo governo federal em sua plataforma de dados abertos. No caso em questão, foram escolhidos os dados referente as ocorrências policiais no Brasil desde o ano de 2004 até 2017. (Com exceção de 2016, porque os dados estão indisponíveis no website).
</p>

## Solução

<p>
Foi escolhido a técnica de ordenação Bucket de maneira que o argumento de separação dos baldes escolhido foram a quantidade de ocorrências de cada registro, sendo um registro por tipo de crime, localidade e data. Como o número de ocorrências se repete em muitos registros, o algoritimo de ordenação Merge Sort foi o escolhido por sua eficiência e por ser estável, e em sua versão iterativa, pela linguagem (python) não dar muito suporte a recursão. Resumindo, o merge sort será aplicado em cada balde individualmente.
</p>

<p>
Além disso, para extrair o máximo de poder computacional e aproveitar da vantagem da separação em baldes, foi utilizado também a ferramenta para Data Science e processamento paralelo e distribuído: Apache Spark, ela é a responsável por paralelizar o Merge Sort em cada um dos baldes. O Spark por si só irá gerenciar a separação de jobs e sua subdivisão em tasks. A quantidade de tarefas executando em paralelo são 4, porque é a quantidade de núcleos que o computador usado nos testes possui.
</p>

O resumo da execução dos jobs e das tasks podem ser conferidos nas imagens (prints) disponíveis no repositório, para a execução utilizando-se apenas um núcleo e a execução usando todos os núcleos (4).

## Desenvolvimento da solução

### Leitura do Dataset

In [1]:
all_police_reports_df = spark.read.csv('ocorrencias/ocorrencias_2017_2004.csv', sep=';', encoding='latin1', header=True)

### Separando coluna da quantidade de ocorrências e vinculando índices

In [2]:
police_reports_amount = all_police_reports_df.select('PC-Qtde Ocorrências').toPandas().astype(float).values
indexes = range(len(police_reports_amount))
police_reports_amount = list(zip(police_reports_amount, indexes))

### Separando Dados em 4 baldes

In [3]:
# A quantidade de baldes foi definida após algumas experimentações com diferentes valores
buckets = [[], [], [], []]
for element in police_reports_amount:
    if element[0] < 2:
        buckets[0].append(element)
    elif element[0] >= 2 and element[0] < 4:
        buckets[1].append(element)
    elif element[0] >= 4 and element[0] < 10:
        buckets[2].append(element)
    else:
        buckets[3].append(element)
        
print(f'Quantidade de elementos nos baldes:\n'
      f'\tprimeiro: {len(buckets[0])},\n'
      f'\tsegundo: {len(buckets[1])},\n'
      f'\tterceiro: {len(buckets[2])},\n'
      f'\tquarto: {len(buckets[3])},\n')

Quantidade de elementos nos baldes:
	primeiro: 115893,
	segundo: 71547,
	terceiro: 54563,
	quarto: 66624,



### Implementação do algoritmo de ordenação (Merge Sort)

In [4]:
def mergeSort(x):
    x = list(x)[0]
    def merge(a, b):
        x = []
        while len(a) > 0 and len(b) > 0:
            if a[0][0] < b[0][0]:
                x.append(a.pop(0))
            else:
                x.append(b.pop(0))
        while len(a) > 0:
            x.append(a.pop(0))
        while len(b) > 0:
            x.append(b.pop(0))
        return x
        
    queue = [[i] for i in x]
    while len( queue ) > 1:
        a, b = queue.pop(0), queue.pop(0)
        queue.append(merge(a, b))
    yield queue[0]

### Aplicando o MergeSort paralelamente em cada um dos baldes

In [5]:
buckets_results = sc.parallelize(buckets, len(buckets)).mapPartitions(mergeSort).collect()

### Exibição dos resultados de cada balde individualmente

In [6]:
# Não são exibidos todos os dados porque são mais de 300 mil
n = 3
print(f'Elementos nos baldes:\n'
      f'\tPrimeiro balde:\n'
      f'\tPrimeiros {n} elementos: {buckets_results[0][:n]}\n'
      f'\tÚltimos {n} elementos: {buckets_results[0][-n:]}\n\n'
      
      f'\tSegundo balde:\n'
      f'\tPrimeiros {n} elementos: {buckets_results[1][:n]}\n'
      f'\tÚltimos {n} elementos: {buckets_results[1][-n:]}\n\n'
      
      f'\tTerceiro balde:\n'
      f'\tPrimeiros {n} elementos: {buckets_results[2][:n]}\n'
      f'\tÚltimos {n} elementos: {buckets_results[2][-n:]}\n\n'
      
      f'\tQuarto balde:\n'
      f'\tPrimeiros {n} elementos: {buckets_results[3][:n]}\n'
      f'\tÚltimos {n} elementos: {buckets_results[3][-n:]}')

Elementos nos baldes:
	Primeiro balde:
	Primeiros 3 elementos: [(array([1.]), 276518), (array([1.]), 276517), (array([1.]), 276516)]
	Últimos 3 elementos: [(array([1.974]), 17121), (array([1.99]), 1604), (array([1.996]), 1598)]

	Segundo balde:
	Primeiros 3 elementos: [(array([2.]), 64199), (array([2.]), 64198), (array([2.]), 64190)]
	Últimos 3 elementos: [(array([3.983]), 4627), (array([3.988]), 88616), (array([3.99]), 66122)]

	Terceiro balde:
	Primeiros 3 elementos: [(array([4.]), 234831), (array([4.]), 234826), (array([4.]), 234822)]
	Últimos 3 elementos: [(array([9.]), 235331), (array([9.]), 235322), (array([9.]), 234848)]

	Quarto balde:
	Primeiros 3 elementos: [(array([10.]), 5027), (array([10.]), 5024), (array([10.]), 5023)]
	Últimos 3 elementos: [(array([996.]), 55138), (array([998.]), 101632), (array([998.]), 89038)]


### Unindo conteúdo dos baldes em um resultado final

In [7]:
import itertools

# Flattening list
final_ordered_list_with_index = list(itertools.chain(*buckets_results))

print(
    f'Primeiros 3 elementos da lista:\n{final_ordered_list_with_index[:3]}\n\n'
    f'Útimos 3 elementos da lista:\n{final_ordered_list_with_index[-3:]}'
)

Primeiros 3 elementos da lista:
[(array([1.]), 276518), (array([1.]), 276517), (array([1.]), 276516)]

Útimos 3 elementos da lista:
[(array([996.]), 55138), (array([998.]), 101632), (array([998.]), 89038)]


## Conclusão

Podemos verificar que a estratégia funcionou. Como resultado final podemos obter, nos testes realizados, um ganho de 6 segundos (de 26s para 20s) da versão executada em núcleo um para a versão executada em todos. Claro que nesse tempo também há o overhead da geração gerenciamento das tarefas, além da união dos resultados no final. Porém a tendência é o ganho de desempenho se tornaria cada vez maior, a medida que a massa de dados fosse aumentada. Como saída do processo, temos um array de tuplas ordenado, a primeira posição indicando a quantidade de ocorrências e a segunda o índice do registro no data frame original.

### Execuções no Spark

[Execução em um núcleo](https://raw.githubusercontent.com/ViniciusBernardo/lista3-ViniciusFerreira-IcaroPires/master/execucao_1_nucleo.jpg)

[Execução em 4 núcleos](https://raw.githubusercontent.com/ViniciusBernardo/lista3-ViniciusFerreira-IcaroPires/master/execucao_4_nucleos.jpg)

## Links externos

[Plataforma de Dados Abertos](http://dados.gov.br/)

[Dados de Ocorrências Policiais](http://dados.gov.br/dataset/sistema-nacional-de-estatisticas-de-seguranca-publica)
