diff --git a/build/Makefile.in b/build/Makefile.in index ddc2d9c86c..56b86eaaed 100644 --- a/build/Makefile.in +++ b/build/Makefile.in @@ -204,6 +204,7 @@ OBJECTS = src/core/callsite@obj@ \ src/profiler/log@obj@ \ src/profiler/profile@obj@ \ src/profiler/heapsnapshot@obj@ \ + src/profiler/telemeh@obj@ \ src/instrument/crossthreadwrite@obj@ \ src/instrument/line_coverage@obj@ \ src/moar@obj@ \ @@ -350,6 +351,7 @@ HEADERS = src/moar.h \ src/profiler/log.h \ src/profiler/profile.h \ src/profiler/heapsnapshot.h \ + src/profiler/telemeh.h \ src/platform/mmap.h \ src/platform/time.h \ src/platform/threads.h \ diff --git a/src/6model/reprs/ConcBlockingQueue.c b/src/6model/reprs/ConcBlockingQueue.c index 98ea2a1478..f3ffd7bc95 100644 --- a/src/6model/reprs/ConcBlockingQueue.c +++ b/src/6model/reprs/ConcBlockingQueue.c @@ -107,6 +107,8 @@ static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *d if (MVM_load(&cbq->elems) > 0) { MVMConcBlockingQueueNode *peeked; + unsigned int interval_id; + interval_id = startInterval(tc, "ConcBlockingQueue.at_pos"); MVMROOT(tc, root, { MVM_gc_mark_thread_blocked(tc); uv_mutex_lock(&cbq->locks->head_lock); @@ -117,6 +119,7 @@ static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *d peeked = cbq->head->next; value->o = peeked ? peeked->value : tc->instance->VMNull; uv_mutex_unlock(&cbq->locks->head_lock); + stopInterval(tc, interval_id, "ConcBlockingQueue.at_pos"); } else { value->o = tc->instance->VMNull; @@ -133,6 +136,7 @@ static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *dat MVMConcBlockingQueueNode *add; AO_t orig_elems; MVMObject *to_add = value.o; + unsigned int interval_id; if (kind != MVM_reg_obj) MVM_exception_throw_adhoc(tc, @@ -143,6 +147,7 @@ static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *dat add = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode)); + interval_id = startInterval(tc, "ConcBlockingQueue.push"); MVMROOT(tc, root, { MVMROOT(tc, to_add, { MVM_gc_mark_thread_blocked(tc); @@ -169,15 +174,18 @@ static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *dat uv_cond_signal(&cbq->locks->head_cond); uv_mutex_unlock(&cbq->locks->head_lock); } + stopInterval(tc, interval_id, "ConcBlockingQueue.push"); } static void shift(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister *value, MVMuint16 kind) { MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; MVMConcBlockingQueueNode *taken; + unsigned int interval_id; if (kind != MVM_reg_obj) MVM_exception_throw_adhoc(tc, "Can only shift objects from a ConcBlockingQueue"); + interval_id = startInterval(tc, "ConcBlockingQueue.shift"); MVMROOT(tc, root, { MVM_gc_mark_thread_blocked(tc); uv_mutex_lock(&cbq->locks->head_lock); @@ -206,6 +214,7 @@ static void shift(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *da uv_cond_signal(&cbq->locks->head_cond); uv_mutex_unlock(&cbq->locks->head_lock); + stopInterval(tc, interval_id, "ConcBlockingQueue.shift"); } /* Set the size of the STable. */ @@ -267,7 +276,9 @@ MVMObject * MVM_concblockingqueue_poll(MVMThreadContext *tc, MVMConcBlockingQueu MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)queue; MVMConcBlockingQueueNode *taken; MVMObject *result = tc->instance->VMNull; + unsigned int interval_id; + interval_id = startInterval(tc, "ConcBlockingQueue.poll"); MVMROOT(tc, cbq, { MVM_gc_mark_thread_blocked(tc); uv_mutex_lock(&cbq->body.locks->head_lock); @@ -288,5 +299,6 @@ MVMObject * MVM_concblockingqueue_poll(MVMThreadContext *tc, MVMConcBlockingQueu uv_mutex_unlock(&cbq->body.locks->head_lock); + stopInterval(tc, interval_id, "ConcBlockingQueue.poll"); return result; } diff --git a/src/6model/reprs/ConditionVariable.c b/src/6model/reprs/ConditionVariable.c index 833ad742fd..99f6c077f3 100644 --- a/src/6model/reprs/ConditionVariable.c +++ b/src/6model/reprs/ConditionVariable.c @@ -124,10 +124,14 @@ MVMObject * MVM_conditionvariable_from_lock(MVMThreadContext *tc, MVMReentrantMu void MVM_conditionvariable_wait(MVMThreadContext *tc, MVMConditionVariable *cv) { MVMReentrantMutex *rm = (MVMReentrantMutex *)cv->body.mutex; AO_t orig_rec_level; + unsigned int interval_id; if (MVM_load(&rm->body.holder_id) != tc->thread_id) MVM_exception_throw_adhoc(tc, "Can only wait on a condition variable when holding mutex"); + + interval_id = startInterval(tc, "ConditionVariable.wait"); + annotateInterval(cv->body.condvar, interval_id, "this condition variable"); orig_rec_level = MVM_load(&rm->body.lock_count); MVM_store(&rm->body.holder_id, 0); MVM_store(&rm->body.lock_count, 0); @@ -142,14 +146,17 @@ void MVM_conditionvariable_wait(MVMThreadContext *tc, MVMConditionVariable *cv) MVM_store(&rm->body.holder_id, tc->thread_id); MVM_store(&rm->body.lock_count, orig_rec_level); + stopInterval(tc, interval_id, "ConditionVariable.wait"); } /* Signals one thread waiting on the condition. */ void MVM_conditionvariable_signal_one(MVMThreadContext *tc, MVMConditionVariable *cv) { + takeTimeStamp(tc, "ConditionVariable.signal_one"); uv_cond_signal(cv->body.condvar); } /* Signals all threads waiting on the condition. */ void MVM_conditionvariable_signal_all(MVMThreadContext *tc, MVMConditionVariable *cv) { + takeTimeStamp(tc, "ConditionVariable.signal_all"); uv_cond_broadcast(cv->body.condvar); } diff --git a/src/6model/reprs/ReentrantMutex.c b/src/6model/reprs/ReentrantMutex.c index 5757aaab64..68a13167d1 100644 --- a/src/6model/reprs/ReentrantMutex.c +++ b/src/6model/reprs/ReentrantMutex.c @@ -117,12 +117,15 @@ static const MVMREPROps ReentrantMutex_this_repr = { /* Locks the mutex. */ void MVM_reentrantmutex_lock(MVMThreadContext *tc, MVMReentrantMutex *rm) { + unsigned int interval_id; if (MVM_load(&rm->body.holder_id) == tc->thread_id) { /* We already hold the lock; bump the count. */ MVM_incr(&rm->body.lock_count); } else { /* Not holding the lock; obtain it. */ + /*interval_id = startInterval(tc, "ReentrantMutex obtains lock");*/ + /*annotateInterval(rm->body.mutex, interval_id, "lock in question");*/ MVMROOT(tc, rm, { MVM_gc_mark_thread_blocked(tc); uv_mutex_lock(rm->body.mutex); @@ -131,6 +134,7 @@ void MVM_reentrantmutex_lock(MVMThreadContext *tc, MVMReentrantMutex *rm) { MVM_store(&rm->body.holder_id, tc->thread_id); MVM_store(&rm->body.lock_count, 1); tc->num_locks++; + /*stopInterval(tc, interval_id, "ReentrantMutex obtained lock");*/ } } @@ -143,6 +147,7 @@ void MVM_reentrantmutex_unlock(MVMThreadContext *tc, MVMReentrantMutex *rm) { MVM_store(&rm->body.holder_id, 0); uv_mutex_unlock(rm->body.mutex); tc->num_locks--; + /*takeTimeStamp(rm->body.mutex, "this ReentrantMutex unlocked");*/ } } else { diff --git a/src/6model/reprs/Semaphore.c b/src/6model/reprs/Semaphore.c index 75b8cc389b..0cbd625d23 100644 --- a/src/6model/reprs/Semaphore.c +++ b/src/6model/reprs/Semaphore.c @@ -115,18 +115,24 @@ static const MVMREPROps Semaphore_this_repr = { }; MVMint64 MVM_semaphore_tryacquire(MVMThreadContext *tc, MVMSemaphore *sem) { - int r = uv_sem_trywait(sem->body.sem); + int r; + takeTimeStamp(tc, "Semaphore.tryAcquire"); + r = uv_sem_trywait(sem->body.sem); return !r; } void MVM_semaphore_acquire(MVMThreadContext *tc, MVMSemaphore *sem) { + unsigned int interval_id; + interval_id = startInterval(tc, "Semaphore.acquire"); MVMROOT(tc, sem, { MVM_gc_mark_thread_blocked(tc); uv_sem_wait(sem->body.sem); MVM_gc_mark_thread_unblocked(tc); }); + stopInterval(tc, interval_id, "Semaphore.acquire"); } void MVM_semaphore_release(MVMThreadContext *tc, MVMSemaphore *sem) { + takeTimeStamp(tc, "Semaphore.release"); uv_sem_post(sem->body.sem); } diff --git a/src/core/nativecall_dyncall.c b/src/core/nativecall_dyncall.c index 7aa42d39fb..3f93a2578a 100644 --- a/src/core/nativecall_dyncall.c +++ b/src/core/nativecall_dyncall.c @@ -211,6 +211,7 @@ static char callback_handler(DCCallback *cb, DCArgs *cb_args, DCValue *cb_result MVMint32 num_roots, i; MVMRegister res; MVMRegister *args; + unsigned int interval_id; /* Unblock GC if needed, so this thread can do work. */ MVMThreadContext *tc = data->tc; @@ -218,6 +219,8 @@ static char callback_handler(DCCallback *cb, DCArgs *cb_args, DCValue *cb_result if (was_blocked) MVM_gc_mark_thread_unblocked(tc); + interval_id = startInterval(tc, "nativecall callback handler"); + /* Build a callsite and arguments buffer. */ args = MVM_malloc(data->num_types * sizeof(MVMRegister)); num_roots = 0; @@ -303,6 +306,7 @@ static char callback_handler(DCCallback *cb, DCArgs *cb_args, DCValue *cb_result args[i - 1].i64 = dcbArgULongLong(cb_args); break; default: + stopInterval(tc, interval_id, "nativecall callback handler failed"); MVM_exception_throw_adhoc(tc, "Internal error: unhandled dyncall callback argument type"); } @@ -412,6 +416,7 @@ static char callback_handler(DCCallback *cb, DCArgs *cb_args, DCValue *cb_result cb_result->l = MVM_nativecall_unmarshal_ulonglong(data->tc, res.o); break; default: + stopInterval(tc, interval_id, "nativecall callback handler failed"); MVM_exception_throw_adhoc(data->tc, "Internal error: unhandled dyncall callback return type"); } @@ -424,6 +429,8 @@ static char callback_handler(DCCallback *cb, DCArgs *cb_args, DCValue *cb_result if (was_blocked) MVM_gc_mark_thread_blocked(tc); + stopInterval(tc, interval_id, "nativecall callback handler"); + /* Indicate what we're producing as a result. */ return get_signature_char(data->typeinfos[0]); } @@ -475,11 +482,15 @@ MVMObject * MVM_nativecall_invoke(MVMThreadContext *tc, MVMObject *res_type, void *entry_point = body->entry_point; void *ptr = NULL; + unsigned int interval_id; + /* Create and set up call VM. */ DCCallVM *vm = dcNewCallVM(8192); dcMode(vm, body->convention); dcReset(vm); + interval_id = startInterval(tc, "nativecall invoke"); + /* Process arguments. */ for (i = 0; i < num_args; i++) { MVMObject *value = MVM_repr_at_pos_o(tc, args, i); @@ -580,6 +591,7 @@ MVMObject * MVM_nativecall_invoke(MVMThreadContext *tc, MVMObject *res_type, handle_arg("integer", cont_i, DCulonglong, i64, dcArgLongLong, MVM_nativecall_unmarshal_ulonglong); break; default: + stopInterval(tc, interval_id, "nativecall invoke failed"); MVM_exception_throw_adhoc(tc, "Internal error: unhandled dyncall argument type"); } } @@ -720,6 +732,7 @@ MVMObject * MVM_nativecall_invoke(MVMThreadContext *tc, MVMObject *res_type, break; } default: + stopInterval(tc, interval_id, "nativecall invoke failed"); MVM_exception_throw_adhoc(tc, "Internal error: unhandled dyncall return type"); } } @@ -772,6 +785,7 @@ MVMObject * MVM_nativecall_invoke(MVMThreadContext *tc, MVMObject *res_type, (MVMint64)*(DCpointer *)free_rws[num_rws]); break; default: + stopInterval(tc, interval_id, "nativecall invoke failed"); MVM_exception_throw_adhoc(tc, "Internal error: unhandled dyncall argument type"); } num_rws++; @@ -796,5 +810,6 @@ MVMObject * MVM_nativecall_invoke(MVMThreadContext *tc, MVMObject *res_type, /* Finally, free call VM. */ dcFree(vm); + stopInterval(tc, interval_id, "nativecall invoke"); return result; } diff --git a/src/core/nativecall_libffi.c b/src/core/nativecall_libffi.c index 5236883647..021fe9fb3d 100644 --- a/src/core/nativecall_libffi.c +++ b/src/core/nativecall_libffi.c @@ -210,6 +210,7 @@ static void callback_handler(ffi_cif *cif, void *cb_result, void **cb_args, void MVMRegister *args; MVMNativeCallback *data = (MVMNativeCallback *)cb_data; void **values = MVM_malloc(sizeof(void *) * (data->cs->arg_count ? data->cs->arg_count : 1)); + unsigned int interval_id; /* Unblock GC if needed, so this thread can do work. */ MVMThreadContext *tc = data->tc; @@ -217,6 +218,8 @@ static void callback_handler(ffi_cif *cif, void *cb_result, void **cb_args, void if (was_blocked) MVM_gc_mark_thread_unblocked(tc); + interval_id = startInterval(tc, "nativecall callback handler"); + /* Build a callsite and arguments buffer. */ args = MVM_malloc(data->num_types * sizeof(MVMRegister)); num_roots = 0; @@ -296,6 +299,7 @@ static void callback_handler(ffi_cif *cif, void *cb_result, void **cb_args, void args[i - 1].i64 = *(unsigned long long *)cb_args[i - 1]; break; default: + stopInterval(tc, interval_id, "nativecall callback handler failed"); MVM_exception_throw_adhoc(tc, "Internal error: unhandled libffi callback argument type"); } @@ -405,6 +409,7 @@ static void callback_handler(ffi_cif *cif, void *cb_result, void **cb_args, void *(unsigned long long *)cb_result = MVM_nativecall_unmarshal_ulonglong(data->tc, res.o); break; default: + stopInterval(tc, interval_id, "nativecall callback handler failed"); MVM_exception_throw_adhoc(data->tc, "Internal error: unhandled libffi callback return type"); } @@ -417,6 +422,8 @@ static void callback_handler(ffi_cif *cif, void *cb_result, void **cb_args, void /* Re-block GC if needed, so other threads will be able to collect. */ if (was_blocked) MVM_gc_mark_thread_blocked(tc); + + stopInterval(tc, interval_id, "nativecall callback handler"); } #define handle_arg(what, cont_X, dc_type, reg_slot, unmarshal_fun) do { \ @@ -476,9 +483,13 @@ MVMObject * MVM_nativecall_invoke(MVMThreadContext *tc, MVMObject *res_type, void *entry_point = body->entry_point; void **values = MVM_malloc(sizeof(void *) * (num_args ? num_args : 1)); + unsigned int interval_id; + ffi_cif cif; ffi_status status = ffi_prep_cif(&cif, body->convention, (unsigned int)num_args, body->ffi_ret_type, body->ffi_arg_types); + interval_id = startInterval(tc, "nativecall invoke"); + /* Process arguments. */ for (i = 0; i < num_args; i++) { MVMObject *value = MVM_repr_at_pos_o(tc, args, i); @@ -582,6 +593,7 @@ MVMObject * MVM_nativecall_invoke(MVMThreadContext *tc, MVMObject *res_type, handle_arg("integer", cont_i, unsigned long long, i64, MVM_nativecall_unmarshal_ulonglong); break; default: + stopInterval(tc, interval_id, "nativecall invoke failed"); MVM_exception_throw_adhoc(tc, "Internal error: unhandled libffi argument type"); } } @@ -686,6 +698,7 @@ MVMObject * MVM_nativecall_invoke(MVMThreadContext *tc, MVMObject *res_type, handle_ret(tc, unsigned long long, ffi_arg, MVM_nativecall_make_int); break; default: + stopInterval(tc, interval_id, "nativecall invoke failed"); MVM_exception_throw_adhoc(tc, "Internal error: unhandled libffi return type"); } } @@ -737,6 +750,7 @@ MVMObject * MVM_nativecall_invoke(MVMThreadContext *tc, MVMObject *res_type, (MVMint64)*(void **)*(void **)values[i]); break; default: + stopInterval(tc, interval_id, "nativecall invoke failed"); MVM_exception_throw_adhoc(tc, "Internal error: unhandled libffi argument type"); } } @@ -754,5 +768,7 @@ MVMObject * MVM_nativecall_invoke(MVMThreadContext *tc, MVMObject *res_type, if (values) MVM_free(values); + stopInterval(tc, interval_id, "nativecall invoke"); + return result; } diff --git a/src/core/threads.c b/src/core/threads.c index 7cbb905c42..79562a31f6 100644 --- a/src/core/threads.c +++ b/src/core/threads.c @@ -12,6 +12,9 @@ typedef struct { MVMObject * MVM_thread_new(MVMThreadContext *tc, MVMObject *invokee, MVMint64 app_lifetime) { MVMThread *thread; MVMThreadContext *child_tc; + unsigned int interval_id; + + interval_id = startInterval(tc, "spawning a new thread off of me"); /* Create the Thread object and stash code to run and lifetime. */ MVMROOT(tc, invokee, { @@ -31,6 +34,8 @@ MVMObject * MVM_thread_new(MVMThreadContext *tc, MVMObject *invokee, MVMint64 ap /* Add one, since MVM_incr returns original. */ thread->body.tc = child_tc; + stopInterval(child_tc, interval_id, "i'm the newly spawned thread."); + /* Also make a copy of the thread ID in the thread object itself, so it * is available once the thread dies and its ThreadContext is gone. */ thread->body.thread_id = child_tc->thread_id; @@ -196,6 +201,7 @@ MVMint64 MVM_thread_native_id(MVMThreadContext *tc, MVMObject *thread_obj) { /* Yields control to another thread. */ void MVM_thread_yield(MVMThreadContext *tc) { + takeTimeStamp(tc, "thread yielding"); MVM_platform_thread_yield(); } diff --git a/src/gc/orchestrate.c b/src/gc/orchestrate.c index 61a5fc6a85..8c4ffc1fe5 100644 --- a/src/gc/orchestrate.c +++ b/src/gc/orchestrate.c @@ -331,6 +331,8 @@ static void run_gc(MVMThreadContext *tc, MVMuint8 what_to_do) { MVMuint8 gen; MVMuint32 i, n; + unsigned int interval_id; + #if MVM_GC_DEBUG if (tc->in_spesh) MVM_panic(1, "Must not GC when in the specializer/JIT\n"); @@ -339,6 +341,12 @@ static void run_gc(MVMThreadContext *tc, MVMuint8 what_to_do) { /* Decide nursery or full collection. */ gen = tc->instance->gc_full_collect ? MVMGCGenerations_Both : MVMGCGenerations_Nursery; + if (tc->instance->gc_full_collect) { + interval_id = startInterval(tc, "start full collection"); + } else { + interval_id = startInterval(tc, "start minor collection"); + } + /* Do GC work for ourselves and any work threads. */ for (i = 0, n = tc->gc_work_count ; i < n; i++) { MVMThreadContext *other = tc->gc_work[i].tc; @@ -351,6 +359,8 @@ static void run_gc(MVMThreadContext *tc, MVMuint8 what_to_do) { /* Wait for everybody to agree we're done. */ finish_gc(tc, gen, what_to_do == MVMGCWhatToDo_All); + + stopInterval(tc, interval_id, "finished run_gc"); } /* This is called when the allocator finds it has run out of memory and wants @@ -360,6 +370,8 @@ static void run_gc(MVMThreadContext *tc, MVMuint8 what_to_do) { void MVM_gc_enter_from_allocator(MVMThreadContext *tc) { GCDEBUG_LOG(tc, MVM_GC_DEBUG_ORCHESTRATE, "Thread %d run %d : Entered from allocate\n"); + takeTimeStamp(tc, "gc_enter_from_allocator"); + /* Try to start the GC run. */ if (MVM_trycas(&tc->instance->gc_start, 0, 1)) { MVMThread *last_starter = NULL; @@ -383,6 +395,8 @@ void MVM_gc_enter_from_allocator(MVMThreadContext *tc) { /* Decide if it will be a full collection. */ tc->instance->gc_full_collect = is_full_collection(tc); + takeTimeStamp(tc, "won the gc starting race"); + /* If profiling, record that GC is starting. */ if (tc->instance->profiling) MVM_profiler_log_gc_start(tc, tc->instance->gc_full_collect); @@ -459,6 +473,8 @@ void MVM_gc_enter_from_allocator(MVMThreadContext *tc) { if (tc->instance->profiling) MVM_profiler_log_gc_end(tc); + takeTimeStamp(tc, "gc finished"); + GCDEBUG_LOG(tc, MVM_GC_DEBUG_ORCHESTRATE, "Thread %d run %d : GC complete (cooridnator)\n"); } else { @@ -477,6 +493,8 @@ void MVM_gc_enter_from_interrupt(MVMThreadContext *tc) { GCDEBUG_LOG(tc, MVM_GC_DEBUG_ORCHESTRATE, "Thread %d run %d : Entered from interrupt\n"); + takeTimeStamp(tc, "gc_enter_from_interrupt"); + /* If profiling, record that GC is starting. */ if (tc->instance->profiling) MVM_profiler_log_gc_start(tc, is_full_collection(tc)); diff --git a/src/io/eventloop.c b/src/io/eventloop.c index c4d6d67502..957c72dc0f 100644 --- a/src/io/eventloop.c +++ b/src/io/eventloop.c @@ -72,12 +72,16 @@ static uv_loop_t *get_or_vivify_loop(MVMThreadContext *tc) { if (!instance->event_loop_thread) { /* Grab starting mutex and ensure we didn't lose the race. */ + takeTimeStamp(tc, "hoping to start an event loop thread"); MVM_gc_mark_thread_blocked(tc); uv_mutex_lock(&instance->mutex_event_loop_start); MVM_gc_mark_thread_unblocked(tc); if (!instance->event_loop_thread) { MVMObject *thread, *loop_runner; int r; + unsigned int interval_id; + + interval_id = startInterval(tc, "creating the event loop thread"); /* Create various bits of state the async event loop thread needs. */ instance->event_loop_todo_queue = MVM_repr_alloc_init(tc, @@ -112,6 +116,8 @@ static uv_loop_t *get_or_vivify_loop(MVMThreadContext *tc) { /* Make the started event loop thread visible to others. */ instance->event_loop_thread = ((MVMThread *)thread)->body.tc; }); + + stopInterval(tc, interval_id, "created the event loop thread"); } uv_mutex_unlock(&instance->mutex_event_loop_start); } diff --git a/src/io/syncfile.c b/src/io/syncfile.c index 2aca386eb2..2d18946b8b 100644 --- a/src/io/syncfile.c +++ b/src/io/syncfile.c @@ -122,6 +122,9 @@ static MVMint32 read_to_buffer(MVMThreadContext *tc, MVMIOFileData *data, MVMint uv_buf_t read_buf = uv_buf_init(buf, bytes); uv_fs_t req; MVMint32 read; + unsigned int interval_id; + + interval_id = startInterval(tc, "syncfile.read_to_buffer"); MVM_gc_mark_thread_blocked(tc); if ((read = uv_fs_read(tc->loop, &req, data->fd, &read_buf, 1, -1, NULL)) < 0) { MVM_free(buf); @@ -131,6 +134,8 @@ static MVMint32 read_to_buffer(MVMThreadContext *tc, MVMIOFileData *data, MVMint } MVM_string_decodestream_add_bytes(tc, data->ds, buf, read); MVM_gc_mark_thread_unblocked(tc); + annotateInterval(read, interval_id, "read this many bytes"); + stopInterval(tc, interval_id, "syncfile.read_to_buffer"); return read; } diff --git a/src/io/syncsocket.c b/src/io/syncsocket.c index eb6e407766..71e0f4d1e3 100644 --- a/src/io/syncsocket.c +++ b/src/io/syncsocket.c @@ -84,6 +84,9 @@ static void on_connect(uv_connect_t* req, int status) { } static void socket_connect(MVMThreadContext *tc, MVMOSHandle *h, MVMString *host, MVMint64 port) { MVMIOSyncSocketData *data = (MVMIOSyncSocketData *)h->body.data; + unsigned int interval_id; + + interval_id = startInterval(tc, "syncsocket connect"); if (!data->ss.handle) { struct sockaddr *dest = MVM_io_resolve_host_name(tc, host, port); uv_tcp_t *socket = MVM_malloc(sizeof(uv_tcp_t)); @@ -102,11 +105,14 @@ static void socket_connect(MVMThreadContext *tc, MVMOSHandle *h, MVMString *host MVM_free(connect); MVM_free(dest); + stopInterval(tc, interval_id, "syncsocket connect"); + data->ss.handle = (uv_stream_t *)socket; /* So can be cleaned up in close */ if (status < 0) MVM_exception_throw_adhoc(tc, "Failed to connect: %s", uv_strerror(status)); } else { + stopInterval(tc, interval_id, "syncsocket didn't connect"); MVM_exception_throw_adhoc(tc, "Socket is already bound or connected"); } } @@ -189,7 +195,9 @@ static const MVMIOOps op_table = { static MVMObject * socket_accept(MVMThreadContext *tc, MVMOSHandle *h) { MVMIOSyncSocketData *data = (MVMIOSyncSocketData *)h->body.data; + unsigned int interval_id; + interval_id = startInterval(tc, "syncsocket accept"); while (!data->accept_server) { if (tc->loop != data->ss.handle->loop) { MVM_exception_throw_adhoc(tc, "Tried to accept() on a socket from outside its originating thread"); @@ -202,6 +210,7 @@ static MVMObject * socket_accept(MVMThreadContext *tc, MVMOSHandle *h) { /* Check the accept worked out. */ if (data->accept_status < 0) { + stopInterval(tc, interval_id, "syncsocket accept failed"); MVM_exception_throw_adhoc(tc, "Failed to listen: unknown error"); } else { @@ -218,11 +227,13 @@ static MVMObject * socket_accept(MVMThreadContext *tc, MVMOSHandle *h) { MVM_string_decode_stream_sep_default(tc, &(data->ss.sep_spec)); result->body.ops = &op_table; result->body.data = data; + stopInterval(tc, interval_id, "syncsocket accept succeeded"); return (MVMObject *)result; } else { uv_close((uv_handle_t*)client, NULL); MVM_free(client); + stopInterval(tc, interval_id, "syncsocket accept failed"); MVM_exception_throw_adhoc(tc, "Failed to accept: %s", uv_strerror(r)); } } diff --git a/src/io/syncstream.c b/src/io/syncstream.c index 36c1207f09..fbbc81c30c 100644 --- a/src/io/syncstream.c +++ b/src/io/syncstream.c @@ -45,14 +45,17 @@ void MVM_io_syncstream_set_separator(MVMThreadContext *tc, MVMOSHandle *h, MVMSt /* Read a bunch of bytes into the current decode stream. Returns true if we * read some data, and false if we hit EOF. */ static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)handle->data; size_t size = suggested_size > 0 ? suggested_size : 4; buf->base = MVM_malloc(size); + annotateInterval(size, data->interval_id, "alloced this much space"); buf->len = size; } static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)handle->data; if (nread > 0) { MVM_string_decodestream_add_bytes(data->cur_tc, data->ds, buf->base, nread); + annotateInterval(nread, data->interval_id, "read this many bytes"); } else if (nread == UV_EOF) { data->eof = 1; @@ -66,6 +69,9 @@ static MVMint32 read_to_buffer(MVMThreadContext *tc, MVMIOSyncStreamData *data, /* Don't try and read again if we already saw EOF. */ if (!data->eof) { int r; + unsigned int interval_id; + + interval_id = startInterval(tc, "syncstream.read_to_buffer"); data->handle->data = data; data->cur_tc = tc; if ((r = uv_read_start(data->handle, on_alloc, on_read)) < 0) @@ -78,6 +84,7 @@ static MVMint32 read_to_buffer(MVMThreadContext *tc, MVMIOSyncStreamData *data, MVM_gc_mark_thread_blocked(tc); uv_run(tc->loop, UV_RUN_DEFAULT); MVM_gc_mark_thread_unblocked(tc); + stopInterval(tc, interval_id, "syncstream.read_to_buffer"); return 1; } else { @@ -188,6 +195,9 @@ MVMint64 MVM_io_syncstream_write_str(MVMThreadContext *tc, MVMOSHandle *h, MVMSt uv_buf_t write_buf; int r; + unsigned int interval_id; + + interval_id = startInterval(tc, "syncstream.write_str"); output = MVM_string_encode(tc, str, 0, -1, &output_size, data->encoding, NULL, data->translate_newlines ? MVM_TRANSLATE_NEWLINE_OUTPUT : 0); if (newline) { @@ -201,6 +211,7 @@ MVMint64 MVM_io_syncstream_write_str(MVMThreadContext *tc, MVMOSHandle *h, MVMSt uv_unref((uv_handle_t *)data->handle); MVM_free(req); MVM_free(output); + stopInterval(tc, interval_id, "syncstream.write_str failed"); MVM_exception_throw_adhoc(tc, "Failed to write string to stream: %s", uv_strerror(r)); } else { @@ -210,6 +221,8 @@ MVMint64 MVM_io_syncstream_write_str(MVMThreadContext *tc, MVMOSHandle *h, MVMSt MVM_free(output); } + annotateInterval(output_size, interval_id, "written this many bytes"); + stopInterval(tc, interval_id, "syncstream.write_str"); data->total_bytes_written += output_size; return output_size; } @@ -220,10 +233,14 @@ MVMint64 MVM_io_syncstream_write_bytes(MVMThreadContext *tc, MVMOSHandle *h, cha uv_write_t *req = MVM_malloc(sizeof(uv_write_t)); uv_buf_t write_buf = uv_buf_init(buf, bytes); int r; + unsigned int interval_id; + + interval_id = startInterval(tc, "syncstream.write_bytes"); uv_ref((uv_handle_t *)data->handle); if ((r = uv_write(req, data->handle, &write_buf, 1, write_cb)) < 0) { uv_unref((uv_handle_t *)data->handle); MVM_free(req); + stopInterval(tc, interval_id, "syncstream.write_bytes failed"); MVM_exception_throw_adhoc(tc, "Failed to write bytes to stream: %s", uv_strerror(r)); } else { @@ -231,6 +248,8 @@ MVMint64 MVM_io_syncstream_write_bytes(MVMThreadContext *tc, MVMOSHandle *h, cha uv_run(tc->loop, UV_RUN_DEFAULT); MVM_gc_mark_thread_unblocked(tc); } + annotateInterval(bytes, interval_id, "written this many bytes"); + stopInterval(tc, interval_id, "syncstream.write_bytes"); data->total_bytes_written += bytes; return bytes; } diff --git a/src/io/syncstream.h b/src/io/syncstream.h index 85a65043ef..b71a87fab2 100644 --- a/src/io/syncstream.h +++ b/src/io/syncstream.h @@ -26,6 +26,8 @@ struct MVMIOSyncStreamData { /* Current separator specification for line-by-line reading. */ MVMDecodeStreamSeparators sep_spec; + + unsigned int interval_id; }; void MVM_io_syncstream_set_encoding(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 encoding); diff --git a/src/main.c b/src/main.c index 1cd8d0b7d9..cb776b167a 100644 --- a/src/main.c +++ b/src/main.c @@ -68,6 +68,7 @@ The following environment variables are respected:\n\ MVM_JIT_BYTECODE_DIR Specifies a directory for JIT bytecode dumps\n\ MVM_CROSS_THREAD_WRITE_LOG Log unprotected cross-thread object writes to stderr\n\ MVM_COVERAGE_LOG Append line-by-line coverage messages to this file\n\ + MVM_TELEMETRY_LOG Write high-resolution timing for several internal events\n\ "; static int cmp_flag(const void *key, const void *value) @@ -123,6 +124,8 @@ int wmain(int argc, wchar_t *wargv[]) int lib_path_i = 0; int flag; + unsigned int interval_id; + for (; (flag = parse_flag(argv[argi])) != NOT_A_FLAG; ++argi) { switch (flag) { case FLAG_CRASH: @@ -186,6 +189,19 @@ int wmain(int argc, wchar_t *wargv[]) } } + if (getenv("MVM_TELEMETRY_LOG")) { + char path[256]; + snprintf(path, 255, "%s.%d", getenv("MVM_TELEMETRY_LOG"), +#ifdef _WIN32 + _getpid() +#else + getpid() +#endif + ); + initTelemetry(fopen(path, "w")); + interval_id = startInterval(0, "moarvm startup"); + } + lib_path[lib_path_i] = NULL; if (argi >= argc) { @@ -212,6 +228,11 @@ int wmain(int argc, wchar_t *wargv[]) if (dump) MVM_vm_dump_file(instance, input_file); else MVM_vm_run_file(instance, input_file); + if (getenv("MVM_TELEMETRY_LOG")) { + stopInterval(0, interval_id, "moarvm teardown"); + finishTelemetry(); + } + if (full_cleanup) { MVM_vm_destroy_instance(instance); return EXIT_SUCCESS; diff --git a/src/profiler/telemeh.c b/src/profiler/telemeh.c new file mode 100644 index 0000000000..d77e8ead7d --- /dev/null +++ b/src/profiler/telemeh.c @@ -0,0 +1,265 @@ +#include +#include +#include +#include + +#ifdef _WIN32 +#include +#else +#include +#endif + +double ticksPerSecond; + +// use RDTSCP instruction to get the required pipeline flush implicitly +#define READ_TSC(tscValue) \ +{ \ + unsigned int _tsc_aux; \ + tscValue = __rdtscp(&_tsc_aux); \ +} + +enum RecordType { + Calibration, + Epoch, + TimeStamp, + IntervalStart, + IntervalEnd, + IntervalAnnotation +}; + +struct CalibrationRecord { + double ticksPerSecond; +}; + +struct EpochRecord { + unsigned long long time; +}; + +struct TimeStampRecord { + unsigned long long time; + const char *description; +}; + +struct IntervalRecord { + unsigned long long time; + unsigned int intervalID; + const char *description; +}; + +struct IntervalAnnotation { + unsigned int intervalID; + const char *description; +}; + +struct TelemetryRecord { + enum RecordType recordType; + + intptr_t threadID; + + union { + struct CalibrationRecord calibration; + struct EpochRecord epoch; + struct TimeStampRecord timeStamp; + struct IntervalRecord interval; + struct IntervalAnnotation annotation; + }; +}; + +#define RECORD_BUFFER_SIZE 10000 + +// this is a ring buffer of telemetry events +static struct TelemetryRecord recordBuffer[RECORD_BUFFER_SIZE]; +static unsigned int recordBufferIndex = 0; +static unsigned int lastSerializedIndex = 0; +static unsigned long long beginningEpoch = 0; +static unsigned int telemetry_active = 0; + +struct TelemetryRecord *newRecord() +{ + unsigned int newBufferIndex, recordIndex; + struct TelemetryRecord *record; + + do { + recordIndex = recordBufferIndex; + newBufferIndex = (recordBufferIndex + 1) % RECORD_BUFFER_SIZE; + } while(!__atomic_compare_exchange_n(&recordBufferIndex, &recordIndex, newBufferIndex, 0, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)); + + record = &recordBuffer[recordIndex]; + return record; +} + +static unsigned int intervalIDCounter = 0; + +void takeTimeStamp(intptr_t threadID, const char *description) +{ + struct TelemetryRecord *record; + + if (!telemetry_active) { return; } + + record = newRecord(); + + READ_TSC(record->timeStamp.time); + record->recordType = TimeStamp; + record->threadID = threadID; + record->timeStamp.description = description; +} + +unsigned int startInterval(intptr_t threadID, const char *description) +{ + struct TelemetryRecord *record; + + unsigned int intervalID; + + if (!telemetry_active) { return 0; } + + record = newRecord(); + intervalID = __atomic_fetch_add(&intervalIDCounter, 1, __ATOMIC_SEQ_CST); + READ_TSC(record->interval.time); + + record->recordType = IntervalStart; + record->threadID = threadID; + record->interval.intervalID = intervalID; + record->interval.description = description; + + return intervalID; +} + +void stopInterval(intptr_t threadID, int intervalID, const char *description) +{ + struct TelemetryRecord *record; + + if (!telemetry_active) { return; } + + record = newRecord(); + READ_TSC(record->interval.time); + + record->recordType = IntervalEnd; + record->threadID = threadID; + record->interval.intervalID = intervalID; + record->interval.description = description; +} + +void annotateInterval(intptr_t subject, int intervalID, const char *description) { + struct TelemetryRecord *record; + + if (!telemetry_active) { return; } + + record = newRecord(); + record->recordType = IntervalAnnotation; + record->threadID = subject; + record->annotation.intervalID = intervalID; + record->annotation.description = description; +} + +void calibrateTSC(FILE *outfile) +{ + unsigned long long startTsc, endTsc; + struct timespec startTime, endTime; + + clock_gettime(CLOCK_MONOTONIC, &startTime); + //startTsc = __rdtsc(); + READ_TSC(startTsc) + + sleep(1); + + clock_gettime(CLOCK_MONOTONIC, &endTime); + //endTsc = __rdtsc(); + READ_TSC(endTsc) + + { + unsigned long long ticks = endTsc - startTsc; + + unsigned long long wallClockTime = (endTime.tv_sec - startTime.tv_sec) * 1000000000 + (endTime.tv_nsec - startTime.tv_nsec); + + ticksPerSecond = (double)ticks / (double)wallClockTime; + ticksPerSecond *= 1000000000.0; + } +} + +static pthread_t backgroundSerializationThread; +static volatile int continueBackgroundSerialization = 1; + +void serializeTelemetryBufferRange(FILE *outfile, unsigned int serializationStart, unsigned int serializationEnd) +{ + for(unsigned int i = serializationStart; i < serializationEnd; i++) { + struct TelemetryRecord *record = &recordBuffer[i]; + + fprintf(outfile, "% 10x ", record->threadID); + + switch(record->recordType) { + case Calibration: + fprintf(outfile, "Calibration: %f ticks per second\n", record->calibration.ticksPerSecond); + break; + case Epoch: + fprintf(outfile, "Epoch counter: %ld\n", record->epoch.time); + break; + case TimeStamp: + fprintf(outfile, "%15ld -|- Time stamp: \"%s\"\n", record->timeStamp.time - beginningEpoch, record->timeStamp.description); + break; + case IntervalStart: + fprintf(outfile, "%15ld (- Interval start: \"%s\" (%d)\n", record->interval.time - beginningEpoch, record->interval.description, record->interval.intervalID); + break; + case IntervalEnd: + fprintf(outfile, "%15ld -) Interval stop: \"%s\" (%d)\n", record->interval.time - beginningEpoch, record->interval.description, record->interval.intervalID); + break; + case IntervalAnnotation: + fprintf(outfile, "%15s ??? Annotation: \"%s\" (%d)\n", " ", record->annotation.description, record->annotation.intervalID); + break; + } + } +} + +void serializeTelemetryBuffer(FILE *outfile) +{ + unsigned int serializationEnd = recordBufferIndex; + unsigned int serializationStart = lastSerializedIndex; + + if(serializationEnd < serializationStart) { + serializeTelemetryBufferRange(outfile, serializationStart, RECORD_BUFFER_SIZE); + serializeTelemetryBufferRange(outfile, 0, serializationEnd); + } else { + serializeTelemetryBufferRange(outfile, serializationStart, serializationEnd); + } + + lastSerializedIndex = serializationEnd; +} + +void *backgroundSerialization(void *outfile) +{ + while(continueBackgroundSerialization) { + sleep(1); + serializeTelemetryBuffer((FILE *)outfile); + } + + fclose((FILE *)outfile); + + return NULL; +} + +void initTelemetry(FILE *outfile) +{ + struct TelemetryRecord *calibrationRecord; + struct TelemetryRecord *epochRecord; + + telemetry_active = 1; + + calibrateTSC(outfile); + + calibrationRecord = newRecord(); + calibrationRecord->calibration.ticksPerSecond = ticksPerSecond; + calibrationRecord->recordType = Calibration; + + epochRecord = newRecord(); + READ_TSC(epochRecord->epoch.time) + epochRecord->recordType = Epoch; + + beginningEpoch = epochRecord->epoch.time; + + pthread_create(&backgroundSerializationThread, NULL, backgroundSerialization, (void *)outfile); +} + +void finishTelemetry() +{ + continueBackgroundSerialization = 0; + pthread_join(backgroundSerializationThread, NULL); +} diff --git a/src/profiler/telemeh.h b/src/profiler/telemeh.h new file mode 100644 index 0000000000..83b9a72ab4 --- /dev/null +++ b/src/profiler/telemeh.h @@ -0,0 +1,10 @@ +#include "stdint.h" + +void takeTimeStamp(intptr_t threadID, const char *description); + +unsigned int startInterval(intptr_t threadID, const char *description); +void stopInterval(intptr_t threadID, int intervalID, const char *description); +void annotateInterval(intptr_t subject, int intervalID, const char *description); + +void initTelemetry(FILE *outfile); +void finishTelemetry();