Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 0 additions & 42 deletions python/ray/tests/test_reconstruction.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import signal
import sys
import time

import numpy as np
import pytest
Expand Down Expand Up @@ -682,47 +681,6 @@ def dependent_task(x):
ray.get(x)


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_reconstruction_hangs(ray_start_cluster):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"max_direct_call_object_size": 100,
"task_retry_delay_ms": 100,
"object_timeout_milliseconds": 200,
"fetch_warn_timeout_milliseconds": 1000,
}
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0, _system_config=config, enable_object_reconstruction=True)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
cluster.add_node(num_cpus=1, object_store_memory=10**8)
cluster.wait_for_nodes()

@ray.remote
def sleep():
# Task takes longer than the reconstruction timeout.
time.sleep(3)
return np.zeros(10**5, dtype=np.uint8)

@ray.remote
def dependent_task(x):
return

obj = sleep.options(resources={"node1": 1}).remote()
for _ in range(3):
ray.get(dependent_task.remote(obj))
x = dependent_task.remote(obj)
cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
ray.get(x)


if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
44 changes: 5 additions & 39 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,9 @@ void ReferenceCounter::RemoveLocalReference(const ObjectID &object_id,
}

void ReferenceCounter::UpdateSubmittedTaskReferences(
const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids_to_add,
const std::vector<ObjectID> &argument_ids_to_remove, std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreation(return_id, true);
}
for (const ObjectID &argument_id : argument_ids_to_add) {
RAY_LOG(DEBUG) << "Increment ref count for submitted task argument " << argument_id;
auto it = object_id_refs_.find(argument_id);
Expand All @@ -294,11 +290,8 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
}

void ReferenceCounter::UpdateResubmittedTaskReferences(
const std::vector<ObjectID> return_ids, const std::vector<ObjectID> &argument_ids) {
const std::vector<ObjectID> &argument_ids) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreation(return_id, true);
}
for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id);
RAY_CHECK(it != object_id_refs_.end());
Expand All @@ -311,13 +304,10 @@ void ReferenceCounter::UpdateResubmittedTaskReferences(
}

void ReferenceCounter::UpdateFinishedTaskReferences(
const std::vector<ObjectID> return_ids, const std::vector<ObjectID> &argument_ids,
bool release_lineage, const rpc::Address &worker_addr,
const ReferenceTableProto &borrowed_refs, std::vector<ObjectID> *deleted) {
const std::vector<ObjectID> &argument_ids, bool release_lineage,
const rpc::Address &worker_addr, const ReferenceTableProto &borrowed_refs,
std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreation(return_id, false);
}
// Must merge the borrower refs before decrementing any ref counts. This is
// to make sure that for serialized IDs, we increment the borrower count for
// the inner ID before decrementing the submitted_task_ref_count for the
Expand Down Expand Up @@ -1043,19 +1033,6 @@ bool ReferenceCounter::RemoveObjectLocation(const ObjectID &object_id,
return true;
}

void ReferenceCounter::UpdateObjectPendingCreation(const ObjectID &object_id,
bool pending_creation) {
auto it = object_id_refs_.find(object_id);
bool push = false;
if (it != object_id_refs_.end()) {
push = (it->second.pending_creation != pending_creation);
it->second.pending_creation = pending_creation;
}
if (push) {
PushToLocationSubscribers(it);
}
}

absl::optional<absl::flat_hash_set<NodeID>> ReferenceCounter::GetObjectLocations(
const ObjectID &object_id) {
absl::MutexLock lock(&mutex_);
Expand Down Expand Up @@ -1196,15 +1173,6 @@ bool ReferenceCounter::IsObjectReconstructable(const ObjectID &object_id) const
return it->second.is_reconstructable;
}

bool ReferenceCounter::IsObjectPendingCreation(const ObjectID &object_id) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
return false;
}
return it->second.pending_creation;
}

void ReferenceCounter::PushToLocationSubscribers(ReferenceTable::iterator it) {
const auto &object_id = it->first;
const auto &locations = it->second.locations;
Expand All @@ -1217,8 +1185,7 @@ void ReferenceCounter::PushToLocationSubscribers(ReferenceTable::iterator it) {
<< " locations, spilled url: " << spilled_url
<< ", spilled node ID: " << spilled_node_id
<< ", and object size: " << object_size
<< ", and primary node ID: " << primary_node_id << ", pending creation? "
<< it->second.pending_creation;
<< ", and primary node ID: " << primary_node_id;
rpc::PubMessage pub_message;
pub_message.set_key_id(object_id.Binary());
pub_message.set_channel_type(rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL);
Expand Down Expand Up @@ -1254,7 +1221,6 @@ void ReferenceCounter::FillObjectInformationInternal(
object_info->set_spilled_node_id(it->second.spilled_node_id.Binary());
auto primary_node_id = it->second.pinned_at_raylet_id.value_or(NodeID::Nil());
object_info->set_primary_node_id(primary_node_id.Binary());
object_info->set_pending_creation(it->second.pending_creation);
}

void ReferenceCounter::PublishObjectLocationSnapshot(const ObjectID &object_id) {
Expand Down
19 changes: 4 additions & 15 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// \param[out] deleted Any objects that are newly out of scope after this
/// function call.
void UpdateSubmittedTaskReferences(
const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids_to_add,
const std::vector<ObjectID> &argument_ids_to_remove = std::vector<ObjectID>(),
std::vector<ObjectID> *deleted = nullptr) LOCKS_EXCLUDED(mutex_);
Expand All @@ -122,13 +121,13 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// have already incremented them when the task was first submitted.
///
/// \param[in] argument_ids The arguments of the task to add references for.
void UpdateResubmittedTaskReferences(const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids)
void UpdateResubmittedTaskReferences(const std::vector<ObjectID> &argument_ids)
LOCKS_EXCLUDED(mutex_);

/// Update object references that were given to a submitted task. The task
/// may still be borrowing any object IDs that were contained in its
/// arguments. This should be called when the task finishes.
/// arguments. This should be called when inlined dependencies are inlined or
/// when the task finishes for plasma dependencies.
///
/// \param[in] object_ids The object IDs to remove references for.
/// \param[in] release_lineage Whether to decrement the arguments' lineage
Expand All @@ -140,8 +139,7 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// arguments. Some references in this table may still be borrowed by the
/// worker and/or a task that the worker submitted.
/// \param[out] deleted The object IDs whos reference counts reached zero.
void UpdateFinishedTaskReferences(const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids,
void UpdateFinishedTaskReferences(const std::vector<ObjectID> &argument_ids,
bool release_lineage, const rpc::Address &worker_addr,
const ReferenceTableProto &borrowed_refs,
std::vector<ObjectID> *deleted)
Expand Down Expand Up @@ -472,10 +470,6 @@ class ReferenceCounter : public ReferenceCounterInterface,

bool IsObjectReconstructable(const ObjectID &object_id) const;

/// Whether the object is pending creation (the task that creates it is
/// scheduled/executing).
bool IsObjectPendingCreation(const ObjectID &object_id) const;

private:
struct Reference {
/// Constructor for a reference whose origin is unknown.
Expand Down Expand Up @@ -629,8 +623,6 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// This will be Nil if the object has not been spilled or if it is spilled
/// distributed external storage.
NodeID spilled_node_id = NodeID::Nil();
/// Whether the task that creates this object is scheduled/executing.
bool pending_creation = false;
/// Callback that will be called when this ObjectID no longer has
/// references.
std::function<void(const ObjectID &)> on_delete;
Expand Down Expand Up @@ -765,9 +757,6 @@ class ReferenceCounter : public ReferenceCounterInterface,
void AddObjectLocationInternal(ReferenceTable::iterator it, const NodeID &node_id)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

void UpdateObjectPendingCreation(const ObjectID &object_id, bool pending_creation)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

/// Publish object locations to all subscribers.
///
/// \param[in] it The reference iterator for the object.
Expand Down
Loading