Skip to content

Commit

Permalink
Merge pull request #1 from RGerzaguet/New_ZMQ_Host
Browse files Browse the repository at this point in the history
New zmq host
  • Loading branch information
RGerzaguet committed Sep 14, 2020
2 parents cfb2d58 + cd00be8 commit 4872dd5
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 72 deletions.
71 changes: 30 additions & 41 deletions src/Host.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ export print;

struct SocketsuhdOverNetwork
ip::IPAddr;
data::Socket;
mdEH::Socket;
mdHE::Socket;
rtcSocket::Socket;
rttSocket::Socket;
brSocket::Socket;
end
mutable struct Configuration
carrierFreq::Float64;
Expand All @@ -65,38 +65,27 @@ export UHDOverNetwork;


function initSockets(ip::String)
# Define IPv4 adress
# uhdOverNetwork side --> Used for HE link
e310Adress = IPv4(ip);
# Host adress --> Used for data and feedback
hostAddress = IPv4("0.0.0.0");
# --- Creates the MD socket
# To push config to uhdOverNetwork
# TODO: => Switch the UDP socket for config push to ZMQ
mdHESocket = ZMQ.Socket(PUB)
ZMQ.connect(mdHESocket,"tcp://$e310Adress:1024");
# To get config from uhdOverNetwork
mdEHSocket = Socket(SUB);
tcpSys = string("tcp://$e310Adress:1025");
ZMQ.subscribe(mdEHSocket);
ZMQ.connect(mdEHSocket,tcpSys);
# Rx Data socket
dataSocket = Socket(SUB);
tcpSys = string("tcp://$e310Adress:1026");
ZMQ.subscribe(dataSocket);
ZMQ.connect(dataSocket,tcpSys);
# # Request To Receive (RTR) socket
# rtrSock = Socket(SUB);
# tcpSys = string("tcp://$e310Adress:5555");
# ZMQ.subscribe(rtrSock);
# ZMQ.connect(rtrSock,tcpSys);
# rtr = CustomZMQ(rtrSock,e310Adress,5555);
# Tx data socket
# TODO: => Create ZMQ socket for data push
# --- Connect to socket
# --- Format e310 IP address
e310Address = IPv4(ip);
# --- Configuration Socket
# Socket used to send configuration and get configuration back
rtcSocket = ZMQ.Socket(REQ);
ZMQ.connect(rtcSocket,"tcp://$e310Address:5555");

# --- Tx socket
# Socket used to transmit data from Host to e310 (Tx link)
rttSocket = ZMQ.Socket(REQ);
ZMQ.connect(rttSocket,"tcp://$e310Address:9999");

# --- Rx socket
# Socket used for e310 to broacast Rx stream
brSocket = Socket(SUB); # Define IPv4 adress
tcpSys = string("tcp://$e310Address:1111");
ZMQ.subscribe(brSocket);
ZMQ.connect(brSocket,tcpSys);

# --- Global socket packet
sockets = SocketsuhdOverNetwork(hostAddress,dataSocket,mdEHSocket,mdHESocket);
@info "2"
sockets = SocketsuhdOverNetwork(e310Address,rtcSocket,rttSocket,brSocket);
return sockets
end

Expand Down Expand Up @@ -143,12 +132,12 @@ end
function Base.close(uhdOverNetwork::UHDOverNetwork)
# @info "coucou"
# --- We close here all the related sockets
close(uhdOverNetwork.sockets.mdHE);
close(uhdOverNetwork.sockets.data);
close(uhdOverNetwork.sockets.mdEH);
close(uhdOverNetwork.sockets.rtcSocket);
close(uhdOverNetwork.sockets.rttSocket);
close(uhdOverNetwork.sockets.brSocket);
end

sendConfig(uhdOverNetwork::UHDOverNetwork,mess) = send(uhdOverNetwork.sockets.mdHE,mess)
sendConfig(uhdOverNetwork::UHDOverNetwork,mess) = send(uhdOverNetwork.sockets.rtcSocket,mess)


function updateCarrierFreq!(uhdOverNetwork::UHDOverNetwork,carrierFreq)
Expand Down Expand Up @@ -229,7 +218,7 @@ function recv!(sig::Vector{Complex{Cfloat}},uhdOverNetwork::UHDOverNetwork;packe
# radio.packetSize is the complex size, so x2
(posT+uhdOverNetwork.packetSize> packetSize) ? n = packetSize - posT : n = uhdOverNetwork.packetSize;
# --- UDP recv. This allocs. This is bad. No idea how to use prealloc pointer without rewriting the stack.
tmp = reinterpret(Complex{Cfloat},recv(uhdOverNetwork.sockets.data));
tmp = reinterpret(Complex{Cfloat},recv(uhdOverNetwork.sockets.brSocket));
sig[posT .+ (1:n)] .= @view tmp[1:n];
# --- Update counters
posT += n;
Expand All @@ -240,7 +229,7 @@ function recv!(sig::Vector{Complex{Cfloat}},uhdOverNetwork::UHDOverNetwork;packe
end

function getuhdOverNetworkConfig(uhdOverNetwork::UHDOverNetwork)
receiver = recv(uhdOverNetwork.sockets.mdEH);
receiver = recv(uhdOverNetwork.sockets.rtcSocket);
res = Meta.parse(String(receiver))
config = eval(res);
return Configuration(config...);
Expand All @@ -258,7 +247,7 @@ function getMD(uhdOverNetwork::UHDOverNetwork)
# --- Send the command
sendConfig(uhdOverNetwork,strF);
# --- Get the MD back
receiver = recv(uhdOverNetwork.sockets.mdEH);
receiver = recv(uhdOverNetwork.sockets.rtcSocket);
res = Meta.parse(String(receiver))
# --- Convert to a MD structure
md = eval(res)
Expand Down
51 changes: 20 additions & 31 deletions tests/minimalTransceiver.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ using UHDBindings
using Distributed
using ZMQ
@everywhere using Sockets
# --- Constant definition
# const HOST_ADDRESS = @ip_str "192.168.10.60";
const E310_ADRESS = @ip_str "0.0.0.0";
const PORTHE = 30000;
const PORTEH = 30000;

mutable struct Configuration
carrierFreq::Float64;
Expand Down Expand Up @@ -44,18 +39,15 @@ end
function main(carrierFreq, samplingRate, gain, nbSamples)
# --- Setting a very first configuration
global radio = openUHD(carrierFreq, samplingRate, gain);
# --- Create socket for data transmission
dataSocket = ZMQ.Socket(PUB);
bind(dataSocket,"tcp://*:1026");
# --- Create ZMQ socket for config transmission to Host
configEH = ZMQ.Socket(PUB)
bind(configEH,"tcp://*:1025");
# --- Create ZMQ socket for config reception from Host
configHE = ZMQ.Socket(SUB);
tcpSys = string("tcp://*:1024");
ZMQ.subscribe(configHE);
ZMQ.bind(configHE,tcpSys)
# ZMQ.connect(configHE,tcpSys);
# --- Configuration socket
rtcSocket = ZMQ.Socket(REP);
bind(rtcSocket,"tcp://*:5555");
# --- RTT socket for Tx
rttSocket = ZMQ.Socket(REP);
bind(rttSocket,"tcp://*:9999");
# --- Socket for broadcast Rx
brSocket = ZMQ.Socket(PUB);
bind(brSocket,"tcp://*:1111");
# --- Get samples
sig = zeros(Complex{Cfloat}, nbSamples);
cnt = 0;
Expand All @@ -70,7 +62,7 @@ function main(carrierFreq, samplingRate, gain, nbSamples)
# --- Interruption to update radio config
@async begin
# --- We wait in this @async for a reception
receiver = ZMQ.recv(configHE);
receiver = ZMQ.recv(rtcSocket);
@info "We have receive something from remote PC"
# --- Here, we have receive something
# Raise a flag because something happens
Expand All @@ -81,24 +73,23 @@ function main(carrierFreq, samplingRate, gain, nbSamples)
(requestConfig, requestMD, mode) = updateUHD!(radio, res);
if requestConfig
# --- Sending the configuration to Host
sendConfig(configEH, radio.rx, nbSamples);
sendConfig(rtcSocket, radio.rx, nbSamples);
end
if requestMD
sendMD(configEH, radio.rx);
sendMD(rtcSocket, radio.rx);
end
if mode == :tx
# reevaluate selection
D = eval(res);
# sig = D[:tx];
end
# --- Recreate a new socket due to the new config
end
while (!flag)
if mode == :rx
# --- Direct call to avoid allocation
recv!(sig, radio);
# --- To UDP socket
ZMQ.send(dataSocket,sig)
ZMQ.send(brSocket,sig)
yield();
else
# --- We now transmit data !
Expand All @@ -110,9 +101,9 @@ function main(carrierFreq, samplingRate, gain, nbSamples)
# --- Close UHD
close(radio);
# --- Close sockets
close(dataSocket);
close(configHE);
close(configEH);
close(rtcSocket);
close(rttSocket);
close(brSocket);
# --- Release USRP
@show exception;
end
Expand Down Expand Up @@ -153,20 +144,18 @@ function updateUHD!(radio, res)
end


function sendConfig(configEH, radio, nbSamples)
function sendConfig(rtcSocket, radio, nbSamples)
# --- get Configuration from radio
config = (radio.carrierFreq, radio.samplingRate, radio.gain, radio.antenna, nbSamples);
# --- Send config
strF = "$(config)";
# Sockets.send(configEH, HOST_ADDRESS, PORTEH, strF);
ZMQ.send(configEH, strF);
ZMQ.send(rtcSocket, strF);
end
function sendMD(configEH, radio)
function sendMD(rtcSocket, radio)
md = (getTimestamp(radio)..., Cint(getError(radio)));
# --- Send config
strF = "$(md)";
# Sockets.send(configEH, HOST_ADDRESS, PORTEH, strF);
ZMQ.send(configEH, strF);
ZMQ.send(rtcSocket, strF);
end

end
Expand Down

0 comments on commit 4872dd5

Please sign in to comment.