Skip to content

Commit

Permalink
Merge 39aaf47 into bb67939
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijunfu committed May 9, 2018
2 parents bb67939 + 39aaf47 commit e129082
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
17 changes: 15 additions & 2 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,17 @@ void PlasmaStore::push_notification(ObjectInfoT* object_info) {
}
}

void PlasmaStore::push_notification(ObjectInfoT* object_info, int client_fd) {
auto it = pending_notifications_.find(client_fd);
if (it != pending_notifications_.end()) {
auto notification = create_object_info_buffer(object_info);
it->second.object_notifications.emplace_back(std::move(notification));
send_notifications(it->first);
// The notification gets freed in send_notifications when the notification
// is sent over the socket.
}
}

// Subscribe to notifications about sealed objects.
void PlasmaStore::subscribe_to_updates(Client* client) {
ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
Expand All @@ -646,9 +657,11 @@ void PlasmaStore::subscribe_to_updates(Client* client) {
// TODO(pcm): Is the following neccessary?
pending_notifications_[fd];

// Push notifications to the new subscriber about existing objects.
// Push notifications to the new subscriber about existing sealed objects.
for (const auto& entry : store_info_.objects) {
push_notification(&entry.second->info);
if (entry.second->state == PLASMA_SEALED) {
push_notification(&entry.second->info, fd);
}
}
send_notifications(fd);
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class PlasmaStore {
private:
void push_notification(ObjectInfoT* object_notification);

void push_notification(ObjectInfoT* object_notification, int client_fd);

void add_client_to_object_clients(ObjectTableEntry* entry, Client* client);

void return_from_get(GetRequest* get_req);
Expand Down

0 comments on commit e129082

Please sign in to comment.