In [3]:
%config Completer.use_jedi = False

In [4]:
import pandas as pd
from datetime import timedelta, datetime
from matplotlib.dates import DateFormatter
import matplotlib.pyplot as plt
import requests
import time
import dask
import time
from dask.distributed import Client, LocalCluster

In [10]:
raise ValueError

ValueError: 

In [12]:
cluster = LocalCluster("127.0.0.1:8786")

In [13]:
client = Client(cluster)
client.cluster.scale(10)  

In [14]:
client.cluster

Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

# Data

In [5]:
data = pd.read_csv('https://covid.ourworldindata.org/data/owid-covid-data.csv',
                   usecols= ['date', 'total_cases', 'new_cases', 'total_deaths', 'new_deaths',
                             'total_cases_per_million', 'new_cases_per_million', 'total_deaths_per_million',
                                'new_deaths_per_million', 'location'],
                   index_col='date', parse_dates=True)

In [6]:
data.head()

Unnamed: 0_level_0,location,total_cases,new_cases,total_deaths,new_deaths,total_cases_per_million,new_cases_per_million,total_deaths_per_million,new_deaths_per_million
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2020-02-24,Afghanistan,5.0,5.0,,,0.126,0.126,,
2020-02-25,Afghanistan,5.0,0.0,,,0.126,0.0,,
2020-02-26,Afghanistan,5.0,0.0,,,0.126,0.0,,
2020-02-27,Afghanistan,5.0,0.0,,,0.126,0.0,,
2020-02-28,Afghanistan,5.0,0.0,,,0.126,0.0,,


### Data in dask dataframe

In [9]:
import dask.dataframe

In [10]:
data_dask = dask.dataframe.from_pandas(data, 10)

In [10]:
raise ValueError

ValueError: 

# Data Visualisation

In [11]:
Countries = ['Poland', 'Germany', 'United States', 'Czechia', 'Brazil', 'Norway', 'Romania', 'Portugal']
Measurements = ['total_cases', 'new_cases', 'total_deaths', 'new_deaths', 'total_cases_per_million',
                'new_cases_per_million', 'total_deaths_per_million', 'new_deaths_per_million']

### Iterative

In [12]:
def plot(data, country, measurement):
    plt.figure()
    plt.plot(data[data['location']==country][measurement], 'bo-')
    plt.savefig('tmp_plots/{}_{}.png'.format(country, measurement))
    plt.close()

In [13]:
start = time.time()
for c in Countries:
    for m in Measurements:
        plot(data, c, m)
print(time.time()-start)

6.073431015014648


In [14]:
start = time.time()
for c in data['location'].unique()[:50]:
    for m in Measurements:
        plot(data, c, m)
print(time.time()-start)

37.400670289993286


### Dask

In [15]:
@dask.delayed
def plot(data, country, measurement):
    plt.figure()
    plt.plot(data[data['location']==country][measurement], 'bo-')
    plt.savefig('tmp_plots/{}_{}.png'.format(country, measurement))
    plt.close()

In [16]:
s = time.time()
tasks = []
for c in Countries:
    for m in Measurements:
        tasks.append(plot(data, c, m))

dask.compute(tasks, scheduler='distributed')
print(time.time()-s)

  (               location  total_cases  new_cases   ... s_per_million')
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good


4.2811439037323


In [17]:
s = time.time()
tasks = []
for c in data['location'].unique()[:50]:
    for m in Measurements:
        tasks.append(plot(data, c, m))

dask.compute(tasks, scheduler='distributed')
print(time.time()-s)

18.8866605758667


### Dask with dask dataframe

In [17]:
s = time.time()
tasks = []
for c in Countries:
    for m in Measurements:
        tasks.append(plot(data, c, m))

dask.compute(tasks, scheduler='distributed')
print(time.time()-s)

3.1964051723480225


In [18]:
s = time.time()
tasks = []
for c in data['location'].unique()[:50]:
    for m in Measurements:
        tasks.append(plot(data_dask, c, m))

dask.compute(tasks, scheduler='distributed')
print(time.time()-s)

5.901809215545654


### Dask with dask dataframe - processess

In [19]:
s = time.time()
tasks = []
for c in Countries:
    for m in Measurements:
        tasks.append(plot(data, c, m))

dask.compute(tasks, scheduler='processes', n_workers=10)
print(time.time()-s)

4.142251968383789


In [20]:
s = time.time()
tasks = []
for c in data['location'].unique()[:50]:
    for m in Measurements:
        tasks.append(plot(data_dask, c, m))

dask.compute(tasks, scheduler='processes', n_workers=10)
print(time.time()-s)

12.971989393234253


# Machine Learning

In [18]:
from dask_ml.xgboost import XGBRegressor

new_cases_pl = data_dask[data_dask['location'] == 'Poland']

new_cases_pl = new_cases_pl[['new_cases']]

past = 7
future = 7

new_cases_pl['shift0'] = new_cases_pl['new_cases']
for i in range(0,past+future):
    new_cases_pl['shift{}'.format(i+1)] = new_cases_pl['shift{}'.format(i)].shift(1)

new_cases_pl = new_cases_pl.dropna()

models ={}

for i in range(future):
    model = XGBRegressor()
    model.fit(new_cases_pl[['shift{}'.format(j) for j in range(1,8)]], new_cases_pl['shift{}'.format(i)])
    models[i+1] = model



In [56]:
a = []
for i in range(1, future+1):
    a.append(models[i].predict(new_cases_pl[['shift1', 'shift2', 'shift3', 'shift4', 'shift5', 'shift6', 'shift7']]))

today = datetime.now()
results = pd.Series([x.compute()[-1] for x in a], index=[today+timedelta(days=i) for i in range(1,8)])

In [61]:
results

2022-01-02 22:02:17.891583    12396.231445
2022-01-03 22:02:17.891583    14172.050781
2022-01-04 22:02:17.891583    15429.987305
2022-01-05 22:02:17.891583     9974.266602
2022-01-06 22:02:17.891583     4974.808594
2022-01-07 22:02:17.891583     6476.845215
2022-01-08 22:02:17.891583    10299.976562
dtype: float32

# Compare Covid App

### Basics Plots

In [22]:
for _ in range(10):
    print('Old version: ', requests.get("http://127.0.0.1:7000/").elapsed.total_seconds())
    print('New version: ', requests.get("http://127.0.0.1:8000/").elapsed.total_seconds())
    print()

Old version:  4.296047
New version:  3.600474

Old version:  4.610023
New version:  3.072478

Old version:  4.286443
New version:  3.031399

Old version:  4.486578
New version:  1.726479

Old version:  4.162756
New version:  1.477798

Old version:  4.457506
New version:  1.599334

Old version:  4.499907
New version:  2.868368

Old version:  4.184524
New version:  3.418261

Old version:  4.322487
New version:  3.250274



KeyboardInterrupt: 

In [23]:
Countries_1 = ['Poland', 'Germany', 'United States', 'Czechia']
Countries_2 = ['Brazil', 'Norway', 'Romania', 'Portugal']
Measurements = ['new_cases', 'total_deaths']

for c in Countries_1:
    for c2 in Countries_2:
        for m in Measurements:
            t = requests.post("http://127.0.0.1:7000/", json={"country": c,
                                                         "another_country": "Germany",
                                                         "measurement": "new_cases",
                                                         "selected_time": "month",
                                                          "selected_share": "Apple"
                                                         }).elapsed.total_seconds() +\
                requests.get("http://127.0.0.1:7000/covid_plot/").elapsed.total_seconds()
            print('Old version: ', t)
            t = requests.post("http://127.0.0.1:8000/", json={"country": c,
                                                         "another_country": "Germany",
                                                         "measurement": "new_cases",
                                                         "selected_time": "month",
                                                          "selected_share": "Apple"
                                                         }).elapsed.total_seconds() +\
                requests.get("http://127.0.0.1:8000/covid_plot/").elapsed.total_seconds()
            print('New version: ', t)
            print()

Old version:  4.746532
New version:  1.441074

Old version:  4.152755
New version:  1.402775

Old version:  4.250938
New version:  1.621842

Old version:  4.196384999999999
New version:  1.4900280000000001

Old version:  4.391731
New version:  1.45146

Old version:  4.411445
New version:  1.545118

Old version:  4.333029
New version:  1.640199

Old version:  4.213495
New version:  1.332196

Old version:  4.526042
New version:  1.35887

Old version:  4.2536760000000005
New version:  1.429252

Old version:  4.328933999999999
New version:  1.387347

Old version:  4.234112
New version:  1.434331

Old version:  4.63063
New version:  1.466935

Old version:  4.1960999999999995
New version:  1.430101

Old version:  4.217612
New version:  1.366846

Old version:  4.28828
New version:  1.382717



KeyboardInterrupt: 

### Additional analysis

In [58]:
print('Old version: ', requests.get("http://127.0.0.1:7000/advanced_analysis/").elapsed.total_seconds())
print('New version: ', requests.get("http://127.0.0.1:8000/advanced_analysis/").elapsed.total_seconds())

Old version:  3.359629
New version:  2.631957


### Predictions

In [22]:
print('Old version: ', requests.get("http://127.0.0.1:7000/predictions/").elapsed.total_seconds())
print('New version: ', requests.get("http://127.0.0.1:8000/predictions/").elapsed.total_seconds())

Old version:  1.53174
New version:  1.927925
