Skip to content

Commit

Permalink
Eager transport (#3598)
Browse files Browse the repository at this point in the history
* TransportStreamType

* no constexpr specifier for Clamp<T>

* refine signature of Send

* GetTransportInstructionParallelConfs

* Send/Receive multi blobs by one instruction

* refine: static inline -> inline static

* refine comment according to google style guide

* implement Send/Receive by Grpc

* reimplement Send/Receive

* oneflow.eager_assign_121

* BoxingInterNodeOneToOne

* 2node_test_assign

* rename OF_BARRIAER

* add eager_2node_test.py

* InitLazyGlobalSession if eager execution not enabled

* remove Global<LbiDiffWatcherInfo>

* add TODO() comments for OF_SESSION_BARRIER under directory core/comm_network

* import atexit in python/framework/unittest.py

* fix minor bug in test_assign.py

* fix macro name error: PLATFORM_POSIX -> OF_PLATFORM_POSIX
  • Loading branch information
lixinqi committed Oct 10, 2020
1 parent 0ddafb9 commit 5181fee
Show file tree
Hide file tree
Showing 45 changed files with 1,337 additions and 191 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -21,6 +21,7 @@ wheelhouse*
/oneflow/python/__export_symbols__.py
/oneflow/python/compatibility.py
/oneflow/python/framework/sysconfig_gen.py
/oneflow/python/test/ops/localhost_script_*.sh
.clangd
compile_commands.json
/oneflow/python/test/ops/localhost_script_*.sh
8 changes: 8 additions & 0 deletions oneflow/api/python/init.cpp
Expand Up @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <atomic>
#include <pybind11/pybind11.h>
#include "oneflow/core/job/job_build_and_infer_ctx_mgr.h"
#include "oneflow/cfg/pybind_module_registry.h"
Expand All @@ -22,8 +23,15 @@ namespace py = pybind11;

namespace oneflow {

uint64_t NewTokenId() {
static std::atomic<uint64_t> token_id(0);
token_id++;
return token_id;
}

PYBIND11_MODULE(oneflow_api, m) {
m.def("EagerExecutionEnabled", []() { return EagerExecutionEnabled(); });
m.def("NewTokenId", &NewTokenId);
::oneflow::cfg::Pybind11ModuleRegistry().ImportAll(m);
::oneflow::OneflowModuleRegistry().ImportAll(m);
}
Expand Down
86 changes: 43 additions & 43 deletions oneflow/core/common/data_type_converter.h
Expand Up @@ -28,23 +28,23 @@ namespace oneflow {

template<typename T>
struct IsFloatingOrHalf {
static constexpr bool value = IsFloating<T>::value || IsFloat16<T>::value;
static const bool value = IsFloating<T>::value || IsFloat16<T>::value;
};

template<typename T>
struct IsArithmeticOrHalf {
static constexpr bool value = std::is_arithmetic<T>::value || IsFloat16<T>::value;
static const bool value = std::is_arithmetic<T>::value || IsFloat16<T>::value;
};

template<typename From, typename To>
struct NeedsClamp {
static constexpr bool from_fp = IsFloatingOrHalf<From>::value;
static constexpr bool to_fp = IsFloatingOrHalf<To>::value;
static constexpr bool from_fp16 = IsFloat16<From>::value;
static constexpr bool to_fp16 = IsFloat16<To>::value;
static constexpr bool from_unsigned = std::is_unsigned<From>::value;
static constexpr bool to_unsigned = std::is_unsigned<To>::value;
static constexpr bool value =
static const bool from_fp = IsFloatingOrHalf<From>::value;
static const bool to_fp = IsFloatingOrHalf<To>::value;
static const bool from_fp16 = IsFloat16<From>::value;
static const bool to_fp16 = IsFloat16<To>::value;
static const bool from_unsigned = std::is_unsigned<From>::value;
static const bool to_unsigned = std::is_unsigned<To>::value;
static const bool value =
// to smaller type of same kind (fp, int)
(from_fp == to_fp && sizeof(To) < sizeof(From)) ||
// fp32 has range in excess of (u)int64
Expand All @@ -59,7 +59,7 @@ struct NeedsClamp {

template<typename To>
struct NeedsClamp<bool, To> {
static constexpr bool value = false;
static const bool value = false;
};

template<typename T, typename U, typename Enabled = void>
Expand Down Expand Up @@ -248,44 +248,44 @@ struct Converter : ConverterBase<Out, In> {
// Converts between two FP types
template<typename Out, typename In>
struct ConverterBase<Out, In, true, true> {
OF_DEVICE_FUNC static constexpr Out Convert(In value) { return value; }
OF_DEVICE_FUNC static constexpr Out ConvertNorm(In value) { return value; }
OF_DEVICE_FUNC static constexpr Out ConvertSat(In value) { return value; }
OF_DEVICE_FUNC static constexpr Out ConvertSatNorm(In value) { return value; }
OF_DEVICE_FUNC static const Out Convert(In value) { return value; }
OF_DEVICE_FUNC static const Out ConvertNorm(In value) { return value; }
OF_DEVICE_FUNC static const Out ConvertSat(In value) { return value; }
OF_DEVICE_FUNC static const Out ConvertSatNorm(In value) { return value; }
};

// Converts integral to FP type
template<typename Out, typename In>
struct ConverterBase<Out, In, true, false> {
OF_DEVICE_FUNC static constexpr Out Convert(In value) { return value; }
OF_DEVICE_FUNC static constexpr Out ConvertSat(In value) { return value; }
OF_DEVICE_FUNC static constexpr Out ConvertNorm(In value) {
OF_DEVICE_FUNC static const Out Convert(In value) { return value; }
OF_DEVICE_FUNC static const Out ConvertSat(In value) { return value; }
OF_DEVICE_FUNC static const Out ConvertNorm(In value) {
return value * (Out(1) / (GetMaxVal<In>()));
}
OF_DEVICE_FUNC static constexpr Out ConvertSatNorm(In value) {
OF_DEVICE_FUNC static const Out ConvertSatNorm(In value) {
return value * (Out(1) / (GetMaxVal<In>()));
}
};

// Converts integral to float16
template<typename In>
struct ConverterBase<float16, In, true, false> {
OF_DEVICE_FUNC static constexpr float16 Convert(In value) {
OF_DEVICE_FUNC static const float16 Convert(In value) {
auto out = ConverterBase<float, In, true, false>::Convert(value);
return static_cast<float16>(out);
}

OF_DEVICE_FUNC static constexpr float16 ConvertSat(In value) {
OF_DEVICE_FUNC static const float16 ConvertSat(In value) {
auto out = ConverterBase<float, In, true, false>::ConvertSat(value);
return static_cast<float16>(out);
}

OF_DEVICE_FUNC static constexpr float16 ConvertNorm(In value) {
OF_DEVICE_FUNC static const float16 ConvertNorm(In value) {
auto out = ConverterBase<float, In, true, false>::ConvertNorm(value);
return static_cast<float16>(out);
}

OF_DEVICE_FUNC static constexpr float16 ConvertSatNorm(In value) {
OF_DEVICE_FUNC static const float16 ConvertSatNorm(In value) {
auto out = ConverterBase<float, In, true, false>::ConvertSatNorm(value);
return static_cast<float16>(out);
}
Expand All @@ -294,31 +294,31 @@ struct ConverterBase<float16, In, true, false> {
// Converts FP to integral type
template<typename Out, typename In>
struct ConverterBase<Out, In, false, true> {
OF_DEVICE_FUNC static constexpr Out Convert(In value) {
OF_DEVICE_FUNC static const Out Convert(In value) {
#ifdef __CUDA_ARCH__
return Clamp<Out>(cuda_round_helper(value, Out()));
#else
return Clamp<Out>(std::round(value));
#endif
}

OF_DEVICE_FUNC static constexpr Out ConvertSat(In value) {
OF_DEVICE_FUNC static const Out ConvertSat(In value) {
#ifdef __CUDA_ARCH__
return Clamp<Out>(cuda_round_helper(value, Out()));
#else
return Clamp<Out>(std::round(value));
#endif
}

OF_DEVICE_FUNC static constexpr Out ConvertNorm(In value) {
OF_DEVICE_FUNC static const Out ConvertNorm(In value) {
#ifdef __CUDA_ARCH__
return Clamp<Out>(cuda_round_helper(value * GetMaxVal<Out>(), Out()));
#else
return std::round(value * GetMaxVal<Out>());
#endif
}

OF_DEVICE_FUNC static constexpr Out ConvertSatNorm(In value) {
OF_DEVICE_FUNC static const Out ConvertSatNorm(In value) {
#ifdef __CUDA_ARCH__
return std::is_signed<Out>::value
? Clamp<Out>(cuda_round_helper(value * GetMaxVal<Out>(), Out()))
Expand All @@ -333,23 +333,23 @@ struct ConverterBase<Out, In, false, true> {
template<typename Out, typename In, bool IsOutSigned = std::is_signed<Out>::value,
bool IsInSigned = std::is_signed<In>::value>
struct ConvertIntInt {
OF_DEVICE_FUNC static constexpr Out Convert(In value) { return value; }
OF_DEVICE_FUNC static constexpr Out ConvertNorm(In value) {
OF_DEVICE_FUNC static const Out Convert(In value) { return value; }
OF_DEVICE_FUNC static const Out ConvertNorm(In value) {
return Converter<Out, float>::Convert(value * (1.0f * GetMaxVal<Out>() / GetMaxVal<In>()));
}
OF_DEVICE_FUNC static constexpr Out ConvertSat(In value) { return Clamp<Out>(value); }
OF_DEVICE_FUNC static constexpr Out ConvertSatNorm(In value) { return ConvertNorm(value); }
OF_DEVICE_FUNC static const Out ConvertSat(In value) { return Clamp<Out>(value); }
OF_DEVICE_FUNC static const Out ConvertSatNorm(In value) { return ConvertNorm(value); }
};

// Converts signed to unsigned integer
template<typename Out, typename In>
struct ConvertIntInt<Out, In, false, true> {
OF_DEVICE_FUNC static constexpr Out Convert(In value) { return value; }
OF_DEVICE_FUNC static constexpr Out ConvertNorm(In value) {
OF_DEVICE_FUNC static const Out Convert(In value) { return value; }
OF_DEVICE_FUNC static const Out ConvertNorm(In value) {
return Converter<Out, float>::Convert(value * (1.0f * GetMaxVal<Out>() / GetMaxVal<In>()));
}
OF_DEVICE_FUNC static constexpr Out ConvertSat(In value) { return Clamp<Out>(value); }
OF_DEVICE_FUNC static constexpr Out ConvertSatNorm(In value) {
OF_DEVICE_FUNC static const Out ConvertSat(In value) { return Clamp<Out>(value); }
OF_DEVICE_FUNC static const Out ConvertSatNorm(In value) {
#ifdef __CUDA_ARCH__
return cuda_round_helper(__saturatef(value * (1.0f / GetMaxVal<In>())) * GetMaxVal<Out>());
#else
Expand All @@ -368,10 +368,10 @@ struct ConvertIntInt<Out, In, false, true> {
// Pass-through conversion
template<typename T>
struct Converter<T, T> {
static OF_DEVICE_FUNC constexpr T Convert(T value) { return value; }
static OF_DEVICE_FUNC constexpr T ConvertSat(T value) { return value; }
static OF_DEVICE_FUNC constexpr T ConvertNorm(T value) { return value; }
static OF_DEVICE_FUNC constexpr T ConvertSatNorm(T value) { return value; }
static OF_DEVICE_FUNC const T Convert(T value) { return value; }
static OF_DEVICE_FUNC const T ConvertSat(T value) { return value; }
static OF_DEVICE_FUNC const T ConvertNorm(T value) { return value; }
static OF_DEVICE_FUNC const T ConvertSatNorm(T value) { return value; }
};

template<typename raw_out, typename raw_in>
Expand All @@ -381,25 +381,25 @@ struct ConvertIntInt<Out, In, false, true> {
} // namespace

template<typename Out, typename In>
OF_DEVICE_FUNC constexpr Out Convert(In value) {
OF_DEVICE_FUNC const Out Convert(In value) {
return converter_t<Out, In>::Convert(value);
}

template<typename Out, typename In>
OF_DEVICE_FUNC constexpr Out ConvertNorm(In value) {
OF_DEVICE_FUNC const Out ConvertNorm(In value) {
return converter_t<Out, In>::ConvertNorm(value);
}

template<typename Out, typename In>
OF_DEVICE_FUNC constexpr Out ConvertSat(In value) {
OF_DEVICE_FUNC const Out ConvertSat(In value) {
return converter_t<Out, In>::ConvertSat(value);
}

template<typename Out, typename In>
OF_DEVICE_FUNC constexpr Out ConvertSatNorm(In value) {
OF_DEVICE_FUNC const Out ConvertSatNorm(In value) {
return converter_t<Out, In>::ConvertSatNorm(value);
}

} // namespace oneflow

#endif // ONEFLOW_CORE_COMMON_DATA_TYPE_CONVERTER_H_
#endif // ONEFLOW_CORE_COMMON_DATA_TYPE_CONVERTER_H_
2 changes: 1 addition & 1 deletion oneflow/core/common/data_type_converter_test_static.h
Expand Up @@ -59,4 +59,4 @@ static_assert(!NeedsClamp<float, double>::value, "Clamping not required");
} // namespace
} // namespace oneflow

#endif // ONEFLOW_CORE_COMMON_DATA_TYPE_CONVERTER_TEST_STATIC_H_
#endif // ONEFLOW_CORE_COMMON_DATA_TYPE_CONVERTER_TEST_STATIC_H_
6 changes: 6 additions & 0 deletions oneflow/core/common/error.cpp
Expand Up @@ -211,6 +211,12 @@ Error Error::LossBlobNotFoundError(const std::string& error_summary) {
return error;
}

Error Error::RwMutexedObjectNotFoundError() {
auto error = std::make_shared<ErrorProto>();
error->mutable_rw_mutexed_object_not_found_error();
return error;
}

Error Error::GradientFunctionNotFound() {
auto error = std::make_shared<ErrorProto>();
error->mutable_gradient_function_not_found_error();
Expand Down
2 changes: 2 additions & 0 deletions oneflow/core/common/error.h
Expand Up @@ -60,6 +60,8 @@ class Error final {
const std::vector<std::string>& error_msgs);
static Error LossBlobNotFoundError(const std::string& error_summary);

static Error RwMutexedObjectNotFoundError();

// gradient
static Error GradientFunctionNotFound();

Expand Down
3 changes: 3 additions & 0 deletions oneflow/core/common/error.proto
Expand Up @@ -105,6 +105,8 @@ message MemoryZoneOutOfMemoryError {

message LossBlobNotFoundError { }

message RwMutexedObjectNotFoundError { }

message UnkownError { }

message ErrorStackFrame {
Expand Down Expand Up @@ -147,6 +149,7 @@ message ErrorProto {
PlacementError placement_error= 470;
BlobSplitAxisInferError blob_split_axis_infer_error = 480;
UnknownJobBuildAndInferError unknown_job_build_and_infer_error = 500;
RwMutexedObjectNotFoundError rw_mutexed_object_not_found_error = 600;
UnkownError unknown_error = 900;
}
}
27 changes: 2 additions & 25 deletions oneflow/core/control/ctrl_test.cpp
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
#include "oneflow/core/job/env.pb.h"
#include "oneflow/core/control/ctrl_client.h"
#include "oneflow/core/control/ctrl_server.h"
#include "oneflow/core/control/ctrl_util.h"
#include "oneflow/core/job/resource_desc.h"
#include "oneflow/core/job/global_for.h"

Expand All @@ -32,30 +33,6 @@ namespace oneflow {

namespace {

sockaddr_in GetSockAddr(const std::string& addr, uint16_t port) {
sockaddr_in sa;
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
PCHECK(inet_pton(AF_INET, addr.c_str(), &(sa.sin_addr)) == 1);
return sa;
}

int FindAvailablePort() {
int sock = socket(AF_INET, SOCK_STREAM, 0);

for (uint16_t port = 10000; port < GetMaxVal<uint16_t>(); ++port) {
sockaddr_in sa = GetSockAddr("0.0.0.0", port);
int bind_result = bind(sock, reinterpret_cast<sockaddr*>(&sa), sizeof(sa));
if (bind_result == 0) {
shutdown(sock, SHUT_RDWR);
close(sock);
return port;
}
}

return -1;
}

EnvProto GetEnvProto(int port) {
EnvProto ret;
auto* machine0 = ret.add_machine();
Expand All @@ -77,7 +54,7 @@ Resource GetResource() {
} // namespace

TEST(CtrlServer, new_delete) {
int port = FindAvailablePort();
int port = CtrlUtil().FindAvailablePort();
if (port == -1) { return; }
EnvProto env_proto = GetEnvProto(port);
Global<EnvDesc>::New(env_proto);
Expand Down

0 comments on commit 5181fee

Please sign in to comment.