Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 335 additions & 0 deletions study/ch08.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
## 작업과 실행 정책 간의 보이지 않는 연결 관계

Executor 프레임웍을 이용하여 실행정책을 설정 & 변경 시,
일정한 조건을 갖춘 실행 정책이 필요한 작업이 있음을 유의해야 함

- **의존성이 있는 작업**
- **독립적인 작업** : 다른 작업이 실행하는 데서 발생하는 부수적인 요건에 관계없이 동작하는 작업 → 설정 변경해도 성능 외 문제 X
- **다른 작업에 의존성을 갖는 작업** : 실행 정책에 보이지 않는 조건 걸림, 활동성 문제 발생 가능 → 스레드 풀의 크기를 충분히 크게 잡기

- **스레드 한정 기법을 사용하는 작업**
- Executor 프레임웍이 단일 스레드로 동작해야 한다는 조건 발생 → 여러 개의 스레드를 사용하는 풀로 변경하지 않도록 유의
- 반드시 순차적으로 실행돼야 함

- **응답 시간이 민감한 작업**
- 응답 성능을 지켜야 하는 Executor에는 오래 걸리는 작업을 넣지 않도록 유의
- 화면 갱신 등 응답 민감한 작업은 가볍게 유지

- **ThreadLocal을 사용하는 작업**
- ThreadLocal은 스레드가 재사용될 때 값도 함께 남아 있음. Executor처럼 스레드를 오래 재사용하는 환경에서는, 의도치 않게 이전 작업의 데이터가 이어질 수 있음
- ThreadLocal 사용 시, 현재 실행 중인 작업 종료 후 더 이상 사용하지 않을 값만 보관해야 함 → 작업 종료 후 반드시 정리하기

> 스레드 풀은 동일하고 서로 독립적인 다수의 작업 실행 시 효과적이지만,
> 해당 작업이 가정하는 **실행 정책**을 명확히 문서화해야 함.

---

### 스레드 부족 데드락

- **스레드 부족 데드락**: 의존성 있는 작업처럼, 같은 풀 안에서 작업이 서로의 결과를 기다리기만 하며 진행이 멈추는 상황
- 작업 A가 같은 풀에 추가한 다른 작업 B의 결과를 기다림
- B도 A가 끝나길 기다림 → A, B 모두 대기 → 데드락 발생

- 풀의 크기가 충분히 크다면 데드락 없이 실행될 수 있지만,
완전히 독립적이지 않은 작업을 Executor에 등록할 때는 항상 주의 필요

- 운영 환경에서는 스레드 풀에서 필요한 자원이 제한되어 풀 크기가 예상보다 작게 설정될 수 있음

- **풀 크기, 설정 등 실행 정책을 명시해 두는 것이 중요**

---

### 오래 실행되는 작업

- 스레드 풀의 크기가 상당히 작다면, 오래 걸리는 작업 때문에 응답 속도가 저하될 수 있음
- 일정 시간 동안만 대기하는 메소드 사용
- 시간 초과 시 해당 작업이 실행되지 못했음을 기록하고, 작업을 종료 후 재등록하는 대책 등 사용

---

## 스레드 풀 크기 조절

- 스레드 풀의 크기는 설정 파일이나 `Runtime.availableProcessors` 등의 메소드 결과 값에 따라 동적으로 지정
- 너무 크거나 작으면 자원 확보 경쟁을 하거나, 작업 처리 속도가 저하됨

### 스레드 풀 크기 산정 방법
1. **컴퓨터 환경 확인**: CPU 개수, 메모리, 파일 핸들, DB 커넥션 등 가용 자원 파악 & 해야 할 작업의 동작 과정 파악
2. **작업 성격 고려**: CPU를 많이 사용하는 작업의 경우, CPU 코어 수 + 1이 적정치로 알려져 있음
3. **비율 계산**: 실제 작업하는 시간 대비 대기 시간 비율을 대략 측정해 풀 크기를 조정

> 리틀의 법칙(Little's law) : `L = λ × W`
> - **L**: 동시에 처리되는 요청의 개수
> - **λ (lambda)**: 시스템이 처리 가능한 평균 처리량
> - **W**: 평균 요청 처리 시간
>
> 예) 평균 응답 시간 55ms, 스레드 풀 크기 22인 서비스 → `22 / 0.055 = 400`
> 시스템이 1초당 처리할 수 있는 요청 개수는 **400개**

4. 풀 크기를 바꿔가며 계속 실행 → CPU의 활용도가 최적화되는 지점 찾기
5. 자원 고려: DB 연결 등 다른 자원과 풀 크기가 서로 영향을 미치므로 함께 조율해야 함

---

## ThreadPoolExecutor 설정

### 스레드 생성과 제거
- Executors 팩토리 메소드의 기본 정책이 맞지 않으면,
**ThreadPoolExecutor 생성 메소드**를 직접 사용해 세부 설정 조정 가능

- 주요 설정 값
- **풀의 코어 크기** : 기본적으로 유지할 스레드 수
- **최대 크기** : 동시에 실행 가능한 스레드 수의 상한값
- **스레드 유지 시간** : 코어 크기를 초과하거나, 스레드가 일정 시간 이상 작업 없이 대기하면 제거
- 코어 크기 및 유지 시간을 조절하면 불필요한 스레드 자원 점유를 줄이고, 다른 작업에 자원을 활용 가능

```java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}
```

---

### 큐에 쌓인 작업 관리

스레드 풀도 처리할 수 있는 것보다 많은 작업이 들어오면 결국 한계에 도달함.
큐(BlockingQueue)에 작업을 쌓아둘 수 있지만, 처리 속도보다 유입이 빠르면 **응답 지연 발생**.

#### 큐에 적용할 수 있는 전략
1. **크기 제한 없음**: 작업을 끝없이 쌓음
- `LinkedBlockingQueue`, `newFixedThreadPool`, `newSingleThreadExecutor` 기본 설정
2. **크기 제한 있음**: 자원 사용량 제어 가능
- 단, 큐가 가득 차면 새로운 작업 처리 방안 필요
- 큐 크기와 스레드 수를 함께 튜닝해야 함
3. **직접 전달**: 큐 없이 작업을 스레드에 바로 전달
- `SynchronousQueue`, `newCachedThreadPool` 기본 설정
- 대기 스레드 없으면 새 스레드 생성, 최대 크기 도달 시 집중 대응 정책에 따라 작업 거부

#### 권장 전략
- 작업이 **서로 독립적일 때만** 스레드 수/작업 큐 크기 제한 가능
- 자원 관리 중요 & 동시 스레드 수 제한 필요 → **고정된 스레드 풀**
- 작업 간 의존성 있음 → 고정 큐는 **스레드 부족 데드락 위험** → **무제한 풀 권장**
- 작업 몰림·동적 확장 필요 → `newCachedThreadPool` + `SynchronousQueue`

---

### 집중 대응 정책

- 큐가 가득 찼거나 이미 종료된 스레드 풀에 작업을 등록할 때 **집중 대응 정책** 동작

#### 설정 방법
- `setRejectedExecutionHandler()`로 원하는 정책 지정 가능

1. **중단 정책 (기본 정책)** : `RejectedExecutionException` 던짐 → 호출자가 직접 처리
2. **제거 정책** : 새로 추가하려던 작업을 조용히 제거
3. **오래된 항목 제거 정책** : 큐의 가장 오래된 작업 제거 후 새 작업 삽입
4. **호출자 실행 정책** : 작업을 제출한 스레드에서 직접 실행 → 속도 조절 가능

#### 추가 방법
- Executor 생성 시 집중 대응 정책 지정 가능
- `execute()` 호출 시 단순 대기 정책은 없음
- 필요 시 **Semaphore 기반 BoundedExecutor** 구현 → (스레드 수 + 큐 허용 크기) 만큼 세마포어 설정

---

### 스레드 팩토리 (ThreadFactory)

- 스레드 풀은 새로운 스레드를 항상 **ThreadFactory**를 통해 생성 (`newThread()` 호출)
- 기본 ThreadFactory는 **데몬이 아니고 특별한 설정 없는 스레드**를 반환

#### ThreadFactory의 특징
- `newThread(Runnable r)` 메서드 하나만 정의되어 있음
- 사용 예시
- 스레드 이름 지정 → 로그/덤프에서 식별 용이
- `UncaughtExceptionHandler` 지정 → 실행 중 예외 로깅
- 생성/종료 시 디버깅 메시지 출력
- 생성/제거된 스레드 수 통계 관리

#### 애플리케이션 보안 관련
- `Executors.privilegedThreadFactory()` 사용 가능
- 호출한 스레드와 동일한 권한, `AccessControlContext`, `contextClassLoader` 적용
- → 여러 클라이언트가 서로 다른 보안 정책을 가질 때 혼란 방지

---

### ThreadPoolExecutor 생성 이후 설정 변경

- `ThreadPoolExecutor`는 생성 후에도 `set` 메소드로 대부분 설정 값 변경 가능
- 예: `setCorePoolSize()`, `setMaximumPoolSize()` 등
- `Executor`를 `ThreadPoolExecutor`로 형변환 후 설정 가능

```java
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();

public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}

public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE,
"UNCAUGHT in thread " + t.getName(), e);
}
});
}

public void run() {
// Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created " + getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting " + getName());
}
}

public static int getThreadsCreated() {
return created.get();
}

public static int getThreadsAlive() {
return alive.get();
}

public static boolean getDebug() {
return debugLifecycle;
}

public static void setDebug(boolean b) {
debugLifecycle = b;
}
}```

#### 설정 변경 차단하기
- `Executors.unconfigurableExecutorService(ExecutorService e)`
- ExecutorService를 감싸서 외부에는 인터페이스만 노출 → 세부 설정 변경 차단
- 주의: `newSingleThreadExecutor()`는 내부적으로 ThreadPoolExecutor를 감싸 단일 스레드 정책 보장
- 외부 코드가 잘못 설정을 변경하지 못하도록 하려면 `unconfigurableExecutorService()` 사용

---

## ThreadPoolExecutor 상속

- `ThreadPoolExecutor`는 상속을 통해 기능 확장 가능
- 하위 클래스가 오버라이드할 수 있는 훅 메서드 제공

### 주요 훅 메서드
- **beforeExecute(Thread t, Runnable r)**
- 작업 실행 전 호출
- 로그, 실행 시작 시점 기록, 모니터링/통계 활용
- 주의: 이 메서드에서 RuntimeException 발생 시 → 작업 실행 X, `afterExecute`도 호출되지 않음

- **afterExecute(Runnable r, Throwable t)**
- 작업 실행 후 항상 호출됨 (정상 종료/예외와 무관)
- 실행 결과 기록, 예외 처리, 실행 시간 측정
- `beforeExecute`에서 저장한 값을 ThreadLocal로 전달 가능

- **terminated()**
- 모든 작업과 스레드가 종료된 뒤 한 번 호출
- 자원 반납, 로그 출력, 알람 발송, 최종 통계 수집 등에 활용

---

## 재귀 함수 병렬화

### 반복문 병렬화
- 반복문 내부에서 **복잡한 연산**이나 **블로킹 I/O** 수행 시, 각 반복 작업이 **독립적**이면 병렬화에 적합
- Executor 사용 시 순차 실행보다 빠르게 처리 가능
- `processInParallel()`은 작업 등록만 하고 바로 반환 → 빠른 처리 시작
- 모든 작업 종료까지 기다리려면 `ExecutorService.invokeAll()` 사용

### 재귀 함수 병렬화
- 재귀 함수도 각 단계 작업이 **이전 결과와 독립적**이면 병렬화 가능

```java
public <T> void sequentialRecursive(List<Node<T>> nodes,
Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}

public <T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(new Runnable() {
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}

public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
```

- **퍼즐 프레임워크 예시**
- 탐색 병렬 실행, 목표 상태 찾으면 즉시 중단
- `CountDownLatch`로 결과 한 번만 설정, 동시 접근은 락으로 보호
- 결과 없음 → 실행 중인 스레드 수 추적 → 더 이상 작업이 없으면 null 반환

- 추가 종료 조건
- 전체 실행 시간 제한
- 이동 횟수 제한
- 클라이언트 요청에 따른 중단 신호 처리

```java
public class PuzzleSolver <P,M> extends ConcurrentPuzzleSolver<P, M> {
PuzzleSolver(Puzzle<P, M> puzzle) {
super(puzzle);
}

private final AtomicInteger taskCount = new AtomicInteger(0);

protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
return new CountingSolverTask(p, m, n);
}

class CountingSolverTask extends SolverTask {
CountingSolverTask(P pos, M move, PuzzleNode<P, M> prev) {
super(pos, move, prev);
taskCount.incrementAndGet();
}

public void run() {
try {
super.run();
} finally {
if (taskCount.decrementAndGet() == 0)
solution.setValue(null);
}
}
}
}
```





Loading