Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3348: [Plasma] Fix bug in which plasma store dies when object created by remo… #2650

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,14 +422,12 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,

// Increment the count of the number of instances of this object that this
// client is using. A call to PlasmaClient::Release is required to decrement
// this
// count. Cache the reference to the object.
// this count. Cache the reference to the object.
IncrementObjectCount(object_id, &object, false);
// We increment the count a second time (and the corresponding decrement will
// happen in a PlasmaClient::Release call in plasma_seal) so even if the
// buffer
// returned by PlasmaClient::Dreate goes out of scope, the object does not get
// released before the call to PlasmaClient::Seal happens.
// buffer returned by PlasmaClient::Create goes out of scope, the object does
// not get released before the call to PlasmaClient::Seal happens.
IncrementObjectCount(object_id, &object, false);
return Status::OK();
}
Expand Down Expand Up @@ -645,24 +643,24 @@ Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) {
}

Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
// If the client is already disconnected, ignore release requests.
if (store_conn_ < 0) {
return Status::OK();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is unrelated to the bug.

// If an object is in the deletion cache, handle it directly without waiting.
auto iter = deletion_cache_.find(object_id);
if (iter != deletion_cache_.end()) {
RETURN_NOT_OK(PerformRelease(object_id));
return Status::OK();
}
// If the client is already disconnected, ignore release requests.
if (store_conn_ < 0) {
return Status::OK();
}
// Add the new object to the release history.
release_history_.push_front(object_id);
// If there are too many bytes in use by the client or if there are too many
// pending release calls, and there are at least some pending release calls in
// the release_history list, then release some objects.

// TODO(wap) Evicition policy only works on host memory, and thus objects
// on the GPU cannot be released currently.
// TODO(wap): Eviction policy only works on host memory, and thus objects on
// the GPU cannot be released currently.
while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ / 100) ||
release_history_.size() > config_.release_delay) &&
release_history_.size() > 0) {
Expand Down
64 changes: 48 additions & 16 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,50 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
object->device_num = entry->device_num;
}

void PlasmaStore::RemoveGetRequest(GetRequest* get_request) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is just factored out from PlasmaStore::ReturnFromGet. We shouldn't be using raw pointers.

// Remove the get request from each of the relevant object_get_requests hash
// tables if it is present there. It should only be present there if the get
// request timed out or if it was issued by a client that has disconnected.
for (ObjectID& object_id : get_request->object_ids) {
auto object_request_iter = object_get_requests_.find(object_id);
if (object_request_iter != object_get_requests_.end()) {
auto& get_requests = object_request_iter->second;
// Erase get_req from the vector.
auto it = std::find(get_requests.begin(), get_requests.end(), get_request);
if (it != get_requests.end()) {
get_requests.erase(it);
// If the vector is empty, remove the object ID from the map.
if (get_requests.empty()) {
object_get_requests_.erase(object_request_iter);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this line. I think this may have been a minor memory leak in the past, that is perhaps some keys never got removed from object_get_requests_.

}
}
}
}
// Remove the get request.
if (get_request->timer != -1) {
ARROW_CHECK(loop_->RemoveTimer(get_request->timer) == AE_OK);
}
delete get_request;
}

void PlasmaStore::RemoveGetRequestsForClient(Client* client) {
std::unordered_set<GetRequest*> get_requests_to_remove;
for (auto const& pair : object_get_requests_) {
for (GetRequest* get_request : pair.second) {
if (get_request->client == client) {
get_requests_to_remove.insert(get_request);
}
}
}

// It shouldn't be possible for a given client to be in the middle of multiple get
// requests.
ARROW_CHECK(get_requests_to_remove.size() <= 1);
for (GetRequest* get_request : get_requests_to_remove) {
RemoveGetRequest(get_request);
}
}

void PlasmaStore::ReturnFromGet(GetRequest* get_req) {
// Figure out how many file descriptors we need to send.
std::unordered_set<int> fds_to_send;
Expand Down Expand Up @@ -305,22 +349,7 @@ void PlasmaStore::ReturnFromGet(GetRequest* get_req) {
// Remove the get request from each of the relevant object_get_requests hash
// tables if it is present there. It should only be present there if the get
// request timed out.
for (ObjectID& object_id : get_req->object_ids) {
auto object_request_iter = object_get_requests_.find(object_id);
if (object_request_iter != object_get_requests_.end()) {
auto& get_requests = object_request_iter->second;
// Erase get_req from the vector.
auto it = std::find(get_requests.begin(), get_requests.end(), get_req);
if (it != get_requests.end()) {
get_requests.erase(it);
}
}
}
// Remove the get request.
if (get_req->timer != -1) {
ARROW_CHECK(loop_->RemoveTimer(get_req->timer) == AE_OK);
}
delete get_req;
RemoveGetRequest(get_req);
}

void PlasmaStore::UpdateObjectGetRequests(const ObjectID& object_id) {
Expand Down Expand Up @@ -584,6 +613,9 @@ void PlasmaStore::DisconnectClient(int client_fd) {
}
}

/// Remove all of the client's GetRequests.
RemoveGetRequestsForClient(client);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key fix. Essentially, when a client disconnects, we need to clean up its get requests.


for (const auto& entry : sealed_objects) {
RemoveFromClientObjectIds(entry.first, entry.second, client);
}
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ class PlasmaStore {
void AddToClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
Client* client);

/// Remove a GetRequest and clean up the relevant data structures.
///
/// @param get_request The GetRequest to remove.
void RemoveGetRequest(GetRequest* get_request);

/// Remove all of the GetRequests for a given client.
///
/// @param client The client whose GetRequests should be removed.
void RemoveGetRequestsForClient(Client* client);

void ReturnFromGet(GetRequest* get_req);

void UpdateObjectGetRequests(const ObjectID& object_id);
Expand Down
36 changes: 33 additions & 3 deletions python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import print_function

import math
import multiprocessing
import os
import pytest
import random
Expand Down Expand Up @@ -117,10 +118,10 @@ def setup_method(self, test_method):
plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
use_valgrind=USE_VALGRIND,
use_one_memory_mapped_file=use_one_memory_mapped_file)
plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
# Connect to Plasma.
self.plasma_client = plasma.connect(plasma_store_name, "", 64)
self.plasma_client2 = plasma.connect(plasma_store_name, "", 0)
self.plasma_client = plasma.connect(self.plasma_store_name, "", 64)
self.plasma_client2 = plasma.connect(self.plasma_store_name, "", 0)

def teardown_method(self, test_method):
try:
Expand Down Expand Up @@ -747,6 +748,35 @@ def test_use_one_memory_mapped_file(self):
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1,
0)

def test_client_death_during_get(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails prior to this PR.

import pyarrow.plasma as plasma

object_id = random_object_id()

def client_blocked_in_get(plasma_store_name):
client = plasma.connect(self.plasma_store_name, "", 0)
# Try to get an object ID that doesn't exist. This should block.
client.get([object_id])

p = multiprocessing.Process(target=client_blocked_in_get,
args=(self.plasma_store_name, ))
p.start()
# Make sure the process is running.
time.sleep(0.2)
assert p.is_alive()

# Kill the client process.
p.terminate()
# Wait a little for the store to process the disconnect event.
time.sleep(0.1)

# Create the object.
self.plasma_client.put(1, object_id=object_id)

# Check that the store is still alive. This will raise an exception if
# the client is dead.
self.plasma_client.contains(random_object_id())


@pytest.mark.plasma
def test_object_id_size():
Expand Down