From 0480cac16d7899b8d2393d53107c3cdfc6e121f9 Mon Sep 17 00:00:00 2001 From: Aliaksei Syrel Date: Wed, 27 Mar 2024 11:22:34 +0100 Subject: [PATCH] [feenkcom/gtoolkit#3703] add AsyncForkPollFuture --- src/Futures/AsyncForkPollFuture.class.st | 80 ++++++++++++++++++++++++ src/Futures/BlockClosure.extension.st | 5 ++ 2 files changed, 85 insertions(+) create mode 100644 src/Futures/AsyncForkPollFuture.class.st diff --git a/src/Futures/AsyncForkPollFuture.class.st b/src/Futures/AsyncForkPollFuture.class.st new file mode 100644 index 0000000..b00a52e --- /dev/null +++ b/src/Futures/AsyncForkPollFuture.class.st @@ -0,0 +1,80 @@ +" +An alternative to {{gtClass:AsyncPollFuture}} but executes a {{gtClass:BlockClosure}} in its own forked process. +" +Class { + #name : #AsyncForkPollFuture, + #superclass : #Object, + #traits : 'TAsyncFuture', + #classTraits : 'TAsyncFuture classTrait', + #instVars : [ + 'pollBlock', + 'executedBlock', + 'computationProcess', + 'computationValue', + 'waker' + ], + #category : #'Futures-Base - Futures' +} + +{ #category : #'instance creation' } +AsyncForkPollFuture class >> pollBlock: aBlock [ + ^ self new pollBlock: aBlock +] + +{ #category : #accessing } +AsyncForkPollFuture >> initialize [ + super initialize. + + computationValue := AsyncMutex for: AsyncOptionalValue none. + computationProcess := AsyncMutex for: AsyncOptionalValue none. + waker := AsyncFutureExecutionAtomicWaker new +] + +{ #category : #'api - future' } +AsyncForkPollFuture >> poll: anAsynchronousContext [ + computationValue lock: [ :aValueOption | + aValueOption + ifSome: [ :aValue | + computationValue value: AsyncOptionalValue none. + ^ AsyncPoll ready: aValue ] ]. + + computationProcess lock: [ :aProcessOption | + aProcessOption + ifSome: [ :aProcess | + waker register: anAsynchronousContext waker. + ^ AsyncPoll pending ] ]. + + self + assert: [ pollBlock notNil ] + description: [ 'Block already polled' ]. + + executedBlock := pollBlock. + pollBlock := nil. + computationProcess value: ([ + computationValue value: (AsyncOptionalValue some: executedBlock value). + computationProcess value: (AsyncOptionalValue none). + waker wakeUp + ] newProcess). + + waker register: anAsynchronousContext waker. + computationProcess value resume. + ^ AsyncPoll pending +] + +{ #category : #initialization } +AsyncForkPollFuture >> pollBlock: aBlock [ + pollBlock := aBlock. + executedBlock := nil +] + +{ #category : #'api - future copy' } +AsyncForkPollFuture >> postSimilarCopy [ + "Is called by similarCopy. self is a shallow copy, implementors should set the fields as necessary to complete the similar copy" + + computationValue := AsyncMutex for: AsyncOptionalValue none. + computationProcess := AsyncMutex for: AsyncOptionalValue none. + waker := AsyncFutureExecutionAtomicWaker new. + + executedBlock ifNotNil: [ pollBlock := executedBlock ]. + executedBlock := nil +] diff --git a/src/Futures/BlockClosure.extension.st b/src/Futures/BlockClosure.extension.st index 1065f7c..20b557d 100644 --- a/src/Futures/BlockClosure.extension.st +++ b/src/Futures/BlockClosure.extension.st @@ -1,5 +1,10 @@ Extension { #name : #BlockClosure } +{ #category : #'*Futures' } +BlockClosure >> asAsyncForkedFuture [ + ^ AsyncForkPollFuture pollBlock: self +] + { #category : #'*Futures' } BlockClosure >> asAsyncFuture [ ^ AsyncPollFuture pollBlock: self