Skip to content

Commit

Permalink
Updates for nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
lums658 committed Aug 15, 2022
1 parent 306c383 commit 366c782
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 210 deletions.
22 changes: 11 additions & 11 deletions experimental/tiledb/common/dag/ports/CMakeLists.txt
Expand Up @@ -86,8 +86,8 @@ gather_sources(${PSEUDO_NODES_SOURCES})
#
# Object library for other units to depend upon
#
add_library(pseudo_nodes OBJECT ${PSEUDO_NODES_SOURCES})
target_link_libraries(pseudo_nodes PUBLIC baseline $<TARGET_OBJECTS:baseline>)
#add_library(pseudo_nodes OBJECT ${PSEUDO_NODES_SOURCES})
#target_link_libraries(pseudo_nodes PUBLIC baseline $<TARGET_OBJECTS:baseline>)

#
# Test-compile of object library ensures link-completeness
Expand All @@ -97,24 +97,24 @@ target_link_libraries(compile_pseudo_nodes PRIVATE pseudo_nodes)
target_sources(compile_pseudo_nodes PRIVATE test/compile_pseudo_nodes_main.cc)

if (TILEDB_TESTS)
add_executable(unit_fsm EXCLUDE_FROM_ALL)
target_link_libraries(unit_fsm PUBLIC fsm)
add_executable(unit_pseudo_nodes EXCLUDE_FROM_ALL)
# target_link_libraries(unit_pseudo_nodes PUBLIC pseudo_nodes)
find_package(Catch_EP REQUIRED)
target_link_libraries(unit_concurrency PUBLIC Catch2::Catch2)
# target_link_libraries(unit_concurrency PUBLIC $<TARGET_OBJECTS:thread_pool>)
target_link_libraries(unit_pseudo_nodes PUBLIC Catch2::Catch2)
# target_link_libraries(unit_pseudo_nodes PUBLIC $<TARGET_OBJECTS:thread_pool>)

# Sources for code elsewhere required for tests
target_sources(unit_concurrency PUBLIC ${DEPENDENT_SOURCES})
target_sources(unit_pseudo_nodes PUBLIC ${DEPENDENT_SOURCES})

# Sources for tests
target_sources(unit_concurrency PUBLIC
target_sources(unit_pseudo_nodes PUBLIC
test/main.cc
test/unit_concurrency.cc
test/unit_pseudo_nodes.cc
)

add_test(
NAME "unit_concurrency"
COMMAND $<TARGET_FILE:unit_concurrency> --durations=yes
NAME "unit_pseudo_nodes"
COMMAND $<TARGET_FILE:unit_pseudo_nodes> --durations=yes
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
)
endif()
Expand Down
108 changes: 54 additions & 54 deletions experimental/tiledb/common/dag/ports/ports.h
Expand Up @@ -174,25 +174,22 @@ class Source : public Port<Mover_T, Block> {
return false;
}
this->item_ = value;

using mover_type = Mover_T<Block>;
using source_type = Source<Mover_T, Block>;
using sink_type = Sink<Mover_T, Block>;

/**
* Extract an item into the `Source`. Used only for testing.
*/
std::optional<Block> extract() {
if (!this->is_attached()) {
throw std::logic_error("Sink not attached in extract");
return {};
}
std::optional<Block> ret{};
std::swap(ret, this->item_);

return ret;
return true;
}
/**
* Extract an item into the `Source`. Used only for testing.
*/
std::optional<Block> extract() {
if (!this->is_attached()) {
throw std::logic_error("Sink not attached in extract");
return {};
}
};
std::optional<Block> ret{};
std::swap(ret, this->item_);

return ret;
}
};

/**
* A data flow sink, used by both edges and nodes.
Expand All @@ -201,43 +198,46 @@ class Source : public Port<Mover_T, Block> {
* determined by the states (and policies) of the `Mover`. Their
* functionality is determined by the states (and policies) of the `Mover`.
*/
template <template <class> class Mover_T, class Block>
class Sink : public Port<Mover_T, Block> {
using Port<Mover_T, Block>::Port;
friend Port<Mover_T, Block>;
using port_type = Port<Mover_T, Block>;

friend class Source<Mover_T, Block>;
using mover_type = typename port_type::mover_type;
using source_type = typename port_type::source_type;
using sink_type = typename port_type::sink_type;

public:
Sink() = default;

Sink(const Sink& rhs) = delete;
Sink(Sink&& rhs) = delete;
Sink& operator=(const Sink& rhs) = delete;
Sink& operator=(Sink&& rhs) = delete;

public:
/**
* Create functions (and friends) to manage attaching and detaching of
* `Sink` and `Source` ports.
*/
void attach(source_type& predecessor) {
if (this->is_attached() || predecessor.is_attached()) {
throw std::runtime_error(
"Sink attempting to attach to already attached ports");
} else {
this->item_mover_ = std::make_shared<mover_type>();
predecessor.item_mover_ = this->item_mover_;
this->item_mover_->register_port_items(predecessor.item_, this->item_);
this->set_attached();
predecessor.set_attached();
}
template <template <class> class Mover_T, class Block>
class Sink : public Port<Mover_T, Block> {
using Port<Mover_T, Block>::Port;
friend Port<Mover_T, Block>;
using port_type = Port<Mover_T, Block>;

friend class Source<Mover_T, Block>;
// using mover_type = typename port_type::mover_type;
// using source_type = typename port_type::source_type;
// using sink_type = typename port_type::sink_type;

using mover_type = Mover_T<Block>;
using source_type = Source<Mover_T, Block>;
using sink_type = Sink<Mover_T, Block>;

public:
Sink() = default;

Sink(const Sink& rhs) = delete;
Sink(Sink&& rhs) = delete;
Sink& operator=(const Sink& rhs) = delete;
Sink& operator=(Sink&& rhs) = delete;

public:
/**
* Create functions (and friends) to manage attaching and detaching of
* `Sink` and `Source` ports.
*/
void attach(source_type& predecessor) {
if (this->is_attached() || predecessor.is_attached()) {
throw std::runtime_error(
"Sink attempting to attach to already attached ports");
} else {
this->item_mover_ = std::make_shared<mover_type>();
predecessor.item_mover_ = this->item_mover_;
this->item_mover_->register_port_items(predecessor.item_, this->item_);
this->set_attached();
predecessor.set_attached();
}
};
}

void attach(source_type& predecessor, std::shared_ptr<mover_type> mover) {
if (this->is_attached() || predecessor.is_attached()) {
Expand Down
13 changes: 3 additions & 10 deletions experimental/tiledb/common/dag/ports/test/pseudo_nodes.h
Expand Up @@ -44,8 +44,6 @@ namespace tiledb::common {

class GraphNode {};

class GraphNode {};

/**
* Prototype producer function object class. This class generates a sequence of
* integers from 0 to N (half-open interval). It will invoke an out-of-data
Expand Down Expand Up @@ -127,13 +125,9 @@ class ProducerNode : public Source<Mover_T, Block> {
* Trivial default constructor, for testing.
*/
ProducerNode() = default;
ProducerNode(const ProducerNode&) = default;
// ProducerNode(const ProducerNode&) = default;
ProducerNode(const ProducerNode&) {
}
ProducerNode(ProducerNode&&) = default;

// ProducerNode(const ProducerNode&) = default;
// ProducerNode(ProducerNode&&) = default;

/**
* Submit an item to be transferred to correspondent_ Sink. Blocking. The
Expand Down Expand Up @@ -199,7 +193,7 @@ class consumer {
OutputIterator iter_;

public:
explicit explicit consumer(OutputIterator iter)
explicit consumer(OutputIterator iter)
: iter_(iter) {
}
void operator()(Block& item) {
Expand Down Expand Up @@ -250,11 +244,10 @@ class ConsumerNode : public Sink<Mover_T, Block> {
* Trivial default constructor, for testing.
*/
ConsumerNode() = default;
ConsumerNode(const ConsumerNode&) = default;
// ConsumerNode(const ConsumerNode&) = default;
ConsumerNode(const ConsumerNode&) {
}
ConsumerNode(ConsumerNode&&) = default;
ConsumerNode(ConsumerNode&&) = default;

/**
* Retrieve `item_` from the Sink. Blocking. The behavior of `retrieve` and
Expand Down

0 comments on commit 366c782

Please sign in to comment.