Skip to content

Chapter 09, Advanced thread management

SeokJoon.Yun edited this page Nov 6, 2015 · 3 revisions

##Chapter 09, Advanced thread management

  • 앞부분 손건님 부탁드립니다.

9.2 Interrupting threads

  • long-running thread를 정지시키는데는 singnal로 처리하는 것이 좋습니다.

    • thread pool에 의해 동작중인 thread가 있는 상태에서 thread pool을 해제해야 할 경우
    • thread 동작을 사용자가 명시적으로 취소할 경우
    • 기타 다른 상황들에서 이런 작업이 필요한 경우가 있습니다.
  • 어떤 상황이든 그 처리 방법은 같습니다.

    • thread의 처리가 원래대로 종료시점까지 실행되기 전에 signal을 보내서, 더 이상 실행되지 않고 빠져나오게 해야 합니다.
  • 각각의 경우별로 구현을 해도 되지만, 우리 그러지 맙시다. 예 ?

    • thread 관련 구현은 의도대로 잘 되지 않을 경우가 많아서 제대로 돌아가는지 확인을 해야 합니다.
    • 하지만, 일반적인 메커니즘으로 구현하면, 한번만 검증을 제대로 마친 뒤 계속해서 믿고 쓸 수 있습니다.
  • C++11에서 interrupting thread와 관련된 기능을 제공해주지는 않습니다.

    • 하지만 어렵지 않게 구현할 수 있습니다.
    • interrupting thread를 보기전에 먼저 thread의 시작 및 interrupt에 대한 interface 부터 살펴보기로 하겠습니다.

9.2.1 Launching and interrupting another thread

  • 우선 external interface부터 생각해 보겠습니다.
    • thread가 interrupt 가능하게 하기 위해서는 뭐가 필요할까요 ?
    • 당연히 std::thread 에 interrupt() 함수를 추가해야 합니다.
class interruptible_thread
{
public:
    template<typename FunctionType>
    interrputible_thread(FunctionType f);
    void join();
    void detach();
    bool joinable() const;
    void interrput();
};
  • 기본적으로 std::thread를 이용하고, interruption handling에 일부만 구현하면 됩니다.
  • thread 입장에서 뭐가 필요할까요 ?
    • 나(thread)는 여기서 interrupt 처리를 해도 된다. 는 바로 그 interruption point가 중요합니다.
    • 다른 추가 정보 없이 (parameter 없이) 동작하는 함수 interruption_point() 을 추가해 보도록 하겠습니다.
    • thread의 상태를 저장하는 local 변수를 선언하여서 thread가 시작될 때 set 한 다음에 interruption_point() 함수에서 그 값을 제어하도록 구현 하면 됩니다.
  • thread_local flag가 필요하기 때문에 일반적인 std::thread를 그대로 사용할 수 없습니다.
    • interruptible_thread를 제어하기 위해서는 이 변수가 필수적입니다.
    • std::thread가 시작되기전에 이 기능이 수행되도록 constuctor로 감싸야 합니다.
class interrupt_flag
{
public:
    void set();
    bool is_set() const;
};

thread_local interrupt_flag this_thread_interrupt_flag; // #1

class interruptible_thread
{
    std::thread internal_thread;
    interrupt_flag* flag;
public:
    template<typename FunctionType>
    interruptible_thread(FunctionType f)
    {
        std::promise<interrupt_flag*> p;               // #2
        internal_thread = std::thread([f, &p] {        // #3
                p.set_value(&this_thread_interrupt_flag);
                f();                                   // #4
            });
        flag = p.get_future().get();                   // #5
    }
    void interrupt()
    {
        if (flag)
        {
            flag->set();                               //#6
        }
    }
};
  • 실제로 작업을 수행할 f 함수와 local promise 객체 p (#2) 를 감싼 lambda 식이 있습니다. (#3)
    • lambda식 안에서 promise의 값을 thread_local로 선언된 this_thread_interrupt_flag (#1)의 주소로 설정합니다.
    • 그렇게 set 한 다음에 새로운 thread를 할당하여 공급된 함수의 copy(사본)을 수행합니다. (#4)
  • thread를 수행한 다음에 future와 연관된 promise가 준비되어서 그 결과를 flag라는 멤버변수에 줄때까지 기다립니다. (#5)
  • lambda가 new thread를 수행중이고, 지역변수 p가 dangling reference라도 별 문제는 없습니다.
    • 왜냐하면 interruptible_thread의 생성자는 p가 더이상 실행하기 전의 new thread가 더이상 참조되지 않을때까지 기다립니다.
    • 이 구현은 thread의 join , detach 여부에 대해서 고려하지 않습니다.
    • thread가 종료되거나 detach 되었을 때 flag가 clear 되었는지만 확인하여 dangling pointer를 방지하면 됩니다.
  • interrupt() 함수는 간단합니다.
    • interrupt flag에 대한 pointer를 가지고 있는 경우, interrupt 시킬 thread에 대해서 flag를 set하면 됩니다. (#6)

9.2.2 Detecting that a thread has been interrupted

  • 이제 interruption flag를 설정할 수 있게 되었습니다.
    • 다음으로는 thread가 실제로 중단되었는지 여부를 확인해 보도록 하겠습니다.
    • 가장 간단한 방법의 interruption_point() 함수를 사용하도록 하겠습니다.
      • thread 내의 interrupt되어도 안전한 곳에서 이 함수를 호출합니다.
      • flag가 설정된 경우라면 thread_interrupted 예외를 throw 하면 됩니다.
void interruption_point()
{
    if (this_thread_interrupt_flag.is_set())
        throw thread_interrupted();
}

사용은 이렇게 하면 됩니다.

void F()
{
    while(!done)
    {
        interruption_point();
        process_next_item();
    }
}
  • 이것은 동작하는 code이긴 하지만, 이상적인 방법은 아닙니다.
    • thread를 interrupt하기 위한 최고의 장소중 하나는 thread가 interrupt_point()를 호출하기 위해 실행되고 있지 않는 것을 기다리면서 block된 상태이다.
    • interrupt 방식에 따라 뭔가를 기다리는 수단이 필요합니다. (?)

####9.2.3 Interrupting a condition variable wait

interruption_point()를 명시적으로 호출하여 원하는 곳에서 interruption을 감지할 수 있지만, 조건변수가 notify되기를 기다리는 것과 같은 blocking wait에서는 아무 도움이 되지 않습니다. wait할 수 있는 여러 가지 방법에 대해서 overload 가능하고, wait 작업을 중간 할 수 있는 새로운 interruptible_wait() 함수가 필요합니다.
조건변수의 wait를 중단하게 하려면 어떻게 해야 할까요 ?
가장 간단한 방법은 조건변수에 interrupt flag를 set 한 뒤 notify하고, wait 한 후 interrupt point를 넣는것입니다.
하지만 이렇게 하면 필요한 thread를 깨우기 위해서 조건변수를 기다리는 wait 상태의 모든 thread에 notify해야 합니다. 결국 wait 중인 thread들은 모두 wake-up을 해야합니다. 비록 가짜 wake-up이지만, 그게 진짜 wake-up과 다를바가 없습니다.
interrupt_flagset()이 호출될때 notify 될 수 있도록 조건변수의 포인터를 저장 할 수 있어야 합니다. 조건 변수에 대한 interruptible_wait()의 구현은 다음과 같습니다.

#####Listing 9.10 std::condition_variable 을 사용한 interruptible_wait (broken version)

void interruptible_wait(std::condition_variable& cv, std::uniduq_lock<std::mux>& lk)
{
    interruption_point();
    this_thread_interrupt_flag.set_condition_variable(cv);  // #1
    cv.wait(lk);                                            // #2
    this_thread_interrupt_falg.clear_condition_variable();  // #3
    interruption_point();
}
  • interrupt flag 와 condition variable의 관계를 설정하고 해제하는 좋고 간단한 code 입니다.
    1. interrupt를 check.
    2. thread의 interrupt_flag와 condition_variable을 연결. #1
    3. condition_variable을 wait #2
    4. condition_variable의 연결 해제 #3
    5. interrupt를 다시 한번 check.
  • condition_variable을 wait하는 동안 interrupt가 발생한 경우라면,
    • 해당 thread는 condition_variable을 통해서, wait 상태를 깨울 것입니다.
    • 그래서, interrupt의 확인이 가능합니다.
  • 불행하게도, 이 code는 못씁니다. 2가지 문제가 있습니다.
    1. std::condition_variable::wait()에서 예외발생시 interrupt_flag와의 관계를 해제하지 않고 빠져나오게 됩니다.
    • destructor와 연결되는 구조를 활용하면 쉽게 고칠 수 있습니다.
    1. race condition(경쟁 조건)이 있습니다.
    • interrupt_point()wait() 사이에서 interrupt가 발생한 경우,
      • condition_variable는 interrupt_flag와의 관계에 대해서는 신경쓰지 않습니다.
        • 왜냐하면, thread는 wait 상태가 아니기 때문에 condition_variable을 notify 할 수 없습니다.
    • interrupt_point() wait() 사이에서의 interrupt에 대한 notify 방안을 생각해야 합니다.
      • std::condition_variable의 내부 구현을 살펴보지 않고도 이 문제를 해결하는 방법은 딱 한가지밖에 없습니다.
        • set_condition_variable() 호출을 보호하는 용도의 mutex를 사용하는 것입니다.
    • 그런데, 이러면 또다른 문제가 발생합니다.
      • thread #2를 lock 시킬 다른 thread #1의 수명도 모른체 mutex의 참조를 통과시켜야 합니다.
        • thread #1 : interrupt()를 호출하는 thread
        • thread #2 : interrupt 될 thread
      • thread #2가 이미 mutex를 lock 한지 그 여부도 모른체 위 작업을 해야 합니다.
        • deadlock의 가능성이과 이미 해제된 이후에 mutex에 접근할 가능성이 있습니다.
      • 조건변수의 wait를 interrupt 할수 없는 것은 너무 제한적이다.
        • interruptible_wait()를 안쓰고도 잘 할수는 있습니다.
        • 그래서 다른 대안이 있습니까 ?
          1. wait_for()를 사용하여 (1ms 같은) 작은 timeout value를 사용하는 것입니다. (wait()대신에)
          • thread가 interruption을 보기전에 wait 하는 시간의 상한선을 둡니다.
          • 이렇게하면, 대기중인 thread는 오히려 더 많이 timeout으로 부터 spurious wake(가짜 wake)가 될 수 있지만, 쉽게 도움이 될수는 없습니다.
          • 이러한 구현을 interrupt_flag의 구현과 함께 보시겠습니다.

#####Listing 9.11 Using a timeout in interruptible_wait for std::condition_variable

class interrupt_flag
{
    std::atomic<bool> flag;
    std::condition_variable* thread_cond;
    std::mutex set_clear_mutex;
public:
    interrupt_flag()
        : thread_cond(0)
    {}
    
    void set()
    {
        flag.store(true,std::memory_order_relaxed);
        std::lock_guard<std::mutex> lk(set_clear_mutex);
        if(thread_cond)
        {
            thread_cond->notify_all();
        }
    }
    
    bool is_set() const
    {
        return flag.load(std::memory_order_relaxed);
    }
    
    void set_condition_variable(std::condition_variable& cv)
    {
        std::lock_guard<std::mutex> lk(set_clear_mutex);
        thread_cond=&cv;
    }
    
    void clear_condition_variable()
    {
        std::lock_guard<std::mutex> lk(set_clear_mutex);
        thread_cond=0;
    }
    
    struct clear_cv_on_destruct
    {
        ~clear_cv_on_destruct()
        {
            this_thread_interrupt_flag.clear_condition_variable();
        }
    };
};

void interruptible_wait(std::condition_variable& cv, std::unique_lock<std::mutex>& lk)
{
    interruption_point();
    this_thread_interrupt_flag.set_condition_variable(cv);
    interrupt_flag::clear_cv_on_destruct guard;
    interruption_point();
    cv.wait_for(lk,std::chrono::milliseconds(1));
    interruption_point();
}

만약 기다리고 있는 predicate이 있다면, 1ms timeout은 predicate loop 속으로 완벽하게 숨길 수 있습니다.

template<typename Predicate>
void interruptible_wait(std::condition_variable& cv,
                        std::unique_lock<std::mutex>& lk,
                        Predicate pred)
{
    interruption_point();
    this_thread_interrupt_flag.set_condition_variable(cv);
    interrupt_flag::clear_cv_on_destruct guard;
    while(!this_thread_interrupt_flag.is_set() && !pred())
    {
        cv.wait_for(lk,std::chrono::milliseconds(1));
    }
    interruption_point();
}

결과적으로 predicate가 더 자주 check하겠지만, 일반적인 wait() 호출보다는 더 쉽게 사용될 것입니다. timeout을 이용한 여러가지 방법들은 구현이 쉽습니다. : 지정된 시간이나 1ms 라든지... 그래서 std::condition_variable의 wait가 std::condition_variable_any에 대해서 신경을 쓰고 있습니까 ?
같거나 더 잘할수 있나요 ?

####9.2.4 Interrupting a wait on std::condition_variable_any

std::condition_variablestd::unique_lock<std::mutex>만 사용가능하지만 std::condition_variable_any는 모든 종류의 lock type을 사용 할 수 있습니다. 그래서 더 쉽게 사용이 가능합니다. set_clear_mutexinterrupt_flag 사이에서의 lock/unlock과 wait call을 지원하는 lock type을 직접 만들어서 사용이 가능하게 되었습니다.

#####Listing 9.12 interruptible_wait for std::condition_variable_any

class interrupt_flag
{
    std::atomic<bool> flag;
    std::condition_variable* thread_cond;
    std::condition_variable_any* thread_cond_any;
    std::mutex set_clear_mutex;
public:
    interrupt_flag()
        : thread_cond(0)
        , thread_cond_any(0)
    {}
    
    void set()
    {
        flag.store(true,std::memory_order_relaxed);
        std::lock_guard<std::mutex> lk(set_clear_mutex);
        if(thread_cond)
        {
            thread_cond->notify_all();
        }
        else if(thread_cond_any)
        {
            thread_cond_any->notify_all();
        }
    }
    
    template<typename Lockable>
    void wait(std::condition_variable_any& cv,Lockable& lk)
    {
        struct custom_lock
        {
            interrupt_flag* self;
            Lockable& lk;
            custom_lock(interrupt_flag* self_,
                        std::condition_variable_any& cond,
                        Lockable& lk_)
                : self(self_)
                , lk(lk_)
            {
                self->set_clear_mutex.lock(); //#1
                self->thread_cond_any=&cond; //#2
            }
            
            void unlock() //#3
            {
                lk.unlock();
                self->set_clear_mutex.unlock();
            }
            
            void lock()
            {
                std::lock(self->set_clear_mutex,lk); //#4
            }
            
            ~custom_lock()
            {
                self->thread_cond_any=0; //#5
                self->set_clear_mutex.unlock();
            }
        };
        
        custom_lock cl(this,cv,lk);
        interruption_point();
        cv.wait(cl);
        interruption_point();
    }
    // rest as before
};

template<typename Lockable>
void interruptible_wait(std::condition_variable_any& cv,
                        Lockable& lk)
{
    this_thread_interrupt_flag.wait(cv,lk);
}

constructor에서 custom lock type은 set_clear_mutex내에서 lock을 획득합니다. (#1)
그런 다음 생성자에 전달될 std::condition_variable_any을 참조할 thread_cond_any의 pointer를 설정합니다. (#2)
lock가능한 참조는 다음에 사용되기 위해 일단 저장해 놓습니다. (이미 lock 상태입니다.)
이제 경쟁조건에 걸리지 않게 interrupt를 check할 수 있습니다.
interrupt flag가 set 상태란 것은, set_clear_mutex에 lock을 획득하기 전에 set한 것입니다.
condition variablewait()안에서 unlock()을 호출햇을 때, lock가능한 object와 set_clear_mutex의 내부를 unlock합니다. (#3)
set_clear_mutex에 lock을 획득하고 / wait() 내부에서 thread_cond_any pointer를 check하는 것을 방해는 thread를 허용 할 수는 있습니다. (이전에는 아니었던)
이건 정확이 std::condition_variable 이후 (관리할수 없던) 무언가입니다.
wait()함수가 wait를 완료하면 (notify 혹은 spurious wake), lock()함수를 호출하여 다시 set_clear_mutex내부에서 lock을 획득하고, lock가능한 object를 lock합니다. (#4)
또한 set_clear_mutex를 unlock하는 하면서 custom_lock destructor 안에서 thread_cond_any pointer를 clear 하기전에 wait() 호출전에 일어난 interrupt를 다시 확인 할수 있습니다. (#5)

####9.2.5 Interrupting other blocking calls

condition variable의 wait를 interrupt하는 것을 잡을수 있습니다만, 그럼 다른 blocking wait에 대해서는 어떻해야 할까요 ? : mutex, future, 기타 등등...
mutex나 future를 사용하지 않고 정말 짧은 wait를 interrupt할수 있는 방법은 없습니다. 그래서 일반적으로 std::condition_variable를 사용하려면 timeout 옵션을 사용해야 합니다.
interruptible_wait() 함수내에서 loop를 사용하여 구현이 가능합니다.
아래 예제는 std::future<>를 사용하는 interruptible_wait() 함수를 overload 했습니다.

template<typename T>
void interruptible_wait(std::future<T>& uf)
{
    while(!this_thread_interrupt_flag.is_set())
    {
        if(uf.wait_for(lk,std::chrono::milliseconds(1) == std::future_status::ready)
            break;
    }
    interruption_point();
}

interrupt flag가 set 될때까지 혹은 future가 ready 될때까지 wait 하지만, future를 1ms 단위로 blocking wait 하지 않습니다. 즉 interrupt 요청에 대해서 평균 0.5 ms의 시간이 걸리는 거라 봐야죠.
wait_for는 보통 1개의 clock tick을 wait하는데, 만약 clock tick이 매 15ms 마다 발생한다면 15ms를 기다리는게 아니라 1ms를 wait 하게 됩니다. 물론 상황에 따라선 안될 수도 있겠죠. 필요하다면 언제든지 timeout을 줄일 수 있습니다.
timeout을 줄이면 thread가 flag를 check하기 위하여 더 자주 wake 할 것이며, 그로인해 task switching overhead가 늘어납니다. OK. interruption_point()interruptible_wait()를 이용하여 interrupt를 detect하는 것을 살펴보았습니다. 이제 어떻게 처리해야 할까요 ?

####9.2.6 Handling interruptions

thread를 interrupt하는 것은 그냥 thread_interrupted exception이기 때문에 다른 exception들 같이 catch block에서 처리하면 됩니다.

try
{
    do_something();
}
catch(thread_interrupted&)
{
    handle_interruption();
}

즉 interrupt를 처리한 뒤에, 계속해서 작업을 할 수 있습니다. 그런뒤 다른 thread가 다시 interrupt()를 call하면 다음 interrupt point에서 interrupt 됩니다. thread가 일련의 독립적인 작업을 수행하는 것을 원할 수도 잇습니다.; interrupt는 수행중이던 task를 포기하고, 다음 task로 이동할 수도 있습니다.
왜냐하면 thread_interrupt는 exception이기 때문에, 예외 안전 관련 예방조치들을 해야 합니다. resource leak이라던지 data가 일관성 있는 상태로 남아있게 해야 합니다. 종종 interrupt가 thread를 중단하는게 좋을 수도 있으므로, exception이 외부로 전달되도록 할 수도 있습니다. 하지만 그렇게 thread function 밖으로 exception을 전달했다가 std::thread constructor까지 가게 되면, std::terminate()가 호출되고, 프로그램은 종료됩니다. interruptible_thread가 pass하는 모든 function들을 다 기억해서 thread_interrupted의 catch handler를 넣는게 힘들기 때문에, interrupt_flag를 초기화하는 wrapper 안에 catch block을 넣어도 됩니다. 이렇게하면 개별 thread별로 terminate되기 때문에 interrupt exception을 unhandle 상태로 pass하는 것을 안전하게 해줍니다. interruptible_thread constructor 초기화를 다음과 같이 해줍니다.

internal_thread=std::thread([f,&p]{
    p.set_value(&this_thread_interrupt_flag);
    try
    {
        f();
    }
    catch(thread_interrupted const&)
    {}
});

이제 interrupt가 사용되는 구체적인 예제를 살펴보겠습니다.

####9.2.7 Interrupting background tasks on application exit

desktop에서의 검색 application을 생각해 봅시다. 사용자 편의성 뿐 아니라 filesystem의 상태는 monitoring해야 하고, 변경 사항을 확인하여 index를 update 해야 합니다. 일반적으로 이런작업들은 GUI에 영향을 미치지 않게 해야하므로 background thread로 수행합니다. 그리고 application의 전체 수명 동안 background thread가 실행되야 합니다. 즉, application의 초기화에서 시작되어서 끝날때까지 남아있어야 합니다. 이 경우에는 통상적으로 PC가 꺼질 경우에만 종료됩니다. 왜냐면 계속해서 최신 index를 유지해야하기 때문입니다. 때에 따라서는 application이 종료될때, background thread를 절차에 따라 종료해야하는데, 그런 방법중 하나로 interrupt를 이용할 수 있습니다.
이런 시스템의 thread 관리를 구현한 sample을 살펴보겠습니다.

#####Listing 9.13 Monitoring the filesystem in the background

std::mutex config_mutex;
std::vector<interruptible_thread> background_threads;
void background_thread(int disk_id)
{
    while(true)
    {
        interruption_point(); //#1
        fs_change fsc=get_fs_changes(disk_id); //#2
        if(fsc.has_changes())
        {
            update_index(fsc); //#3
        }
    }
}

void start_background_processing()
{
    background_threads.push_back(interruptible_thread(background_thread,disk_1));
    background_threads.push_back(interruptible_thread(background_thread,disk_2));
}

int main()
{
    start_background_processing(); //#4
    process_gui_until_exit(); //#5
    std::unique_lock<std::mutex> lk(config_mutex);
    
    for(unsigned i=0;i<background_threads.size();++i)
    {
        background_threads[i].interrupt(); //#6
    }
    
    for(unsigned i=0;i<background_threads.size();++i)
    {
        background_threads[i].join(); //#7
    }
}

프로그램 시작시 background thread가 시작됩니다. (#4) 그런 다음 main thread가 GUI를 처리합니다. (#5) user가 application을 종료시키면, backgound thread는 interrupt 됩니다. (#6) 그런 다음 main thread는 모든 background thread가 종료될때까지 wait 합니다. (#7) background thread는 loop안에서 disk의 변경을 check 하고, (#2) index를 update 합니다. (#3) 내번 loop 안에서 interruption_point()를 호출하는 것으로 interrupt를 check합니다. (#1)
왜 각각의 thread가 wait하기 전에 모든 thread를 interrupt 할까요 ? 왜 각각에 대하여 interrupt 하고 다음 실행 시점에 대한 wait를 하지 않을까요 ? 바로 concurrency때문입니다. thread가 interrupt 되었을 때 바로 중단되지 않습니다. 왜냐면 다음번 interruption_point()에서 interrupt를 받아서 처리하며, 그런 다음 destructor를 호출 및 exception 처리 code를 수행한 후에 종료되어야 합니다. 각각 thread에 대해서 join함으로써, 수행중인 작업이 있을지라도 (여기서는 다른 thread에 interrupt거는 작업) interrupt된 thread가 wait 상태가 됩니다. 할 수 있는 작업이 더이상 없을 때에만 (즉, 모든 thread가 interrupt된 경우에만) wait 상태가 됩니다. 이건 또한 모든 thread가 interrupt되어 parallel로 빨리 완료되도록 해 줍니다.
interruption mechanism은 쉽게 interrupt 호출을 추가하거나, code의 특정 블록에서 interrupt가 되지 않도록 확장이 가능합니다. 그건 독자들의 몫입니다.

###9.3 Summary

이번 chapter에서 우리는 다양한 "고급" thread 관리 테크닉들을 살펴봤습니다. (thread pools, interrupting thread)

  • 어떻게 local work queue를 사용하는지, thread pool을 이용하여 synchronization overhead를 줄이고 처리량을 향상시키면서 work를 steal하는 방법 과
  • queue로 부터 다른 task를 실행할 때 다른 subtask가 끝나길 wait하는 동안 deadlock의 가능성을 없에면서 수행하는 방법에 대해서 알아보았습니다.
  • 여러 가지 방법으로 다른 thread를 interrupt 하는 방법에 대해서도 알아보았습니다.
    • interruption point 및 기타 함수들을 사용하여 interrupt 될 수 있도록 blocking wait를 구현하는 것을 살펴보았습니다.