Skip to content

Commit

Permalink
Add v5: initial (crap) threads support
Browse files Browse the repository at this point in the history
  • Loading branch information
Theldus committed Jan 10, 2024
1 parent b95ba11 commit 5bea85b
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 9 deletions.
102 changes: 93 additions & 9 deletions 1b.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#include <inttypes.h>
#include <sys/stat.h>
#include <sys/mman.h>

/* Current version: v4. */
/* Current version: v5. */

/***********************************************************************
* Change log:
Expand Down Expand Up @@ -37,8 +38,16 @@
* for the input file, I'm removing all the collision handling and
* strings comparison.
* All this changes led us to ~1.32x faster than v3.
*
* v5-fun-version:
* (Intentionally) very crap threads support.
* Intentionally bad because the hash table is shared between all
* threads and protected by a mutex... which adds a huge bottleneck.
* Idiot? Yes, but I would like to see how bad it would be.
* Result: ~6.17x slower than v4.
*/

#define NUM_THREADS 4
#define HT_SIZE (10000 * 5)
#define unlikely(c) __builtin_expect((c), 0)
#define likely(c) __builtin_expect((c), 1)
Expand All @@ -56,6 +65,16 @@ struct station
int count;
};

static struct thread_data {
int tidx;
size_t size;
const char *base_buffer;
const char *end_buffer;
} tdata[NUM_THREADS];

static pthread_t threads[NUM_THREADS];
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

static struct station stations[HT_SIZE] = {0};
static size_t hashtable_entries = 0;

Expand Down Expand Up @@ -174,6 +193,7 @@ hashtable_add_station(const char *st_name, size_t st_size, int value)
size_t index;

/* Try to find first. */
pthread_mutex_lock(&mutex);
st = hashtable_find_station(st_name, st_size, &index);
if (likely(st != NULL))
goto found;
Expand All @@ -193,6 +213,7 @@ hashtable_add_station(const char *st_name, size_t st_size, int value)
stations[index].name = malloc(st_size + 1);
memcpy(stations[index].name, st_name, st_size);
stations[index].name[st_size] = '\0';
pthread_mutex_unlock(&mutex);
return;

found:
Expand All @@ -204,6 +225,7 @@ hashtable_add_station(const char *st_name, size_t st_size, int value)

stations[index].avg += value;
stations[index].count++;
pthread_mutex_unlock(&mutex);
}

/**
Expand Down Expand Up @@ -303,21 +325,83 @@ static void list_stations(void)
printf("}\n");
}

/**
* @brief Worker thread, works basically the same way as the
* single-threaded version.
*
* @param p Thread data, including the base buffer and size.
*/
static void*
do_thread_read(void *p)
{
struct thread_data *td = p;
const char *prev;
const char *next;
size_t rsize;

prev = td->base_buffer;
rsize = td->size;

while ((next = memchr(prev, '\n', rsize)) != NULL)
{
add_station(prev, next - prev);
rsize -= (next - prev + 1);
prev = (next + 1);
}

return (p);
}

/*
* Read each line and add the station.
* @brief Prepares the data range to be processed to the worker
* threads, start them and wait.
*/
static void do_read(void)
{
char *prev = txt_buff;
char *next = NULL;
size_t size = txt_size;
int i;
char *buf;
size_t rem_size = txt_size;

/* Decide each portion that each thread will work. */
tdata[0].base_buffer = txt_buff;
tdata[0].size = (txt_size / NUM_THREADS);

while ((next = memchr(prev, '\n', size)) != NULL)
/* Remaining threads. */
for (i = 1; i < NUM_THREADS; i++)
{
add_station(prev, next - prev);
size -= (next - prev + 1);
prev = (next + 1);
/* Adjust size of previous thread. */
buf = memchr(
tdata[i-1].base_buffer + tdata[i-1].size,
'\n',
rem_size - tdata[i-1].size);

if (!buf)
err(1, "Unable to find next line!\n");

tdata[i-1].size = (buf - tdata[i-1].base_buffer + 1);
tdata[i-1].end_buffer = (tdata[i-1].base_buffer + tdata[i-1].size);
rem_size -= tdata[i-1].size;

/* Adjust base buffer of current thread and size. */
tdata[i].base_buffer = tdata[i-1].end_buffer;
tdata[i].size = (txt_size / NUM_THREADS);
}

/* Adjust last thread. */
tdata[NUM_THREADS-1].base_buffer = tdata[NUM_THREADS-2].end_buffer;
tdata[NUM_THREADS-1].size = rem_size;
tdata[NUM_THREADS-1].end_buffer = (tdata[NUM_THREADS-1].base_buffer +
tdata[NUM_THREADS-1].size);

/* Create worker threads. */
for (int i = 0; i < NUM_THREADS; i++) {
tdata[i].tidx = i;
pthread_create(&threads[i], NULL, do_thread_read, &tdata[i]);
}

/* Join threads. */
for (int i = 0; i < NUM_THREADS; i++)
pthread_join(threads[i], NULL);
}

int main(int argc, char **argv)
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
CC ?= cc
CFLAGS += -O3 -march=native -mtune=native -g
LDLIBS += -pthread

.PHONY: all clean

Expand Down

0 comments on commit 5bea85b

Please sign in to comment.