diff --git a/src/engine-driver/transport.cr b/src/engine-driver/transport.cr index 9c49c822..9a3b199a 100644 --- a/src/engine-driver/transport.cr +++ b/src/engine-driver/transport.cr @@ -1,3 +1,5 @@ +require "tokenizer" + abstract class EngineDriver::Transport abstract def send(message) : Int32 abstract def send(message, task : EngineDriver::Task, &block : Bytes -> Nil) : Int32 @@ -6,6 +8,8 @@ abstract class EngineDriver::Transport abstract def start_tls(verify_mode : OpenSSL::SSL::VerifyMode, context : OpenSSL::SSL::Context) : Nil abstract def connect(connect_timeout : Int32) + property tokenizer : ::Tokenizer? + # Only SSH implements exec def exec(message) : SSH2::Channel raise ::IO::EOFError.new("exec is only available to SSH transports") @@ -48,6 +52,19 @@ abstract class EngineDriver::Transport end protected def process(data) : Nil + if tokenize = @tokenizer + messages = tokenize.extract(data) + if messages.size == 1 + process_message(messages[0]) + else + messages.each { |message| spawn { process_message(message) } } + end + else + process_message(data) + end + end + + private def process_message(data) # Check if the task provided a response processing block if task = @queue.current if processing = task.processing