# Accelerating End-to-End Data Science Workflows # 

Transition Path: cuDF มอบวิธีที่ผู้ใช้สามารถปรับขนาดเวิร์กโฟลว์ pandas ของตนได้เมื่อขนาดข้อมูลเติบโตขึ้น โดยนำเสนอทางเลือกตรงกลางระหว่าง pandas แบบ Single-threaded และโซลูชันการประมวลผลแบบกระจายเช่น Dask หรือ Apache Spark



## 09 - บทนำสู่ Dask cuDF ##

**สารบัญ**

[Dask](https://dask.org/) cuDF สามารถใช้เพื่อกระจายการดำเนินการ DataFrame ไปยัง GPU หลายตัว ในโน้ตบุ๊กนี้ เราจะแนะนำแนวคิดหลักของ Dask เรียนรู้วิธีตั้งค่า Dask Cluster เพื่อใช้ประโยชน์จาก GPU หลายตัว และดูวิธีการดำเนินการ DataFrame อย่างง่ายบน Dask DataFrame แบบกระจาย โน้ตบุ๊กนี้ครอบคลุมส่วนต่างๆ ดังนี้:

1.  [บทนำสู่ Dask](#An-Introduction-to-Dask)
2.  [การตั้งค่า Dask Scheduler](#Setting-up-a-Dask-Scheduler)
    * [การรับที่อยู่ IP ภายใน (Local IP Address)](#Obtaining-the-Local-IP-Address)
    * [การเริ่มต้น `LocalCUDACluster`](#Starting-a-LocalCUDACluster)
    * [การสร้างอินสแตนซ์การเชื่อมต่อไคลเอ็นต์ (Client Connection)](#Instantiating-a-Client-Connection)
    * [Dask Dashboard](#The-Dask-Dashboard)
3.  [การอ่านข้อมูลด้วย Dask cuDF](#Reading-Data-with-Dask-cuDF)
4.  [Computational Graph](#Computational-Graph)
    * [การแสดงภาพ Computational Graph](#Visualizing-the-Computational-Graph)
    * [การขยาย Computational Graph](#Extending-the-Computational-Graph)
    * [การคำนวณด้วย Computational Graph](#Computing-with-the-Computational-Graph)
    * [การคงอยู่ของข้อมูลใน Cluster](#Persisting-Data-in-the-Cluster)
5.  [การสำรวจข้อมูลเบื้องต้นด้วย Dask cuDF](#Initial-Data-Exploration-with-Dask-cuDF)
    * [แบบฝึกหัดที่ 1 - มณฑลทางเหนือของ Sunderland ด้วย Dask](#Exercise-#1---Counties-North-of-Sunderland-with-Dask)


## บทนำสู่ Dask ##

[Dask](https://dask.org/) เป็นไลบรารี Python สำหรับการประมวลผลแบบขนาน ในการเขียนโปรแกรม Dask เราจะสร้างกราฟการคำนวณที่กำหนดโค้ดที่เรา **ต้องการ** ดำเนินการ จากนั้นส่งกราฟการคำนวณเหล่านี้ไปยัง Dask scheduler ซึ่งจะประเมินผลแบบ lazy และมีประสิทธิภาพในลักษณะแบบขนาน

นอกจากการใช้คอร์ CPU หรือเธรดหลายตัวเพื่อดำเนินการกราฟการคำนวณแบบขนานแล้ว Dask schedulers ยังสามารถกำหนดค่าให้ดำเนินการกราฟการคำนวณบน CPU หลายตัว หรือตามที่เราจะทำในเวิร์คช็อปนี้ คือ GPU หลายตัว ด้วยเหตุนี้ การเขียนโปรแกรม Dask จึงอำนวยความสะดวกในการดำเนินการกับชุดข้อมูลที่มีขนาดใหญ่กว่าหน่วยความจำของทรัพยากรการคำนวณเพียงตัวเดียว

เนื่องจากกราฟการคำนวณของ Dask สามารถประกอบด้วยโค้ด Python แบบกำหนดเองได้ จึงให้ [ระดับการควบคุมและความยืดหยุ่นที่เหนือกว่าระบบอื่นๆ มากมาย](https://docs.dask.org/en/latest/spark.html) ที่สามารถดำเนินการกับชุดข้อมูลขนาดใหญ่ อย่างไรก็ตาม เราจะเน้นในเวิร์คช็อปนี้เป็นหลักที่ Dask DataFrame ซึ่งเป็นหนึ่งในโครงสร้างข้อมูลหลายอย่างที่การดำเนินการและเมธอดใช้ประโยชน์จากการจัดตารางแบบขนานของ Dask โดยกำเนิด:

* Dask DataFrame ซึ่งคล้ายคลึงกับ Pandas DataFrame อย่างใกล้ชิด
* Dask Array ซึ่งคล้ายคลึงกับ NumPy ndarray อย่างใกล้ชิด
* Dask Bag ซึ่งเป็นเซ็ตที่อนุญาตให้มีข้อมูลซ้ำกันและสามารถเก็บข้อมูลที่มีชนิดแตกต่างกันได้

โดยเฉพาะอย่างยิ่ง เราจะใช้ Dask-cuDF dataframe ซึ่งรวมอินเทอร์เฟซของ Dask เข้ากับพลัง GPU ของ cuDF สำหรับการดำเนินการ DataFrame แบบกระจายบน GPU หลายตัว ตอนนี้เราจะหันความสนใจไปที่การใช้ประโยชน์จาก GPU NVIDIA V100 ทั้ง 4 ตัวในสภาพแวดล้อมนี้สำหรับการดำเนินการกับชุดข้อมูลประชากร UK ขนาด 18GB ที่ไม่สามารถเก็บไว้ในหน่วยความจำของ GPU ขนาด 16GB เพียงตัวเดียวได้

## การตั้งค่า Dask Scheduler ##

เราเริ่มต้นด้วยการเรียกใช้ Dask Scheduler ซึ่งจะดูแลการกระจายงานของเราไปยัง GPU ที่มีอยู่ 4 ตัว ในการดำเนินการนี้ เราจำเป็นต้องเริ่มต้นอินสแตนซ์ `LocalCUDACluster` โดยใช้ IP ของเครื่องโฮสต์ของเรา จากนั้นจึงสร้างไคลเอนต์ที่สามารถสื่อสารกับคลัสเตอร์ได้

### การรับที่อยู่ IP ภายใน (Local IP Address) ###

In [None]:
import subprocess # we will use this to obtain our local IP using the following command
cmd = "hostname --all-ip-addresses"

process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]

### การเริ่มต้น `LocalCUDACluster` ###

`dask_cuda` มียูทิลิตี้สำหรับการโต้ตอบระหว่าง Dask และ CUDA (ตัวอักษร "cu" ใน cuDF)

In [None]:
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(ip=IPADDR)

### การสร้างการเชื่อมต่อไคลเอ็นต์ ###

ไลบรารี `dask.distributed` ให้ฟังก์ชันการทำงานแบบกระจายแก่เรา รวมถึงความสามารถในการเชื่อมต่อกับ CUDA Cluster ที่เราเพิ่งสร้างขึ้น การนำเข้า `progress` จะให้แถบความคืบหน้าที่มีประโยชน์ที่เราสามารถนำไปใช้ด้านล่างได้








In [None]:
from dask.distributed import Client, progress

client = Client(cluster)

### The Dask Dashboard ###

Dask มาพร้อมกับแดชบอร์ดที่มีประโยชน์มาก ซึ่งในกรณีของเราจะรันบนพอร์ต `8787` เปิดแท็บเบราว์เซอร์ใหม่ตอนนี้แล้วคัดลอก URL ของแล็บนี้ลงไป แทนที่ `/lab/lab` ด้วย `:8787` (เพื่อให้ลงท้ายด้วย `.com:8787`) ซึ่งจะเปิดแดชบอร์ด Dask ที่ขณะนี้อยู่ในสถานะว่าง

## การอ่านข้อมูลด้วย Dask cuDF ##

ด้วย `dask_cudf` เราสามารถสร้าง DataFrame จากไฟล์หลายรูปแบบ (รวมถึงจากหลายไฟล์และจากที่เก็บข้อมูลบนคลาวด์โดยตรง เช่น S3) จาก cuDF DataFrame จาก Pandas DataFrame และแม้แต่จาก vanilla CPU Dask DataFrame ที่นี่เราจะสร้าง Dask cuDF DataFrame จากไฟล์ CSV ในเครื่อง `pop5x_1-07.csv` ซึ่งมีคุณสมบัติคล้ายกับไฟล์ `pop.csv` ที่คุณเคยใช้ แต่มีขนาดใหญ่ขึ้น 5 เท่า (18GB) ซึ่งแสดงถึงประชากรเกือบ 300 ล้านคน ซึ่งเกือบเท่ากับขนาดประชากรของประเทศสหรัฐอเมริกาทั้งหมด

In [None]:
# get the file size of `pop5x_1-07.csv` in GB
!ls -sh data/uk_pop5x.csv

เรานำเข้า dask_cudf (และส่วนประกอบ RAPIDS อื่นๆ เมื่อจำเป็น) หลังจากตั้งค่าคลัสเตอร์ เพื่อให้แน่ใจว่าพวกมันถูกสร้างขึ้นอย่างถูกต้องภายในบริบท CUDA ที่สร้างขึ้น


In [None]:
import dask_cudf

In [None]:
ddf = dask_cudf.read_csv('./data/uk_pop5x.csv', dtype=['float32', 'str', 'str', 'float32', 'float32', 'str'])

In [None]:
ddf.dtypes

## กราฟการคำนวณ (Computational Graph) ##

ดังที่กล่าวไว้ข้างต้น เมื่อเขียนโปรแกรมด้วย Dask เราจะสร้างกราฟการคำนวณที่เรา **อยากจะให้ดำเนินการในที่สุด** เราสามารถสังเกตพฤติกรรมนี้ได้แล้ว: ในการเรียกใช้ `dask_cudf.read_csv` เราได้ระบุว่า **อยากจะให้ดำเนินการในที่สุด** เพื่ออ่านเนื้อหาทั้งหมดของ `pop5x_1-07.csv` อย่างไรก็ตาม Dask จะไม่ขอให้ตัวจัดตาราง (scheduler) ดำเนินการนี้จนกว่าเราจะระบุอย่างชัดเจนว่าเราต้องการให้ทำเช่นนั้น

สังเกตการใช้หน่วยความจำสำหรับ GPU ทั้ง 4 ตัวโดยการรันเซลล์ต่อไปนี้ และสังเกตว่าการใช้หน่วยความจำ GPU ไม่ได้ใหญ่พอที่จะบ่งชี้ว่าไฟล์ขนาด 18GB ทั้งหมดถูกอ่านเข้าสู่หน่วยความจำแล้ว:

In [None]:
!nvidia-smi

### การแสดงภาพกราฟการคำนวณ ###

กราฟการคำนวณที่ยังไม่ได้ดำเนินการจะมีเมธอด `.visualize` ซึ่งเมื่อใช้ในสภาพแวดล้อม Jupyter เช่นนี้ จะแสดงกราฟการคำนวณ รวมถึงวิธีที่ Dask ตั้งใจจะกระจายงานออกไป ดังนั้น เราสามารถแสดงภาพว่าการดำเนินการ `read_csv` จะถูกกระจายโดย Dask อย่างไรโดยการรันเซลล์ต่อไปนี้:

In [None]:
ddf.visualize(format='svg') # This visualization is very large, and using `format='svg'` will make it easier to view.

อย่างที่คุณเห็น เมื่อเราสั่งให้ Dask ดำเนินการนี้จริง ๆ มันจะขนานงานออกไปทั่วทั้ง 4 GPUs ในลักษณะของ 69 พาร์ติชันแบบขนาน เราสามารถดูจำนวนพาร์ติชันที่แน่นอนได้ด้วยคุณสมบัติ `npartitions`:

In [None]:
ddf.npartitions

### การขยายกราฟการคำนวณ ###

แนวคิดของการสร้างกราฟการคำนวณด้วยการดำเนินการตามอำเภอใจก่อนที่จะรันนั้นเป็นส่วนหลักของ Dask มาเพิ่มการดำเนินการบางอย่างลงในกราฟการคำนวณที่มีอยู่และแสดงภาพอีกครั้ง

หลังจากรันเซลล์ถัดไปแล้ว แม้ว่าจะต้องเลื่อนดูนานหน่อยเพื่อให้เข้าใจอย่างชัดเจน (ความท้าทายของการวิเคราะห์ข้อมูลแบบกระจาย!) คุณจะเห็นว่ากราฟที่สร้างไว้แล้วสำหรับ `read_csv` ตอนนี้ยังคงดำเนินต่อไป มันเลือกคอลัมน์ `age` ข้ามพาร์ติชันทั้งหมด (แสดงภาพเป็น `getitem`) และในที่สุดก็ทำการลด `.mean()` (แสดงภาพเป็น `series-sum-chunk`, `series-sum-agg`, `count-chunk`, `sum-agg` และ `true-div`)


In [None]:
mean_age = ddf['age'].sum()
mean_age.visualize(format='svg')

### การคำนวณด้วย Computational Graph ###

มีหลายวิธีในการระบุให้ Dask ทราบว่าเราต้องการดำเนินการคำนวณที่อธิบายไว้ใน Computational Graph ที่เราได้สร้างขึ้นมา วิธีแรกที่เราจะแสดงคือเมธอด `.compute` ซึ่งจะส่งคืนผลลัพธ์ของการคำนวณเป็นวัตถุในหน่วยความจำของ GPU ตัวเดียว ซึ่งจะไม่มีการกระจายอยู่ทั่ว GPU อีกต่อไป

**หมายเหตุ**: ค่านี้จริงๆ แล้วเป็น [*future*](https://docs.python.org/3/library/concurrent.futures.html) ซึ่งสามารถนำไปใช้ในโค้ดได้ทันที แม้กระทั่งก่อนที่จะประเมินผลเสร็จสิ้น แม้ว่าสิ่งนี้จะมีประโยชน์อย่างมากในหลายสถานการณ์ แต่ในการอบรมเชิงปฏิบัติการนี้เราไม่จำเป็นต้องทำอะไรที่ซับซ้อนกับ futures ที่เราสร้างขึ้นนอกเหนือจากการรอให้มันประเมินผลเสร็จเพื่อให้เราสามารถแสดงค่าของมันได้

ด้านล่างนี้ เราจะส่ง Computational Graph ที่เราสร้างขึ้นไปยัง Dask Scheduler เพื่อดำเนินการแบบขนานบน GPU ทั้ง 4 ตัวของเรา หากคุณเปิด Dask Dashboard ในแท็บอื่นจากก่อนหน้านี้ คุณสามารถดูได้ในขณะที่การดำเนินการเสร็จสิ้น เนื่องจากกราฟของเราเกี่ยวข้องกับการอ่านชุดข้อมูลขนาด 18GB ทั้งหมด (ตามที่เราประกาศเมื่อเพิ่ม `read_csv` ไปยัง Call Graph) คุณสามารถคาดหวังว่าการดำเนินการจะใช้เวลาเล็กน้อย หากคุณดูแดชบอร์ดอย่างใกล้ชิด คุณจะเห็นว่า Dask เริ่มการคำนวณต่อเนื่องสำหรับ `mean` แม้ว่าข้อมูลจะยังคงถูกอ่านเข้าสู่หน่วยความจำอยู่




In [None]:
mean_age.compute()

### การคงข้อมูลในคลัสเตอร์ (Persisting Data in the Cluster) ###

ดังที่คุณเห็น การดำเนินการก่อนหน้านี้ที่อ่านไฟล์ CSV ขนาด 18GB ทั้งหมดเข้าสู่หน่วยความจำของ GPU ไม่ได้เก็บรักษาข้อมูลไว้ในหน่วยความจำหลังจากเสร็จสิ้นกราฟการคำนวณ:


In [None]:
!nvidia-smi

เวิร์กโฟลว์ Dask ทั่วไป ซึ่งเราจะนำมาใช้คือการคงข้อมูลที่เราต้องการทำงานด้วยไปยังคลัสเตอร์ จากนั้นจึงดำเนินการอย่างรวดเร็วกับข้อมูลที่คงอยู่แล้วนี้ เราทำเช่นนี้ด้วยเมธอด `.persist` จาก [เอกสาร Dask](https://distributed.dask.org/en/latest/manage-computation.html#client-persist):

> เมธอด `.persist` จะส่งกราฟงานที่อยู่เบื้องหลัง Dask collection ไปยังตัวกำหนดเวลา (scheduler) โดยจะได้รับ Futures สำหรับงานระดับบนสุดทั้งหมด (เช่น หนึ่ง Future สำหรับแต่ละ Pandas [*หรือ cuDF*] DataFrame ใน Dask[*-cudf*] DataFrame) จากนั้นจะส่งคืนสำเนาของ collection ที่ชี้ไปยัง futures เหล่านี้ แทนที่จะชี้ไปยังกราฟเดิม collection ใหม่นี้มีความหมายเทียบเท่ากัน แต่ตอนนี้ชี้ไปที่ข้อมูลที่กำลังทำงานอยู่แทนที่จะเป็นกราฟแบบ lazy

ด้านล่างนี้ เราจะคง `ddf` ไว้ในคลัสเตอร์เพื่อให้ข้อมูลอยู่ในหน่วยความจำ GPU เพื่อให้เราสามารถดำเนินการได้อย่างรวดเร็ว


In [None]:
ddf = ddf.persist()

อย่างที่คุณเห็นจากการรัน `nvidia-smi` (หลังจากปล่อยให้ `persist` ทำงานเสร็จสิ้น) GPU แต่ละตัวตอนนี้มีส่วนหนึ่งของ DataFrame แบบกระจายอยู่ในหน่วยความจำแล้ว:

In [None]:
!nvidia-smi

การรัน `ddf.visualize` ตอนนี้แสดงให้เห็นว่าเราไม่มีการดำเนินการใดๆ ในกราฟงานของเราอีกต่อไป มีเพียงพาร์ติชันข้อมูลที่พร้อมให้เราดำเนินการ:

In [None]:
ddf.visualize(format='svg')

การคำนวณการดำเนินการบนข้อมูลนี้จะเร็วขึ้นมาก:

In [None]:
ddf['age'].mean().compute()

## การสำรวจข้อมูลเบื้องต้นด้วย Dask cuDF ##

ความงดงามของ Dask คือการทำงานกับข้อมูลของคุณ แม้ว่าจะมีการกระจายและมีขนาดใหญ่มาก แต่ก็เหมือนกับการทำงานกับชุดข้อมูลขนาดเล็กในหน่วยความจำ


In [None]:
ddf.head() # As a convenience, no need to `.compute` the `head()` method

In [None]:
ddf.count().compute()

In [None]:
ddf.dtypes

### แบบฝึกหัดที่ 1 - มณฑลทางเหนือของ Sunderland ด้วย Dask ###

ในที่นี้ เราขอให้คุณย้อนกลับไปดูแบบฝึกหัดก่อนหน้านี้ แต่เป็นกับชุดข้อมูลแบบกระจาย หวังว่าคุณจะเห็นได้ชัดเจนว่าโค้ดมีความคล้ายคลึงกันเพียงใดสำหรับ DataFrame แบบ Single-GPU และ DataFrame แบบกระจายด้วย Dask

ระบุละติจูดของผู้อยู่อาศัยที่อยู่เหนือสุดของมณฑล Sunderland (บุคคลที่มีค่า `lat` สูงสุด) จากนั้นพิจารณาว่ามณฑลใดมีผู้อยู่อาศัยที่อยู่เหนือผู้อยู่อาศัยนี้บ้าง ใช้เมธอด `unique` ของ `Series` ของ cuDF เพื่อกำจัดค่าที่ซ้ำกันในผลลัพธ์

**คำแนะนำ**:
* แก้ไขเฉพาะส่วน `<FIXME>` เท่านั้น และรันเซลล์ด้านล่างเพื่อระบุมณฑลทางเหนือของ Sunderland


In [None]:
sunderland_residents = ddf.loc[<<<<FIXME>>>>]
northmost_sunderland_lat = sunderland_residents['lat'].max()
counties_with_pop_north_of = ddf.loc[ddf['lat'] > northmost_sunderland_lat]['county'].unique()
results=counties_with_pop_north_of.compute()
results.head()

Click ... for solution. 

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

เยี่ยมมาก!