Skip to content

Commit

Permalink
[Concurrency] TaskLocals lookup "skip" optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Feb 13, 2021
1 parent 1044723 commit b811b12
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 133 deletions.
6 changes: 1 addition & 5 deletions include/swift/ABI/MetadataValues.h
Expand Up @@ -1938,8 +1938,7 @@ class JobFlags : public FlagSet<size_t> {

Task_IsChildTask = 24,
Task_IsFuture = 25,
Task_IsTaskGroup = 26,
Task_HasLocalValues = 27
Task_IsTaskGroup = 26
};

explicit JobFlags(size_t bits) : FlagSet(bits) {}
Expand Down Expand Up @@ -1969,9 +1968,6 @@ class JobFlags : public FlagSet<size_t> {
FLAGSET_DEFINE_FLAG_ACCESSORS(Task_IsTaskGroup,
task_isTaskGroup,
task_setIsTaskGroup)
FLAGSET_DEFINE_FLAG_ACCESSORS(Task_HasLocalValues,
task_hasLocalValues,
task_setHasLocalValues)
};

/// Kinds of task status record.
Expand Down
112 changes: 71 additions & 41 deletions include/swift/ABI/Task.h
Expand Up @@ -139,13 +139,13 @@ class ActiveTaskStatus {
///
/// +--------------------------+
/// | childFragment? |
/// | taskLocalValuesFragment |
/// | taskLocalValuesFragment? |
/// | groupFragment? |
/// | futureFragment? |*
/// +--------------------------+
///
/// The future fragment is dynamic in size, based on the future result type
/// it can hold, and thus must be the *last* fragment.
/// * The future fragment is dynamic in size, based on the future result type
/// it can hold, and thus must be the *last* fragment.
class AsyncTask : public HeapObject, public Job {
public:
/// The context for resuming the job. When a task is scheduled
Expand Down Expand Up @@ -217,7 +217,7 @@ class AsyncTask : public HeapObject, public Job {
return reinterpret_cast<ChildFragment*>(this + 1);
}

// ==== Task Locals Values-- -------------------------------------------------
// ==== Task Locals Values ---------------------------------------------------

class TaskLocalValuesFragment {
public:
Expand All @@ -230,8 +230,14 @@ class AsyncTask : public HeapObject, public Job {
IsTerminal = 0b00,
/// The storage pointer points at the next TaskLocalChainItem in this task.
IsNext = 0b01,
/// The storage pointer points at a parent AsyncTask,
/// in which we should continue the lookup.
/// The storage pointer points at a parent AsyncTask, in which we should
/// continue the lookup.
///
/// Note that this may not necessarily be the same as the task's parent
/// task -- we may point to a super-parent if we know / that the parent
/// does not "contribute" any task local values. This is to speed up
/// lookups by skipping empty parent tasks during get(), and explained
/// in depth in `createParentLink`.
IsParent = 0b11
};

Expand Down Expand Up @@ -272,23 +278,42 @@ class AsyncTask : public HeapObject, public Job {
/// the TaskLocalItem linked list into the appropriate parent.
static TaskLocalItem* createParentLink(AsyncTask *task, AsyncTask *parent) {
assert(parent);
assert(parent->hasTaskLocalValues());
assert(task->hasTaskLocalValues());
size_t amountToAllocate = TaskLocalItem::itemSize(/*valueType*/nullptr);
// assert(amountToAllocate % MaximumAlignment == 0); // TODO: do we need this?
void *allocation = malloc(amountToAllocate); // TODO: use task-local allocator
fprintf(stderr, "MALLOC parent link item: %d\n", allocation);

TaskLocalItem *item =
new(allocation) TaskLocalItem(nullptr, nullptr);

auto next = parent->localValuesFragment()->head;
auto nextLinkType = next ? NextLinkType::IsParent : NextLinkType::IsTerminal;
item->next = reinterpret_cast<uintptr_t>(next) |
static_cast<uintptr_t>(nextLinkType);

fprintf(stderr, "error: %s [%s:%d] created parent item: task=%d -> parentTask=%d :: item=%d -> item->getNext()=%d\n", __FUNCTION__, __FILE_NAME__, __LINE__,
task, parent, item, item->getNext());
auto parentHead = parent->localValuesFragment()->head;
if (parentHead) {
if (parentHead->isEmpty()) {
switch (parentHead->getNextLinkType()) {
case NextLinkType::IsParent:
// it has no values, and just points to its parent,
// therefore skip also skip pointing to that parent and point
// to whichever parent it was pointing to as well, it may be its
// immediate parent, or some super-parent.
item->next = reinterpret_cast<uintptr_t>(parentHead->getNext());
static_cast<uintptr_t>(NextLinkType::IsParent);

This comment has been minimized.

Copy link
@yln

yln Feb 15, 2021

Contributor

Is there a missing bitwise or to include NextLinkType::IsParent bits?
(one more occurrence below)

This comment has been minimized.

Copy link
@ktoso

ktoso Feb 15, 2021

Author Member

Oh no 😱 Yes that's missing a |, thanks a ton for review/spotting this. I'll fix right away.

break;
case NextLinkType::IsNext:
assert(false && "empty taskValue head in parent task, yet parent's 'head' is `IsNext`, "
"this should not happen, as it implies the parent must have stored some value.");
break;
case NextLinkType::IsTerminal:
item->next = reinterpret_cast<uintptr_t>(parentHead->getNext());
static_cast<uintptr_t>(NextLinkType::IsTerminal);
break;
}
} else {
item->next = reinterpret_cast<uintptr_t>(parentHead) |
static_cast<uintptr_t>(NextLinkType::IsParent);
}
} else {
item->next = reinterpret_cast<uintptr_t>(parentHead) |
static_cast<uintptr_t>(NextLinkType::IsTerminal);
}

return item;
}
Expand All @@ -297,7 +322,6 @@ class AsyncTask : public HeapObject, public Job {
const Metadata *keyType,
const Metadata *valueType) {
assert(task);
assert(task->hasTaskLocalValues());
size_t amountToAllocate = TaskLocalItem::itemSize(valueType);
// assert(amountToAllocate % MaximumAlignment == 0); // TODO: do we need this?
void *allocation = malloc(amountToAllocate); // TODO: use task-local allocator
Expand Down Expand Up @@ -327,9 +351,14 @@ class AsyncTask : public HeapObject, public Job {
return static_cast<NextLinkType>(next & statusMask);
}

/// Item does not contain any actual value, and is only used to point at
/// a specific parent item.
bool isEmpty() {
return !valueType;
}

/// Retrieve a pointer to the storage of the value.
OpaqueValue *getStoragePtr() {
// assert(valueType && "valueType must be set before accessing storage pointer.");
return reinterpret_cast<OpaqueValue *>(
reinterpret_cast<char *>(this) + storageOffset(valueType));
}
Expand All @@ -356,7 +385,8 @@ class AsyncTask : public HeapObject, public Job {
};

private:
/// Single-linked list of task local values.
/// A stack (single-linked list) of task local values.
///
/// Once task local values within this task are traversed, the list continues
/// to the "next parent that contributes task local values," or if no such
/// parent exists it terminates with null.
Expand All @@ -366,11 +396,28 @@ class AsyncTask : public HeapObject, public Job {
/// parent that has values. If this task does not have any values, the head
/// pointer MAY immediately point at this task's parent task which has values.
///
/// NOTE: Check the higher bits to know if this is a self or parent value.
/// ### Concurrency
/// Access to the head is only performed from the task itself, when it
/// creates child tasks, the child during creation will inspect its parent's
/// task local value stack head, and point to it. This is done on the calling
/// task, and thus needs not to be synchronized. Subsequent traversal is
/// performed by child tasks concurrently, however they use their own
/// pointers/stack and can never mutate the parent's stack.
///
/// The stack is only pushed/popped by the owning task, at the beginning and
/// end a `body` block of `withLocal(_:boundTo:body:)` respectively.
///
/// Correctness of the stack strongly relies on the guarantee that tasks
/// never outline a scope in which they are created. Thanks to this, if
/// tasks are created inside the `body` of `withLocal(_:,boundTo:body:)`
/// all tasks created inside the `withLocal` body must complete before it
/// returns, as such, any child tasks potentially accessing the value stack
/// are guaranteed to be completed by the time we pop values off the stack
/// (after the body has completed).
TaskLocalItem *head = nullptr;

public:
TaskLocalValuesFragment() {}
TaskLocalValuesFragment() {}

void destroy();

Expand All @@ -386,13 +433,7 @@ class AsyncTask : public HeapObject, public Job {
OpaqueValue* get(const Metadata *keyType);
};

bool hasTaskLocalValues() const {
return Flags.task_hasLocalValues();
}

TaskLocalValuesFragment *localValuesFragment() {
assert(hasTaskLocalValues());

auto offset = reinterpret_cast<char*>(this);
offset += sizeof(AsyncTask);

Expand All @@ -404,14 +445,7 @@ class AsyncTask : public HeapObject, public Job {
}

OpaqueValue* localValueGet(const Metadata *keyType) {
if (hasTaskLocalValues()) {
return localValuesFragment()->get(keyType);
} else {
// We are guaranteed to have a task-local fragment even if this task has
// no bindings, but its parent tasks do. Thus, if no fragment, we can
// immediately return null.
return nullptr;
}
return localValuesFragment()->get(keyType);
}

// ==== TaskGroup ------------------------------------------------------------
Expand Down Expand Up @@ -724,9 +758,7 @@ class AsyncTask : public HeapObject, public Job {
offset += sizeof(ChildFragment);
}

if (hasTaskLocalValues()) {
offset += sizeof(TaskLocalValuesFragment);
}
offset += sizeof(TaskLocalValuesFragment);

return reinterpret_cast<GroupFragment *>(offset);
}
Expand Down Expand Up @@ -861,9 +893,7 @@ class AsyncTask : public HeapObject, public Job {
offset += sizeof(ChildFragment);
}

if (hasTaskLocalValues()) {
offset += sizeof(TaskLocalValuesFragment);
}
offset += sizeof(TaskLocalValuesFragment);

if (isTaskGroup()) {
offset += sizeof(GroupFragment);
Expand Down
18 changes: 3 additions & 15 deletions include/swift/Runtime/Concurrency.h
Expand Up @@ -257,31 +257,19 @@ void swift_task_localValuePush(AsyncTask* task,
/* +1 */ OpaqueValue *value,
const Metadata *valueType);

/// Remove task `count` local bindings from the task local binding stack.
/// Crashes if `count` is greater if the number of task locals stored in the task.
/// Remove task a local binding from the task local values stack.
///
/// This must be only invoked by the task itself to avoid concurrent writes.
///
/// Its Swift signature is
///
/// \code
/// public func _taskLocalValuePop(
/// _ task: Builtin.NativeObject,
/// count: Int
/// _ task: Builtin.NativeObject
/// )
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_localValuePop(AsyncTask* task, int count);

/// Checks if task (or any of its parent tasks) has task local values.
///
/// \code
/// func swift_task_hasTaskLocalValues<Key>(
/// _ task: Builtin.NativeObject,
/// ) -> Bool
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_hasTaskLocalValues(AsyncTask* task);
void swift_task_localValuePop(AsyncTask* task);

/// This should have the same representation as an enum like this:
/// enum NearestTaskDeadline {
Expand Down
53 changes: 14 additions & 39 deletions stdlib/public/Concurrency/Task.cpp
Expand Up @@ -135,11 +135,7 @@ SWIFT_CC(swift)
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
auto task = static_cast<AsyncTask*>(obj);

fprintf(stderr, "destroy task (%d): %d\n",
task->hasTaskLocalValues(), task);


// For a group, destroy the queues and results.
// For a group, destroy the queues and results.
if (task->isTaskGroup()) {
task->groupFragment()->destroy();
}
Expand All @@ -150,9 +146,7 @@ static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
}

// release any objects potentially held as task local values.
if (task->hasTaskLocalValues()) {
task->localValuesFragment()->destroy();
}
task->localValuesFragment()->destroy();

// The task execution itself should always hold a reference to it, so
// if we get here, we know the task has finished running, which means
Expand Down Expand Up @@ -242,15 +236,9 @@ AsyncTaskAndContext swift::swift_task_create_future_f(
headerSize += sizeof(AsyncTask::ChildFragment);
}

bool needsTaskLocalsFragment =
flags.task_hasLocalValues() || (parent && parent->hasTaskLocalValues());
fprintf(stderr, "error: %s [%s:%d] prepare task taskHasTaskLocals=%d parentHasLocals=%d needsLocals=%d\n", __FUNCTION__, __FILE_NAME__, __LINE__,
flags.task_hasLocalValues(), (parent && parent->hasTaskLocalValues()), needsTaskLocalsFragment);
if (needsTaskLocalsFragment) {
headerSize += sizeof(AsyncTask::TaskLocalValuesFragment);
fprintf(stderr, "error: %s [%s:%d] adding values fragment size=%d\n", __FUNCTION__, __FILE_NAME__, __LINE__,
sizeof(AsyncTask::TaskLocalValuesFragment));
}
headerSize += sizeof(AsyncTask::TaskLocalValuesFragment);
fprintf(stderr, "error: %s [%s:%d] adding values fragment size=%d\n", __FUNCTION__, __FILE_NAME__, __LINE__,
sizeof(AsyncTask::TaskLocalValuesFragment));

if (flags.task_isTaskGroup()) {
headerSize += sizeof(AsyncTask::GroupFragment);
Expand Down Expand Up @@ -287,12 +275,9 @@ AsyncTaskAndContext swift::swift_task_create_future_f(
new (childFragment) AsyncTask::ChildFragment(parent);
}

if (needsTaskLocalsFragment) {
assert(task->hasTaskLocalValues());
auto taskLocalsFragment = task->localValuesFragment();
new (taskLocalsFragment) AsyncTask::TaskLocalValuesFragment();
taskLocalsFragment->initializeLinkParent(task, parent);
}
auto taskLocalsFragment = task->localValuesFragment();
new (taskLocalsFragment) AsyncTask::TaskLocalValuesFragment();
taskLocalsFragment->initializeLinkParent(task, parent);

// Initialize the task group fragment if applicable.
if (flags.task_isTaskGroup()) {
Expand Down Expand Up @@ -494,7 +479,6 @@ void swift::swift_task_runAndBlockThread(const void *function,

// Set up a task that runs the runAndBlock async function above.
auto flags = JobFlags(JobKind::Task, JobPriority::Default);
flags.task_setHasLocalValues(true);
auto pair = swift_task_create_f(flags,
/*parent*/ nullptr,
&runAndBlock_start,
Expand All @@ -515,27 +499,22 @@ size_t swift::swift_task_getJobFlags(AsyncTask *task) {
return task->Flags.getOpaqueValue();
}

void swift::swift_task_localValuePush(AsyncTask *task,
void swift::swift_task_local_value_push(AsyncTask *task,
const Metadata *keyType,
/* +1 */ OpaqueValue *value, const Metadata *valueType) {
fprintf(stderr, "error: %s [%s:%d] PUSH keyType=%d value=%d *value=%d\n",
__FUNCTION__, __FILE_NAME__, __LINE__,
keyType, value, *reinterpret_cast<int*>(value));
assert(task->hasTaskLocalValues());
task->localValuesFragment()->pushValue(task, keyType, value, valueType);
}

void swift::swift_task_localValuePop(AsyncTask *task, int count) {
assert(task->hasTaskLocalValues());
auto fragment = task->localValuesFragment();
for (int i = 0; i < count; i++) {
fprintf(stderr, "error: %s [%s:%d] POP task=%d %d / %d\n", __FUNCTION__, __FILE_NAME__, __LINE__,
task, i, count);
fragment->popValue(task);
}
void swift::swift_task_local_value_pop(AsyncTask *task) {
fprintf(stderr, "error: %s [%s:%d] POP task=%d\n", __FUNCTION__, __FILE_NAME__, __LINE__,
task);
task->localValuesFragment()->popValue(task);
}

OpaqueValue* swift::swift_task_localValueGet(AsyncTask *task,
OpaqueValue* swift::swift_task_local_value_get(AsyncTask *task,
const Metadata *keyType) {
auto value = task->localValueGet(keyType);
fprintf(stderr, "error: %s [%s:%d] lookup keyType=%d value=%d\n",
Expand All @@ -544,10 +523,6 @@ OpaqueValue* swift::swift_task_localValueGet(AsyncTask *task,
return value;
}

bool swift::swift_task_hasTaskLocalValues(AsyncTask *task) {
return task->hasTaskLocalValues();
}

namespace {

/// Structure that gets filled in when a task is suspended by `withUnsafeContinuation`.
Expand Down

0 comments on commit b811b12

Please sign in to comment.