446 changes: 446 additions & 0 deletions link/ableton/link/Controller.hpp

Large diffs are not rendered by default.

94 changes: 94 additions & 0 deletions link/ableton/link/Gateway.hpp
@@ -0,0 +1,94 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/discovery/PeerGateway.hpp>
#include <ableton/link/MeasurementService.hpp>
#include <ableton/link/PeerState.hpp>

namespace ableton
{
namespace link
{

template <typename PeerObserver, typename Clock, typename IoContext>
class Gateway
{
public:
Gateway(util::Injected<IoContext> io,
asio::ip::address_v4 addr,
util::Injected<PeerObserver> observer,
NodeState nodeState,
GhostXForm ghostXForm,
Clock clock)
// TODO: Measurement should have an IoContext injected
: mIo(std::move(io)),
mMeasurement(addr,
nodeState.sessionId,
std::move(ghostXForm),
std::move(clock),
util::injectVal(channel(mIo->log(), "gateway@" + addr.to_string()))),
mPeerGateway(discovery::makeIpV4Gateway(util::injectRef(*mIo),
std::move(addr),
std::move(observer),
PeerState{std::move(nodeState), mMeasurement.endpoint()}))
{
}

Gateway(const Gateway& rhs) = delete;
Gateway& operator=(const Gateway& rhs) = delete;

Gateway(Gateway&& rhs)
: mIo(std::move(rhs.mIo))
, mMeasurement(std::move(rhs.mMeasurement))
, mPeerGateway(std::move(rhs.mPeerGateway))
{
}

Gateway& operator=(Gateway&& rhs)
{
mIo = std::move(rhs.mIo);
mMeasurement = std::move(rhs.mMeasurement);
mPeerGateway = std::move(rhs.mPeerGateway);
return *this;
}

void updateNodeState(std::pair<NodeState, GhostXForm> state)
{
mMeasurement.updateNodeState(state.first.sessionId, state.second);
mPeerGateway.updateState(PeerState{std::move(state.first), mMeasurement.endpoint()});
}

template <typename Handler>
void measurePeer(const PeerState& peer, Handler handler)
{
mMeasurement.measurePeer(peer, std::move(handler));
}

private:
util::Injected<IoContext> mIo;
MeasurementService<Clock, typename util::Injected<IoContext>::type::Log> mMeasurement;
discovery::
IpV4Gateway<PeerObserver, PeerState, typename util::Injected<IoContext>::type&>
mPeerGateway;
};

} // namespace link
} // namespace ableton
59 changes: 59 additions & 0 deletions link/ableton/link/GhostXForm.hpp
@@ -0,0 +1,59 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <chrono>
#include <cmath>

namespace ableton
{
namespace link
{

using std::chrono::microseconds;

struct GhostXForm
{
microseconds hostToGhost(const microseconds hostTime) const
{
return microseconds{llround(slope * hostTime.count())} + intercept;
}

microseconds ghostToHost(const microseconds ghostTime) const
{
return microseconds{llround((ghostTime - intercept).count() / slope)};
}

friend bool operator==(const GhostXForm lhs, const GhostXForm rhs)
{
return lhs.slope == rhs.slope && lhs.intercept == rhs.intercept;
}

friend bool operator!=(const GhostXForm lhs, const GhostXForm rhs)
{
return !(lhs == rhs);
}

double slope;
microseconds intercept;
};

} // namespace link
} // namespace ableton
82 changes: 82 additions & 0 deletions link/ableton/link/HostTimeFilter.hpp
@@ -0,0 +1,82 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/link/LinearRegression.hpp>
#include <chrono>
#include <vector>

namespace ableton
{
namespace link
{

template <class T>
class HostTimeFilter
{
static const std::size_t kNumPoints = 512;
using Points = std::vector<std::pair<double, double>>;
using PointIt = typename Points::iterator;

public:
HostTimeFilter()
: mIndex(0)
{
mPoints.reserve(kNumPoints);
}

~HostTimeFilter() = default;

void reset()
{
mIndex = 0;
mPoints.clear();
}

std::chrono::microseconds sampleTimeToHostTime(const double sampleTime)
{
const auto micros = static_cast<double>(mHostTimeSampler.micros().count());
const auto point = std::make_pair(sampleTime, micros);

if (mPoints.size() < kNumPoints)
{
mPoints.push_back(point);
}
else
{
mPoints[mIndex] = point;
}
mIndex = (mIndex + 1) % kNumPoints;

const auto result = linearRegression(mPoints.begin(), mPoints.end());

const auto hostTime = (result.first * sampleTime) + result.second;

return std::chrono::microseconds(llround(hostTime));
}

private:
std::size_t mIndex;
Points mPoints;
T mHostTimeSampler;
};

} // namespace link
} // namespace ableton
160 changes: 160 additions & 0 deletions link/ableton/link/Kalman.hpp
@@ -0,0 +1,160 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <array>
#include <cfloat>
#include <cmath>
#include <limits>

#if LINK_PLATFORM_WINDOWS
// Windows.h (or more specifically, minwindef.h) define the max(a, b) macro
// which conflicts with the symbol provided by std::numeric_limits.
#ifdef max
#undef max
#endif
#endif

namespace ableton
{
namespace link
{

template <std::size_t n>
struct Kalman
{
Kalman()
: mValue(0)
, mGain(0)
, mVVariance(1)
, mWVariance(1)
, mCoVariance(1)
, mVarianceLength(n)
, mCounter(mVarianceLength)
{
}

double getValue()
{
return mValue;
}

double calculateVVariance()
{
auto vVar = 0.;
auto meanOfDiffs = 0.;

for (size_t k = 0; k < (mVarianceLength); k++)
{
meanOfDiffs += (mMeasuredValues[k] - mFilterValues[k]);
}

meanOfDiffs /= (mVarianceLength);

for (size_t i = 0; i < (mVarianceLength); i++)
{
vVar += (pow(mMeasuredValues[i] - mFilterValues[i] - meanOfDiffs, 2.0));
}

vVar /= (mVarianceLength - 1);

return vVar;
}

double calculateWVariance()
{
auto wVar = 0.;
auto meanOfDiffs = 0.;

for (size_t k = 0; k < (mVarianceLength); k++)
{
meanOfDiffs += (mFilterValues[(mCounter - k - 1) % mVarianceLength]
- mFilterValues[(mCounter - k - 2) % mVarianceLength]);
}

meanOfDiffs /= (mVarianceLength);

for (size_t i = 0; i < (mVarianceLength); i++)
{
wVar += (pow(mFilterValues[(mCounter - i - 1) % mVarianceLength]
- mFilterValues[(mCounter - i - 2) % mVarianceLength] - meanOfDiffs,
2.0));
}

wVar /= (mVarianceLength - 1);

return wVar;
}

void iterate(const double value)
{
const std::size_t currentIndex = mCounter % mVarianceLength;
mMeasuredValues[currentIndex] = value;

if (mCounter < (mVarianceLength + mVarianceLength))
{
if (mCounter == mVarianceLength)
{
mValue = value;
}
else
{
mValue = (mValue + value) / 2;
}
}
else
{
// prediction equations
const double prevFilterValue = mFilterValues[(mCounter - 1) % mVarianceLength];
mFilterValues[currentIndex] = prevFilterValue;
mWVariance = calculateWVariance();
const double coVarianceEstimation = mCoVariance + mWVariance;

// update equations
mVVariance = calculateVVariance();
if ((coVarianceEstimation + mVVariance) != 0)
{
mGain = coVarianceEstimation / (coVarianceEstimation + mVVariance);
}
else
{
mGain = std::numeric_limits<double>::max();
}
mValue = prevFilterValue + mGain * (value - prevFilterValue);
mCoVariance = (1 - mGain) * coVarianceEstimation;
}
mFilterValues[currentIndex] = mValue;

++mCounter;
}

double mValue;
double mGain;
double mVVariance;
double mWVariance;
double mCoVariance;
size_t mVarianceLength;
size_t mCounter;
std::array<double, n> mFilterValues;
std::array<double, n> mMeasuredValues;
};

} // namespace link
} // namespace ableton
64 changes: 64 additions & 0 deletions link/ableton/link/LinearRegression.hpp
@@ -0,0 +1,64 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <cfloat>
#include <cmath>
#include <numeric>
#include <utility>

namespace ableton
{
namespace link
{

template <typename It>
std::pair<double, double> linearRegression(It begin, It end)
{
using namespace std;
using Point = pair<double, double>;

const double numPoints = static_cast<double>(distance(begin, end));

const double meanX = accumulate(begin, end, 0.0, [](double a, Point b) {
return a + b.first;
}) / numPoints;

const double productXX = accumulate(begin, end, 0.0,
[&meanX](double a, Point b) { return a + pow(b.first - meanX, 2.0); });

const double meanY = accumulate(begin, end, 0.0, [](double a, Point b) {
return a + b.second;
}) / numPoints;

const double productXY =
inner_product(begin, end, begin, 0.0, [](double a, double b) { return a + b; },
[&meanX, &meanY](
Point a, Point b) { return ((a.first - meanX) * (b.second - meanY)); });

const double slope = productXX == 0.0 ? 0.0 : productXY / productXX;

const double intercept = meanY - (slope * meanX);

return make_pair(slope, intercept);
}

} // namespace link
} // namespace ableton
296 changes: 296 additions & 0 deletions link/ableton/link/Measurement.hpp
@@ -0,0 +1,296 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/discovery/Payload.hpp>
#include <ableton/discovery/Socket.hpp>
#include <ableton/link/PayloadEntries.hpp>
#include <ableton/link/PeerState.hpp>
#include <ableton/link/SessionId.hpp>
#include <ableton/link/v1/Messages.hpp>
#include <ableton/platforms/asio/AsioService.hpp>
#include <ableton/util/Injected.hpp>
#include <chrono>
#include <memory>

namespace ableton
{
namespace link
{

template <typename IoService, typename Clock, typename Socket, typename Log>
struct Measurement
{
using Point = std::pair<double, double>;
using Callback = std::function<void(std::vector<Point>)>;
using Micros = std::chrono::microseconds;
using Timer = typename IoService::Timer;

static const std::size_t kNumberDataPoints = 100;
static const std::size_t kNumberMeasurements = 5;

Measurement() = default;

Measurement(const PeerState& state,
Callback callback,
asio::ip::address_v4 address,
Clock clock,
util::Injected<Log> log)
: mpIo(new IoService{})
, mpImpl(std::make_shared<Impl>(*mpIo,
std::move(state),
std::move(callback),
std::move(address),
std::move(clock),
std::move(log)))
{
mpImpl->listen();
}

Measurement(Measurement&& rhs)
: mpIo(std::move(rhs.mpIo))
, mpImpl(std::move(rhs.mpImpl))
{
}

~Measurement()
{
postImplDestruction();
}

Measurement& operator=(Measurement&& rhs)
{
postImplDestruction();
mpIo = std::move(rhs.mpIo);
mpImpl = std::move(rhs.mpImpl);
return *this;
}

void postImplDestruction()
{
// Post destruction of the impl object into the io thread if valid
if (mpIo)
{
mpIo->post(ImplDeleter{*this});
}
}

struct Impl : std::enable_shared_from_this<Impl>
{
Impl(IoService& io,
const PeerState& state,
Callback callback,
asio::ip::address_v4 address,
Clock clock,
util::Injected<Log> log)
: mpSocket(std::make_shared<Socket>(io))
, mSessionId(state.nodeState.sessionId)
, mEndpoint(state.endpoint)
, mCallback(std::move(callback))
, mClock(std::move(clock))
, mTimer(util::injectVal(io.makeTimer()))
, mMeasurementsStarted(0)
, mLog(std::move(log))
, mSuccess(false)
{
configureUnicastSocket(*mpSocket, address);

const auto ht = HostTime{mClock.micros()};
sendPing(mEndpoint, discovery::makePayload(ht));
resetTimer();
}

void resetTimer()
{
mTimer->cancel();
mTimer->expires_from_now(std::chrono::milliseconds(50));
mTimer->async_wait([this](const typename Timer::ErrorCode e) {
if (!e)
{
if (mMeasurementsStarted < kNumberMeasurements)
{
const auto ht = HostTime{mClock.micros()};
sendPing(mEndpoint, discovery::makePayload(ht));
++mMeasurementsStarted;
resetTimer();
}
else
{
fail();
}
}
});
}

void listen()
{
mpSocket->receive(util::makeAsyncSafe(this->shared_from_this()));
}

// Operator to handle incoming messages on the interface
template <typename It>
void operator()(
const asio::ip::udp::endpoint& from, const It messageBegin, const It messageEnd)
{
using namespace std;
const auto result = v1::parseMessageHeader(messageBegin, messageEnd);
const auto& header = result.first;
const auto payloadBegin = result.second;

if (header.messageType == v1::kPong)
{
debug(*mLog) << "Received Pong message from " << from;

// parse for all entries
SessionId sessionId{};
std::chrono::microseconds ghostTime{0};
std::chrono::microseconds prevGHostTime{0};
std::chrono::microseconds prevHostTime{0};

try
{
discovery::parsePayload<SessionMembership, GHostTime, PrevGHostTime, HostTime>(
payloadBegin, messageEnd,
[&sessionId](const SessionMembership& sms) { sessionId = sms.sessionId; },
[&ghostTime](GHostTime gt) { ghostTime = std::move(gt.time); },
[&prevGHostTime](PrevGHostTime gt) { prevGHostTime = std::move(gt.time); },
[&prevHostTime](HostTime ht) { prevHostTime = std::move(ht.time); });
}
catch (const std::runtime_error& err)
{
warning(*mLog) << "Failed parsing payload, caught exception: " << err.what();
listen();
return;
}

if (mSessionId == sessionId)
{
const auto hostTime = mClock.micros();

const auto payload =
discovery::makePayload(HostTime{hostTime}, PrevGHostTime{ghostTime});

sendPing(from, payload);
listen();

if (prevGHostTime != Micros{0})
{
mData.push_back(
std::make_pair(static_cast<double>((hostTime + prevHostTime).count()) * 0.5,
static_cast<double>(ghostTime.count())));
mData.push_back(std::make_pair(static_cast<double>(prevHostTime.count()),
static_cast<double>((ghostTime + prevGHostTime).count()) * 0.5));
}

if (mData.size() > kNumberDataPoints)
{
finish();
}
else
{
resetTimer();
}
}
else
{
fail();
}
}
else
{
debug(*mLog) << "Received invalid message from " << from;
listen();
}
}

template <typename Payload>
void sendPing(asio::ip::udp::endpoint to, const Payload& payload)
{
v1::MessageBuffer buffer;
const auto msgBegin = std::begin(buffer);
const auto msgEnd = v1::pingMessage(payload, msgBegin);
const auto numBytes = static_cast<size_t>(std::distance(msgBegin, msgEnd));

try
{
mpSocket->send(buffer.data(), numBytes, to);
}
catch (const std::runtime_error& err)
{
info(*mLog) << "Failed to send Ping to " << to.address().to_string() << ": "
<< err.what();
}
}

void finish()
{
mTimer->cancel();
mCallback(std::move(mData));
mData = {};
mSuccess = true;
debug(*mLog) << "Measuring " << mEndpoint << " done.";
}

void fail()
{
mCallback(std::vector<Point>{});
mData = {};
debug(*mLog) << "Measuring " << mEndpoint << " failed.";
}

std::shared_ptr<Socket> mpSocket;
SessionId mSessionId;
asio::ip::udp::endpoint mEndpoint;
std::vector<std::pair<double, double>> mData;
Callback mCallback;
Clock mClock;
util::Injected<typename IoService::Timer> mTimer;
std::size_t mMeasurementsStarted;
util::Injected<Log> mLog;
bool mSuccess;
};

struct ImplDeleter
{
ImplDeleter(Measurement& measurement)
: mpImpl(std::move(measurement.mpImpl))
{
}

void operator()()
{
// Notify callback that the measurement has failed if it did
// not succeed before destruction
if (!mpImpl->mSuccess)
{
mpImpl->fail();
}
mpImpl.reset();
}

std::shared_ptr<Impl> mpImpl;
};

std::unique_ptr<IoService> mpIo;
std::shared_ptr<Impl> mpImpl;
};

} // namespace link
} // namespace ableton
70 changes: 70 additions & 0 deletions link/ableton/link/MeasurementEndpointV4.hpp
@@ -0,0 +1,70 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/discovery/NetworkByteStreamSerializable.hpp>
#include <ableton/platforms/asio/AsioWrapper.hpp>

namespace ableton
{
namespace link
{

struct MeasurementEndpointV4
{
enum
{
key = 'mep4'
};

// Model the NetworkByteStreamSerializable concept
friend std::uint32_t sizeInByteStream(const MeasurementEndpointV4 mep)
{
return discovery::sizeInByteStream(
static_cast<std::uint32_t>(mep.ep.address().to_v4().to_ulong()))
+ discovery::sizeInByteStream(mep.ep.port());
}

template <typename It>
friend It toNetworkByteStream(const MeasurementEndpointV4 mep, It out)
{
return discovery::toNetworkByteStream(mep.ep.port(),
discovery::toNetworkByteStream(
static_cast<std::uint32_t>(mep.ep.address().to_v4().to_ulong()), std::move(out)));
}

template <typename It>
static std::pair<MeasurementEndpointV4, It> fromNetworkByteStream(It begin, It end)
{
using namespace std;
auto addrRes =
discovery::Deserialize<std::uint32_t>::fromNetworkByteStream(move(begin), end);
auto portRes = discovery::Deserialize<std::uint16_t>::fromNetworkByteStream(
move(addrRes.second), end);
return make_pair(MeasurementEndpointV4{{asio::ip::address_v4{move(addrRes.first)},
move(portRes.first)}},
move(portRes.second));
}

asio::ip::udp::endpoint ep;
};

} // namespace link
} // namespace ableton
186 changes: 186 additions & 0 deletions link/ableton/link/MeasurementService.hpp
@@ -0,0 +1,186 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/discovery/Socket.hpp>
#include <ableton/link/GhostXForm.hpp>
#include <ableton/link/Kalman.hpp>
#include <ableton/link/LinearRegression.hpp>
#include <ableton/link/Measurement.hpp>
#include <ableton/link/PeerState.hpp>
#include <ableton/link/PingResponder.hpp>
#include <ableton/link/SessionId.hpp>
#include <ableton/link/v1/Messages.hpp>
#include <ableton/platforms/asio/AsioService.hpp>
#include <ableton/util/Log.hpp>
#include <map>
#include <memory>
#include <thread>

namespace ableton
{
namespace link
{

template <typename Clock, typename Log>
class MeasurementService
{
public:
using Point = std::pair<double, double>;

using MeasurementInstance = Measurement<platforms::asio::AsioService,
Clock,
discovery::Socket<v1::kMaxMessageSize>,
Log>;

using MeasurementServicePingResponder = PingResponder<platforms::asio::AsioService&,
Clock,
discovery::Socket<v1::kMaxMessageSize>,
Log>;

static const std::size_t kNumberThreads = 1;

MeasurementService(asio::ip::address_v4 address,
SessionId sessionId,
GhostXForm ghostXForm,
Clock clock,
util::Injected<Log> log)
: mClock(std::move(clock))
, mLog(std::move(log))
, mPingResponder(std::move(address),
std::move(sessionId),
std::move(ghostXForm),
util::injectRef(mIo),
mClock,
mLog)
{
}

MeasurementService(const MeasurementService&) = delete;
MeasurementService(MeasurementService&&) = delete;

~MeasurementService()
{
// Clear the measurement map in the io service so that whatever
// cleanup code executes in response to the destruction of the
// measurement objects still have access to the io service
mIo.post([this] { mMeasurementMap.clear(); });
}

void updateNodeState(const SessionId& sessionId, const GhostXForm& xform)
{
mPingResponder.updateNodeState(sessionId, xform);
}

asio::ip::udp::endpoint endpoint() const
{
return mPingResponder.endpoint();
}

// Measure the peer and invoke the handler with a GhostXForm
template <typename Handler>
void measurePeer(const PeerState& state, const Handler handler)
{
using namespace std;

mIo.post([this, state, handler] {
const auto nodeId = state.nodeState.nodeId;
auto addr = mPingResponder.endpoint().address().to_v4();
auto callback = CompletionCallback<Handler>{*this, nodeId, handler};

try
{
mMeasurementMap[nodeId] =
MeasurementInstance{state, move(callback), move(addr), mClock, mLog};
}
catch (const runtime_error& err)
{
info(*mLog) << "Failed to measure. Reason: " << err.what();
handler(GhostXForm{});
}
});
}

static GhostXForm filter(
std::vector<Point>::const_iterator begin, std::vector<Point>::const_iterator end)
{
using namespace std;
using std::chrono::microseconds;

Kalman<5> kalman;
for (auto it = begin; it != end; ++it)
{
kalman.iterate(it->second - it->first);
}

return GhostXForm{1, microseconds(llround(kalman.getValue()))};
}

private:
template <typename Handler>
struct CompletionCallback
{
void operator()(const std::vector<Point> data)
{
using namespace std;
using std::chrono::microseconds;

// Post this to the measurement service's io service so that we
// don't delete the measurement object in its stack. Capture all
// needed data separately from this, since this object may be
// gone by the time the block gets executed.
auto nodeId = mNodeId;
auto handler = mHandler;
auto& measurementMap = mService.mMeasurementMap;
mService.mIo.post([nodeId, handler, &measurementMap, data] {
const auto it = measurementMap.find(nodeId);
if (it != measurementMap.end())
{
if (data.empty())
{
handler(GhostXForm{});
}
else
{
handler(MeasurementService::filter(begin(data), end(data)));
}
measurementMap.erase(it);
}
});
}

MeasurementService& mService;
NodeId mNodeId;
Handler mHandler;
};

// Make sure the measurement map outlives the io service so that the rest of
// the members are guaranteed to be valid when any final handlers
// are begin run.
using MeasurementMap = std::map<NodeId, MeasurementInstance>;
MeasurementMap mMeasurementMap;
Clock mClock;
util::Injected<Log> mLog;
platforms::asio::AsioService mIo;
MeasurementServicePingResponder mPingResponder;
};

} // namespace link
} // namespace ableton
82 changes: 82 additions & 0 deletions link/ableton/link/NodeId.hpp
@@ -0,0 +1,82 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/discovery/NetworkByteStreamSerializable.hpp>
#include <algorithm>
#include <array>
#include <cstdint>
#include <random>
#include <string>

namespace ableton
{
namespace link
{

using NodeIdArray = std::array<std::uint8_t, 8>;

struct NodeId : NodeIdArray
{
NodeId() = default;

NodeId(NodeIdArray rhs)
: NodeIdArray(std::move(rhs))
{
}

static NodeId random()
{
using namespace std;

random_device rd;
mt19937 gen(rd());
// uint8_t not standardized for this type - use unsigned
uniform_int_distribution<unsigned> dist(33, 126); // printable ascii chars

NodeId nodeId;
generate(
nodeId.begin(), nodeId.end(), [&] { return static_cast<uint8_t>(dist(gen)); });
return nodeId;
}

friend std::ostream& operator<<(std::ostream& stream, const NodeId& id)
{
return stream << std::string{id.cbegin(), id.cend()};
}

template <typename It>
friend It toNetworkByteStream(const NodeId& nodeId, It out)
{
return discovery::toNetworkByteStream(nodeId, std::move(out));
}

template <typename It>
static std::pair<NodeId, It> fromNetworkByteStream(It begin, It end)
{
using namespace std;
auto result =
discovery::Deserialize<NodeIdArray>::fromNetworkByteStream(move(begin), move(end));
return make_pair(NodeId(move(result.first)), move(result.second));
}
};

} // namespace link
} // namespace ableton
69 changes: 69 additions & 0 deletions link/ableton/link/NodeState.hpp
@@ -0,0 +1,69 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/discovery/Payload.hpp>
#include <ableton/link/NodeId.hpp>
#include <ableton/link/SessionId.hpp>
#include <ableton/link/Timeline.hpp>

namespace ableton
{
namespace link
{

struct NodeState
{
using Payload = decltype(discovery::makePayload(Timeline{}, SessionMembership{}));

NodeId ident() const
{
return nodeId;
}

friend bool operator==(const NodeState& lhs, const NodeState& rhs)
{
return std::tie(lhs.nodeId, lhs.sessionId, lhs.timeline)
== std::tie(rhs.nodeId, rhs.sessionId, rhs.timeline);
}

friend Payload toPayload(const NodeState& state)
{
return discovery::makePayload(state.timeline, SessionMembership{state.sessionId});
}

template <typename It>
static NodeState fromPayload(NodeId id, It begin, It end)
{
using namespace std;
auto state = NodeState{move(id), {}, {}};
discovery::parsePayload<Timeline, SessionMembership>(move(begin), move(end),
[&state](Timeline tl) { state.timeline = move(tl); },
[&state](SessionMembership sm) { state.sessionId = move(sm.sessionId); });
return state;
}

NodeId nodeId;
SessionId sessionId;
Timeline timeline;
};

} // namespace link
} // namespace ableton
146 changes: 146 additions & 0 deletions link/ableton/link/PayloadEntries.hpp
@@ -0,0 +1,146 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/discovery/NetworkByteStreamSerializable.hpp>
#include <cmath>
#include <cstdint>

namespace ableton
{
namespace link
{

struct HostTime
{
enum
{
key = '__ht'
};

HostTime() = default;

HostTime(const std::chrono::microseconds tm)
: time(tm)
{
}

// Model the NetworkByteStreamSerializable concept
friend std::uint32_t sizeInByteStream(const HostTime& sht)
{
return discovery::sizeInByteStream(std::move(sht.time));
}

template <typename It>
friend It toNetworkByteStream(const HostTime& sht, It out)
{
return discovery::toNetworkByteStream(std::move(sht.time), std::move(out));
}

template <typename It>
static std::pair<HostTime, It> fromNetworkByteStream(It begin, It end)
{
using namespace std;
auto result = discovery::Deserialize<chrono::microseconds>::fromNetworkByteStream(
move(begin), move(end));
return make_pair(HostTime{move(result.first)}, move(result.second));
}

std::chrono::microseconds time;
};

struct GHostTime : HostTime
{
enum
{
key = '__gt'
};

GHostTime() = default;

GHostTime(const std::chrono::microseconds tm)
: time(tm)
{
}

// Model the NetworkByteStreamSerializable concept
friend std::uint32_t sizeInByteStream(const GHostTime& dgt)
{
return discovery::sizeInByteStream(std::move(dgt.time));
}

template <typename It>
friend It toNetworkByteStream(const GHostTime& dgt, It out)
{
return discovery::toNetworkByteStream(std::move(dgt.time), std::move(out));
}

template <typename It>
static std::pair<GHostTime, It> fromNetworkByteStream(It begin, It end)
{
using namespace std;
auto result = discovery::Deserialize<chrono::microseconds>::fromNetworkByteStream(
move(begin), move(end));
return make_pair(GHostTime{move(result.first)}, move(result.second));
}

std::chrono::microseconds time;
};

struct PrevGHostTime
{
enum
{
key = '_pgt'
};

PrevGHostTime() = default;

PrevGHostTime(const std::chrono::microseconds tm)
: time(tm)
{
}

// Model the NetworkByteStreamSerializable concept
friend std::uint32_t sizeInByteStream(const PrevGHostTime& dgt)
{
return discovery::sizeInByteStream(std::move(dgt.time));
}

template <typename It>
friend It toNetworkByteStream(const PrevGHostTime& pdgt, It out)
{
return discovery::toNetworkByteStream(std::move(pdgt.time), std::move(out));
}

template <typename It>
static std::pair<PrevGHostTime, It> fromNetworkByteStream(It begin, It end)
{
using namespace std;
auto result = discovery::Deserialize<chrono::microseconds>::fromNetworkByteStream(
move(begin), move(end));
return make_pair(PrevGHostTime{move(result.first)}, move(result.second));
}

std::chrono::microseconds time;
};

} // namespace link
} // namespace ableton
83 changes: 83 additions & 0 deletions link/ableton/link/PeerState.hpp
@@ -0,0 +1,83 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/discovery/Payload.hpp>
#include <ableton/link/MeasurementEndpointV4.hpp>
#include <ableton/link/NodeState.hpp>

namespace ableton
{
namespace link
{

// A state type for peers. PeerState stores the normal NodeState plus
// additional information (the remote endpoint at which to find its
// ping/pong measurement server).

struct PeerState
{
using IdType = NodeId;

IdType ident() const
{
return nodeState.ident();
}

SessionId sessionId() const
{
return nodeState.sessionId;
}

Timeline timeline() const
{
return nodeState.timeline;
}

friend bool operator==(const PeerState& lhs, const PeerState& rhs)
{
return lhs.nodeState == rhs.nodeState && lhs.endpoint == rhs.endpoint;
}

friend auto toPayload(const PeerState& state)
-> decltype(std::declval<NodeState::Payload>()
+ discovery::makePayload(MeasurementEndpointV4{{}}))
{
return toPayload(state.nodeState)
+ discovery::makePayload(MeasurementEndpointV4{state.endpoint});
}

template <typename It>
static PeerState fromPayload(NodeId id, It begin, It end)
{
using namespace std;
auto peerState = PeerState{NodeState::fromPayload(move(id), begin, end), {}};

discovery::parsePayload<MeasurementEndpointV4>(move(begin), move(end),
[&peerState](MeasurementEndpointV4 me4) { peerState.endpoint = move(me4.ep); });
return peerState;
}

NodeState nodeState;
asio::ip::udp::endpoint endpoint;
};

} // namespace link
} // namespace ableton
355 changes: 355 additions & 0 deletions link/ableton/link/Peers.hpp
@@ -0,0 +1,355 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/link/PeerState.hpp>
#include <ableton/util/Injected.hpp>
#include <cassert>

namespace ableton
{
namespace link
{

// SessionMembershipCallback is invoked when any change to session
// membership occurs (when any peer joins or leaves a session)
//
// SessionTimelineCallback is invoked with a session id and a timeline
// whenever a new combination of these values is seen

template <typename IoContext,
typename SessionMembershipCallback,
typename SessionTimelineCallback>
class Peers
{
// non-movable private implementation type
struct Impl;

public:
using Peer = std::pair<PeerState, asio::ip::address>;

Peers(util::Injected<IoContext> io,
SessionMembershipCallback membership,
SessionTimelineCallback timeline)
: mpImpl(
std::make_shared<Impl>(std::move(io), std::move(membership), std::move(timeline)))
{
}

// The set of peers for a given session, ordered by (peerId, addr).
// The result will possibly contain multiple entries for the same
// peer if it is visible through multiple gateways.
std::vector<Peer> sessionPeers(const SessionId& sid) const
{
using namespace std;
vector<Peer> result;
auto& peerVec = mpImpl->mPeers;
copy_if(begin(peerVec), end(peerVec), back_inserter(result), SessionMemberPred{sid});
return result;
}

// Number of individual for a given session.
std::size_t uniqueSessionPeerCount(const SessionId& sid) const
{
using namespace std;
auto peerVec = sessionPeers(sid);
auto last = unique(begin(peerVec), end(peerVec),
[](const Peer& a, const Peer& b) { return a.first.ident() == b.first.ident(); });
return static_cast<size_t>(distance(begin(peerVec), last));
}

void setSessionTimeline(const SessionId& sid, const Timeline& tl)
{
// Set the cached timeline for all peers to a new client-specified
// timeline. When we make a timeline change, we do so
// optimistically and clients assume that all peers in a session
// have adopted the newly specified timeline. We must represent
// this in our cache or else we risk failing to notify about a
// higher-priority peer timeline that was already seen.
for (auto& peer : mpImpl->mPeers)
{
if (peer.first.sessionId() == sid)
{
peer.first.nodeState.timeline = tl;
}
}
}

// Purge all cached peers that are members of the given session
void forgetSession(const SessionId& sid)
{
using namespace std;
auto& peerVec = mpImpl->mPeers;
peerVec.erase(
remove_if(begin(peerVec), end(peerVec), SessionMemberPred{sid}), end(peerVec));
}

void resetPeers()
{
mpImpl->mPeers.clear();
}

// Observer type that monitors peer discovery on a particular
// gateway and relays the information to a Peers instance.
// Models the PeerObserver concept from the discovery module.
struct GatewayObserver
{
using GatewayObserverNodeState = PeerState;
using GatewayObserverNodeId = NodeId;

GatewayObserver(std::shared_ptr<Impl> pImpl, asio::ip::address addr)
: mpImpl(std::move(pImpl))
, mAddr(std::move(addr))
{
}
GatewayObserver(const GatewayObserver&) = delete;

GatewayObserver(GatewayObserver&& rhs)
: mpImpl(std::move(rhs.mpImpl))
, mAddr(std::move(rhs.mAddr))
{
}

~GatewayObserver()
{
// Check to handle the moved from case
if (mpImpl)
{
auto& io = *mpImpl->mIo;
io.async(Deleter{*this});
}
}

// model the PeerObserver concept from discovery
friend void sawPeer(GatewayObserver& observer, const PeerState& state)
{
auto pImpl = observer.mpImpl;
auto addr = observer.mAddr;
assert(pImpl);
pImpl->mIo->async([pImpl, addr, state] {
pImpl->sawPeerOnGateway(std::move(state), std::move(addr));
});
}

friend void peerLeft(GatewayObserver& observer, const NodeId& id)
{
auto pImpl = observer.mpImpl;
auto addr = observer.mAddr;
pImpl->mIo->async(
[pImpl, addr, id] { pImpl->peerLeftGateway(std::move(id), std::move(addr)); });
}

friend void peerTimedOut(GatewayObserver& observer, const NodeId& id)
{
auto pImpl = observer.mpImpl;
auto addr = observer.mAddr;
pImpl->mIo->async(
[pImpl, addr, id] { pImpl->peerLeftGateway(std::move(id), std::move(addr)); });
}

struct Deleter
{
Deleter(GatewayObserver& observer)
: mpImpl(std::move(observer.mpImpl))
, mAddr(std::move(observer.mAddr))
{
}

void operator()()
{
mpImpl->gatewayClosed(mAddr);
}

std::shared_ptr<Impl> mpImpl;
asio::ip::address mAddr;
};

std::shared_ptr<Impl> mpImpl;
asio::ip::address mAddr;
};

// Factory function for the gateway observer
friend GatewayObserver makeGatewayObserver(Peers& peers, asio::ip::address addr)
{
return GatewayObserver{peers.mpImpl, std::move(addr)};
}

private:
struct Impl
{
Impl(util::Injected<IoContext> io,
SessionMembershipCallback membership,
SessionTimelineCallback timeline)
: mIo(std::move(io))
, mSessionMembershipCallback(std::move(membership))
, mSessionTimelineCallback(std::move(timeline))
{
}

void sawPeerOnGateway(PeerState peerState, asio::ip::address gatewayAddr)
{
using namespace std;

const auto peerSession = peerState.sessionId();
const auto peerTimeline = peerState.timeline();
bool isNewSessionTimeline = false;
bool didSessionMembershipChange = false;
{
isNewSessionTimeline = !sessionTimelineExists(peerSession, peerTimeline);

auto peer = make_pair(move(peerState), move(gatewayAddr));
const auto idRange = equal_range(begin(mPeers), end(mPeers), peer, PeerIdComp{});

if (idRange.first == idRange.second)
{
// This peer is not currently known on any gateway
didSessionMembershipChange = true;
mPeers.insert(move(idRange.first), move(peer));
}
else
{
// We've seen this peer before... does it have a new session?
didSessionMembershipChange =
all_of(idRange.first, idRange.second, [&peerSession](const Peer& test) {
return test.first.sessionId() != peerSession;
});

// was it on this gateway?
const auto addrRange =
equal_range(idRange.first, idRange.second, peer, AddrComp{});

if (addrRange.first == addrRange.second)
{
// First time on this gateway, add it
mPeers.insert(move(addrRange.first), move(peer));
}
else
{
// We have an entry for this peer on this gateway, update it
*addrRange.first = move(peer);
}
}
} // end lock

// Invoke callbacks outside the critical section
if (isNewSessionTimeline)
{
mSessionTimelineCallback(peerSession, peerTimeline);
}

if (didSessionMembershipChange)
{
mSessionMembershipCallback();
}
}

void peerLeftGateway(const NodeId& nodeId, const asio::ip::address& gatewayAddr)
{
using namespace std;

bool didSessionMembershipChange = false;
{
auto it = find_if(begin(mPeers), end(mPeers), [&](const Peer& peer) {
return peer.first.ident() == nodeId && peer.second == gatewayAddr;
});

if (it != end(mPeers))
{
mPeers.erase(move(it));
didSessionMembershipChange = true;
}
} // end lock

if (didSessionMembershipChange)
{
mSessionMembershipCallback();
}
}

void gatewayClosed(const asio::ip::address& gatewayAddr)
{
using namespace std;

{
mPeers.erase(
remove_if(begin(mPeers), end(mPeers),
[&gatewayAddr](const Peer& peer) { return peer.second == gatewayAddr; }),
end(mPeers));
} // end lock

mSessionMembershipCallback();
}

bool sessionTimelineExists(const SessionId& session, const Timeline& tl)
{
using namespace std;
return find_if(begin(mPeers), end(mPeers), [&](const Peer& peer) {
return peer.first.sessionId() == session && peer.first.timeline() == tl;
}) != end(mPeers);
}

struct PeerIdComp
{
bool operator()(const Peer& lhs, const Peer& rhs) const
{
return lhs.first.ident() < rhs.first.ident();
}
};

struct AddrComp
{
bool operator()(const Peer& lhs, const Peer& rhs) const
{
return lhs.second < rhs.second;
}
};

util::Injected<IoContext> mIo;
SessionMembershipCallback mSessionMembershipCallback;
SessionTimelineCallback mSessionTimelineCallback;
std::vector<Peer> mPeers; // sorted by peerId, unique by (peerId, addr)
};

struct SessionMemberPred
{
bool operator()(const Peer& peer) const
{
return peer.first.sessionId() == sid;
}

const SessionId& sid;
};

std::shared_ptr<Impl> mpImpl;
};

template <typename Io,
typename SessionMembershipCallback,
typename SessionTimelineCallback>
Peers<Io, SessionMembershipCallback, SessionTimelineCallback> makePeers(
util::Injected<Io> io,
SessionMembershipCallback membershipCallback,
SessionTimelineCallback timelineCallback)
{
return {std::move(io), std::move(membershipCallback), std::move(timelineCallback)};
}

} // namespace link
} // namespace ableton
100 changes: 100 additions & 0 deletions link/ableton/link/Phase.hpp
@@ -0,0 +1,100 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/link/Beats.hpp>
#include <ableton/link/Timeline.hpp>
#include <chrono>

namespace ableton
{
namespace link
{

// Returns a value in the range [0,quantum) corresponding to beats %
// quantum except that negative beat values are handled correctly.
// If the given quantum is zero, returns zero.
inline Beats phase(const Beats beats, const Beats quantum)
{
if (quantum == Beats{INT64_C(0)})
{
return Beats{INT64_C(0)};
}
else
{
// Handle negative beat values by doing the computation relative to an
// origin that is on the nearest quantum boundary less than -(abs(x))
const auto quantumMicros = quantum.microBeats();
const auto quantumBins = (llabs(beats.microBeats()) + quantumMicros) / quantumMicros;
const std::int64_t quantumBeats{quantumBins * quantumMicros};
return (beats + Beats{quantumBeats}) % quantum;
}
}

// Return the least value greater than x that matches the phase of
// target with respect to the given quantum. If the given quantum
// quantum is 0, x is returned.
inline Beats nextPhaseMatch(const Beats x, const Beats target, const Beats quantum)
{
const auto desiredPhase = phase(target, quantum);
const auto xPhase = phase(x, quantum);
const auto phaseDiff = (desiredPhase - xPhase + quantum) % quantum;
return x + phaseDiff;
}

// Return the closest value to x that matches the phase of the target
// with respect to the given quantum. The result deviates from x by at
// most quantum/2, but may be less than x.
inline Beats closestPhaseMatch(const Beats x, const Beats target, const Beats quantum)
{
return nextPhaseMatch(x - Beats{0.5 * quantum.floating()}, target, quantum);
}

// Interprets the given timeline as encoding a quantum boundary at its
// origin. Given such a timeline, returns a phase-encoded beat value
// relative to the given quantum that corresponds to the given
// time. The phase of the resulting beat value can be calculated with
// phase(beats, quantum). The result will deviate by up to +-
// (quantum/2) beats compared to the result of tl.toBeats(time).
inline Beats toPhaseEncodedBeats(
const Timeline& tl, const std::chrono::microseconds time, const Beats quantum)
{
const auto beat = tl.toBeats(time);
return closestPhaseMatch(beat, beat - tl.beatOrigin, quantum);
}

// The inverse of toPhaseEncodedBeats. Given a phase encoded beat
// value from the given timeline and quantum, find the time value that
// it maps to.
inline std::chrono::microseconds fromPhaseEncodedBeats(
const Timeline& tl, const Beats beat, const Beats quantum)
{
const auto fromOrigin = beat - tl.beatOrigin;
const auto originOffset = fromOrigin - phase(fromOrigin, quantum);
// invert the phase calculation so that it always rounds up in the
// middle instead of down like closestPhaseMatch. Otherwise we'll
// end up rounding down twice when a value is at phase quantum/2.
const auto inversePhaseOffset = closestPhaseMatch(
quantum - phase(fromOrigin, quantum), quantum - phase(beat, quantum), quantum);
return tl.fromBeats(tl.beatOrigin + originOffset + quantum - inversePhaseOffset);
}

} // link
} // ableton
185 changes: 185 additions & 0 deletions link/ableton/link/PingResponder.hpp
@@ -0,0 +1,185 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/link/GhostXForm.hpp>
#include <ableton/link/PayloadEntries.hpp>
#include <ableton/link/SessionId.hpp>
#include <ableton/link/v1/Messages.hpp>
#include <ableton/platforms/asio/AsioWrapper.hpp>
#include <ableton/util/Injected.hpp>
#include <ableton/util/SafeAsyncHandler.hpp>
#include <chrono>
#include <memory>
#include <thread>

namespace ableton
{
namespace link
{

template <typename Io, typename Clock, typename Socket, typename Log>
class PingResponder
{
public:
PingResponder(asio::ip::address_v4 address,
SessionId sessionId,
GhostXForm ghostXForm,
util::Injected<Io> io,
Clock clock,
util::Injected<Log> log)
: mIo(std::move(io))
, mpImpl(std::make_shared<Impl>(*mIo,
std::move(address),
std::move(sessionId),
std::move(ghostXForm),
std::move(clock),
std::move(log)))
{
mpImpl->listen();
}

PingResponder(const PingResponder&) = delete;
PingResponder(PingResponder&&) = delete;

~PingResponder()
{
// post the release of the impl object into the io service so that
// it happens in the same thread as its handlers
auto pImpl = mpImpl;
mIo->post([pImpl]() mutable { pImpl.reset(); });
}

void updateNodeState(const SessionId& sessionId, const GhostXForm& xform)
{
auto pImpl = mpImpl;
mIo->post([pImpl, sessionId, xform] {
pImpl->mSessionId = std::move(sessionId);
pImpl->mGhostXForm = std::move(xform);
});
}

asio::ip::udp::endpoint endpoint() const
{
return mpImpl->mSocket.endpoint();
}

asio::ip::address address() const
{
return endpoint().address();
}

Socket socket() const
{
return mpImpl->mSocket;
}

private:
struct Impl : std::enable_shared_from_this<Impl>
{
Impl(typename util::Injected<Io>::type& io,
asio::ip::address_v4 address,
SessionId sessionId,
GhostXForm ghostXForm,
Clock clock,
util::Injected<Log> log)
: mSessionId(std::move(sessionId))
, mGhostXForm(std::move(ghostXForm))
, mClock(std::move(clock))
, mLog(std::move(log))
, mSocket(io)
{
configureUnicastSocket(mSocket, address);
}

void listen()
{
mSocket.receive(util::makeAsyncSafe(this->shared_from_this()));
}

// Operator to handle incoming messages on the interface
template <typename It>
void operator()(const asio::ip::udp::endpoint& from, const It begin, const It end)
{
using namespace discovery;

// Decode Ping Message
const auto result = link::v1::parseMessageHeader(begin, end);
const auto& header = result.first;
const auto payloadBegin = result.second;

// Check Payload size
const auto payloadSize = static_cast<std::size_t>(std::distance(payloadBegin, end));
const auto maxPayloadSize =
sizeInByteStream(makePayload(HostTime{}, PrevGHostTime{}));
if (header.messageType == v1::kPing && payloadSize <= maxPayloadSize)
{
debug(*mLog) << "Received ping message from " << from;

try
{
reply(std::move(payloadBegin), std::move(end), from);
}
catch (const std::runtime_error& err)
{
info(*mLog) << "Failed to send pong to " << from << ". Reason: " << err.what();
}
}
else
{
info(*mLog) << "Received invalid Message from " << from << ".";
}
listen();
}

template <typename It>
void reply(It begin, It end, const asio::ip::udp::endpoint& to)
{
using namespace discovery;

// Encode Pong Message
const auto id = SessionMembership{mSessionId};
const auto currentGt = GHostTime{mGhostXForm.hostToGhost(mClock.micros())};
const auto pongPayload = makePayload(id, currentGt);

v1::MessageBuffer pongBuffer;
const auto pongMsgBegin = std::begin(pongBuffer);
auto pongMsgEnd = v1::pongMessage(pongPayload, pongMsgBegin);
// Append ping payload to pong message.
pongMsgEnd = std::copy(begin, end, pongMsgEnd);

const auto numBytes =
static_cast<std::size_t>(std::distance(pongMsgBegin, pongMsgEnd));
mSocket.send(pongBuffer.data(), numBytes, to);
}

SessionId mSessionId;
GhostXForm mGhostXForm;
Clock mClock;
util::Injected<Log> mLog;
Socket mSocket;
};

util::Injected<Io> mIo;
std::shared_ptr<Impl> mpImpl;
};

} // namespace link
} // namespace ableton
65 changes: 65 additions & 0 deletions link/ableton/link/SessionId.hpp
@@ -0,0 +1,65 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/link/NodeId.hpp>

namespace ableton
{
namespace link
{

// SessionIds occupy the same value space as NodeIds and are
// identified by their founding node.
using SessionId = NodeId;

// A payload entry indicating membership in a particular session
struct SessionMembership
{
enum
{
key = 'sess'
};

// Model the NetworkByteStreamSerializable concept
friend std::uint32_t sizeInByteStream(const SessionMembership& sm)
{
return discovery::sizeInByteStream(sm.sessionId);
}

template <typename It>
friend It toNetworkByteStream(const SessionMembership& sm, It out)
{
return discovery::toNetworkByteStream(sm.sessionId, std::move(out));
}

template <typename It>
static std::pair<SessionMembership, It> fromNetworkByteStream(It begin, It end)
{
using namespace std;
auto idRes = SessionId::fromNetworkByteStream(move(begin), move(end));
return make_pair(SessionMembership{move(idRes.first)}, move(idRes.second));
}

SessionId sessionId;
};

} // namespace link
} // namespace ableton
303 changes: 303 additions & 0 deletions link/ableton/link/Sessions.hpp
@@ -0,0 +1,303 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/link/GhostXForm.hpp>
#include <ableton/link/SessionId.hpp>
#include <ableton/link/Timeline.hpp>

namespace ableton
{
namespace link
{

struct SessionMeasurement
{
GhostXForm xform;
std::chrono::microseconds timestamp;
};

struct Session
{
SessionId sessionId;
Timeline timeline;
SessionMeasurement measurement;
};

template <typename Peers,
typename MeasurePeer,
typename JoinSessionCallback,
typename IoContext,
typename Clock>
class Sessions
{
public:
using Timer = typename util::Injected<IoContext>::type::Timer;

Sessions(Session init,
util::Injected<Peers> peers,
MeasurePeer measure,
JoinSessionCallback join,
util::Injected<IoContext> io,
Clock clock)
: mPeers(std::move(peers))
, mMeasure(std::move(measure))
, mCallback(std::move(join))
, mCurrent(std::move(init))
, mIo(std::move(io))
, mTimer(mIo->makeTimer())
, mClock(std::move(clock))
{
}

void resetSession(Session session)
{
mCurrent = std::move(session);
mOtherSessions.clear();
}

void resetTimeline(Timeline timeline)
{
mCurrent.timeline = std::move(timeline);
}

// Consider the observed session/timeline pair and return a possibly
// new timeline that should be used going forward.
Timeline sawSessionTimeline(SessionId sid, Timeline timeline)
{
using namespace std;
if (sid == mCurrent.sessionId)
{
// matches our current session, update the timeline if necessary
updateTimeline(mCurrent, move(timeline));
}
else
{
auto session = Session{move(sid), move(timeline), {}};
const auto range =
equal_range(begin(mOtherSessions), end(mOtherSessions), session, SessionIdComp{});
if (range.first == range.second)
{
// brand new session, insert it into our list of known
// sessions and launch a measurement
launchSessionMeasurement(session);
mOtherSessions.insert(range.first, move(session));
}
else
{
// we've seen this session before, update its timeline if necessary
updateTimeline(*range.first, move(timeline));
}
}
return mCurrent.timeline;
}

private:
void launchSessionMeasurement(Session& session)
{
using namespace std;
auto peers = mPeers->sessionPeers(session.sessionId);
if (!peers.empty())
{
// first criteria: always prefer the founding peer
const auto it = find_if(begin(peers), end(peers),
[&session](const Peer& peer) { return session.sessionId == peer.first.ident(); });
// TODO: second criteria should be degree. We don't have that
// represented yet so just use the first peer for now
auto peer = it == end(peers) ? peers.front() : *it;
// mark that a session is in progress by clearing out the
// session's timestamp
session.measurement.timestamp = {};
mMeasure(move(peer), MeasurementResultsHandler{*this, session.sessionId});
}
}

void handleSuccessfulMeasurement(const SessionId& id, GhostXForm xform)
{
using namespace std;

debug(mIo->log()) << "Session " << id << " measurement completed with result "
<< "(" << xform.slope << ", " << xform.intercept.count() << ")";

auto measurement = SessionMeasurement{move(xform), mClock.micros()};

if (mCurrent.sessionId == id)
{
mCurrent.measurement = move(measurement);
mCallback(mCurrent);
}
else
{
const auto range = equal_range(
begin(mOtherSessions), end(mOtherSessions), Session{id, {}, {}}, SessionIdComp{});

if (range.first != range.second)
{
const auto SESSION_EPS = chrono::microseconds{500000};
// should we join this session?
const auto hostTime = mClock.micros();
const auto curGhost = mCurrent.measurement.xform.hostToGhost(hostTime);
const auto newGhost = measurement.xform.hostToGhost(hostTime);
// update the measurement for the session entry
range.first->measurement = move(measurement);
// If session times too close - fall back to session id order
const auto ghostDiff = newGhost - curGhost;
if (ghostDiff > SESSION_EPS || (std::abs(ghostDiff.count()) < SESSION_EPS.count()
&& id < mCurrent.sessionId))
{
// The new session wins, switch over to it
auto current = mCurrent;
mCurrent = move(*range.first);
mOtherSessions.erase(range.first);
// Put the old current session back into our list of known
// sessions so that we won't re-measure it
const auto it = upper_bound(
begin(mOtherSessions), end(mOtherSessions), current, SessionIdComp{});
mOtherSessions.insert(it, move(current));
// And notify that we have a new session and make sure that
// we remeasure it periodically.
mCallback(mCurrent);
scheduleRemeasurement();
}
}
}
}

void scheduleRemeasurement()
{
// set a timer to re-measure the active session after a period
mTimer.expires_from_now(std::chrono::microseconds{30000000});
mTimer.async_wait([this](const typename Timer::ErrorCode e) {
if (!e)
{
launchSessionMeasurement(mCurrent);
scheduleRemeasurement();
}
});
}

void handleFailedMeasurement(const SessionId& id)
{
using namespace std;

debug(mIo->log()) << "Session " << id << " measurement failed.";

// if we failed to measure for our current session, schedule a
// retry in the future. Otherwise, remove the session from our set
// of known sessions (if it is seen again it will be measured as
// if new).
if (mCurrent.sessionId == id)
{
scheduleRemeasurement();
}
else
{
const auto range = equal_range(
begin(mOtherSessions), end(mOtherSessions), Session{id, {}, {}}, SessionIdComp{});
if (range.first != range.second)
{
mOtherSessions.erase(range.first);
mPeers->forgetSession(id);
}
}
}

void updateTimeline(Session& session, Timeline timeline)
{
// We use beat origin magnitude to prioritize sessions.
if (timeline.beatOrigin > session.timeline.beatOrigin)
{
debug(mIo->log()) << "Adopting peer timeline (" << timeline.tempo.bpm() << ", "
<< timeline.beatOrigin.floating() << ", "
<< timeline.timeOrigin.count() << ")";

session.timeline = std::move(timeline);
}
else
{
debug(mIo->log()) << "Rejecting peer timeline with beat origin: "
<< timeline.beatOrigin.floating()
<< ". Current timeline beat origin: "
<< session.timeline.beatOrigin.floating();
}
}

struct MeasurementResultsHandler
{
void operator()(GhostXForm xform) const
{
Sessions& sessions = mSessions;
const SessionId& sessionId = mSessionId;
if (xform == GhostXForm{})
{
mSessions.mIo->async([&sessions, sessionId] {
sessions.handleFailedMeasurement(std::move(sessionId));
});
}
else
{
mSessions.mIo->async([&sessions, sessionId, xform] {
sessions.handleSuccessfulMeasurement(std::move(sessionId), std::move(xform));
});
}
}

Sessions& mSessions;
SessionId mSessionId;
};

struct SessionIdComp
{
bool operator()(const Session& lhs, const Session& rhs) const
{
return lhs.sessionId < rhs.sessionId;
}
};

using Peer = typename util::Injected<Peers>::type::Peer;
util::Injected<Peers> mPeers;
MeasurePeer mMeasure;
JoinSessionCallback mCallback;
Session mCurrent;
util::Injected<IoContext> mIo;
Timer mTimer;
Clock mClock;
std::vector<Session> mOtherSessions; // sorted/unique by session id
};

template <typename Peers,
typename MeasurePeer,
typename JoinSessionCallback,
typename IoContext,
typename Clock>
Sessions<Peers, MeasurePeer, JoinSessionCallback, IoContext, Clock> makeSessions(
Session init,
util::Injected<Peers> peers,
MeasurePeer measure,
JoinSessionCallback join,
util::Injected<IoContext> io,
Clock clock)
{
using namespace std;
return {move(init), move(peers), move(measure), move(join), move(io), move(clock)};
}

} // namespace link
} // namespace ableton
90 changes: 90 additions & 0 deletions link/ableton/link/Tempo.hpp
@@ -0,0 +1,90 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/link/Beats.hpp>
#include <chrono>

namespace ableton
{
namespace link
{

struct Tempo : std::tuple<double>
{
Tempo() = default;

// Beats per minute
explicit Tempo(const double bpm)
: std::tuple<double>(bpm)
{
}

Tempo(const std::chrono::microseconds microsPerBeat)
: std::tuple<double>(60. * 1e6 / microsPerBeat.count())
{
}

double bpm() const
{
return std::get<0>(*this);
}

std::chrono::microseconds microsPerBeat() const
{
return std::chrono::microseconds{llround(60. * 1e6 / bpm())};
}

// Given the tempo, convert a time to a beat value
Beats microsToBeats(const std::chrono::microseconds micros) const
{
return Beats{micros.count() / static_cast<double>(microsPerBeat().count())};
}

// Given the tempo, convert a beat to a time value
std::chrono::microseconds beatsToMicros(const Beats beats) const
{
return std::chrono::microseconds{llround(beats.floating() * microsPerBeat().count())};
}

// Model the NetworkByteStreamSerializable concept
friend std::uint32_t sizeInByteStream(const Tempo tempo)
{
return discovery::sizeInByteStream(tempo.microsPerBeat());
}

template <typename It>
friend It toNetworkByteStream(const Tempo tempo, It out)
{
return discovery::toNetworkByteStream(tempo.microsPerBeat(), std::move(out));
}

template <typename It>
static std::pair<Tempo, It> fromNetworkByteStream(It begin, It end)
{
auto result =
discovery::Deserialize<std::chrono::microseconds>::fromNetworkByteStream(
std::move(begin), std::move(end));
return std::make_pair(Tempo{std::move(result.first)}, std::move(result.second));
}
};

} // namespace link
} // namespace ableton
99 changes: 99 additions & 0 deletions link/ableton/link/Timeline.hpp
@@ -0,0 +1,99 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/discovery/NetworkByteStreamSerializable.hpp>
#include <ableton/link/Beats.hpp>
#include <ableton/link/Tempo.hpp>
#include <cmath>
#include <cstdint>
#include <tuple>

namespace ableton
{
namespace link
{

// A tuple of (tempo, beats, time), with integral units
// based on microseconds. This type establishes a bijection between
// beats and wall time, given a valid tempo. It also serves as a
// payload entry.

struct Timeline
{
enum
{
key = 'tmln'
};

Beats toBeats(const std::chrono::microseconds time) const
{
return beatOrigin + tempo.microsToBeats(time - timeOrigin);
}

std::chrono::microseconds fromBeats(const Beats beats) const
{
return timeOrigin + tempo.beatsToMicros(beats - beatOrigin);
}

friend bool operator==(const Timeline& lhs, const Timeline& rhs)
{
return std::tie(lhs.tempo, lhs.beatOrigin, lhs.timeOrigin)
== std::tie(rhs.tempo, rhs.beatOrigin, rhs.timeOrigin);
}

friend bool operator!=(const Timeline& lhs, const Timeline& rhs)
{
return !(lhs == rhs);
}

// Model the NetworkByteStreamSerializable concept
friend std::uint32_t sizeInByteStream(const Timeline& tl)
{
return discovery::sizeInByteStream(std::tie(tl.tempo, tl.beatOrigin, tl.timeOrigin));
}

template <typename It>
friend It toNetworkByteStream(const Timeline& tl, It out)
{
return discovery::toNetworkByteStream(
std::tie(tl.tempo, tl.beatOrigin, tl.timeOrigin), std::move(out));
}

template <typename It>
static std::pair<Timeline, It> fromNetworkByteStream(It begin, It end)
{
using namespace std;
using namespace discovery;
Timeline timeline;
auto result =
Deserialize<tuple<Tempo, Beats, chrono::microseconds>>::fromNetworkByteStream(
move(begin), move(end));
tie(timeline.tempo, timeline.beatOrigin, timeline.timeOrigin) = move(result.first);
return make_pair(move(timeline), move(result.second));
}

Tempo tempo;
Beats beatOrigin;
std::chrono::microseconds timeOrigin;
};

} // namespace link
} // namespace ableton
138 changes: 138 additions & 0 deletions link/ableton/link/v1/Messages.hpp
@@ -0,0 +1,138 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/discovery/Payload.hpp>
#include <array>

namespace ableton
{
namespace link
{
namespace v1
{

// The maximum size of a message, in bytes
const std::size_t kMaxMessageSize = 512;
// Utility typedef for an array of bytes of maximum message size
using MessageBuffer = std::array<uint8_t, v1::kMaxMessageSize>;

using MessageType = uint8_t;

const MessageType kPing = 1;
const MessageType kPong = 2;

struct MessageHeader
{
MessageType messageType;

friend std::uint32_t sizeInByteStream(const MessageHeader& header)
{
return discovery::sizeInByteStream(header.messageType);
}

template <typename It>
friend It toNetworkByteStream(const MessageHeader& header, It out)
{
return discovery::toNetworkByteStream(header.messageType, std::move(out));
}

template <typename It>
static std::pair<MessageHeader, It> fromNetworkByteStream(It begin, const It end)
{
using namespace discovery;

MessageHeader header;
std::tie(header.messageType, begin) =
Deserialize<decltype(header.messageType)>::fromNetworkByteStream(begin, end);

return std::make_pair(std::move(header), std::move(begin));
}
};

namespace detail
{

// Types that are only used in the sending/parsing of messages, not
// publicly exposed.
using ProtocolHeader = std::array<char, 8>;
const ProtocolHeader kProtocolHeader = {{'_', 'l', 'i', 'n', 'k', '_', 'v', 1}};

// Must have at least kMaxMessageSize bytes available in the output stream
template <typename Payload, typename It>
It encodeMessage(const MessageType messageType, const Payload& payload, It out)
{
using namespace std;
const MessageHeader header = {messageType};
const auto messageSize =
kProtocolHeader.size() + sizeInByteStream(header) + sizeInByteStream(payload);

if (messageSize < kMaxMessageSize)
{
return toNetworkByteStream(
payload, toNetworkByteStream(
header, copy(begin(kProtocolHeader), end(kProtocolHeader), move(out))));
}
else
{
throw range_error("Exceeded maximum message size");
}
}

} // namespace detail

template <typename Payload, typename It>
It pingMessage(const Payload& payload, It out)
{
return detail::encodeMessage(kPing, payload, std::move(out));
}

template <typename Payload, typename It>
It pongMessage(const Payload& payload, It out)
{
return detail::encodeMessage(kPong, payload, std::move(out));
}

template <typename It>
std::pair<MessageHeader, It> parseMessageHeader(It bytesBegin, const It bytesEnd)
{
using ItDiff = typename std::iterator_traits<It>::difference_type;

MessageHeader header = {};
const auto protocolHeaderSize = discovery::sizeInByteStream(detail::kProtocolHeader);
const auto minMessageSize =
static_cast<ItDiff>(protocolHeaderSize + sizeInByteStream(header));

// If there are enough bytes in the stream to make a header and if
// the first bytes in the stream are the protocol header, then
// proceed to parse the stream.
if (std::distance(bytesBegin, bytesEnd) >= minMessageSize
&& std::equal(
begin(detail::kProtocolHeader), end(detail::kProtocolHeader), bytesBegin))
{
std::tie(header, bytesBegin) =
MessageHeader::fromNetworkByteStream(bytesBegin + protocolHeaderSize, bytesEnd);
}
return std::make_pair(std::move(header), std::move(bytesBegin));
}

} // namespace v1
} // namespace link
} // namespace ableton
66 changes: 66 additions & 0 deletions link/ableton/platforms/Config.hpp
@@ -0,0 +1,66 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/link/Controller.hpp>
#include <ableton/util/Log.hpp>

#if LINK_PLATFORM_WINDOWS
#include <ableton/platforms/asio/Context.hpp>
#include <ableton/platforms/windows/Clock.hpp>
#include <ableton/platforms/windows/ScanIpIfAddrs.hpp>
#elif LINK_PLATFORM_MACOSX
#include <ableton/platforms/asio/Context.hpp>
#include <ableton/platforms/darwin/Clock.hpp>
#include <ableton/platforms/posix/ScanIpIfAddrs.hpp>
#elif LINK_PLATFORM_LINUX
#include <ableton/platforms/asio/Context.hpp>
#include <ableton/platforms/posix/ScanIpIfAddrs.hpp>
#include <ableton/platforms/stl/Clock.hpp>
#endif

namespace ableton
{
namespace link
{
namespace platform
{

#if LINK_PLATFORM_WINDOWS
using Clock = platforms::windows::Clock;
using IoContext =
platforms::asio::Context<platforms::windows::ScanIpIfAddrs, util::NullLog>;

#elif LINK_PLATFORM_MACOSX
using Clock = platforms::darwin::Clock;
using IoContext =
platforms::asio::Context<platforms::posix::ScanIpIfAddrs, util::NullLog>;

#elif LINK_PLATFORM_LINUX
using Clock = platforms::stl::Clock;
using IoContext =
platforms::asio::Context<platforms::posix::ScanIpIfAddrs, util::NullLog>;
#endif

using Controller = Controller<PeerCountCallback, TempoCallback, Clock, IoContext>;

} // platform
} // link
} // ableton
105 changes: 105 additions & 0 deletions link/ableton/platforms/asio/AsioService.hpp
@@ -0,0 +1,105 @@
/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If you would like to incorporate Link into a proprietary software application,
* please contact <link-devs@ableton.com>.
*/

#pragma once

#include <ableton/platforms/asio/AsioTimer.hpp>
#include <ableton/platforms/asio/AsioWrapper.hpp>
#include <thread>

namespace ableton
{
namespace platforms
{
namespace asio
{

class AsioService
{
public:
using Timer = AsioTimer;

AsioService()
: AsioService(DefaultHandler{})
{
}

template <typename ExceptionHandler>
explicit AsioService(ExceptionHandler exceptHandler)
: mpWork(new ::asio::io_service::work(mService))
{
mThread =
std::thread{[](::asio::io_service& service, ExceptionHandler handler) {
for (;;)
{
try
{
service.run();
break;
}
catch (const typename ExceptionHandler::Exception& exception)
{
handler(exception);
}
}
},
std::ref(mService), std::move(exceptHandler)};
}

~AsioService()
{
mpWork.reset();
mThread.join();
}

AsioTimer makeTimer()
{
return {mService};
}

template <typename Handler>
void post(Handler handler)
{
mService.post(std::move(handler));
}

::asio::io_service mService;

private:
// Default handler is hidden and defines a hidden exception type
// that will never be thrown by other code, so it effectively does
// not catch.
struct DefaultHandler
{
struct Exception
{
};

void operator()(const Exception&)
{
}
};

std::unique_ptr<::asio::io_service::work> mpWork;
std::thread mThread;
};

} // namespace asio
} // namespace platforms
} // namespace ableton