In [1]:
# code lập trình phân tán
from dask.distributed import Client
client = Client(n_workers=4)

# Delay

## delay decorator

In [2]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

In [3]:
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other

x = inc(1)
y = inc(2)
z = add(x, y)

Wall time: 3.03 s


In [4]:
# sử dụng delay

In [5]:
from dask import delayed
from time import sleep

@delayed
def inc(x):
    sleep(1)
    return x + 1
@delayed
def add(x, y):
    sleep(1)
    return x + y

In [6]:
%%time
# This runs immediately, all it does is build a graph
x = inc(1) 
y = inc(2) # sử dụng luồng tình toán song song với x
z = add(x, y)

Wall time: 2.05 ms


![image.png](attachment:4902fe31-e27b-470c-9dd2-4911f58b24dc.png)![image.png](attachment:6e24087b-e8ab-4693-89f7-37237bdb5db4.png)

In [7]:
%%time
# This actually runs our computation using a local thread pool
z.compute() 

Wall time: 2.33 s


5

In [8]:
%%time
data = [1, 2, 3, 4, 5, 6, 7, 8]
results = []
for x in data:
    y = inc(x)
    results.append(y)

total = sum(results)
total.compute()

Wall time: 1.07 s


44

In [15]:
%%time
# Sequential code
# for loop

@delayed
def inc(x):
    sleep(0.1)
    return x + 1

@delayed
def double(x):
    sleep(0.1)
    return 2 * x

def is_even(x):
    return not x % 2

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


results = []
for x in data:
    if is_even(x):
        y = double(x)
    else:
        y = inc(x)
    results.append(y)

total = sum(results)
print(total.compute())

90
Wall time: 176 ms


## delay in pandas

In [25]:
import os
import pandas as pd
from glob import glob
filenames = sorted(glob(os.path.join('datatest',  '*.bz2')))

In [27]:
%%time

sums = []
counts = []
for fn in filenames:
    # Read in file
    df = pd.read_csv(fn)

    # Groupby origin airport
    by_origin = df.groupby('Origin')

    # Sum of all departure delays by origin
    total = by_origin.DepDelay.sum()

    # Number of flights by origin
    count = by_origin.DepDelay.count()

    # Save the intermediates
    sums.append(total)
    counts.append(count)

# Combine intermediates to get total mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights

Wall time: 36.5 s


In [32]:
%%time
from dask import compute
# This is just one possible solution, there are
# several ways to do this using `delayed`
delayed_readcsv = delayed(pd.read_csv)
sums = []
counts = []
for fn in filenames:
    # Read in file
    df = delayed_readcsv(fn)

    # Groupby origin airport
    by_origin = df.groupby('Origin')

    # Sum of all departure delays by origin
    total = by_origin.DepDelay.sum()

    # Number of flights by origin
    count = by_origin.DepDelay.count()

    # Save the intermediates
    sums.append(total)
    counts.append(count)

# Compute the intermediates
sums, counts = compute(sums, counts)

# Combine intermediates to get total mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights

Wall time: 21.2 s


In [30]:
client.close()

# bag
- Parallel Lists for semi-structured data
- Tương tự list nhưng ko có cấu trúc, index

In [33]:
from dask.distributed import Client

client = Client(n_workers=4)

In [42]:
# each element is an integer
import dask.bag as db
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
b.take(3)

(1, 2, 3)

In [43]:
# sửu dụng với cả các function của list : pyspark, including map, filter, groupby, etc..
def is_even(n):
    return n % 2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even)
d = c.map(lambda x: x ** 2)
d.compute()

[4, 16, 36, 64, 100]

In [44]:
b.groupby(lambda x: x % 2).compute()

[(0, [2, 4, 6, 8, 10]), (1, [1, 3, 5, 7, 9])]

In [38]:
# each element is a text file, where each line is a JSON object
# note that the compression is handled automatically
import os
b = db.read_text(os.path.join('datatest', '1989.csv.bz2'))
b.take(1) # line 1 to 1

('Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay\n',)

In [41]:
b.take(2) # line 1 to 2

('Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay\n',
 '1989,1,23,1,1419,1230,1742,1552,UA,183,NA,323,322,NA,110,109,SFO,HNL,2398,NA,NA,0,NA,0,NA,NA,NA,NA,NA\n')

In [45]:
client.close()

# array

In [49]:
from dask.distributed import Client
import numpy as np
client = Client(n_workers=4)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 55680 instead


In [52]:
import dask.array as da

In [59]:
%%time
x = np.random.normal(10, 0.1, size=(20000, 20000))


Wall time: 8.12 s


In [60]:
%time y = x.mean(axis=0)[::]

Wall time: 215 ms


In [61]:
%%time
x = da.from_array(x, chunks=(1000, 1000))
y = x.mean(axis=0)
y.compute()

Wall time: 13.5 s


array([ 9.99959131,  9.99943521, 10.00064929, ...,  9.99989365,
        9.99851456, 10.00007696])

In [53]:
%%time
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))  # chunk theo mảng, chia array thành nhiều mảng nhỏ
y = x.mean(axis=0)[::100]
y.compute()

Wall time: 2.18 s


array([ 9.99944254, 10.0008407 ,  9.99881246,  9.99952143,  9.99914806,
        9.99942084,  9.99949801,  9.99927512, 10.00008651, 10.0000341 ,
       10.000254  ,  9.99948419,  9.99978536, 10.0001092 , 10.00073403,
        9.9998261 , 10.00098631, 10.00126004,  9.99953592, 10.00013244,
        9.99886833,  9.99969668, 10.00088781,  9.99989644, 10.00031895,
        9.99950283, 10.00030859,  9.9999652 , 10.00056739, 10.00007537,
        9.99994686,  9.99918894,  9.99953371,  9.99921571, 10.00042088,
        9.99953278, 10.00033138, 10.00023003, 10.00089552, 10.00031118,
       10.0004568 , 10.00001258,  9.99928031,  9.99991783, 10.00212057,
        9.99914343,  9.99983201,  9.99974565,  9.99923624,  9.99991234,
       10.00043334, 10.00046908,  9.99989144, 10.00025374, 10.00033674,
        9.99922566,  9.99968955, 10.00131521, 10.00020715, 10.0013141 ,
       10.00116011,  9.99874664,  9.99991329, 10.00056322,  9.9995604 ,
        9.99983153,  9.99830021,  9.99945079, 10.00019627,  9.99

## save to result if it too large

In [None]:
# save result lan luot
# chú ý convert đúng định dạng trước khi save để đạt hiệu quả tối ưu
da.to_hdf5('data/myfile.hdf5', '/output', my_dask_array)

In [None]:
# save result theo huong trong mang
import os
from glob import glob

import dask.array as da
import h5py

filenames = sorted(glob(os.path.join("data", "weather-big", "*.hdf5")))
dsets = [h5py.File(filename, mode="r")["/t2m"] for filename in filenames]

arrays = [da.from_array(dset, chunks=(500, 500)) for dset in dsets]

x = da.stack(arrays, axis=0)

result = x[:, ::2, ::2]

da.to_zarr(result, os.path.join("data", "myfile.zarr"), overwrite=True)

In [62]:
client.close()

# dask Dataframe

In [1]:
import os

In [2]:
from dask.distributed import Client

client = Client(n_workers=4)

In [3]:
import dask.dataframe as dd

In [4]:
filename = os.path.join('datatest',  '1989.csv')

In [None]:
# một số hàm áp dụng cho một số định dạng cột, nếu dựa vào sample và gán sai định dạng cột thì khi chạy hàm sẽ bị lỗi
# ép kiểu định dạng nếu biết trước để chạy được hàm phù hợp

In [5]:
# sample
dtype={'ActualElapsedTime': 'float64',
       'ArrDelay': 'float64',
       'ArrTime': 'float64',
       'DepTime': 'float64',
       'Distance': 'float64',
      'DepDelay': 'float64',
      'Cancelled':'bool'}
df = dd.read_csv(filename,
                 parse_dates={'Date': [0, 1, 2]},
                 dtype=dtype)
df.head()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,1989-01-23,1,1419.0,1230,1742.0,1552,UA,183,,323.0,...,,,False,,0,,,,,
1,1989-01-24,2,1255.0,1230,1612.0,1552,UA,183,,317.0,...,,,False,,0,,,,,
2,1989-01-25,3,1230.0,1230,1533.0,1552,UA,183,,303.0,...,,,False,,0,,,,,
3,1989-01-26,4,1230.0,1230,1523.0,1552,UA,183,,293.0,...,,,False,,0,,,,,
4,1989-01-27,5,1232.0,1230,1513.0,1552,UA,183,,281.0,...,,,False,,0,,,,,


In [6]:
# len
len(df)

5041200

In [11]:
# dtypes
df

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
npartitions=8,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1
,datetime64[ns],int64,int64,int64,int64,int64,object,int64,object,int64,float64,float64,int64,int64,object,object,int64,float64,float64,bool,float64,int64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [20]:
# math operator
df = dd.read_csv(filename,
                 parse_dates={'Date': [0, 1, 2]},
                 dtype=dtype) # ép lại kiểu DepDelay từ int ---> float thì chạy ko bị lỗi, lấy dtype goi y trong phan báo lỗi
%time df['DepDelay'].mean().compute()

Wall time: 3.15 s


8.202822005482144

In [7]:
# filter
df_filter = df[~df['Cancelled']]
%time df_filter['DepDelay'].mean().compute()

Wall time: 3.34 s


8.202822005482144

In [8]:
#groupby
%time df[~df.Cancelled].groupby('Origin').Origin.count().compute()

Wall time: 3.43 s


Origin
ABE     4850
ABQ    30736
ACY      643
AGS     3053
ALB    13424
       ...  
GST       86
ROP       39
TVC      200
EGE       11
SUN       13
Name: Origin, Length: 237, dtype: int64

In [9]:
%time df.groupby("Origin").DepDelay.mean().compute()

Wall time: 3.22 s


Origin
ABE     5.825979
ABQ     6.686003
ACY     5.138414
AGS     5.604651
ALB     6.310861
         ...    
GST     9.697674
ROP    20.794872
TVC     6.970000
EGE    32.363636
SUN    19.000000
Name: DepDelay, Length: 237, dtype: float64

In [13]:
%%time
df_filter = df[(~df['Cancelled']) & (df["ActualElapsedTime"] > 300)]
df_dropdup = df_filter.drop_duplicates(subset = ["FlightNum"])
df_dropna = df_dropdup.dropna(how='any', subset=['TailNum'])
value_count_sr = df_dropna['UniqueCarrier'].value_counts()
value_count_sr.compute()

Wall time: 3.21 s


Series([], Name: UniqueCarrier, dtype: int64)