# "Process data with Ray"
> "Awesome summary"

- toc: false
- branch: master
- badges: true
- comments: true
- categories: [fastpages, jupyter]
- hide: false
- search_exclude: true
- metadata_key1: metadata_value1
- metadata_key2: metadata_value2

In [None]:
!pip install -U ray

Collecting ray
  Downloading ray-1.8.0-cp37-cp37m-manylinux2014_x86_64.whl (54.7 MB)
[K     |████████████████████████████████| 54.7 MB 30 kB/s 
Collecting redis>=3.5.0
  Downloading redis-4.0.2-py3-none-any.whl (119 kB)
[K     |████████████████████████████████| 119 kB 54.1 MB/s 
Collecting deprecated
  Downloading Deprecated-1.2.13-py2.py3-none-any.whl (9.6 kB)
Installing collected packages: deprecated, redis, ray
Successfully installed deprecated-1.2.13 ray-1.8.0 redis-4.0.2


In [None]:
pip install pandas-datareader==0.10.0

Collecting pandas-datareader==0.10.0
  Downloading pandas_datareader-0.10.0-py3-none-any.whl (109 kB)
[?25l[K     |███                             | 10 kB 20.0 MB/s eta 0:00:01[K     |██████                          | 20 kB 23.8 MB/s eta 0:00:01[K     |█████████                       | 30 kB 27.4 MB/s eta 0:00:01[K     |████████████                    | 40 kB 28.5 MB/s eta 0:00:01[K     |███████████████                 | 51 kB 30.7 MB/s eta 0:00:01[K     |██████████████████              | 61 kB 33.1 MB/s eta 0:00:01[K     |█████████████████████           | 71 kB 34.9 MB/s eta 0:00:01[K     |████████████████████████        | 81 kB 36.4 MB/s eta 0:00:01[K     |███████████████████████████     | 92 kB 38.3 MB/s eta 0:00:01[K     |██████████████████████████████  | 102 kB 37.8 MB/s eta 0:00:01[K     |████████████████████████████████| 109 kB 37.8 MB/s 
Installing collected packages: pandas-datareader
  Attempting uninstall: pandas-datareader
    Found existing installat

In [None]:
import pandas as pd
import numpy as np
import pandas_datareader as web
import itertools

In [None]:
import ray

# start ray
ray.init(num_cpus=6)


{'metrics_export_port': 62438,
 'node_id': '01d331c749e42aab051c37dc324fa73540f4da1c2c936884e451fc76',
 'node_ip_address': '172.28.0.2',
 'object_store_address': '/tmp/ray/session_2021-11-22_19-09-20_711468_62/sockets/plasma_store',
 'raylet_ip_address': '172.28.0.2',
 'raylet_socket_name': '/tmp/ray/session_2021-11-22_19-09-20_711468_62/sockets/raylet',
 'redis_address': '172.28.0.2:6379',
 'session_dir': '/tmp/ray/session_2021-11-22_19-09-20_711468_62',
 'webui_url': None}

In [None]:
def get_data(ticker, dt_init, dt_end):
    return web.get_data_yahoo(ticker+'.SA', start=dt_init, end=dt_end)


@ray.remote
def get_data_remote(ticker, dt_init, dt_end):
    return web.get_data_yahoo(ticker+'.SA', start=dt_init, end=dt_end)


def processa_data(x):
    return x['Adj Close'].pct_change()

@ray.remote
def processa_data_remote(x):
    return x['Adj Close'].pct_change()


def ingest(ticker, dt_init, dt_end):
    df = get_data(ticker, dt_init, dt_end)
    df['retorno'] = processa_data(df)
    return df.dropna()

@ray.remote
def ingest_remote(ticker, dt_init, dt_end):
    df = get_data(ticker, dt_init, dt_end)
    df['retorno'] = processa_data(df)
    return df.dropna()




In [None]:

# benchmarking
tickers = ['ELET3', 'ELET6','PETR3', 'MGLU3', 'ABEV3', 'ALPA4', 'BRFS3', 'GOLL4', 'HYPE3', 'RENT3', 'MRFG3', 'RADL3', 'SUZB3', 'VALE3']

data_init = '2010-01-01'
data_end = '2021-09-30'



In [None]:
%%time
resultado = [get_data(ticker, data_init, data_end) for ticker in tickers]

CPU times: user 1.51 s, sys: 147 ms, total: 1.65 s
Wall time: 11.8 s


In [None]:
%%time 
resultado_ray = [get_data_remote.remote(ticker, data_init, data_end) for ticker in tickers]
resultado_ray = ray.get(resultado_ray)

CPU times: user 191 ms, sys: 40.1 ms, total: 231 ms
Wall time: 5.44 s


In [None]:
resultado_ray

In [None]:
%%time
resultado_processamento = [processa_data(x) for x in resultado]

CPU times: user 16.8 ms, sys: 0 ns, total: 16.8 ms
Wall time: 21.4 ms


In [None]:
resultado_processamento

In [None]:
%%time
resultado_processamento_remote = [processa_data_remote.remote(x) for x in resultado_ray]
resultado_processamento_remote = ray.get(resultado_processamento_remote)

CPU times: user 40.6 ms, sys: 18.4 ms, total: 58.9 ms
Wall time: 102 ms


In [None]:
%%time
dados = [ingest(ticker, data_init, data_end) for ticker in tickers]

CPU times: user 1.89 s, sys: 178 ms, total: 2.07 s
Wall time: 32.2 s


In [None]:
dados

In [None]:
%%time 
dados_ray = [ingest_remote.remote(ticker, data_init, data_end) for ticker in tickers]
dados_ray = ray.get(dados_ray)

CPU times: user 187 ms, sys: 40.9 ms, total: 228 ms
Wall time: 7.35 s


In [None]:
dados_ray

In [None]:
combinacoes = list(itertools.combinations(dados[0].columns, 3))

In [None]:
def statistics(df, combinacoes):
    return df[combinacoes].resample('Q').pad().describe()

@ray.remote
def statistics_remote(df, combinacoes):
    return df[combinacoes].resample('Q').pad().describe()


In [None]:
%%time
stats = [statistics(df, list(combinacao)) for combinacao in combinacoes for df in dados]

CPU times: user 5.76 s, sys: 249 ms, total: 6.01 s
Wall time: 6.06 s


In [None]:
%%time
stats_ray = ray.get([statistics_remote.remote(df, list(combinacao)) for combinacao in combinacoes for df in dados])

[2m[36m(statistics_remote pid=2113)[0m 
CPU times: user 1.88 s, sys: 481 ms, total: 2.36 s
Wall time: 8.36 s


In [None]:
combinacoes

[('High', 'Low', 'Open'),
 ('High', 'Low', 'Close'),
 ('High', 'Low', 'Volume'),
 ('High', 'Low', 'Adj Close'),
 ('High', 'Low', 'retorno'),
 ('High', 'Open', 'Close'),
 ('High', 'Open', 'Volume'),
 ('High', 'Open', 'Adj Close'),
 ('High', 'Open', 'retorno'),
 ('High', 'Close', 'Volume'),
 ('High', 'Close', 'Adj Close'),
 ('High', 'Close', 'retorno'),
 ('High', 'Volume', 'Adj Close'),
 ('High', 'Volume', 'retorno'),
 ('High', 'Adj Close', 'retorno'),
 ('Low', 'Open', 'Close'),
 ('Low', 'Open', 'Volume'),
 ('Low', 'Open', 'Adj Close'),
 ('Low', 'Open', 'retorno'),
 ('Low', 'Close', 'Volume'),
 ('Low', 'Close', 'Adj Close'),
 ('Low', 'Close', 'retorno'),
 ('Low', 'Volume', 'Adj Close'),
 ('Low', 'Volume', 'retorno'),
 ('Low', 'Adj Close', 'retorno'),
 ('Open', 'Close', 'Volume'),
 ('Open', 'Close', 'Adj Close'),
 ('Open', 'Close', 'retorno'),
 ('Open', 'Volume', 'Adj Close'),
 ('Open', 'Volume', 'retorno'),
 ('Open', 'Adj Close', 'retorno'),
 ('Close', 'Volume', 'Adj Close'),
 ('Close', 