Skip to content

Commit

Permalink
Distributed ThreadNexus to Machine, Threads, ThreadState.
Browse files Browse the repository at this point in the history
  • Loading branch information
brixen committed Apr 21, 2020
1 parent 236ab47 commit b4d5c34
Show file tree
Hide file tree
Showing 20 changed files with 809 additions and 967 deletions.
4 changes: 2 additions & 2 deletions machine/capi/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ extern "C" {

ThreadState* state = env->state();
ENTER_CAPI(state);
state->managed_phase(state);
state->managed_phase();

void* ret = (*func)(data);

env->state()->unmanaged_phase(state);
env->state()->unmanaged_phase();
LEAVE_CAPI(env->state());

return ret;
Expand Down
103 changes: 60 additions & 43 deletions machine/class/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,65 +176,62 @@ namespace rubinius {

SET_THREAD_UNWIND(state);

if(state->thread_unwinding_p()) {
// TODO halt
return 0;
}

RUBINIUS_THREAD_START(
const_cast<RBX_DTRACE_CHAR_P>(
state->name().c_str()), state->fiber()->fiber_id()->to_native(), 0);
if(!state->thread_unwinding_p()) {
RUBINIUS_THREAD_START(
const_cast<RBX_DTRACE_CHAR_P>(
state->name().c_str()), state->fiber()->fiber_id()->to_native(), 0);

state->fiber()->pid(state, Fixnum::from(gettid()));

if(state->configuration()->log_fiber_lifetime.value) {
logger::write("fiber: run: %s, %d, %#x",
state->name().c_str(), state->fiber()->pid()->to_native(),
(intptr_t)pthread_self());
}
state->fiber()->pid(state, Fixnum::from(gettid()));

NativeMethod::init_thread(state);
if(state->configuration()->log_fiber_lifetime.value) {
logger::write("fiber: run: %s, %d, %#x",
state->name().c_str(), state->fiber()->pid()->to_native(),
(intptr_t)pthread_self());
}

state->fiber()->suspend_and_continue(state);
NativeMethod::init_thread(state);

Object* value = state->fiber()->block()->send(state, G(sym_call),
as<Array>(state->thread()->fiber_value()), state->fiber()->block());
state->set_call_frame(nullptr);
state->fiber()->suspend_and_continue(state);

if(value) {
state->fiber()->value(state, value);
state->thread()->fiber_value(state, value);
} else {
state->fiber()->value(state, cNil);
state->thread()->fiber_value(state, cNil);
}
Object* value = state->fiber()->block()->send(state, G(sym_call),
as<Array>(state->thread()->fiber_value()), state->fiber()->block());
state->set_call_frame(nullptr);

if(state->unwind_state()->raise_reason() != cFiberCancel) {
if(state->fiber()->status() == eTransfer) {
// restart the root Fiber
state->thread()->fiber()->invoke_context(state);
state->thread()->fiber()->restart(state);
if(value) {
state->fiber()->value(state, value);
state->thread()->fiber_value(state, value);
} else {
state->fiber()->invoke_context()->fiber()->restart(state);
state->fiber()->value(state, cNil);
state->thread()->fiber_value(state, cNil);
}
}

{
std::lock_guard<std::mutex> guard(state->fiber_wait_mutex());
if(state->unwind_state()->raise_reason() != cFiberCancel) {
if(state->fiber()->status() == eTransfer) {
// restart the root Fiber
state->thread()->fiber()->invoke_context(state);
state->thread()->fiber()->restart(state);
} else {
state->fiber()->invoke_context()->fiber()->restart(state);
}
}

{
std::lock_guard<std::mutex> guard(state->fiber_wait_mutex());

state->fiber()->status(eDead);
state->set_suspended();
state->fiber()->status(eDead);
state->set_suspended();
}
}

state->unmanaged_phase(state);
state->unmanaged_phase();

NativeMethod::cleanup_thread(state);

if(state->configuration()->log_fiber_lifetime.value) {
logger::write("fiber: exit: %s %fs", state->name().c_str(), state->run_time());
}

state->machine()->thread_nexus()->remove_thread_state(state);
state->threads()->remove_thread_state(state);
state->set_thread_dead();

RUBINIUS_THREAD_STOP(
Expand Down Expand Up @@ -277,7 +274,7 @@ namespace rubinius {
std::ostringstream name;
name << "fiber." << fiber->fiber_id()->to_native();

fiber->thread_state(state->thread_nexus()->create_thread_state(state->machine(), name.str().c_str()));
fiber->thread_state(state->threads()->create_thread_state(name.str().c_str()));

fiber->thread_state()->set_kind(ThreadState::eFiber);
fiber->thread_state()->set_suspending();
Expand Down Expand Up @@ -466,15 +463,35 @@ namespace rubinius {
}

Array* Fiber::s_list(STATE) {
return state->machine()->vm_fibers(state);
Array* fibers = Array::create(state, 0);

state->threads()->each(state, [fibers](STATE, ThreadState* thread_state) {
if(thread_state->kind() == ThreadState::eFiber
&& !thread_state->fiber()->nil_p()
&& thread_state->fiber()->status() != Fiber::eDead) {
fibers->append(state, thread_state->fiber());
}
});

return fibers;
}

Fiber* Fiber::s_main(STATE) {
return state->thread()->fiber();
}

Fixnum* Fiber::s_count(STATE) {
return state->machine()->vm_fibers_count(state);
intptr_t count = 0;

state->threads()->each(state, [&](STATE, ThreadState* thread_state) {
if(thread_state->kind() == ThreadState::eFiber
&& !thread_state->fiber()->nil_p()
&& thread_state->fiber()->status() != Fiber::eDead) {
count++;
}
});

return Fixnum::from(count);
}

void Fiber::finalize(STATE, Fiber* fiber) {
Expand Down
44 changes: 22 additions & 22 deletions machine/class/native_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ namespace rubinius {

ThreadState* state = env->state();

state->managed_phase(state);
state->managed_phase();

Array* args = Array::create(state, stub->arg_count);
OnStack<1> os(state, args);
Expand Down Expand Up @@ -545,7 +545,7 @@ namespace rubinius {
break;
}

state->unmanaged_phase(state);
state->unmanaged_phase();
}


Expand Down Expand Up @@ -989,104 +989,104 @@ namespace rubinius {
env->set_current_call_frame(state->call_frame());

state->interrupt_with_signal();
state->unmanaged_phase(state);
state->unmanaged_phase();

switch(ffi_data_local->ret_info.type) {
case RBX_FFI_TYPE_CHAR: {
ffi_arg result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Fixnum::from((intptr_t)result);
break;
}
case RBX_FFI_TYPE_UCHAR: {
ffi_arg result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Fixnum::from((intptr_t)result);
break;
}
case RBX_FFI_TYPE_BOOL: {
ffi_arg result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = RBOOL(result);
break;
}
case RBX_FFI_TYPE_SHORT: {
ffi_arg result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Fixnum::from((intptr_t)result);
break;
}
case RBX_FFI_TYPE_USHORT: {
ffi_arg result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Fixnum::from((intptr_t)result);
break;
}
case RBX_FFI_TYPE_INT: {
ffi_arg result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Integer::from(state, (intptr_t)result);
break;
}
case RBX_FFI_TYPE_UINT: {
ffi_arg result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Integer::from(state, (unsigned int)result);
break;
}
case RBX_FFI_TYPE_LONG: {
long result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Integer::from(state, result);
break;
}
case RBX_FFI_TYPE_ULONG: {
unsigned long result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Integer::from(state, result);
break;
}
case RBX_FFI_TYPE_FLOAT: {
float result = 0.0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Float::create(state, (double)result);
break;
}
case RBX_FFI_TYPE_DOUBLE: {
double result = 0.0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Float::create(state, result);
break;
}
case RBX_FFI_TYPE_LONG_LONG: {
long long result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Integer::from(state, result);
break;
}
case RBX_FFI_TYPE_ULONG_LONG: {
unsigned long long result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = Integer::from(state, result);
break;
}
case RBX_FFI_TYPE_PTR: {
void* result = NULL;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
if(result == NULL) {
ret = cNil;
} else {
Expand All @@ -1098,7 +1098,7 @@ namespace rubinius {
ffi_arg result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);

state->managed_phase(state);
state->managed_phase();

Array* ary = Array::create(state, 1);
ary->set(state, 0, Integer::from(state, (intptr_t)result));
Expand All @@ -1109,7 +1109,7 @@ namespace rubinius {
case RBX_FFI_TYPE_CALLBACK: {
void* result = NULL;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
if(result == NULL) {
ret = cNil;
} else {
Expand All @@ -1128,7 +1128,7 @@ namespace rubinius {
case RBX_FFI_TYPE_STRING: {
char* result = NULL;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
if(result == NULL) {
ret = cNil;
} else {
Expand All @@ -1143,7 +1143,7 @@ namespace rubinius {
Object* p = cNil;

ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();

if(result) {
s = String::create(state, result);
Expand All @@ -1161,7 +1161,7 @@ namespace rubinius {
case RBX_FFI_TYPE_VOID: {
ffi_arg result = 0;
ffi_call(cif, FFI_FN(ffi_data_local->ep), &result, values);
state->managed_phase(state);
state->managed_phase();
ret = cNil;
break;
}
Expand Down

0 comments on commit b4d5c34

Please sign in to comment.