Skip to content

Diplab/Concurrency

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

79 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Concurrency

Outline

前言

單執行緒程式指的是,當我們啟動一個Java程式,而這個Java程式在『同時間只會做一件事』。而多執行緒(Multi-thread)程式 指的是程式『同時間做很多事』,例如Web瀏覽程式可以在下載網頁的同時,顯示動畫、播放音樂、捲動式窗瀏覽網頁內容。 本章除了討論要如何實做Thread程式,也會探討如執行緒生命週期、執行緒安全(Thread-safe),和同步化(Synchronized)等問題。

Thread介紹

首先,先來釐清什麼是program、process、thread。

Program : 一群程式碼的集合,用以解決特定的問題。以物件導向的觀念來類比,相當於Class。

Process : Program載入記憶體後所產生的可執行檔,一個Program可以同時執行多次,產生多個Process。以物件導向的觀念來類比,相當於Object。 每一個process由以下兩個項組成:

  • 一個Memory Space。相當於Object的variable,不同Process的Memory Space也不同,彼此看不到對方的Memory Space。
  • 一個以上的Thread。Thread代表從某個起始點開始(例如main),到目前為止所有函數的呼叫路徑,以及這些呼叫路徑上所用到的區域變數。

Thread : 又稱為(Lightweight Process),是process裡單一而連續的控制流程(flow of control),一個process可以同時包含多個thread,稱為multi-thread 也就是說一個程式可同時進行多個不同的子流程,每個子流程可以得到一小段程式的執行時間,每執行完一個thread就跳下一個thread, 由於轉換的速度很快,看起來就像一個程式可以同時處理多個事務。每個thread都有一個優先值(priority),priority較高的先執行。 每一個thread由以下兩項組成:

  • Stack:紀錄函數呼叫路徑,以及這些函數所用到的區域變數。
  • 目前CPU的狀態

整理Thread的重點如下:

  • 一個process可有多個thread,這些thread共用process的memory space,但每個thread有各自的stack。
  • Thread不能單獨存在或獨立執行,一定隸屬於程式,由程式來啟動thread。
  • 作業系統會根據thread的優先權以及已經用掉的CPU時間,在不同的thread作切換,讓各個thread都有機會執行。
  • main()方法就是一個thread,所以每個process一定至少包含一個thread。

Thread生命週期

下圖是基本的Thread生命週期狀態:

21-1.png

當實例化一個 Thread 並執行 start() 之後,執行緒進入 Runnable 狀態,此時執行緒尚未真正開始執行,必須等待排班器(Scheduler)的排班,被排入執行的執行緒才會執行 run() 方法中的定義。

雖然執行緒看起來像是同時執行,但事實上同一時間點上,還是只有一個執行緒在動作,只是執行緒之間切換的動作很快,所以看來像是同時執行。

有幾種狀況會讓執行緒進入 Blocked 狀態:

  • 等待輸入輸出完成
  • 呼叫 sleep() 方法
  • 嘗試取得物件鎖定
  • 呼叫 wait() 方法

當執行緒因為等待使用者的文字輸入、傾聽網路連線或以上等情況時,會阻止它的執行而進入 Blocked 狀態,執行緒排班器將不分配執行時間給這個執行緒,直到以下的幾個對應情況讓執行緒回到 Runnable 狀態:

  • 輸入輸出完成
  • 呼叫 interrupt()
  • 取得物件鎖定
  • 呼叫 notify() 或 notifyAll()

最後,如果執行的工作完成(或發生例外)而離開 run() 方法,則執行緒執行完畢,進入 Dead 狀態,您可以使用 isAlive() 方法來測試執行緒是否存活。

Thread優先權

Thread.setPriority(int)可以設定Thread的優先權,數字越大優先權越高。 您可以使用 Thread 的 setPriority() 方法來設定執行緒的優先權,設定必須在 1 到 10 之間,否則會丟出 IllegalArgumentException。 Thread定義了3個相關的static final variable

  • public static final int MAX_PRIORITY 10
  • public static final int MIN_PRIORITY 1
  • public static final int NORM_PRIORITY 5

當優先權有高有低的不同執行緒都進入生命週期中的Runnable狀態時(例如I/O輸入完畢),JVM會先讓高優先權的執行緒執行, 但是在支援時間分割的作業系統下,優先權高的執行緒完成之前,較低優先權的執行緒仍然會分配到執行的時間。

使用優先權時,有件事要注意: java分成十種優先權等級,但是各種作業系統的優先權分級未必和java相同,例如Windows就只有七種。 實際運作時,JVM會將程式重新對映到作業系統的優先權等級。 因此不同的優先權重新對映後有可能會沒有差別,如果執行緒的優先權一定要實質差距,可以使用上面提到的MAX、NORM、MIN來設定。

若是優先權相同時則依排程演算法輪流執行,如使用Round-Robin。

  • 註(Round-Robin演算法):首先,作業系統會定義一個時間單位,稱作time quantum或是time slice,長度通常在10~100毫秒。當程序(process)準備好能執行時(Ready),會排入一個佇列(queue)的尾端,這佇列稱為ready queue。 CPU排程器會不斷地取出排在ready queue前端的程序,並且設定計時器在一個時間單位之後,自動去中斷程序。

Thread的使用方法

想要讓物件能具有多執行緒(Multi-thread)功能,只要繼承 java.lang.Thread 類別或是實作 java.lang.Runnable 介面。

繼承Thread類別

在 Java 中要實現執行緒功能,可以繼承 java.lang.Thread 類別,並重新定義 run() 方法, 要使用時則new一個執行緒物件,然後使用 start() 方法啟動該執行緒。

範例1:

package DemoThread;

class Person extends Thread{
	public Person(String name) { // name會成為新thread的名稱
		super(name) ; 
	}
	
	public void run() {
		String name = Thread.currentThread().getName(); // 取得thread的名稱
		int priority = Thread.currentThread().getPriority() ; // 取得thread的優先序
		Thread.State state = currentThread().getState() ; // 取得thread的狀態
		
		System.out.println(name + "的優先序:" + priority + "; 狀態:" + state ) ;
		
		for( int i = 1 ; i <= 5 ; i++ ) {
			System.out.println(name + "跑完第" + i + "圈") ;
			if ( name.equals("Ken") && i%3 == 0 ) { // Ken thread每跑三圈休息一秒
				System.out.println(name + "休息1秒") ;
				try{
					Thread.sleep(1000); // 暫停目前的thread1000毫秒
				}
				catch (InterruptedException e) {
					e.printStackTrace();
				} // catch
			} // if
		} // for
	} // run()
} // Person

public class DemoSampleThread {

    public static void main(String[] args) {
    	Person allen = new Person("Allen") ;
    	Person ken = new Person("Ken") ;
    	
    	allen.start(); // 啟動thread,並呼叫run()方法
    	// allen.start() ; // 已經啟動不能再呼叫start()方法
    	// ken.run() ; // 直接呼叫run()而略過start()不會開啟新的thread,而是由主執行緒去執行run()內容
    	
    	ken.start();
    	System.out.println("執行緒個數:" + Thread.activeCount()) ;
    	// 要注意的是,每次的執行結果不一定相同
    }

}

當您使用 Thread.sleep() 讓執行緒暫停執行進入 Blocked 狀態,您可以使用 interrupt() 讓它離開 Blocked 狀態, 當使用 sleep() 暫時進入 Blocked 狀態而您 interrupt() 時,會丟出 java.lang.InterruptedException 例外物件。

實做Runnable介面

在Java裡一次只能繼承一個類別,所以當我們已經繼承了某類別,就不能繼承Thread類別, 這時候我們可以實做 java.lang.Runnable 介面來定義具執行緒功能的類別, 使用時一樣要先產生 Thread 類別的實例,只是要拿實作 Runnable 介面的物件實例,當成 Thread 類別建構子的參數 可參考父類別Thread的建構子參數:

  • Thread()
  • Thread(String name)
  • Thread(Runnable target)
  • Thread(Runnable target, String name)

範例2:

package DemoRunnable;

class Person implements Runnable {
	public void run() {
		String name = Thread.currentThread().getName() ;
		
		for ( int i = 1 ; i < 5 ; i++ ) {
			System.out.println(name + "跑完第" + i + "圈") ;
			if ( name.equals("Ken") && i%3 == 0 ) {
				System.out.println(name + "稍為暫停" ) ;
				Thread.currentThread().yield(); // 讓該執行續暫停一下,先讓其他執行緒執行
			} // if
		} // for
	} // run
} // Person

public class DempSampleRunnable {

	public static void main(String[] args) {
		Person allen = new Person() ; // Person類別實做Runnable介面,所以Person物件就是Runnable物件
		Person ken = new Person() ;
		Thread tAllen = new Thread( allen, "Allen" ) ; // 呼叫Thread建構式,將Runnable物件allen傳入已建立Thread物件tAllen
		Thread tKen = new Thread( ken, "Ken" ) ;
		tAllen.start(); // 開啟執行緒並執行run方法
		tKen.start(); 
		
		try { // 主執行緒必須等到tAllen、tKen執行完畢才可以繼續進行
			tAllen.join();
			System.out.println("Join tAllen.") ;
			
			tKen.join();
			System.out.println("Join tKen.") ;
		}
		catch( InterruptedException e ) {
			e.printStackTrace();
		}
		
		System.out.println("跑步訓練結束") ;

	}

}

Daemon執行緒

Daemon這個字的原意是在UNIX系統下獨立在幕後執行的程式,在Java的多執行緒設定中借用為背景執行緒。 Daemon執行緒和一般執行緒只有一個差別,那就是當JVM底下只剩Daemon執行緒在運作時,JVM會直接關閉。JVM關閉之後,這些Daemon執行緒當然也會隨之結束。 換句話說,Daemon執行緒不能單獨生存在沒有一般執行緒的環境下。除此之外,Daemon執行緒和一般執行緒完全沒有任何差別。

範例3:

package DemoDaemonThread;

public class DemoDaemonThread {

    public static void main(String[] args) {

    	System.out.println("Main Start.") ;
        Thread thread = new Thread(
        // 這是匿名類別的寫法
            new Runnable() {
                public void run() { 
                    while(true) { 
                        System.out.print("T"); 
                    } 
                }        
            }); 
        
        // 設定為Daemon執行緒
        thread.setDaemon(true); 
        thread.start(); 
        System.out.println("Main End.") ;
    }

}

這個程式在主執行緒結束之後,Daemon 執行緒也就會跟著結束,您可以使用 setDaemon() 方法來設定一個執行緒是否為 Daemon 執行緒, 如果沒有使用 setDaemon() 設定為 true,則程式會不斷的印出 'T' 字元而不終止;使用 isDaemon() 方法則可以判斷該執行緒是否為 Daemon 執行緒。

Java 預設所有從 Daemon 執行緒產生的執行緒也是 Daemon 執行緒,因為基本上由一個背景服務執行緒衍生出來的執行緒, 也應該是為了在背景服務而產生的,所以在產生它的執行緒停止的話,也應該一併跟著停止。

Thread的加入

如果有一個A執行緒正在運行,您希望插入一個B執行緒,並要求 B 執行緒先執行完畢,然後再繼續 A 執行緒的流程, 可以使用 join() 方法來完成這個需求,當執行緒使用 join() 加入至另一個執行緒時,另一個執行緒會等待這個被加入的執行緒工作完畢, 然後再繼續它的動作,join() 的意思表示將執行緒加入成為另一個執行緒的流程之一。

範例4:

package DemoThreadJoin;

public class DemoThreadJoin {

    public static void main(String[] args) {
        System.out.println("Thread A 執行");

        Thread threadB = new Thread(new Runnable() { 
            public void run() { 
                try { 
                    System.out.println("Thread B 開始.."); 
                    for(int i = 0; i < 5; i++) { 
                        Thread.sleep(1000); 
                        System.out.println("Thread B 執行.."); 
                    }
                    System.out.println("Thread B 即將結束.."); 
                } 
                catch(InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
        });

        threadB.start();

        try {
            // Thread B 加入 Thread A
            threadB.join();
        } 
        catch(InterruptedException e) { 
            e.printStackTrace(); 
        } 

        System.out.println("Thread A 執行");
    }

}

程式啟動後主執行緒就開始,在主執行緒中您新建 threadB,並在啟動 threadB 後,將之加入(join)主執行緒的流程之中, 所以 threadB 必須先執行完畢,主執行緒才會再繼續它原本的流程,如果程式中 threadB 沒有使用 join() 將之加入主執行緒的流程中, 則最後一行顯示 "Thread A 執行" 的陳述會先執行完畢(因為 threadB 使用了 sleep(),這讓主執行緒有機會取得時間來執行)。 有時候加入的執行緒有可能處理太久,您不想無止境的等待這個執行緒的工作完畢,則您可以在 join() 上指定時間, 例如 join(10000),表示加入成為流程之一的執行緒至多處理 10000 毫秒,也就是 10 秒,如果加入的執行緒還沒執行完畢就不理它了, 目前的執行緒可以繼續執行原本的工作流程。

Thread的停止

如果您想要停止一個執行緒,您最好自行實作,一個執行緒要進入 Dead 狀態,就是執行完 run() 方法,簡單的說,如果您想要停止一個執行緒的執行, 就要提供一個方式讓執行緒可以完成 run() 的流程, 而這也是您自行實作執行緒停止的基本概念。

public class SomeThread implements Runnable { 
    private boolean isContinue = true; 
    public void terminate() { 
        isContinue = false; 
    } 
    public void run() { 
        while(isContinue) { 
            // ... some statements 
        } 
    } 
}

如果執行緒因為執行 sleep() 而進入 Blocked 狀態,而您想要停止它,您可以使用 interrupt(),而程式會丟出 InterruptedException 例外,因而使得執行緒離開 run() 方法。

public class SomeThread implements Runnable {
    public void run() { 
        System.out.println("sleep....至 blocked 狀態"); 
        try { 
            Thread.sleep(9999); 
        } 
        catch(InterruptedException e) { 
            System.out.println("I am interrupted...."); 
        } 
    } 

    public static void main(String[] args) { 
        Thread thread = 
                 new Thread(new SomeThread()); 
        thread.start(); 
        thread.interrupt(); 
    } 
}

ThreadGroup

在 Java 中每個執行緒都屬於某個「執行緒群組」(ThreadGroup)管理的一員,例如若您是在 main() 主工作流程中產生一個執行緒,則產生的執行緒屬於 main 這個執行緒群組管理的一員, 您可以使用下面的指令來取得目前執行緒所屬的執行緒群組名稱:

Thread.currentThread().getThreadGroup().getName();

每一個執行緒產生時,都會被歸入某個執行緒群組,這視您的執行緒是在哪個群組中產生,如果沒有指定,則歸入產生該子執行緒的執行緒群組中,您也可以自行指定執行緒群組,執行緒一但歸入某個群組,就無法更換群組。

java.lang.ThreadGroup 類別正如其名,可以統一管理整個群組中的執行緒,您可以使用以下的方式來產生群組,並在產生執行緒的時候,一併指定其群組:

ThreadGroup threadGroup1 = new ThreadGroup("group1");
ThreadGroup threadGroup2 = new ThreadGroup("group2");
Thread thread1 = 
          new Thread(threadGroup1, "group1's member");
Thread thread2 = 
           new Thread(threadGroup2, "group2's member");

ThreadGroup 中的某些方法,可以對所有的執行緒產生作用,例如 interrupt() 方法可以 interrupt 群組中所有的執行緒,setMaxPriority() 方法可以設定群組中執行緒所能擁有的最大優先權(本來就擁有更高優先權的執行緒不受影響)。

如果您想要一次取得群組中所有的執行緒來進行某種操作,您可以使用 enumerate() 方法,例如:

Thread[] threads = new Thread[threadGroup1.activeCount()];
threadGroup1.enumerate(threads);

activeCount() 方法取得群組中作用中的執行緒數量,enumerate() 方法要傳入一個 Thread 陣列,它會將執行緒物件設定至每個陣列欄位中,之後您就可以指定陣列索引來操作這些執行緒。

使用Executors

Thread Pool 的概念如同其名,就是一個 Thread 的 Pool, 其中有固定或變動量的 Thread,當 request 進來時,若有閒置的 Thread 就執行, 若沒有的話,可能產生新的 Thread 或把 request 放入 queue 中等待被執行, 當一條 Thread 執行完工作而 queue 中仍有 request 在等待時, 此 Thread 應該要被分發新的 request 並處理。

由以上幾行,我們可以看出 Thread Pool 的工作有:

  • 管控 Thread 的產生與回收
  • 分發 Thread 處理 request
  • 處理 request 的 queue

使用Executors能為你管理Thread物件,進而簡化並行程式設計撰寫。Executors提供的是用戶端與執行任務之間的中介層,讓中介物件 執行任務,而不是讓用戶端直接執行。Executors可以讓你管理非同步任務的執行,而無需自行管理Thread的生命週期。

範例5:

package DemoExecutor ;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
 
 public class ThreadPoolDemo {
 
     public static void main(String[] args) {
         
         // 建立 2 個 thread 的 thread pool
         Executor executor = Executors.newFixedThreadPool(2);  
         
         // 執行實作了 Runnable 介面的內部類別 Work
         executor.execute(new Work(1));  
         executor.execute(new Work(2));  
         executor.execute(new Work(3));  
 
         // 直接在 function 中宣告匿名內部類別
         executor.execute(new Runnable() {
             // anonymous inner class            
             @Override
             public void run() {
                 System.out.println(Thread.currentThread().getName() + 
                     " Begins Work in anonymous inner class.");  
             }
         });
     }
     
     public static class Work implements Runnable {  
         private int id;  
       
         public Work (int id) {  
             this.id = id;  
         }  
       
         public void run() {  
             System.out.println(Thread.currentThread().getName() + 
                 " Begins Work " + id);  
             try {  
                 Thread.sleep(1000);  
             }  
             catch (InterruptedException ex) {  
                 ex.printStackTrace();  
             }  
             System.out.println(Thread.currentThread().getName() + 
                 " Ends Work " + id);  
         }  
     }  
 }

由以上的程式中我們可以發現, Thread Pool 的 Thread 生命週期、request queue、分發request 都被 Java 做掉了, 我們所要做的就只有設定 Thread 的數量和專注在工作的內容。

另外除了固定 Thread 數量的 Thread Pool 可用 Executors.newFixedThreadPool() 外, Executors 也提供了其他的 method 來產生不同的 Thread Pool,如:

  • SingleThreadExecutor
  • CachedThreadPool
  • ScheduledThreadPool
  • SingleThreadScheduledExecutor

同步化議題

同步化

如果程式只是一個單執行緒,單一流程的程式,那麼只要注意到程式邏輯的正確,程式通常就可以正確的執行您想要的功能, 但當程式是多執行緒程式,多流程同時執行時,那麼就要注意到更多的細節,例如在多執行緒共用同一物件的資料時。

如果一個物件所持有的資料可以被多執行緒同時共享存取時,您必須考慮到「資料同步」的問題,所謂資料同步指的是兩份資料的整體性、一致性, 例如物件 A 有 name 與 id 兩個屬性,而有一份 A1 資料有 name 與 id 的資料要用來更新物件A的屬性,如果 A1 的 name 與 id 設定給 A 物件完成, 則稱 A1 與 A 同步:

21-2.png

如果 A1 資料在更新了物件的 name 屬性時,突然插入了一份 A2 資料更新了 A 物件的 id 屬性,則顯然的 A1 資料與 A 就不同步,A2 資料與 A 也不同步。

21-3.png

下面是個簡單的範例,看看在多執行緒共用資料時會發生什麼問題。

範例6:

package DemoSynchronized;

class PersonalInfo {
    private String name; 
    private String id; 
    private int count; 

    public PersonalInfo() { 
       name = "nobody"; 
       id = "N/A"; 
    } 

    public void setNameAndID(String name, String id) { 
       this.name = name; 
       this.id = id; 
       if(!checkNameAndIDEqual()) {
           System.out.println(count + 
                   ") illegal name or ID.....");
       } 
       count++; 
    } 

    private boolean checkNameAndIDEqual() { 
       return (name.charAt(0) == id.charAt(0)) ? 
                             true : false; 
    } 
}

public class PersonalInfoTest {
    public static void main(String[] args) {
        final PersonalInfo person = new PersonalInfo(); 

        // 假設會有兩個執行緒可能更新person物件
        Thread thread1 = new Thread(new Runnable() { 
           public void run() { 
              while(true) 
                  person.setNameAndID("Justin Lin", "J.L"); 
           } 
        }); 

        Thread thread2 = new Thread(new Runnable() { 
            public void run() { 
               while(true) 
                   person.setNameAndID("Shang Hwang", "S.H");    
            } 
        }); 

        System.out.println("開始測試....."); 

        thread1.start(); 
        thread2.start();
    }
} 

雖然傳遞給 setNameAndID() 的引數並沒有問題,在某個時間點時,thread1 設定了 "Justin Lin", "J.L" 給 name 與 id,在進行 if 測試的前一刻, thread2 可能此時剛好呼叫 setNameAndID("Shang Hwang", "S.H"),在 name 被設定為 "Shang Hwang" 時,checkNameAndIDEqual() 開始執行,「此時 name 等於 "Shang Hwang",而 id 還是 "J.L"」,所以 checkNameAndIDEqual() 就會傳回 false,結果就顯示了錯誤訊息。

當一個執行緒正在設定物件資料時,另一個執行緒不可以同時進行設定,這樣的情況我們稱為race condition。 為了解決執行緒衝突的問題,幾乎所有並行性機制都會循序化對資源的存取。也就是說同一時間只允許一個任務通過該程式碼的程式碼區段來達成。 因為這樣的區段會產生互斥(mutual exclusion)的效應,所以這樣的機制有個共通的名稱是mutex。

為了避免資源的衝突,Java以"synchronized" 關鍵字的形式來提供內建的支援。當一個任務想要執行一段受synchronized關鍵字所保護的程式碼時, 他會先檢察看lock是否可用,接著在取得lock後執行程式碼,最後釋放lock。

要注意的是,當使用到並行性時,將變數宣告為private就變的格外重要,若是將變數宣告為public/protected,物件在外界可以繞過同步方法的控制 值直接取得並改變他,那麼就會引發衝突。

把"synchronized" 關鍵字用於方法上,讓方法的範圍(Scope)內都成為同步化區域,例如:

public synchronized void setNameAndID(String name, String id) { 
    this.name = name; 
    this.id = id; 
    if(!checkNameAndIDEqual()) {
       System.out.println(count + 
               ") illegal name or ID.....");
    } 
    count++; 
}

所有物件天生都具備一個lock(也可稱為monitor),物件的這個鎖定在平時是沒有作用的。 被標示為 "synchronized" 的方法會成為同步化區域,當執行緒執行某個物件的同步化區域時,物件的lock就有作用了, 想要執行同步化區域的執行緒,都必須先取得物件的lock,執行完同步化區域之後再將lock歸還給物件。

因為物件的lock只有一個,當有個執行緒已取走lock而正在執行同步化區域中的程式碼時, 若有其它執行緒也想執行 "synchronized" 的區域,因為其它執行緒無法取得lock,所以只好在物件的鎖定池(Lock Pool)等待, 直到lock被前一個執行緒歸還為止,此時在lock pool中的執行緒競爭被歸還的物件lock,只有取得lock的執行緒才能進入 Runnable 狀態, 等待排班器排班並執行同步化區域。 說明到這邊,可以畫出如下圖的執行緒狀態圖:

21-4.png

同步化的區域在有一個執行緒佔據時就像個禁區,不允許其它執行緒進入,由於同時間只能有一個執行緒在同步化區域,所以更新共享資料時, 就有如單執行緒程式在更新資料一樣,藉此保證物件中的資料會與給定的資料同步。

另外,"synchronized" 的設定不只可用於方法上,也可以用於限定某個程式區塊為同步化區域,即為所謂的關鍵區(Critical sections),例如:

public void setNameAndID(String name, String id) { 
    synchronized(this) {
        this.name = name; 
        this.id = id; 
        if(!checkNameAndIDEqual()) {
           System.out.println(count + 
               ") illegal name or ID.....");
        } 
        count++; 
    }
}

這個程式片段的意思就是,在執行緒執行至 "synchronized" 設定的同步化區塊時取得物件lock, 這麼一來就沒有其它執行緒可以來執行這個同步化區塊,這個方式可以應用於不想鎖定整個方法區塊, 而只是想在更新共享資料時再確保物件與資料的同步化,由於同步化區域只是方法中的某個區塊, 在執行完區塊後執行緒即釋放對物件的lock,以便讓其它執行緒有機會競爭物件的lock, 相較於將整個方法區塊都設定為同步化區域會比較有效率。

您也可以標示某個物件要求同步化,例如在多執行緒存取同一個 ArrayList 物件時,由於 ArrayList 並沒有實作資料存取時的同步化, 所以當它使用於多執行緒環境時,必須注意多個執行緒存取同一個 ArrayList 時,有可能發生兩個以上的執行緒將資料存入 ArrayList 的同一個位置, 造成資料的相互覆蓋,為了確保資料存入時的正確性,您可以在存取 ArrayList 物件時要求同步化,例如:

// arraylist參考至一個ArrayList的一個實例 
synchronized(arraylist) {
    arraylist.add(new SomeClass()); 
}

同步化確保資料的同步,但所犧性的就是在於一個執行緒取得物件鎖定而佔據同步化區塊,而其它執行緒等待它釋放鎖定時的延遲, 在執行緒少時可能看不出來,但在執行緒多的環境中必然造成一定的效能問題(例如大型網站的多人連線時)。

可以執行下面這個程式,並比較有synchronized和沒有synchronized的差異。

public class SynchronizedCounterDemo {
   
    public static class SynchronizedCounter {
        private static int num = 0;
        
        public static synchronized int addNum(int add){ return ++num; }
        public static synchronized int decNum(int dec){ return --num; }
                          
    }//synchronizedObject
    
    public static class OnlineOperation implements Runnable{
       public void run(){
          long beginTime = System.currentTimeMillis();
          for( int i = 0 ; i < 1000000 ; i ++ ){
             SynchronizedCounter.addNum(1);
          }
          long endTime = System.currentTimeMillis();
          System.out.println( ( endTime - beginTime ) + " milliseconds. ");
       }//run
    }//Operation
           
    public static void main(String[] args) {
        new Thread(new OnlineOperation() ).start();
        new Thread(new OnlineOperation() ).start();
    }//main
    
} 

不可切割性automicity與易變性volatility

不可切割性(Automic operation):就是不能被執行序排程器所中斷的操作,一旦操作開始,就一定會再context switch之前執行完畢。

  • 舉例來說,當我們在 Java 中執行宣告 int i = 12 會配置 32 bits 的記憶體空間並將 12 這個值寫到記憶體區塊中,將整數 12 寫入記憶體這個操作是一個 Atomic Operation,不會只做一半就被其他操作中斷,而影響指派(assignment)值的正確性

不可切割性是用在longs和doubles以外的基本型別所施加的基本操作。long和double例外的原因是,Java規格書不禁止這兩種64位元資料的修改過程以兩次32位元的寫入來完成。 如果在兩次寫入或分批讀取的過程中受到干擾,就可能出現完全超乎意料的數字。

例如A執行緒不斷設定某個double變數num為5、B則不斷地設定為10, 最後卻有可能因為執行緒彼此干擾寫入結果,使得num讀取出不同於5或10的數字!

所幸,double和long型態的存取,都可以使用volatile修飾,迫使它們不可分割。(要注意的是volatile在Java SE5之前的版本上無法正確運作)

ThreadLocal類別

要編寫一個多執行緒安全(Thread-safe)的程式總是困難的,為了讓執行緒共用資源,您必須小心的對共用資源進行同步, 同步帶來一定的效能延遲,而另一方面,在處理同步的時候,又要注意物件的鎖定與釋放,避免產生死結,種種因素都使得編寫多執行緒程式變得困難。

嘗試從另一個角度來思考多執行緒共用資源的問題,既然共用資源這麼困難,那麼就乾脆不要共用,何不為每個執行緒創造一個資源的複本, 將每一個執行緒存取資料的行為加以隔離,實現的方法就是給予每一個執行緒一個特定空間來保管該執行緒所獨享的資源, 在 Java 中您可以使用 java.lang.ThreadLocal 來實現這個功能,這個類別是從 JDK 1.2 之後開始提供,不過這邊要先來看看, 如何自行實現一個簡單的 ThreadLocal 類別。

ThreadLocal的目的 : 把變數存在currentThread中, 讓每個執行中的Thread都有自己的一份Copy, 而且彼此之間不會互相影響.

實作概念:

public class ThreadLocalConcept{
	private Map<Thread,Object> localMap = Collections.synchronizedMap(new HashMap<Thread,Object>());

	public void set(Object obj){
		this.localMap.put(Thread.currentThread(), obj);
	}

	public Object get(){
		this.localMap.get(Thread.currentThread());
	}
} 

基本上就是這個localMap會放著各個Thread對應的變數, 因此在不同的Thread中呼叫到的Value都會是獨立的.

透過 ThreadLocal,您不用撰寫複雜的執行緒共用互斥邏輯,其意義在於:「有時不共用是好的」。如果共用會產生危險, 那就不要共用,當然,這種方式所犧牲掉的就是空間,您必須為每一個執行緒保留它們獨立的空間, 這是一種以空間換取時間與安全性的方法。

終止任務

前一節我們提到了有幾種狀況會讓執行緒進入 Blocked 狀態:

  • 等待輸入輸出完成
  • 呼叫 sleep() 方法
  • 嘗試取得物件鎖定
  • 呼叫 wait() 方法

有時候你想要終止一個處於blocked狀態的任務,若是你無法停下來等待,你要前往程式碼某個地方可以檢查其狀態,並決定是否強制這個任務脫離blocked狀態。

中斷

Thread類別提供了interrupt()函式,讓你可以終止blocked任務,此函式會將thread設為interrupted狀態,而被設為interrupted狀態的thread,若已 處於blocked的情況,或是嘗試進行blocked性質的操作,便會執出一個InterruptedException。

public class SomeThread implements Runnable {
    public void run() { 
        System.out.println("sleep....至 blocked 狀態"); 
        try { 
            Thread.sleep(9999); 
        } 
        catch(InterruptedException e) { 
            System.out.println("I am interrupted...."); 
        } 
    } 

    public static void main(String[] args) { 
        Thread thread = 
                 new Thread(new SomeThread()); 
        thread.start(); 
        thread.interrupt(); 
    } 
}

Thread等待與喚醒

前一節我們用lock(mutex)來同步兩個任務的行為,避免一個任務干擾另一個任務的資源,接下來要學習如何讓任務互相合作。 當任務合作時,主要的問題在任務間交換訊息,因此我們會使用wait()和notify()來安全的實做。

wait()、notify() 與 notifyAll() 是 由Object 類別所提供的方法,您在定義自己的類別時會繼承下來, wait()、notify() 與 notifyAll() 都被宣告為 "final",所以無法重新定義它們,透過 wait() 方法可以要求執行緒進入物件的等待池(Wait Pool), 或是通知執行緒回到鎖定池的 Blocked 狀態。

當物件的 wait() 方法被調用,目前的執行緒會被放入物件的等待池中, 執行緒歸還物件的lock,其它的執行緒可競爭物件的lock;被放在等待池中的執行緒也是處於 Blocked 狀態,所以不參與執行緒的排班。

wait() 可以指定等待的時間,如果指定時間的話,則時間到之後執行緒會再度回到鎖定池的 Blocked 狀態,等待競爭物件鎖定的機會, 如果指定時間 0 或不指定,則執行緒會持續等待,直到被中斷(interrupt),或是被告知(notify)回到鎖定池的 Blocked 狀態。 21-5.png

當物件的 notify() 被調用,它會從目前物件的等待池中通知「一個」執行緒加入回到鎖定池的 Blocked 狀態,被通知的執行緒是隨機的, 被通知的執行緒會與其它執行緒共同競爭物件的鎖定;如果您呼叫 notifyAll(),則「所有」在等待池中的執行緒都會被通知回到鎖定池的 Blocked 狀態, 這些執行緒會與其它執行緒共同競爭物件的鎖定。

簡單的說,當執行緒呼叫到物件的 wait() 方法時,表示它要先讓出物件的lock並等待通知,或是等待一段指定的時間, 直到被通知或時間到時再與其它執行緒競爭物件的鎖定,如果取得鎖定了,就從等待點開始執行。

wait()和sleep()不同的是:

  • 物件lock在wait()期間會被釋放。
  • 除了經過所指定的時間之外,也可能因為notify()或notifyAll而離開wait()。

notify()&notifyAll()比較:

notify():

  • 目的:僅喚醒一個正在等待狀態的thread。
  • 缺點:當有多個thread等待被喚醒時,由JVM決定哪一個thread出線,開發人員無法控制,出線的依歸不一定是由priority決定,而是JVM的運算法則為準。

notifyAll():

  • 目的:喚醒所有正在等待狀態的thread。
  • 缺點:依序喚醒的順序也由JVM決定。

生產者與消費者

說明 wait()、notify()或notifyAll() 應用最常見的一個例子,就是生產者(Producer)與消費者(Consumer)的例子: 生產者會將產品交給店員,而消費者從店員處取走產品,店員一次只能持有固定數量產品,如果生產者生產了過多的產品, 店員叫生產者等一下(wait),如果店中有空位放產品了再通知(notify)生產者繼續生產,如果店中沒有產品了, 店員會告訴消費者等一下(wait),如果店中有產品了再通知(notify)消費者來取走產品。

以下舉一個最簡單的:生產者每次生產一個 int 整數交給店員,而消費者從店員處取走整數,店員一次只能持有一個整數。

範例7:

Clerk.java

package ProducerAndConsumer;

public class Clerk {
    // -1 表示目前沒有產品
    private int product = -1; 

    // 這個方法由生產者呼叫
    public synchronized void setProduct(int product) { 
        if(this.product != -1) { 
            try { 
                // 目前店員沒有空間收產品,請稍候!
                wait(); 
            } 
            catch(InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 

        this.product = product; 
        System.out.printf("生產者設定 (%d)%n", this.product); 

        // 通知等待區中的一個消費者可以繼續工作了
        notify(); 
    } 

    // 這個方法由消費者呼叫
    public synchronized int getProduct() { 
        if(this.product == -1) { 
            try { 
                // 缺貨了,請稍候!
                wait(); 
            } 
            catch(InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 

        int p = this.product; 
        System.out.printf("消費者取走 (%d)%n", this.product); 
        this.product = -1; // 取走產品,-1表示目前店員手上無產品

        // 通知等待區中的一個生產者可以繼續工作了
        notify(); 

        return p; 
    } 
} 

Producer.java

package ProducerAndConsumer;

public class Producer implements Runnable {
    private Clerk clerk; 

    public Producer(Clerk clerk) { 
        this.clerk = clerk; 
    } 

    public void run() { 
        System.out.println("生產者開始生產整數......"); 

        // 生產1到10的整數
        for(int product = 1; product <= 10; product++) { 
            try { 
                // 暫停隨機時間
                Thread.sleep((int) (Math.random() * 3000)); 
            } 
            catch(InterruptedException e) { 
                e.printStackTrace(); 
            } 
            // 將產品交給店員
            clerk.setProduct(product); 
        }       
    } 
}

Consumer.java

package ProducerAndConsumer;

public class Consumer implements Runnable {
    private Clerk clerk; 

    public Consumer(Clerk clerk) { 
        this.clerk = clerk; 
    } 

    public void run() { 
        System.out.println(
                "消費者開始消耗整數......"); 

        // 消耗10個整數
        for(int i = 1; i <= 10; i++) { 
            try { 
                // 等待隨機時間
                Thread.sleep((int) (Math.random() * 3000)); 
            } 
            catch(InterruptedException e) { 
                e.printStackTrace(); 
            } 

            // 從店員處取走整數
            clerk.getProduct(); 
        } 
    } 
 } 

ProductTest.java

package ProducerAndConsumer;

public class ProductTest {
    public static void main(String[] args) {
        Clerk clerk = new Clerk(); 

        // 生產者執行緒
        Thread producerThread = 
            new Thread(
                new Producer(clerk)); 
        // 消費者執行緒
        Thread consumerThread = 
            new Thread(
                new Consumer(clerk)); 

        producerThread.start(); 
        consumerThread.start(); 
    }
}

死結

死結(Deadlock):正在blocked狀態的thread再也無法改變他的狀態,因為他所要的資源被另一個也在blocked狀態的thread占用,最後造成餓死(starvation)。 死結在程式執行時期不會出現例外,因為死結在程式中屬於非程式執行時期的錯誤,而這種錯誤系統會以正常情況看待,所以應小心避免。

死結的四個必要條件:

  • 互斥:某個資源被使用時,他具有獨佔性,其他thread必須等待此資源被釋放,才能競爭此資源。
  • 擁有和等待:一個任務已經持有一個獨佔性資源時,仍須等待另一個任務所擁有的獨佔性資源。
  • 不可奪取:若資源已經被某個任務擁有,其他任務不可強取,必須等資源被正常釋放。
  • 循環等待:一組處理元P0、P1...Pn,其中P0在等待P1所擁有的資源,P1在等待P2所擁有的資源,...,Pn在等待P1所擁有的資源,稱為循環等待。

死節需要四個必要條件都成立才會產生,所以只需要避免其中一項發生就可以杜絕死結。

concurrent套件新增類別

在 J2SE 5.0 中新增了 java.util.concurrent 套件,當中的類別可以讓您在撰寫多執行緒相關功能時更為方便,下圖是完整的API。 21-6.png

容器類的執行緒安全

Thread-safe:如果程式撰寫者命令多個執行緒同時執行某個物件封裝的程式碼時,可以不用顧慮任何作業系統的排程方式, 也不需要額外的同步化處理或是協調執行緒的機制,就能得到正確的執行結果,那麼這個物件就是「執行緒安全」的。

容器類預設沒有考慮執行緒安全問題,您必須自行實作同步以確保共用資料在多執行緒存取下不會出錯,例如若您使用 List 物件時,您可以這樣實作:

// arraylist參考至一個ArrayList的一個實例 
synchronized(arraylist) {
    arraylist.add(new SomeClass()); 
}

事實上,您也可以使用 java.util.Collections 的 synchronizedXXX() 等方法來傳回一個同步化的容器物件,例如傳回一個同步化的 List:

List list = Collections.synchronizedList(new ArrayList());

以這種方式返回的 List 物件,在存取資料時,會進行同步化的工作。

在 J2SE 5.0 之後,新增了 java.util.concurrent 這個 package,當中包括了一些確保執行緒安全的 Collection 類, 例如 ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet 等,這些新增的 Collection 類基本行為與先前介紹的 Map、List、Set 等物件是相同的,所不同的是增加了同步化的功能,而且依物件存取時的需求不同而有不同的同步化實作,以同時確保效率與安全性。

CountDownLatch和CyclicBarrier

CountDownLatch用來同步一個或多個任務,強制他們持續等待,直到其他任務完成一組完整的操作。 使用方法如下:

  static final int SIZE = 100;
  public static void main(String[] args) throws Exception {
	// ...
    CountDownLatch latch = new CountDownLatch(SIZE);
	// ...
  }

給定CountDownLatch object初始化的計數器值,所有於該物件上呼叫await()的任務都會暫停執行,直到計數器之值遞減至零為止。 其他的任務可能在完成它的工作時,呼叫該物件的countDown()以遞減計數器之值。CountDownLatch的設計是一次使用性的,計數器無法被重設。

CyclicBarrier所適用的情境是,你想要建立一群並行運作的任務,接著等待它們都完成了才進行下一階段(有點像是join())。 它會讓所有並行的任務在關卡前面整隊對齊,所以可以獲得往前的一致性。它和和CountDownLatch很相似,但有兩點不同:

  • CountDownLatch是處理一次性事件,CyclicBarrier可以一而在再而三的被重新使用。
  • CyclicBarrier可以給定一個關卡動作(barrier action),它是個Runnable,當計數器歸零時,便會被自動執行。

BlockingQueue

佇列(Queue)是個先前先出(First In First Out, FIFO)的資料結構。在 J2SE 5.0 中新增了 java.util.concurrent.BlockingQueue, 在多執行緒的情況下,如果 BlockingQueue 的內容為空,而有個執行緒試圖從 Queue 中取出元素,則該執行緒會被 Block, 直到 Queue 有元素時才解除 Block,反過來說,如果 BlockingQueue 滿了,而有個執行緒試圖再把資料填入 Queue 中, 則該執行緒會被 Block,直到 Queue 中有元素被取走後解除 Block。

在 java.util.concurrent 下提供幾種不同的 BlockingQueue,ArrayBlockingQueue 要指定容量大小來建構。LinkedBlockingQueue 預設沒有容量上限, 但也可以指定容量上限。PriorityBlockingQueue 嚴格來說不是 Queue,因為它是根據優先權(Priority)來移除元素。

Callable與Future

考慮這樣一個情況,使用者可能快速翻頁瀏覽文件中,而圖片檔案很大,如此在瀏覽到有圖片的頁數時,由於圖片的載入,因而造成使用者瀏覽文件時會有停頓 的現象, 所以我們希望在文件開啟之後,仍有一個背景作業持續載入圖片,如此使用者在快速瀏覽頁面時,所造成的停頓可以獲得改善。

java.util.concurrent.Callable 與 java.util.concurrent.Future 類別可以協助您完成 Future 模式,Future模式在請求發生時,會先產生一個Future物件給發出請求的客戶,而同時間, 真正的目標物件之生成,由一個 新的執行緒持續進行(即 Worker Thread),真正的目標物件生成之後,將之設定至Future之中, 而當客戶端真正需要目標物件時, 目標物件也已經準備好,可以讓客戶提取使用。

Callable 是個介面,與 Runnable 類似,有個必須實作的方法,可以啟動為另一個執行緒來執行,不過 Callable 工作完成後,可以傳回結果物件,Callable 介面的定義如下:

public interface Callable<V> {
    V call() throws Exception;
}

例如您可以使用 Callable 來完成某個費時的工作,工作結束後傳回結果物件,例如求質數: 範例10: PrimeCallable.java

package DemoCallableAndFuture;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

public class PrimeCallable implements Callable<int[]> {
    private int max;

    public PrimeCallable(int max) {
        this.max = max;
    }

    public int[] call() throws Exception {
        int[] prime = new int[max+1]; 

        List<Integer> list = new ArrayList<Integer>(); 

        for(int i = 2; i <= max; i++) 
            prime[i] = 1; 

        for(int i = 2; i*i <= max; i++) { // 這邊可以改進 
            if(prime[i] == 1) { 
                for(int j = 2*i; j <= max; j++) { 
                    if(j % i == 0) 
                        prime[j] = 0; 
                } 
            } 
        } 

        for(int i = 2; i < max; i++) { 
            if(prime[i] == 1) { 
                list.add(i); 
            } 
        }

        int[] p = new int[list.size()];
        for(int i = 0; i < p.length; i++) {
            p[i] = list.get(i).intValue();
        }

        return p;
    }   
}

FutureDemo.java

package DemoCallableAndFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureDemo {
    public static void main(String[] args) {
        Callable<int[]> primeCallable = new PrimeCallable(1000);
        FutureTask<int[]> primeTask = 
                new FutureTask<int[]>(primeCallable);

        Thread t = new Thread(primeTask);
        t.start();

        try {
            // 假設現在做其它事情
            Thread.sleep(1000);

            // 回來看看質數找好了嗎
            if(primeTask.isDone()) {
                int[] primes = primeTask.get();
                for(int prime : primes) {
                    System.out.print(prime + " ");
                }
                System.out.println();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }   
    }
}

java.util.concurrent.FutureTask 是個代理,真正執行找質數的是 Callable 物件,您使用另一個執行緒啟動 FutureTask,之後就可以先去做其它的事了, 等到某個時間點,回頭用 isDone() 看看任務完成了沒,如果完成了,就可以取得成果。

Semaphore

標準的lock同一時間只允許一個任物存取同一資源,而計數號誌(counting semaphore)則允許n個任務同一時間存取同一資源。

Semaphore可以控制某個資源可被同時訪問的個數,acquire()獲取一個許可,如果沒有就等待,而release()釋放一個許可。比如在Windows下可以設置共享文件的最大客戶端訪問個數。.比較特別的是 Semaphore 在初始的時候可以指定"公不公平", 如果給 true 表示要公平就會讓呼叫 acquire 的 thread 排隊, 不會有 thread 一直取不到資源的現象.

Exchanger

Exchanger是個置換兩任務中物件的關卡。當任務進入關卡時會有個物件,而當它們離開時,它們所持有的,正是之前被另一個任務所持有的物件。 當某個任務會產生成本較高的物件,而另一個任務正打算消費這些物件時,通常便會使用Exchanger。

參考文獻

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages