In [None]:
import time
import random
import hashlib

In [None]:
import numpy as np

In [None]:
import holoviews as hv
hv.extension('bokeh')

In [None]:
%opts Scatter Curve [width=600 height=400 show_grid=True]

# Python + processamento paralelo

### Processamento padrão
(single core)

In [None]:
# Gerar cpfs falsos
cpfs = np.random.randint(1, 999_999_999, size=(5_000_000, ))
cpfs = list(map(str, cpfs))

In [None]:
def mysha(cpf: str):
    """
    Essa função calcula o hash de um CPF
    Ela adiciona um sal, fixo como 'aaaaa'
    Também adiciona uma pimenta, a string
    de um número aleatório entre 1000 e 9999
    
    Parameters
    ----------
    cpf: str
        cpf, ja como string
    
    Returns
    -------
    str
        string hexdigest do hash + salt + pepper
    """
    salt = 'aaaaa'
    pepper = str(random.randint(1000, 9999))
    cpf = salt + cpf + pepper
    h = hashlib.sha256(cpf.encode('utf8')).hexdigest()
    return h

In [None]:
print(f'Num de cpfs: {len(cpfs):,.0f}')

###### Benchmarking
O tempo de casa uma das células abaixo, no meu computador é de, aprox, 12 segundos  
O python esta rodando cada um dos 5mm de cpfs um após o outro. Num núcleo somente (confira no htop ou gerenciador de tarefas)

In [None]:
%%time
for i in cpfs:
    _ = mysha(i)

In [None]:
%%time
_ = [mysha(i) for i in cpfs]

In [None]:
%%time
_ = list(map(mysha, cpfs))

In [None]:
def clockit(cpfs: list, n: int):
    """
    Retorna o tempo necessário para calcular o hash de todos os cpfs numa lista
    
    Parameters
    ----------
    cpfs: list
        lista de cpfs (que são strings)

    n: int
        número de cpfs que será calculado
        `n` deve ser menor que o tamanho da lista
    
    Returns
    -------
    float
        o número de segundos necessário para o cálculo
    """
    start = time.time()
    _ = [mysha(i) for i in cpfs[:n]]
    end = time.time()
    return end - start

aux = []
n = 1
while 1:
    aux.append((n, clockit(cpfs, n)))
    n *= 2
    if n > 500_000:
        break

In [None]:
hv.Curve(aux, kdims='n', vdims='tempo', label='Complexidade linear')

###### Porque so utilizamos 1 núcleo?

A implementação padrão do python (CPython) transforma o código em python para byte code.  
O byte code é interpretado pelo CPython e transformado em assembly para ser executado pelo processador.  

O processo de interpretação do byte code pelo CPython **não** é [thread-safe](https://en.wikipedia.org/wiki/Thread_safety).  
Para contornar isso, foi criado o Global Interpreter Locker (vulgo, GIL) (mais info [aqui](https://wiki.python.org/moin/GlobalInterpreterLock), [aqui](https://docs.python.org/3/c-api/init.html#thread-state-and-the-global-interpreter-lock), [aqui](https://realpython.com/python-gil/))

Mas, uma das consquencias do GIL é que o python não consegue ser executado em vários núcleos de forma nativa.  
Então foram criadas alternativas para aplicações que são limitadas pelo processamento (cpu bound).

### Multiprocessing - uma biblioteca que você ja tem

Nota: neste notebook, de vez em quando, é repetida a importação das libs.  
Vc não precisa reimportar novamente, mas como o notebook é muito longo, esse código repetido facilita vc rodar so um pedaço caso precise reiniciar a kernel (em caso de estouro de memória, por exemplo).  
No jupyter notebook padrão, o atalho para reiniciar a kernel é `0+0` (zero), para interromper uma célula que esta rodando: `i+i`.

In [None]:
import multiprocessing as mp

In [None]:
# esta célula cria um pool com o número de cpus que o python enxerga no seu computador
# caso queira um número diferente, basta colocar o número que quiser no lugar do `mp.cpu_count()`
pool = mp.Pool(mp.cpu_count())

In [None]:
%%time
results = pool.map(mysha, cpfs)

Multiprocessing é uma lib que ja vem instalada por padrão com o python (built-in).  
Ela tem objetos convenientes que facilitam o processamento paralelo simples  
São criados vários interpretadores, um para cada processo. Eles não compartilham memória. Muitas vezes a comunicação entre eles é um gargalo.

Nota: toda vez que vier uma barra como a abaixo, é uma indicação para reiniciar a kernel para liberar memória  
Você não precisa fazer isso, entretanto. Mas pode ser util para não usar o swap ou arquivo de paginação  
Se não o fizer, lembre de não instanciar dois clientes Dask  

---  

### Entra o Dask

Dask é uma biblioteca construída junto a outros projetos python (numpy, pandas, sklearn...) para melhorar a escalabilidade de todo o ecossistema

###### Porque não o spark?!
O spark é uma excelente ferramenta. Foi construído em cima do hadoop, existe há muitos anos no mercado e tem muitos profissionais que sabem utilizar  
Funciona muito bem. Qualquer um pode utilizar o spark sem problemas  

Mas ó spark, feito há mais tempo, foi construido para funcionar muito bem em volta do ecossistema do hadoop. Não foi feito pensando no ecossistema, ainda incipiente, do python.  
Ele é foi construído num nível mais alto. Aplicando otimizações em computações padrões. Basicamente um framework para mapple-shuffle-reduce distribuídos.  
É dificil implementar outras coisas que dependam de mais flexbilidade.  

O dask é mais "low-level".  A implementação do agendador de tarefas é a base em cima da qual é construído.  
Ele da a flexibilidade de construir o que quiser em cima dele. Permitiria, por exemplo, que fosse construído o proprio Spark em cima do dask.  

Outra diferença importante é que o dask é feito em python e para python. Ele também é desenvolvido em conjunto com outras libs comuns na área de dados (numpy, pandas, sklearn, etc).  
Isso facilita, por exemplo, que seja utilizado por qualquer pessoa com intimidade com o Pandas, sem ter que reaprender sobre o spark RDD, pyspark ou qualquer outra coisa diferente.  
Assim a gente foca em analisar dados e ao invés de ficar aprendendo a utilizar ferramentas ou linguagens novas  

Uma comparação mais detalhada sobre Dask e Spark [aqui](https://docs.dask.org/en/latest/spark.html)

###### O que tem dentro:

O dask é composto de duas partes:  
- um scheduler de tarefas
- uma coleção de estruturas de dados distribuidas, como o dask dataframe

<img src="dask-scheduler-overview.svg" />

###### O scheduler

<img src="grid_search_schedule.gif" />

No grafo de tarefas, cada nó é uma tarefa que pode ser executada por um worker diferente  
Os vertices são a comunicação entre os workers  
Os nós vermelhos são os que estão sendo processados no momento, enquanto os azuis são os já concluídos  
O dask avalia os calculos de forma "lazy"  

### Talk is cheap, show me the code
(Linus)

In [None]:
from dask.distributed import Client, progress

# aqui a gente instancia um cliente para o dask.
# Este estara rodando localmente somente, mas pode ser conectador a outros computadores
client = Client(threads_per_worker=4, n_workers=1)
client

###### Um exemplo simples do dask funcionando em paralelo

In [None]:
import time
import random

def inc(x):
    time.sleep(0.5)
    return x + 1

def dec(x):
    time.sleep(0.5)
    return x - 1

def add(x, y):
    time.sleep(0.5)
    return x + y

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

In [None]:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
# A lembrar que `z` será um objeto do tipo `Delayed`, não mais uma função
z

In [None]:
z.visualize(rankdir='LR')

Nota: o gráfico acima é gerado pela lib graphviz. Ela tem dependencias de sistema.  

In [None]:
%%time
z.compute()

---  

###### As estruturas de dados

Uma das maiores vantagens do dask é poder trabalhar em escala com a mesma API que ja estamos acostumados

In [None]:
import pandas as pd

In [None]:
titanic = pd.read_csv('titanic_train.csv')

In [None]:
titanic.Sex.replace({'female': 0, 'male': 1}, inplace=True)
titanic.drop(columns=['Name', 'Ticket', 'Cabin', 'Embarked'], inplace=True)

In [None]:
print(f'Quantidade de linhas: {titanic.shape[0]:,.0f}')

In [None]:
titanic.head()

In [None]:
%%time
titanic.groupby('Sex').Survived.mean()

In [None]:
%%time
titanic[['Age', 'Fare']].describe()

Este é um exemplo bem "feliz". Um hello world na área de dados...

<img src="titanic-kaggle.png" />

###### construindo um dataset maior

In [None]:
import pandas as pd

Nota: a célula abaixo criar um arquivo e salva no seu computador. Você so precisa executar ela uma vez.  
Depois pode comentar a célula inteira

Nota 2: a célular abaixo cria um dataset 250k vezes maior que o titanic normal.  
Foi dimensionada para uma máquina de 100GB de memória ram.  
Caso esteja executando num computador menor, reduza o fator de 250_000, em 10x ou 20x (como achar conveniente)  

In [None]:
### Execute essa célula na primeira vez somente
titanic_raiz = pd.concat([titanic] * 250_000)
titanic_raiz.to_parquet('titanic.parquet', index=False, compression='snappy')

In [None]:
titanic_raiz = pd.read_parquet('titanic.parquet')

In [None]:
print(f'Quantidade de linhas: {titanic_raiz.shape[0]:,.0f}')

Now we're talking!

<img src="titanic-raiz.png" />

In [None]:
%%time
titanic_raiz.groupby('Sex').Survived.mean()

In [None]:
%%time
titanic_raiz[['Age', 'Fare']].describe()

Nota: caso você tenha feito a conta, notou que o tempo das operações não aumentou linearmente.  
De fato, o pandas utiliza o numpy q tem algumas estruturas de dados e algorítimos que são mais eficientes do que O(n).  
Mas, ainda assim, vc fica limitado pela quantidade de memória e, quando o dataset for grande o suficiente, pelo processamento num único núcleo.  

---

###### Reparticionando o dataset

Nota: assim como acima, as próximas 8 células criam um arquivo (na verdade vários) no seu computador na primeira vez que forem executadas.  
Depois você pode comenta-las  

In [None]:
from dask.distributed import Client, progress

In [None]:
client = Client(
    n_workers=1,
    threads_per_worker=1,
)
client

In [None]:
import dask.dataframe as dd

In [None]:
df = dd.read_parquet('titanic.parquet', columns=['Pclass', 'Sex', 'Age', 'Fare', 'Survived'])

In [None]:
df

In [None]:
df = df.persist()
progress(df)

In [None]:
df = df.repartition(npartitions=16)
progress(df)

In [None]:
df.to_parquet('./titanic')

---

###### Agora vai!

In [None]:
from dask.distributed import Client, progress

In [None]:
client = Client(
    n_workers=1,
    threads_per_worker=16,
)
client

In [None]:
import dask.dataframe as dd

In [None]:
# quando ha vários arquivos, o dask ja lê eles particionados
df = dd.read_parquet('titanic/*.parquet')

In [None]:
df

In [None]:
%%time
x = df.groupby('Sex').Survived.mean()

In [None]:
x = x.persist()
progress(x)

Com execução 'lazy', o arquivo so é lido qnd é necessário.  
Não só isso, como o arquivo salvo é colunar ([.parquet](https://parquet.apache.org/)), so são lidas as colunas que forem necessárias.  

In [None]:
# o método .compute() retorna um objeto único (neste caso um pd.Serie), não mais so um Delayed
x.compute()

In [None]:
%%time
x = df[['Age', 'Fare']].describe()
x = x.persist()
progress(x)

In [None]:
x.compute()

In [None]:
df.head()

In [None]:
import pandas as pd

In [None]:
x = df.groupby(by=[
    df['Age'].map_partitions(pd.cut, 10),
    'Pclass',
    'Sex'
]).Survived.mean().persist()

In [None]:
x.compute()

---  

### Chega de titanic...

<img src="titanic-big-data.png" />

In [None]:
import holoviews as hv
hv.extension('bokeh')

In [None]:
from dask.distributed import Client, progress

In [None]:
client = Client(
    n_workers=1,
    threads_per_worker=16,
)
client

In [None]:
import dask.dataframe as dd

In [None]:
taxis = dd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2018-*.csv',
)

Nota: o dask utiliza s3fs (que utiliza o boto) para ler arquivos diretamente do S3.  

In [None]:
taxis = taxis.persist()

In [None]:
# tamanho do dataset
taxis.shape[0].compute()

In [None]:
taxis.head()

In [None]:
# quantos passageiros
taxis.passenger_count.sum().compute()

In [None]:
# quantidade de corridas por número de passageiros
x = taxis.groupby('passenger_count').size().compute()
x

In [None]:
df2 = taxis[(taxis.tip_amount > 0) & (taxis.fare_amount > 0)]

In [None]:
df2['tip_pct'] = df2.tip_amount / df2.fare_amount

In [None]:
df2.tpep_pickup_datetime = df2.tpep_pickup_datetime.astype('M8[us]')

In [None]:
hora = df2.groupby(df2.tpep_pickup_datetime.dt.hour)\
    .tip_pct.mean().persist()

In [None]:
hora = hora.compute()

In [None]:
hora

In [None]:
hv.Curve(hora.values, kdims='Hora', vdims='Gorjeta (%)').opts(width=500, height=300, show_grid=True)