Skip to content

Asynchronous Operations

DuyHai DOAN edited this page Sep 6, 2016 · 7 revisions

All the CRUD API, DSL API and RAW API support asynchronous execution

Asynchronous CRUD

Insert

	final CompletableFuture<Empty> future = userManager
		.crud()
		.insert(....)
		.executeAsync();
		
	final CompletableFuture<ExecutionInfo> future = userManager
		.crud()
		.insert(....)
		.executeAsyncWithStats();		
		

Select

	final CompletableFuture<User> future = userManager
		.crud()
		.findById(....)
		.getOneAsync();
		
	final CompletableFuture<Tuple2<User, ExecutionInfo>> future = userManager
		.crud()
		.findById(....)
		.getOneAsyncWithStats();		
		

Delete

	final CompletableFuture<Empty> future = userManager
		.crud()
		.deleteById(....)
		.executeAsync();
		
	final CompletableFuture<ExecutionInfo> future = userManager
		.crud()
		.deleteById(....)
		.executeAsyncWithStats();		

	final CompletableFuture<Empty> future = userManager
		.crud()
		.deleteByPartitionKeys(....)
		.executeAsync();
		
	final CompletableFuture<ExecutionInfo> future = userManager
		.crud()
		.deleteByPartitionKeys(....)
		.executeAsyncWithStats();		
		

Asynchronous DSL

Select

	final CompletableFuture<User> future = userManager
		.dsl()
		.select()
		...
		.getOneAsync();
		
	final CompletableFuture<Tuple2<User, ExecutionInfo>> future = userManager
		.dsl()
		.select()
		...
		.getOneAsyncWithStats();		

	final CompletableFuture<List<User>> futures = userManager
		.dsl()
		.select()
		...
		.getListAsync();

	final CompletableFuture<Tuple2<List<User>, ExecutionInfo>> futures = userManager
		.dsl()
		.select()
		...
		.getListAsyncWithStats();		
				

Update

	final CompletableFuture<Empty> future = userManager
		.dsl()
		.update()
		...
		.executeAsync();
		
	final CompletableFuture<ExecutionInfo> future = userManager
		.dsl()
		.update()
		...
		.executeAsyncWithStats();			
		

Delete

	final CompletableFuture<Empty> future = userManager
		.dsl()
		.delete()
		...
		.executeAsync();
		
	final CompletableFuture<ExecutionInfo> future = userManager
		.dsl()
		.delete()
		...
		.executeAsyncWithStats();			
		

Asynchronous Query

Typed Query

	final CompletableFuture<User> future = userManager
		.raw()
		.typedQueryForSelect(...)
		...
		.getOneAsync();

	final CompletableFuture<Tuple2<User, ExecutionInfo>> future = userManager
		.raw()
		.typedQueryForSelect(...)
		...
		.getOneAsyncWithStats();
				
	final CompletableFuture<List<User>> futures = userManager
		.raw()
		.typedQueryForSelect(...)
		...
		.getListAsync();

	final CompletableFuture<Tuple2<List<User>, ExecutionInfo>> futures = userManager
		.raw()
		.typedQueryForSelect(...)
		...
		.getListAsyncWithStats();				

Native Query

	final CompletableFuture<TypedMap> future = userManager
		.raw()
		.nativeQuery(...)
		...
		.getOneAsync();

	final CompletableFuture<Tuple2<TypedMap, ExecutionInfo>> future = userManager
		.raw()
		.nativeQuery(...)
		...
		.getOneAsyncWithStats();

	final CompletableFuture<List<TypedMap>> futures = userManager
		.raw()
		.nativeQuery(...)
		...
		.getListAsync();

	final CompletableFuture<Tuple2<List<TypedMap>, ExecutionInfo>> futures = userManager
		.raw()
		.nativeQuery(...)
		...
		.getListAsyncWithStats();
		
	final CompletableFuture<Empty> future = userManager
		.raw()
		.nativeQuery(...)
		...
		.executeAsync();

	final CompletableFuture<ExecutionInfo> future = userManager
		.raw()
		.nativeQuery(...)
		...
		.executeAsyncWithStats();	
			

The Empty enum

It is just a marker enum. Sometimes an operation does not return any meaningful result (like delete() or update()). Still we need to be able to ensure that the asynchronous operation has been executed correctly by calling get(). If the execution has been successful, the singleton Empty is returned, otherwise an exception is raised

Exception Handling

All exceptions that could be raised during the blocking call to get() will return the following runtime exception

  • DriverException
  • DriverInternalError
  • AchillesLightWeightTransactionException
  • AchillesBeanValidationException
  • AchillesInvalidTableException
  • AchillesStaleObjectStateException
  • AchillesException

Thread Pooling

By default a Thread Pool executor with a LinkedBlockingQueue of size 1000 will be used for all asynchronous operations. The default configuration of this thread pool is given below

    new ThreadPoolExecutor(5, 20, 60, TimeUnit.SECONDS, 
    new LinkedBlockingQueue<Runnable>(1000), 
    new DefaultExecutorThreadFactory());

The default implementation of the ThreadFactory is

    public class DefaultExecutorThreadFactory implements ThreadFactory {

        private static final Logger logger = getLogger("achilles-default-executor");
    
        private final AtomicInteger threadNumber = new AtomicInteger(0);
        private Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                logger.error("Uncaught asynchronous exception : "+e.getMessage(), e);
            }
        };
    
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("achilles-default-executor-" + threadNumber.incrementAndGet());
            thread.setDaemon(true);
            thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
            return thread;
        }
    }

Of course, you call provide your own thread pool or configure the default thread pool with your custom parameters. Please refer to Thread Pool Config

To close properly the thread pool on application removal, Achilles exposes the ManagerFactory.shutDown() method. This method is annotated with javax.annotation.PreDestroy so that in a managed contained, it will be invoked automatically. Otherwise you can always manually call the shutDown()method.

Home

Clone this wiki locally