Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
Expand All @@ -44,10 +43,11 @@
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.MultiplexingMailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.query.service.QueryDispatcher;
import org.apache.pinot.query.type.TypeFactory;
Expand All @@ -67,7 +67,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final String _reducerHostname;
private final int _reducerPort;

private final MailboxService<Mailbox.MailboxContent> _mailboxService;
private final MailboxService<TransferableBlock> _mailboxService;
private final QueryEnvironment _queryEnvironment;
private final QueryDispatcher _queryDispatcher;

Expand All @@ -91,7 +91,7 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, BrokerRoutingMa
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
new WorkerManager(_reducerHostname, _reducerPort, routingManager));
_queryDispatcher = new QueryDispatcher();
_mailboxService = new GrpcMailboxService(_reducerHostname, _reducerPort, config);
_mailboxService = MultiplexingMailboxService.newInstance(_reducerHostname, _reducerPort, config);

// TODO: move this to a startUp() function.
_mailboxService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import io.grpc.ManagedChannel;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.spi.env.PinotConfiguration;


Expand All @@ -42,16 +42,16 @@
* to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li>
* </ul>
*/
public class GrpcMailboxService implements MailboxService<MailboxContent> {
public class GrpcMailboxService implements MailboxService<TransferableBlock> {
// channel manager
private final ChannelManager _channelManager;
private final String _hostname;
private final int _mailboxPort;

// maintaining a list of registered mailboxes.
private final ConcurrentHashMap<String, ReceivingMailbox<MailboxContent>> _receivingMailboxMap =
private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, SendingMailbox<MailboxContent>> _sendingMailboxMap =
private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
new ConcurrentHashMap<>();

public GrpcMailboxService(String hostname, int mailboxPort, PinotConfiguration extraConfig) {
Expand Down Expand Up @@ -84,16 +84,16 @@ public int getMailboxPort() {
* Register a mailbox, mailbox needs to be registered before use.
* @param mailboxId the id of the mailbox.
*/
public SendingMailbox<MailboxContent> getSendingMailbox(String mailboxId) {
return _sendingMailboxMap.computeIfAbsent(mailboxId, (mId) -> new GrpcSendingMailbox(mId, this));
public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
return _sendingMailboxMap.computeIfAbsent(mailboxId.toString(), (mId) -> new GrpcSendingMailbox(mId, this));
}

/**
* Register a mailbox, mailbox needs to be registered before use.
* @param mailboxId the id of the mailbox.
*/
public ReceivingMailbox<MailboxContent> getReceivingMailbox(String mailboxId) {
return _receivingMailboxMap.computeIfAbsent(mailboxId, (mId) -> new GrpcReceivingMailbox(mId, this));
public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
return _receivingMailboxMap.computeIfAbsent(mailboxId.toString(), (mId) -> new GrpcReceivingMailbox(mId, this));
}

public ManagedChannel getChannel(String mailboxId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,25 @@
*/
package org.apache.pinot.query.mailbox;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.BaseDataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;


/**
* GRPC implementation of the {@link ReceivingMailbox}.
*/
public class GrpcReceivingMailbox implements ReceivingMailbox<MailboxContent> {
public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
private static final long DEFAULT_MAILBOX_INIT_TIMEOUT = 100L;
private final GrpcMailboxService _mailboxService;
private final String _mailboxId;
Expand All @@ -50,15 +58,24 @@ public void init(MailboxContentStreamObserver streamObserver) {
}
}

/**
* Polls the underlying channel and converts the received data into a TransferableBlock. This may return null in the
* following cases:
*
* <p>
* 1. If the mailbox hasn't initialized yet. This means we haven't received any data yet.
* 2. If the received block from the sender is a data-block with 0 rows.
* </p>
*/
@Override
public MailboxContent receive()
public TransferableBlock receive()
throws Exception {
MailboxContent mailboxContent = null;
if (waitForInitialize()) {
mailboxContent = _contentStreamObserver.poll();
_totalMsgReceived.incrementAndGet();
if (!waitForInitialize()) {
return null;
}
return mailboxContent;
MailboxContent mailboxContent = _contentStreamObserver.poll();
_totalMsgReceived.incrementAndGet();
return fromMailboxContent(mailboxContent);
}

@Override
Expand All @@ -85,4 +102,27 @@ private boolean waitForInitialize()
public String getMailboxId() {
return _mailboxId;
}

/**
* Converts the data sent by a {@link GrpcSendingMailbox} to a {@link TransferableBlock}.
*
* @param mailboxContent data sent by a GrpcSendingMailbox.
* @return null if the received MailboxContent is a data-block with 0 rows.
* @throws IOException if the MailboxContent cannot be converted to a TransferableBlock.
*/
@Nullable
private TransferableBlock fromMailboxContent(MailboxContent mailboxContent)
throws IOException {
ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
if (byteBuffer.hasRemaining()) {
BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
if (dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty()) {
return TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
}
if (dataBlock.getNumberOfRows() > 0) {
return new TransferableBlock(dataBlock);
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@
*/
package org.apache.pinot.query.mailbox;

import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.datablock.BaseDataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;


/**
* GRPC implementation of the {@link SendingMailbox}.
*/
public class GrpcSendingMailbox implements SendingMailbox<MailboxContent> {
public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
private final GrpcMailboxService _mailboxService;
private final String _mailboxId;
private final AtomicBoolean _initialized = new AtomicBoolean(false);
Expand Down Expand Up @@ -58,12 +65,13 @@ public void init()
}

@Override
public void send(MailboxContent data)
public void send(TransferableBlock block)
throws UnsupportedOperationException {
if (!_initialized.get()) {
// initialization is special
init();
}
MailboxContent data = toMailboxContent(block.getDataBlock());
_statusStreamObserver.send(data);
_totalMsgSent.incrementAndGet();
}
Expand All @@ -77,4 +85,17 @@ public void complete() {
public String getMailboxId() {
return _mailboxId;
}

private MailboxContent toMailboxContent(BaseDataBlock dataBlock) {
try {
Mailbox.MailboxContent.Builder builder = Mailbox.MailboxContent.newBuilder().setMailboxId(_mailboxId)
.setPayload(ByteString.copyFrom(dataBlock.toBytes()));
if (dataBlock instanceof MetadataBlock) {
builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true");
}
return builder.build();
} catch (IOException e) {
throw new RuntimeException("Error converting to mailbox content", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.mailbox;

import com.google.common.base.Preconditions;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;


public class InMemoryMailboxService implements MailboxService<TransferableBlock> {
// channel manager
private final String _hostname;
private final int _mailboxPort;
static final int DEFAULT_CHANNEL_CAPACITY = 5;
// TODO: This should come from a config and should be consistent with the timeout for GrpcMailboxService
static final int DEFAULT_CHANNEL_TIMEOUT_SECONDS = 1;

private final ConcurrentHashMap<String, InMemoryMailboxState> _mailboxStateMap = new ConcurrentHashMap<>();

public InMemoryMailboxService(String hostname, int mailboxPort) {
_hostname = hostname;
_mailboxPort = mailboxPort;
}

@Override
public void start() {
}

@Override
public void shutdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do the maps get cleaned up? (esp. regarding a cancelled query or one that times out)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a known leak. @walterddr is planning to fix it with query cancellation afaik

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we had a discussion and i created a tracker but never get to it.

but upon thinking although the affect is not much (just a simple object reference), it would be a big problem when stuff error out, see my comment in the new issue: #9626

}

@Override
public String getHostname() {
return _hostname;
}

@Override
public int getMailboxPort() {
return _mailboxPort;
}

public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
String mId = mailboxId.toString();
return _mailboxStateMap.computeIfAbsent(mId, this::newMailboxState)._sendingMailbox;
}

public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
String mId = mailboxId.toString();
return _mailboxStateMap.computeIfAbsent(mId, this::newMailboxState)._receivingMailbox;
}

InMemoryMailboxState newMailboxState(String mailboxId) {
BlockingQueue<TransferableBlock> queue = createDefaultChannel();
return new InMemoryMailboxState(new InMemorySendingMailbox(mailboxId, queue),
new InMemoryReceivingMailbox(mailboxId, queue), queue);
}

private ArrayBlockingQueue<TransferableBlock> createDefaultChannel() {
return new ArrayBlockingQueue<>(DEFAULT_CHANNEL_CAPACITY);
}

static class InMemoryMailboxState {
ReceivingMailbox<TransferableBlock> _receivingMailbox;
SendingMailbox<TransferableBlock> _sendingMailbox;
BlockingQueue<TransferableBlock> _queue;

InMemoryMailboxState(SendingMailbox<TransferableBlock> sendingMailbox,
ReceivingMailbox<TransferableBlock> receivingMailbox, BlockingQueue<TransferableBlock> queue) {
_receivingMailbox = receivingMailbox;
_sendingMailbox = sendingMailbox;
_queue = queue;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.mailbox;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;


public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
private final String _mailboxId;
private final BlockingQueue<TransferableBlock> _queue;
private volatile boolean _closed;

public InMemoryReceivingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
_mailboxId = mailboxId;
_queue = queue;
_closed = false;
}

@Override
public String getMailboxId() {
return _mailboxId;
}

@Override
public TransferableBlock receive()
throws Exception {
TransferableBlock block = _queue.poll(
InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
// If the poll timed out, we return a null since MailboxReceiveOperator can continue to check other mailboxes
if (block == null) {
return null;
}
if (block.isEndOfStreamBlock()) {
_closed = true;
}
return block;
}

@Override
public boolean isInitialized() {
return true;
}

@Override
public boolean isClosed() {
return _closed && _queue.size() == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we check that the _queue.size() == 0? if I cancel the query or it times out and I close the mailbox I shouldn't be forced to process the queue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in GRPC isClosed() <=> isCompleted(); i think the wording might be misleading in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally there shouldn't be any case where _closed is true but _queue.size() is not 0. Only case where it may happen is when the sender sends something immediately after sending the EOS block. We can also throw an exception here in that case.

To better understand this, this is how I believe some of the scenarios will be handled:

  1. If the sender has died, it would have sent an end of stream block and then terminated on its end. The receiver here would then get the end of stream block and send it upstream to MailboxReceiveOperator. The queue should be empty after _closed is marked true.
  2. If for some reason the sender was not able to send the EOS block, MailboxReceiveOperator will time out after the query timeout is reached.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upon checking the usage of the interface method isClosed(). it is actually in the non-threadsafe mailboxreceiveoperator.

}
}
Loading