Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-2880]Add procedure framework #5477

Merged
merged 4 commits into from Apr 21, 2022
Merged

[IOTDB-2880]Add procedure framework #5477

merged 4 commits into from Apr 21, 2022

Conversation

cmlmakahts
Copy link
Contributor

@cmlmakahts cmlmakahts commented Apr 11, 2022

Procedure模块功能设计

1.概要描述

主要功能类包括四个部分:

  • ProcedureExecutor (executor):
    用于处理客户端请求,提供任务提交、查询、中止功能

    向scheduler提交与执行Procedure

    通过store持久化Procedure

    主要包含以下数据结构:

    Environment:Procedure执行的环境,用于和其他进程通信

    WorkerThreadPool:worker线程池,worker异步执行scheduler中的任务

    RootProcedureStack:hashMap+Stack的形式,用于维护任务之间的父子关系,保证执行与回滚的顺序一致

    CompletedProcedureRetainer:存储已完成的任务,提供查询与定时清理功能

  • ProcedureScheduler (scheduler)

    队列,主要用于实现任务的调度,并且结合

  • ProcedureStore ():

​ 用于持久化Procedure对象,实现故障恢复

整体流程如图所示:

  1. 客户端提交任务至executor
  2. executor将任务push至scheduler
  3. 返回procid至客户端
  4. workerThread从队列中取出任务(异步)
  5. workerThread执行任务(异步)
  6. workerThread更新任务状态,并持久化(异步)

procedure_cooperate
g)

2.详细设计

整体结构1

2.1 类与接口设计

2.1.1 Procedure

class_procedure

Procedure抽象类,用于描述一个具体的任务。

必须实现的抽象方法包括execute, rollback,abort

Procedure<Env> execute(Env)//任务执行方法,用于实现任务的业务逻辑。
void rollback(Env):任务回滚时执行,用于实现任务失败后回滚的逻辑。
boolean abort(Env):用于实现接收到中止命令时,需要进行的业务逻辑。

此外,提供了其他方法,用户可以根据需要进行重写实现。

void acquireLock(Env)                //实现锁机制,获取锁
void releaseLock(Env)                //实现锁机制,释放锁
boolean holdLock(Env)                //实现锁机制,是否持有锁
void beforeRecover(Env)              //任务从磁盘恢复前处理
void afterRecover(Env)				 //任务从磁盘恢复后处理
void completionCleanup(Env)          //任务执行完成后处理
boolean isYieldAfterExecution(Env)   //是否允许执行权让出
boolean isYieldAfterExecution(Env)   //是否允许执行权让出
boolean setTimeoutFailure(Env)       //超时后处理

2.1.2 ProcedureExecutor

class_ProcedureExecutor

ProcedureExecutor是任务调度框架的核心,主要用于实现具体的任务调度功能。下面详细讲解其内部结构。

1.Map<Long,ReentrantLock> idLockMap

用于维护id的对象锁,防止同一个任务被多次提交。

2.Map<Long,Procedure> procedures

任务对象的索引

3.WorkerThread,WorkerMonitorExecutor

二者共同实现了一个轻量的线程池,初始化时需要指定其核心线程数与最大线程数。

WorkerThread用于异步执行如下:

public void run() {
  long lastUpdated = System.currentTimeMillis();
  try {
    while (isRunning() && keepAlive(lastUpdated)) {
        //从队列中取出一个Procedure
      Procedure<Env> procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
      if (procedure == null) {
        continue;
      }
      ...
      ReentrantLock idLock =
          idLockMap.computeIfAbsent(procedure.getProcId(), id -> new ReentrantLock());
        //加锁
      idLock.lock();
        //执行任务
      executeProcedure(procedure);
     ...
    }

  } catch (Throwable throwable) {
    LOG.warn("Worker terminated {}", this.activeProcedure, throwable);
  } finally {
      //释放锁
    idLock.unlock();  
    LOG.debug("Worker teminated.");
  }
  //临时线程keepAlive结束后执行,核心线程不会执行
  workerThreads.remove(this);
}

WorkerMonitorExecutor定时轮询检查WorkerThread是否存在stuck的情况,如果stuck线程数超过阈值,则创建带KeepAlive的临时线程,用于执行队列中其他任务。

4.Map<Long,CompletedProcedureRetainer> completed, CompletedProcedureCleaner

任务执行完毕后,会从procedures转移至completed,CompletedProcedureCleaner会周期执行检查completed清理过期的任务。

5.Map<Long,RootProcedureStack> rollbackStack,用于实现回滚机制

对于每个通过客户端提交上来的Procedure,称为RootProcedure,而在执行过程中产生的Procedure,成为SubProcedure。RootProcedureStack用于维护每个RootProcedure下所有子Procedure。

2.1.3 ProcedureScheduler

procedure_class_scheduler

ProcedureScheduler提供Procedure的调度功能,目前的实现是一个简单的阻塞队列。后续可以对任务(如按照业务类型分为DataRegion任务和SchemaRegion任务)进行分类,实现一个支持优先级策略的调度。

2.1.4 ProcedureStore

procedure_class_store

ProcedureStore用于持久化Procedure内容到磁盘,用于重启恢复。当前为最简单的实现:每一个procid对应一个id.proc.wal文件,当需要更新状态时,使用buffer写入临时文件,再整个进行替换。

2.1.5 StateMachineProcedure

procedure_class_statemachine

StateMachineProcedure用于实现自定义任务状态。除了7种固定的Procedure状态,用户可以自定义任务状态,例如STEP1,STEP2,然后实现抽象方法executeFromState(),rollbackState(),实现状态直接的转换。

重要的属性与方法包括:

subProcList与addChildProcedure(),用于添加子任务

int cycles和previousState,当执行状态不变时(“原地踏步”),cycles会自增,可以用于设置break,防止stuck。

executeFromState(),抽象方法,用于实现自定义状态流转,多为一个Switch case结构

rollbackState(),抽象方法,用于实现自定义状态的回滚

isRollbackSupported(),定义是否支持回滚,如果是,则失败后会走到ROLLEDBACK状态;否则则会进行重试,直至另一个中止状态(SUCCESS/ROLLEDBACK)

2.1.6 完整类图

class_procedure

2.2 Procedure执行流程

2.2.1 Procedure生命周期与状态流转

Procedure包含以下7中状态,状态之间的流转关系,如下图所示。

INITIALIZING  = 1,              
RUNNABLE  = 2,                  
WAITING = 3,
WAITING_TIMEOUT = 4,
ROLLEDBACK = 5,
SUCCESS = 6,                    
FAILED = 7                        

Procedure_State

  1. 客户端提交Procedure,服务端反序列化后得到Procedure对象proc,初始化状态为INITIALIZING

  2. 服务端ProcedureExecutor执行submitProcedure(proc),proc加入ProcedureSchedure,状态为RUNNABLE

WorkerThread将schedure中状态为RUNNABLE的任务proc取出,执行proc.doExecute()中用户自定义的业务逻辑。

​ 用户根据doExecute()的执行情况去设置下一步的状态

设置为SUCCESS,任务执行成功,更新store,执行清理操作,结束。

设置为FAILED,任务执行失败,执行proc.doExecute()回滚逻辑,根据回滚执行结束后,可以将其设置为ROLLEDBACK状态。

设置为ROLLEDBACK,任务已回滚,更新store,执行清理操作,结束。

设置为WAITING_TIMEOUT,则会将任务加入超时队列,等待周期线程进行重新调度,需要设置setTimeoutFailure方法,默认操作为中止任务。

  1. 可以使用proc.doExecute()方法的返回值, 设置子任务subProc。doExecute()执行完成后,会自动提交这些子任务。此时,父任务状态设置为WAITING。直到所有子任务都执行完,才会设置为RUNNABLE,重新执行。

2.2.2 Procedure:submitProcedure()执行过程

任务提交后,ProcedureExecutor会分别在procedures,rollbackStack中新增一个entry。然后将procedure加入scheduler队尾。

2.2.3 ProcedureExecutor:executeProcedure()执行过程

WorkerThread从调度中拿到proc后,调用ProcedureExecutor:executeProcedure(proc)执行

if proc执行完成
	return
end if	
获取proc的RootRollbackStack stack
do
   if 执行rootProcStack.acquire(),stack状态为FAILED
   //开始回滚
	  if 子任务已全部执行
		 依次回滚stack中所有任务
	  else
         回滚自己
      回滚执行完毕,退出循环
      end if
   else
      获取任务锁
         执行任务(执行过程中,如果任务失败,会将stack置为FAILED)
      释放锁
      stack.release()
      判断任务状态
   end if   
while stack状态是否为FAILED //如果为FAILED,则进入循环,执行回滚

2.2.4 ProcedureExecutor:executeRootStackRollback()执行过程

在3.2.3中,rootRollbackStack中任意一个proc失败,都会使stack状态置为FAILED。下图为RootProcedureStack的类图。

procedure_rootProc

包含:

State state,用于标记栈的状态,当栈中的任一子任务执行失败,则栈的状态会置为Failed,即开始回滚该RootProc下所有的SubProc。

ArrayList subprocStack,用于实现栈的功能。执行时,从头部添加,回滚时,从尾部读取

int running,用于实现一个轻量级的CountdownLatch,通过acquire和release控制,保证栈中所有的subproc都执行后,state才能进入下一个状态。

当stack状态为FAILED,且stack下running为0,即所有子任务都执行完毕时,则开始回滚整个rollbackStack,其执行逻辑如下:

获取proc执行的异常,由root开始,遍历rollbackStack。
将rootProc设置为FAILED,更新store
loop subprocStack(从尾部开始)
	if proc执行成功
		从subprocStack移出
		更新store
		continue;
	end if	
	对于其他状态的任务(RUNNABLE/FAILED)
		获取任务锁
			proc.doRollback()
			清空任务回滚位置的记录
			状态置为ROLLEDBACK
			if proc存在父任务
				删除自己
			else
				删除自己的子任务
			end if	
		释放任务锁
	if proc不是root任务
		执行proc.completionCleanup()
	end if
end loop
所有子任务已回滚,清理根任务

2.2.5 ProcedureExecutor:RootProcedureCleanup()过程

当rootProc处于完成状态(ROLLEDBACK/SUCCESS),会进行清理工作

先执行proc.completionCleanup(),处理proc自定义的清理逻辑
将proc从procedures->completed,用于状态查询	
清除rollbackStack

2.3 ProcedureExecutor重启恢复过程

ProcedureExecutor初始化时,会进行恢复,加载磁盘上的Procedure文件信息。恢复的过程如下:

读取磁盘中的文件,得到一个ProcedureList//list中procId倒序排序,保证子任务在父任务前面。
loop Proc in ProcedureList
	if Proc 已经执行完毕
		在completed中,添加该Proc 
	else
		if proc是rootProc
			在rollbackStack中创建一个entry
        end if
    end if
    统计各个状态Procedure数量
end loop
新建各个状态ProcedureList,runnableList,failedList,waitingList,waitingTimeoutList
loop proc in ProcedureList
	if proc已完成且proc是rootProc
		continue;
	end if
	if proc存在父任务
		父任务childLatch++
	end if
	加载rootProc的rollbackStack
	根据proc的状态放入对应的list
end loop

//1 恢复处于waiting的任务
loop proc in waitingList
	if proc有子任务 //说明存在子任务,父任务等待子任务运行完后,重试
		proc状态设置为RUNNABLE
		runnableList.add(proc)
	else
		proc.afterRecover()
	end if
 end loop
 恢复任务锁
 
 //2 恢复超时的proc
 loop proc in waitingTimeoutList
 	timeoutExecutor.add(proc) //等待周期调度
 end loop
 
 //3 恢复失败的proc
 loop proc in failedList
 proc.afterRecover()
 	scheduler.addBack(proc) //直接加入队尾,等待回滚
 end loop
 
 //4 恢复RunnableList
 loop proc in runnableList
 	proc.afterRecover()
 	scheduler.addBack(proc)
 end loop
全部添加到scheduler后,唤醒scheduler 

4 功能部署

​ 主要用于节点管理的流程,生命周期与ConfigNode一致。部署在ConfigNode上,方便获取DataNode状态。

​ 适配ConfigNode共识层

  1. 只有Leader节点上的Executor可以接收客户端提交请求

  2. Procedure状态更新,通过共识层实现。
    procedure冗余设计

  3. Leader发生切换的时候,执行回调,通过磁盘上的WAL,恢复Executor和Scheduler运行时的状态,期间阻塞请求。恢复过程异步执行,不阻塞ConfigNode功能。

5 优缺点总结

优点:

  1. 为包含多部操作的业务,提供统一的状态管理接口。通过状态调度与回滚,实现业务的顺序性与最终状态一致。

  2. 异步任务形式,配合客户端轮询与重试,可以用于应对故障场景(Leader切换/连接超时)。

缺点:

1. 功能比较“重”,会增加ConfigNode节点的运行成本。
2. Scheduler和WAL,目前是比较简单的实现。对于复杂场景,需要进一步设计。

final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
workerThreads.add(worker);
worker.start();
System.out.println("-------------size:" + workerThreads.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete print

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

"ThreadGroup {} contains running threads; {}: See STDOUT",
this.threadGroup,
e.getMessage());
this.threadGroup.list();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why execute list ?

@Override
public long getDelay(TimeUnit unit) {
long delay = procedure.getTimeoutTimestamp() - System.currentTimeMillis();
System.out.println("---------delay------(" + delay);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete print log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@wangchao316
Copy link
Member

I support the procedure and confignode can be deployed on the same process.

Copy link
Member

@wangchao316 wangchao316 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

FAILED = 7
}

struct SubmitProcedureReq{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a big feature, so I will split the comment to serveral parts. Part one: 1. mvn spotless:apply ? seems that the format it not unified

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

byteBuffer.flip();
channel.write(byteBuffer);
}
Files.deleteIfExists(walTmpPath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete tmp path before move?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, intends to delete the real file path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add some ut for procedureStore to proof the simple recovery scene after restart.

FileChannel channel = fis.getChannel()) {
while (channel.read(byteBuffer) > 0) {
byteBuffer.flip();
procedure = Procedure.newInstance(byteBuffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Should you check the byteBuffer's size?
  2. If a procedure's size is less than bytebuffer's size, after first time deserialization, we clear the bytebuffer, can we deserialize the following procedure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, add a loop for bytebuffer remaining check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new loop is not enough. If the bytebuffer could contains two or more procedures, the loop could not deserialize them corretly. And please add a test for deserialize and serialize for the procedurewal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now one procedure maps to one ProcedureWAL, which contains a buffer and a filename. It won't contain two or more procs in one bytebuffer. In order to control memory overhead, a constraint configuration will be introduced to control concurrent procedures. Is this design acceptable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a procedure is a file? If we delete a batch of timeseries, every timeseries will have a file? Does it cause too many files? How do you think? @wangchao316

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a procedure is a file. delete timeseries does not use procedure framework. there is used for procedure framework, as so delete storage group, add node or remove node.
why a procedure is a file?
Because The procedure records need to be deleted frequently. Therefore, the procedure records can be deleted easily by storing them in different files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

try {
clazz = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Invalid procedure class", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two blank char, haha

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

/** Called by the ProcedureExecutor to assign the ID to the newly created procedure. */
protected void setProcId(long procId) {
this.procId = procId;
this.submittedTime = System.currentTimeMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe set submmitedTime and set state should in different method? SetProcId do more than the name can express?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduce a new method setProcRunnable()

try {
for (int i = 0; isAlive(); i++) {
sendStopSignal();
join(250);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create contant variable in the class?


private ProcedureDelayContainer<Env> takeQuietly() {
try {
return queue.poll(20, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please create a constant variable for 20

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

private enum State {
RUNNING, // The Procedure is running or ready to run
FAILED, // The Procedure failed, waiting for the rollback executing
ROLLINGBACK, // The Procedure failed and the execution was rolledback
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rolling or rolled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a intermedia state for a rollback stack, means that there is at least one child proc is failed/rolledback and the whole stack is rolling back.

this.timeoutExecutor =
new TimeoutExecutorThread<>(this, threadGroup, "ProcedureTimeoutExecutor");
this.workerMonitorExecutor =
new TimeoutExecutorThread<>(this, threadGroup, "WorkerThreadMonitor");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkerThreadMonitor -> ProcedureWorkerThreadMonitor is better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@cmlmakahts cmlmakahts requested a review from mychaow April 19, 2022 06:31
*
* @param procedureList procedure list
*/
public void load(List<Procedure> procedureList) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

procedureList is not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@cmlmakahts cmlmakahts requested a review from mychaow April 20, 2022 13:31
Copy link
Member

@mychaow mychaow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wangchao316 wangchao316 merged commit d74157c into apache:master Apr 21, 2022
xinzhongtianxia added a commit to xinzhongtianxia/iotdb that referenced this pull request Apr 24, 2022
* remotes/upstream/master:
  Serialize measurement schema of insert node to wal entry (apache#5638)
  filter non schemaRegionDir (apache#5640)
  [IOTDB-2976] Add English and Chinese docs for count devices and count storage groups (apache#5635)
  change jenkins timeout from 2 hours to 3 hours
  [IOTDB-2740] Equal size bucket sampling UDFs: EQUAL_SIZE_BUCKET_RANDOM_SAMPLE, EQUAL_SIZE_BUCKET_AGG_SAMPLE, EQUAL_SIZE_BUCKET_M4_SAMPLE (apache#5518)
  Fix the issue that EndTime in FragmentInstanceContext is not set (apache#5636)
  fix concurrent bug of CachedMNodeContainer.putIfAbsent (apache#5632)
  [IOTDB-2880] Fix NPE occured in ci test (apache#5634)
  Fix CI (apache#5639)
  Add ColumnMerger to merge multipul input columns of same sensor into one column (apache#5630)
  Add block cancel when GetBlockTask throws exception (apache#5628)
  fix the bug when matching multi-wildcard in pattern tree (apache#5631)
  [IOTDB-2835]Fix empty page in selfcheck method of TsFileSequenceReader (apache#5552)
  Add FragmentInstanceStateMachine for FragmentInstance State change (apache#5615)
  [IOTDB-2880] Fix import check style (apache#5629)
  [IOTDB-2971] Fix sink handle memory leak (apache#5626)
  [rocksdb] updated the interface support (apache#5625)
  [IOTDB-2970] Code style: Avoid wildcard imports (apache#5622)
  [IOTDB-2880]Add procedure framework (apache#5477)
  [rocksdb] add rocksdb properties (apache#5588)

# Conflicts:
#	server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants