Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build*/
old/tmp/
*.csv
8 changes: 0 additions & 8 deletions .idea/.gitignore

This file was deleted.

6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
cmake_minimum_required(VERSION 3.28)
project(parbfs CXX)
set(CMAKE_CXX_STANDARD 23)

add_executable(parbfs main.cpp bfs.cpp)
target_link_libraries(parbfs fmt)
27 changes: 27 additions & 0 deletions CMakePresets.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"version": 3,
"cmakeMinimumRequired": {
"major": 3,
"minor": 28,
"patch": 0
},
"configurePresets": [
{
"name": "default",
"generator": "Ninja Multi-Config",
"binaryDir": "${sourceDir}/build",
"cacheVariables": {
"CMAKE_EXPORT_COMPILE_COMMANDS": "ON",
"CMAKE_CXX_FLAGS": "-Wall -Wextra -Wattributes"
}
},
{
"name": "san",
"inherits": "default",
"binaryDir": "${sourceDir}/build-san",
"cacheVariables": {
"CMAKE_CXX_FLAGS": "-fsanitize=address"
}
}
]
}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/urO9t9_z)
# Лабораторная работа № 1: определение достижимости параллелизма и реализация параллельных алгоритмов.

Шаги выполнения:
Expand Down
158 changes: 158 additions & 0 deletions bfs.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#include "digraph.hpp"
#include <algorithm>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>

static void reset_depths(const digraph& g [[maybe_unused]],
std::span<int> depths,
int start_vert) {
assert(std::ssize(depths) == g.num_verts());
std::fill_n(depths.begin(), start_vert, -1);
depths[start_vert] = 0;
std::fill(depths.begin() + start_vert + 1, depths.end(), -1);
}

void bfs(const digraph& g, std::span<int> depths) {
reset_depths(g, depths, 0);
std::queue<int> q;
q.push(0);
do {
int v = q.front();
q.pop();
for (int n: g.adj[v]) {
if (depths[n] == -1) {
depths[n] = depths[v] + 1;
q.push(n);
}
}
} while (!q.empty());
}

struct parbfs {
const digraph& g;
std::span<int> depths;

struct block {
constexpr static int max_size = 256;
int verts[max_size];
};

int load_depth(int vert) {
return __atomic_load_n(&depths[vert], __ATOMIC_SEQ_CST);
}

bool weak_cas_depth(int vert, int* expected, int desired) {
return __atomic_compare_exchange_n(
&depths[vert], expected, desired,
true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
}

struct {
std::mutex mutex;
std::condition_variable more;
std::queue<std::unique_ptr<block>> queue;
int idle = 0;
bool done = false;
} q;

std::vector<std::jthread> workers;

explicit parbfs(int n_threads, const digraph& g, std::span<int> depths):
g(g),
depths(depths),
workers(n_threads)
{
auto initial = std::make_unique<block>();
initial->verts[0] = 0;
initial->verts[1] = -1;
q.queue.push(std::move(initial));

for (int i = 0; i < n_threads; ++i) {
workers[i] = std::jthread(&parbfs::worker, this);
}
}

std::unique_ptr<block> pop_block() {
std::unique_lock lk(q.mutex);
if (++q.idle == std::ssize(workers) && q.queue.empty()) {
q.done = true;
q.more.notify_all();
return nullptr;
}
q.more.wait(lk, [&] { return !q.queue.empty() || q.done; });
if (q.done) {
assert(q.queue.empty());
return nullptr;
}
auto result = std::move(q.queue.front());
q.queue.pop();
--q.idle;
return result;
}

void push_block(std::unique_ptr<block> block) {
std::unique_lock lk(q.mutex);
q.queue.push(std::move(block));
q.more.notify_one();
}

void worker() {
int out_size = 0;
std::unique_ptr<block> out = nullptr;

auto push_out = [&] {
if (out) {
if (out_size != block::max_size) {
out->verts[out_size] = -1;
}
push_block(std::move(out));
out_size = 0;
}
};

auto push_vert = [&](int vert) {
if (!out) {
assert(out_size == 0);
out = std::make_unique<block>();
}
assert(out_size < block::max_size);
out->verts[out_size++] = vert;
if (out_size == block::max_size) {
push_out();
}
};

auto process_vert = [&](int src) {
const int src_depth = load_depth(src);
const int new_depth = src_depth + 1;
for (int dst: g.adj[src]) {
int dst_depth = load_depth(dst);
if (dst_depth != -1 && dst_depth <= new_depth) {
continue;
}
do {
if (weak_cas_depth(dst, &dst_depth, new_depth)) {
push_vert(dst);
break;
}
assert(dst_depth != -1);
} while (dst_depth > new_depth);
}
};

while (auto in = pop_block()) {
for (int src: in->verts) {
if (src == -1) { break; }
process_vert(src);
}
push_out();
}
}
};

void parallel_bfs(int n_threads, const digraph& g, std::span<int> depths) {
reset_depths(g, depths, 0);
parbfs parbfs(n_threads, g, depths);
}
31 changes: 31 additions & 0 deletions digraph.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once
#include "svo.hpp"
#include <cassert>
#include <span>
#include <vector>

struct digraph {
std::vector<svo_vector<int>> adj;
int num_edges = 0;

explicit digraph(int verts): adj(verts) {}

int num_verts() const { return std::ssize(adj); }

bool maybe_add_edge(int from, int to) {
assert(from >= 0 && from < num_verts());
assert(to >= 0 && to < num_verts());
if (from != to) {
auto& v = adj[from];
if (std::find(v.begin(), v.end(), to) == v.end()) {
v.emplace_back(to);
++num_edges;
return true;
}
}
return false;
}
};

void bfs(const digraph&, std::span<int> depths);
void parallel_bfs(int n_threads, const digraph&, std::span<int> depths);
Loading