From adc6febf7b770251170c897ee686957095ca45d9 Mon Sep 17 00:00:00 2001 From: Chris Date: Fri, 21 Oct 2022 20:52:44 -0500 Subject: [PATCH 01/27] WIP adding second websocket handling for cameras --- photon-client/src/main.js | 5 +- .../server/CameraSocketHandler.java | 158 ++++++++++++++++++ .../server/CameraSocketMessageType.java | 46 +++++ ...ketHandler.java => DataSocketHandler.java} | 14 +- ...geType.java => DataSocketMessageType.java} | 10 +- .../java/org/photonvision/server/Server.java | 21 ++- .../server/UIOutboundSubscriber.java | 4 +- 7 files changed, 236 insertions(+), 22 deletions(-) create mode 100644 photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java create mode 100644 photon-server/src/main/java/org/photonvision/server/CameraSocketMessageType.java rename photon-server/src/main/java/org/photonvision/server/{SocketHandler.java => DataSocketHandler.java} (97%) rename photon-server/src/main/java/org/photonvision/server/{SocketMessageType.java => DataSocketMessageType.java} (84%) diff --git a/photon-client/src/main.js b/photon-client/src/main.js index 81248f7c72..4f8f5077f6 100644 --- a/photon-client/src/main.js +++ b/photon-client/src/main.js @@ -15,11 +15,12 @@ if (process.env.NODE_ENV === "production") { Vue.prototype.$address = location.hostname + ":5800"; } -const wsURL = '//' + Vue.prototype.$address + '/websocket'; +const wsDataURL = '//' + Vue.prototype.$address + '/websocket_data'; +const wsCamerasURL = '//' + Vue.prototype.$address + '/websocket_cameras'; import VueNativeSock from 'vue-native-websocket'; -Vue.use(VueNativeSock, wsURL, { +Vue.use(VueNativeSock, wsDataURL, { reconnection: true, reconnectionDelay: 100, connectManually: true, diff --git a/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java b/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java new file mode 100644 index 0000000000..f0d6fe0159 --- /dev/null +++ b/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java @@ -0,0 +1,158 @@ +/* + * Copyright (C) Photon Vision. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.photonvision.server; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.javalin.websocket.WsBinaryMessageContext; +import io.javalin.websocket.WsCloseContext; +import io.javalin.websocket.WsConnectContext; +import io.javalin.websocket.WsContext; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.commons.lang3.tuple.Pair; +import org.msgpack.jackson.dataformat.MessagePackFactory; +import org.photonvision.common.dataflow.DataChangeDestination; +import org.photonvision.common.dataflow.DataChangeService; +import org.photonvision.common.dataflow.events.IncomingWebSocketEvent; +import org.photonvision.common.hardware.HardwareManager; +import org.photonvision.common.logging.LogGroup; +import org.photonvision.common.logging.Logger; +import org.photonvision.vision.pipeline.PipelineType; + +@SuppressWarnings("rawtypes") +public class CameraSocketHandler { + private final Logger logger = new Logger(CameraSocketHandler.class, LogGroup.WebServer); + private final List users = new CopyOnWriteArrayList<>(); + private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); + + public static class UIMap extends HashMap {} + + private static class ThreadSafeSingleton { + private static final CameraSocketHandler INSTANCE = new CameraSocketHandler(); + } + + public static CameraSocketHandler getInstance() { + return CameraSocketHandler.ThreadSafeSingleton.INSTANCE; + } + + private CameraSocketHandler() { + + } + + public void onConnect(WsConnectContext context) { + context.session.setIdleTimeout(Long.MAX_VALUE); // TODO: determine better value + var insa = context.session.getRemote().getInetSocketAddress(); + var host = insa.getAddress().toString() + ":" + insa.getPort(); + logger.info("New websocket connection from " + host); + users.add(context); + } + + protected void onClose(WsCloseContext context) { + var insa = context.session.getRemote().getInetSocketAddress(); + var host = insa.getAddress().toString() + ":" + insa.getPort(); + var reason = context.reason() != null ? context.reason() : "Connection closed by client"; + logger.info("Closing websocket connection from " + host + " for reason: " + reason); + users.remove(context); + + if (users.size() == 0) { + logger.info("All websocket connections are closed. Setting inputShouldShow to false."); + } + } + + @SuppressWarnings({"unchecked"}) + public void onBinaryMessage(WsBinaryMessageContext context) { + try { + Map deserializedData = + objectMapper.readValue(context.data(), new TypeReference<>() {}); + + // Special case the current camera index + var camIndexValue = deserializedData.get("cameraIndex"); + Integer cameraIndex = null; + if (camIndexValue instanceof Integer) { + cameraIndex = (Integer) camIndexValue; + deserializedData.remove("cameraIndex"); + } + + for (Map.Entry entry : deserializedData.entrySet()) { + try { + var entryKey = entry.getKey(); + var entryValue = entry.getValue(); + var socketMessageType = CameraSocketMessageType.fromEntryKey(entryKey); + + logger.trace( + () -> + "Got WS message: [" + + socketMessageType + + "] ==> [" + + entryKey + + "], [" + + entryValue + + "]"); + + if (socketMessageType == null) { + logger.warn("Got unknown socket message type: " + entryKey); + continue; + } + + switch (socketMessageType) { + case CSMT_CHANGEPIPELINETYPE: + { + // TODO: what is this event? + var data = (HashMap) entryValue; + + break; + } + } + } catch (Exception e) { + logger.error("Failed to parse message!", e); + } + } + } catch (IOException e) { + logger.error("Failed to deserialize message!", e); + } + } + + private void sendMessage(Object message, WsContext user) throws JsonProcessingException { + ByteBuffer b = ByteBuffer.wrap(objectMapper.writeValueAsBytes(message)); + user.send(b); + } + + public void broadcastMessage(Object message, WsContext userToSkip) + throws JsonProcessingException { + if (userToSkip == null) { + for (WsContext user : users) { + sendMessage(message, user); + } + } else { + var skipUserPort = userToSkip.session.getRemote().getInetSocketAddress().getPort(); + for (WsContext user : users) { + var userPort = user.session.getRemote().getInetSocketAddress().getPort(); + if (userPort != skipUserPort) { + sendMessage(message, user); + } + } + } + } +} diff --git a/photon-server/src/main/java/org/photonvision/server/CameraSocketMessageType.java b/photon-server/src/main/java/org/photonvision/server/CameraSocketMessageType.java new file mode 100644 index 0000000000..6e0269cf69 --- /dev/null +++ b/photon-server/src/main/java/org/photonvision/server/CameraSocketMessageType.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) Photon Vision. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.photonvision.server; + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +@SuppressWarnings("unused") +public enum CameraSocketMessageType { + CSMT_DRIVERMODE("todo_1"), + CSMT_CHANGEPIPELINETYPE("todo_1"); + + public final String entryKey; + + CameraSocketMessageType(String entryKey) { + this.entryKey = entryKey; + } + + private static final Map entryKeyToValueMap = new HashMap<>(); + + static { + for (var value : EnumSet.allOf(CameraSocketMessageType.class)) { + entryKeyToValueMap.put(value.entryKey, value); + } + } + + public static CameraSocketMessageType fromEntryKey(String entryKey) { + return entryKeyToValueMap.get(entryKey); + } +} diff --git a/photon-server/src/main/java/org/photonvision/server/SocketHandler.java b/photon-server/src/main/java/org/photonvision/server/DataSocketHandler.java similarity index 97% rename from photon-server/src/main/java/org/photonvision/server/SocketHandler.java rename to photon-server/src/main/java/org/photonvision/server/DataSocketHandler.java index 0a70379f93..4d769e7a5c 100644 --- a/photon-server/src/main/java/org/photonvision/server/SocketHandler.java +++ b/photon-server/src/main/java/org/photonvision/server/DataSocketHandler.java @@ -42,8 +42,8 @@ import org.photonvision.vision.pipeline.PipelineType; @SuppressWarnings("rawtypes") -public class SocketHandler { - private final Logger logger = new Logger(SocketHandler.class, LogGroup.WebServer); +public class DataSocketHandler { + private final Logger logger = new Logger(DataSocketHandler.class, LogGroup.WebServer); private final List users = new CopyOnWriteArrayList<>(); private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); private final DataChangeService dcService = DataChangeService.getInstance(); @@ -54,14 +54,14 @@ public class SocketHandler { public static class UIMap extends HashMap {} private static class ThreadSafeSingleton { - private static final SocketHandler INSTANCE = new SocketHandler(); + private static final DataSocketHandler INSTANCE = new DataSocketHandler(); } - public static SocketHandler getInstance() { - return SocketHandler.ThreadSafeSingleton.INSTANCE; + public static DataSocketHandler getInstance() { + return DataSocketHandler.ThreadSafeSingleton.INSTANCE; } - private SocketHandler() { + private DataSocketHandler() { dcService.addSubscribers( uiOutboundSubscriber, new UIInboundSubscriber()); // Subscribe outgoing messages to the data change service @@ -117,7 +117,7 @@ public void onBinaryMessage(WsBinaryMessageContext context) { try { var entryKey = entry.getKey(); var entryValue = entry.getValue(); - var socketMessageType = SocketMessageType.fromEntryKey(entryKey); + var socketMessageType = DataSocketMessageType.fromEntryKey(entryKey); logger.trace( () -> diff --git a/photon-server/src/main/java/org/photonvision/server/SocketMessageType.java b/photon-server/src/main/java/org/photonvision/server/DataSocketMessageType.java similarity index 84% rename from photon-server/src/main/java/org/photonvision/server/SocketMessageType.java rename to photon-server/src/main/java/org/photonvision/server/DataSocketMessageType.java index ff5ba3a36e..256695f2c8 100644 --- a/photon-server/src/main/java/org/photonvision/server/SocketMessageType.java +++ b/photon-server/src/main/java/org/photonvision/server/DataSocketMessageType.java @@ -22,7 +22,7 @@ import java.util.Map; @SuppressWarnings("unused") -public enum SocketMessageType { +public enum DataSocketMessageType { SMT_DRIVERMODE("driverMode"), SMT_CHANGECAMERANAME("changeCameraName"), SMT_CHANGEPIPELINENAME("changePipelineName"), @@ -40,19 +40,19 @@ public enum SocketMessageType { public final String entryKey; - SocketMessageType(String entryKey) { + DataSocketMessageType(String entryKey) { this.entryKey = entryKey; } - private static final Map entryKeyToValueMap = new HashMap<>(); + private static final Map entryKeyToValueMap = new HashMap<>(); static { - for (var value : EnumSet.allOf(SocketMessageType.class)) { + for (var value : EnumSet.allOf(DataSocketMessageType.class)) { entryKeyToValueMap.put(value.entryKey, value); } } - public static SocketMessageType fromEntryKey(String entryKey) { + public static DataSocketMessageType fromEntryKey(String entryKey) { return entryKeyToValueMap.get(entryKey); } } diff --git a/photon-server/src/main/java/org/photonvision/server/Server.java b/photon-server/src/main/java/org/photonvision/server/Server.java index 05bd00121e..7f7db5d111 100644 --- a/photon-server/src/main/java/org/photonvision/server/Server.java +++ b/photon-server/src/main/java/org/photonvision/server/Server.java @@ -61,15 +61,24 @@ public static void main(int port) { }))); }); - var socketHandler = SocketHandler.getInstance(); - /*Web Socket Events */ + /*Web Socket Events for Data Exchage */ + var dsHandler = DataSocketHandler.getInstance(); app.ws( - "/websocket", + "/websocket_data", ws -> { - ws.onConnect(socketHandler::onConnect); - ws.onClose(socketHandler::onClose); - ws.onBinaryMessage(socketHandler::onBinaryMessage); + ws.onConnect(dsHandler::onConnect); + ws.onClose(dsHandler::onClose); + ws.onBinaryMessage(dsHandler::onBinaryMessage); + }); + /*Web Socket Events for Camera Streaming */ + var camDsHandler = CameraSocketHandler.getInstance(); + app.ws( + "/websocket_cameras", + ws -> { + ws.onConnect(camDsHandler::onConnect); + ws.onClose(camDsHandler::onClose); + ws.onBinaryMessage(camDsHandler::onBinaryMessage); }); /*API Events*/ app.post("/api/settings/import", RequestHandler::onSettingUpload); diff --git a/photon-server/src/main/java/org/photonvision/server/UIOutboundSubscriber.java b/photon-server/src/main/java/org/photonvision/server/UIOutboundSubscriber.java index 18f9528ea9..e502b5eec9 100644 --- a/photon-server/src/main/java/org/photonvision/server/UIOutboundSubscriber.java +++ b/photon-server/src/main/java/org/photonvision/server/UIOutboundSubscriber.java @@ -35,9 +35,9 @@ class UIOutboundSubscriber extends DataChangeSubscriber { Logger logger = new Logger(UIOutboundSubscriber.class, LogGroup.WebServer); - private final SocketHandler socketHandler; + private final DataSocketHandler socketHandler; - public UIOutboundSubscriber(SocketHandler socketHandler) { + public UIOutboundSubscriber(DataSocketHandler socketHandler) { super(DataChangeSource.AllSources, Collections.singletonList(DataChangeDestination.DCD_UI)); this.socketHandler = socketHandler; } From d4f9a9d85ba396226558081c857e2e3cac9907cc Mon Sep 17 00:00:00 2001 From: Chris Date: Sat, 22 Oct 2022 09:25:52 -0500 Subject: [PATCH 02/27] just more WIP --- .../vision/processes/VisionModule.java | 17 +++ .../vision/videoStream/SocketVideoStream.java | 84 +++++++++++++ .../videoStream/SocketVideoStreamManager.java | 79 ++++++++++++ .../server/CameraSocketHandler.java | 112 +++++++++--------- .../server/CameraSocketMessageType.java | 4 +- .../server/DataSocketHandler.java | 14 +-- .../java/org/photonvision/server/Server.java | 1 + 7 files changed, 243 insertions(+), 68 deletions(-) create mode 100644 photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStream.java create mode 100644 photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java diff --git a/photon-core/src/main/java/org/photonvision/vision/processes/VisionModule.java b/photon-core/src/main/java/org/photonvision/vision/processes/VisionModule.java index 2bcad2a172..cf3685340e 100644 --- a/photon-core/src/main/java/org/photonvision/vision/processes/VisionModule.java +++ b/photon-core/src/main/java/org/photonvision/vision/processes/VisionModule.java @@ -48,6 +48,8 @@ import org.photonvision.vision.pipeline.result.CVPipelineResult; import org.photonvision.vision.target.TargetModel; import org.photonvision.vision.target.TrackedTarget; +import org.photonvision.vision.videoStream.SocketVideoStream; +import org.photonvision.vision.videoStream.SocketVideoStreamManager; /** * This is the God Class @@ -82,6 +84,9 @@ public class VisionModule { FileSaveFrameConsumer inputFrameSaver; FileSaveFrameConsumer outputFrameSaver; + SocketVideoStream inputVideoStreamer; + SocketVideoStream outputVideoStreamer; + public VisionModule(PipelineManager pipelineManager, VisionSource visionSource, int index) { logger = new Logger( @@ -168,6 +173,8 @@ public VisionModule(PipelineManager pipelineManager, VisionSource visionSource, private void destroyStreams() { dashboardInputStreamer.close(); dashboardOutputStreamer.close(); + SocketVideoStreamManager.getInstance().removeStream(inputVideoStreamer); + SocketVideoStreamManager.getInstance().removeStream(outputVideoStreamer); } private void createStreams() { @@ -188,6 +195,13 @@ private void createStreams() { outputFrameSaver = new FileSaveFrameConsumer( visionSource.getSettables().getConfiguration().nickname, "output"); + + inputVideoStreamer = new SocketVideoStream(inputStreamPort); + outputVideoStreamer = new SocketVideoStream(outputStreamPort); + SocketVideoStreamManager.getInstance().addStream(inputVideoStreamer); + SocketVideoStreamManager.getInstance().addStream(outputVideoStreamer); + + } private void recreateFpsLimitedResultConsumers() { @@ -196,6 +210,9 @@ private void recreateFpsLimitedResultConsumers() { rawResultConsumers.add((in, out, tgts) -> inputFrameSaver.accept(in)); fpsLimitedResultConsumers.add(result -> outputFrameSaver.accept(result.outputFrame)); + rawResultConsumers.add((in, out, tgts) -> inputVideoStreamer.accept(in)); + fpsLimitedResultConsumers.add(result -> outputVideoStreamer.accept(result.outputFrame)); + fpsLimitedResultConsumers.add( result -> { if (this.pipelineManager.getCurrentPipelineSettings().inputShouldShow) diff --git a/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStream.java b/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStream.java new file mode 100644 index 0000000000..06770f420c --- /dev/null +++ b/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStream.java @@ -0,0 +1,84 @@ +package org.photonvision.vision.videoStream; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +import org.opencv.core.MatOfByte; +import org.opencv.imgcodecs.Imgcodecs; +import org.photonvision.vision.frame.Frame; + +import io.javalin.websocket.WsContext; + +public class SocketVideoStream implements Consumer { + + int portID = 0; // Align with cscore's port for unique identification of stream + MatOfByte jpegBytes = null; + + // Gets set to true when another class reads out valid jpeg bytes at least once + // Set back to false when another frame is freshly converted + // Should eliminate synchronization issues of differeing rates of putting frames in + // and taking them back out + boolean frameWasConsumed = false; + + //Synclock around manipulating the jpeg bytes from multiple threads + Lock jpegBytesLock = new ReentrantLock(); + + Set subscribedUsers = new HashSet(); + + public SocketVideoStream(int portID){ + this.portID = portID; + } + + @Override + public void accept(Frame frame) { + if (subscribedUsers.size() > 0){ + if(jpegBytesLock.tryLock()){ + try{ + //Does a single-shot frame recieve and convert to JPEG for efficency + // Will not capture/convert again until convertNextFrame() is called + if(frame != null && !frame.image.getMat().empty() && jpegBytes == null) { + frameWasConsumed = false; + jpegBytes = new MatOfByte(); + Imgcodecs.imencode(".jpg", frame.image.getMat(), jpegBytes); + } + } finally { + jpegBytesLock.unlock(); + } + } + } + } + + public MatOfByte getJPEGBytes(){ + frameWasConsumed = true; + return jpegBytes; + } + + public void convertNextFrame(){ + if(frameWasConsumed){ + jpegBytesLock.lock(); + if(jpegBytes != null){ + jpegBytes.release(); + jpegBytes = null; + } + jpegBytesLock.unlock(); + } + } + + public void subscribeUser(WsContext user){ + subscribedUsers.add(user); + } + + public void unsubscribeUser(WsContext user){ + subscribedUsers.remove(user); + } + + public boolean userIsSubscribed(WsContext user){ + return subscribedUsers.contains(user); + } + +} diff --git a/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java b/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java new file mode 100644 index 0000000000..1d8323629f --- /dev/null +++ b/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java @@ -0,0 +1,79 @@ +package org.photonvision.vision.videoStream; + +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import org.opencv.core.MatOfByte; + +import edu.wpi.first.math.Pair; +import io.javalin.websocket.WsContext; + +public class SocketVideoStreamManager { + + private Map streams = new Hashtable(); + + private static class ThreadSafeSingleton { + private static final SocketVideoStreamManager INSTANCE = new SocketVideoStreamManager(); + } + + public static SocketVideoStreamManager getInstance() { + return ThreadSafeSingleton.INSTANCE; + } + + private SocketVideoStreamManager(){ + + } + + // Register a new available camera stream + public void addStream(SocketVideoStream newStream){ + streams.put(newStream.portID, newStream); + } + + // Remove a previously-added camera stream, and unsubscribe all users + public void removeStream(SocketVideoStream oldStream){ + streams.remove(oldStream.portID); + } + + // Indicate a user would like to subscribe to a camera stream and get frames from it periodically + public void addSubscription(WsContext user, int streamPortID){ + var stream = streams.get(streamPortID); + stream.subscribeUser(user); + } + + // Indicate a user would like to stop receiving one camera stream + public void removeSubscription(WsContext user, int streamPortID){ + var stream = streams.get(streamPortID); + stream.unsubscribeUser(user); + } + + // Indicate a user no longer should get any camera streams + public void removeAllSubscriptions(WsContext user){ + for (SocketVideoStream stream : streams.values()){ + stream.unsubscribeUser(user); + } + } + + // For a given user, return a list of ports and jpeg byte mats to transmit + public List> getSendFrames(WsContext user){ + var retList = new ArrayList>(); + + for (SocketVideoStream stream : streams.values()){ + if(stream.userIsSubscribed(user)){ + retList.add(new Pair(stream.portID, stream.getJPEGBytes())); + } + } + + return retList; + } + + // Causes all streams to "re-trigger" and recieve and convert their next mjpeg frame + // Only invoke this after all returned jpeg MatOfBytes have been used. + public void allStreamConvertNextFrame(){ + for (SocketVideoStream stream : streams.values()){ + stream.convertNextFrame(); + } + } + +} diff --git a/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java b/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java index f0d6fe0159..c76069c85b 100644 --- a/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java +++ b/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java @@ -19,33 +19,32 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.javalin.websocket.WsBinaryMessageContext; import io.javalin.websocket.WsCloseContext; import io.javalin.websocket.WsConnectContext; import io.javalin.websocket.WsContext; +import io.javalin.websocket.WsMessageContext; + import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.commons.lang3.tuple.Pair; import org.msgpack.jackson.dataformat.MessagePackFactory; -import org.photonvision.common.dataflow.DataChangeDestination; -import org.photonvision.common.dataflow.DataChangeService; -import org.photonvision.common.dataflow.events.IncomingWebSocketEvent; -import org.photonvision.common.hardware.HardwareManager; import org.photonvision.common.logging.LogGroup; import org.photonvision.common.logging.Logger; -import org.photonvision.vision.pipeline.PipelineType; +import org.photonvision.vision.videoStream.SocketVideoStreamManager; -@SuppressWarnings("rawtypes") public class CameraSocketHandler { private final Logger logger = new Logger(CameraSocketHandler.class, LogGroup.WebServer); private final List users = new CopyOnWriteArrayList<>(); - private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); + private final SocketVideoStreamManager svsManager = SocketVideoStreamManager.getInstance(); + + private Thread cameraBroadcastThread; + public static class UIMap extends HashMap {} @@ -58,14 +57,15 @@ public static CameraSocketHandler getInstance() { } private CameraSocketHandler() { - + cameraBroadcastThread = new Thread(this::broadcastFramesTask); + cameraBroadcastThread.start(); } public void onConnect(WsConnectContext context) { context.session.setIdleTimeout(Long.MAX_VALUE); // TODO: determine better value var insa = context.session.getRemote().getInetSocketAddress(); var host = insa.getAddress().toString() + ":" + insa.getPort(); - logger.info("New websocket connection from " + host); + logger.info("New camera websocket connection from " + host); users.add(context); } @@ -73,37 +73,27 @@ protected void onClose(WsCloseContext context) { var insa = context.session.getRemote().getInetSocketAddress(); var host = insa.getAddress().toString() + ":" + insa.getPort(); var reason = context.reason() != null ? context.reason() : "Connection closed by client"; - logger.info("Closing websocket connection from " + host + " for reason: " + reason); + logger.info("Closing camera websocket connection from " + host + " for reason: " + reason); + svsManager.removeAllSubscriptions(context); users.remove(context); - - if (users.size() == 0) { - logger.info("All websocket connections are closed. Setting inputShouldShow to false."); - } } - + @SuppressWarnings({"unchecked"}) - public void onBinaryMessage(WsBinaryMessageContext context) { + public void onMessage(WsMessageContext context) { + var messageStr = context.message(); + ObjectMapper mapper = new ObjectMapper(); try { - Map deserializedData = - objectMapper.readValue(context.data(), new TypeReference<>() {}); - - // Special case the current camera index - var camIndexValue = deserializedData.get("cameraIndex"); - Integer cameraIndex = null; - if (camIndexValue instanceof Integer) { - cameraIndex = (Integer) camIndexValue; - deserializedData.remove("cameraIndex"); - } + JsonNode actualObj = mapper.readTree(messageStr); - for (Map.Entry entry : deserializedData.entrySet()) { + for (var entry : actualObj) { try { - var entryKey = entry.getKey(); - var entryValue = entry.getValue(); + var entryKey = entry.textValue(); + var entryValue = entry.elements(); var socketMessageType = CameraSocketMessageType.fromEntryKey(entryKey); logger.trace( () -> - "Got WS message: [" + "Got Camera WS message: [" + socketMessageType + "] ==> [" + entryKey @@ -117,11 +107,22 @@ public void onBinaryMessage(WsBinaryMessageContext context) { } switch (socketMessageType) { - case CSMT_CHANGEPIPELINETYPE: + case CSMT_SUBSCRIBE: { - // TODO: what is this event? - var data = (HashMap) entryValue; - + while(entryValue.hasNext()){ + var portSpec = entryValue.next(); + int portId = portSpec.get("port").asInt(); + svsManager.addSubscription(context, portId); + } + break; + } + case CSMT_UNSUBSCRIBE: + { + while(entryValue.hasNext()){ + var portSpec = entryValue.next(); + int portId = portSpec.get("port").asInt(); + svsManager.removeSubscription(context, portId); + } break; } } @@ -129,29 +130,34 @@ public void onBinaryMessage(WsBinaryMessageContext context) { logger.error("Failed to parse message!", e); } } - } catch (IOException e) { - logger.error("Failed to deserialize message!", e); + } catch (JsonProcessingException e) { + logger.warn("Could not parse message \"" + messageStr + "\""); + e.printStackTrace(); + return; } + } - private void sendMessage(Object message, WsContext user) throws JsonProcessingException { - ByteBuffer b = ByteBuffer.wrap(objectMapper.writeValueAsBytes(message)); - user.send(b); + + @SuppressWarnings({"unchecked"}) + public void onBinaryMessage(WsBinaryMessageContext context) { + return; // ignoring binary messages for now } - public void broadcastMessage(Object message, WsContext userToSkip) - throws JsonProcessingException { - if (userToSkip == null) { - for (WsContext user : users) { - sendMessage(message, user); + private void broadcastFramesTask() { + // Background camera image broadcasting thread + while (!Thread.currentThread().isInterrupted()) { + + for(var user : users){ + var frames = svsManager.getSendFrames(user); } - } else { - var skipUserPort = userToSkip.session.getRemote().getInetSocketAddress().getPort(); - for (WsContext user : users) { - var userPort = user.session.getRemote().getInetSocketAddress().getPort(); - if (userPort != skipUserPort) { - sendMessage(message, user); - } + + svsManager.allStreamConvertNextFrame(); + + try { + Thread.sleep(5); + } catch (InterruptedException e) { + logger.error("Exception waiting for camera stream broadcast semaphor", e); } } } diff --git a/photon-server/src/main/java/org/photonvision/server/CameraSocketMessageType.java b/photon-server/src/main/java/org/photonvision/server/CameraSocketMessageType.java index 6e0269cf69..74841e2022 100644 --- a/photon-server/src/main/java/org/photonvision/server/CameraSocketMessageType.java +++ b/photon-server/src/main/java/org/photonvision/server/CameraSocketMessageType.java @@ -23,8 +23,8 @@ @SuppressWarnings("unused") public enum CameraSocketMessageType { - CSMT_DRIVERMODE("todo_1"), - CSMT_CHANGEPIPELINETYPE("todo_1"); + CSMT_SUBSCRIBE("subscribe"), + CSMT_UNSUBSCRIBE("unsubscribe"); public final String entryKey; diff --git a/photon-server/src/main/java/org/photonvision/server/DataSocketHandler.java b/photon-server/src/main/java/org/photonvision/server/DataSocketHandler.java index 4d769e7a5c..be8286208e 100644 --- a/photon-server/src/main/java/org/photonvision/server/DataSocketHandler.java +++ b/photon-server/src/main/java/org/photonvision/server/DataSocketHandler.java @@ -24,6 +24,7 @@ import io.javalin.websocket.WsCloseContext; import io.javalin.websocket.WsConnectContext; import io.javalin.websocket.WsContext; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -84,19 +85,6 @@ protected void onClose(WsCloseContext context) { var reason = context.reason() != null ? context.reason() : "Connection closed by client"; logger.info("Closing websocket connection from " + host + " for reason: " + reason); users.remove(context); - - if (users.size() == 0) { - logger.info("All websocket connections are closed. Setting inputShouldShow to false."); - - // cameraIndex -1 means the event is received by all cameras - dcService.publishEvent( - new IncomingWebSocketEvent<>( - DataChangeDestination.DCD_ACTIVEPIPELINESETTINGS, - "inputShouldShow", - false, - -1, - null)); - } } @SuppressWarnings({"unchecked"}) diff --git a/photon-server/src/main/java/org/photonvision/server/Server.java b/photon-server/src/main/java/org/photonvision/server/Server.java index 7f7db5d111..98e86f4461 100644 --- a/photon-server/src/main/java/org/photonvision/server/Server.java +++ b/photon-server/src/main/java/org/photonvision/server/Server.java @@ -79,6 +79,7 @@ public static void main(int port) { ws.onConnect(camDsHandler::onConnect); ws.onClose(camDsHandler::onClose); ws.onBinaryMessage(camDsHandler::onBinaryMessage); + ws.onMessage(camDsHandler::onMessage); }); /*API Events*/ app.post("/api/settings/import", RequestHandler::onSettingUpload); From 8aded76f2010ab98426470c361cc46608a9c3314 Mon Sep 17 00:00:00 2001 From: Chris Date: Sat, 22 Oct 2022 09:52:14 -0500 Subject: [PATCH 03/27] even more wip. Most java-side framework completed, but not yet debugged --- photon-client/src/main.js | 2 +- .../common/configuration/ConfigManager.java | 2 +- .../server/CameraSocketHandler.java | 28 +++++++++++++++---- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/photon-client/src/main.js b/photon-client/src/main.js index 4f8f5077f6..1b01de79b3 100644 --- a/photon-client/src/main.js +++ b/photon-client/src/main.js @@ -16,7 +16,7 @@ if (process.env.NODE_ENV === "production") { } const wsDataURL = '//' + Vue.prototype.$address + '/websocket_data'; -const wsCamerasURL = '//' + Vue.prototype.$address + '/websocket_cameras'; +//const wsCamerasURL = '//' + Vue.prototype.$address + '/websocket_cameras'; import VueNativeSock from 'vue-native-websocket'; diff --git a/photon-core/src/main/java/org/photonvision/common/configuration/ConfigManager.java b/photon-core/src/main/java/org/photonvision/common/configuration/ConfigManager.java index 4a0a2a9943..94aab34bf1 100644 --- a/photon-core/src/main/java/org/photonvision/common/configuration/ConfigManager.java +++ b/photon-core/src/main/java/org/photonvision/common/configuration/ConfigManager.java @@ -438,7 +438,7 @@ private void saveAndWriteTask() { try { Thread.sleep(1000); } catch (InterruptedException e) { - logger.error("Exception waiting for settings semaphor", e); + logger.error("Exception waiting for settings semaphore", e); } } } diff --git a/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java b/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java index c76069c85b..0ec09a7862 100644 --- a/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java +++ b/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java @@ -18,22 +18,20 @@ package org.photonvision.server; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + import io.javalin.websocket.WsBinaryMessageContext; import io.javalin.websocket.WsCloseContext; import io.javalin.websocket.WsConnectContext; import io.javalin.websocket.WsContext; import io.javalin.websocket.WsMessageContext; -import java.io.IOException; -import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; -import org.msgpack.jackson.dataformat.MessagePackFactory; import org.photonvision.common.logging.LogGroup; import org.photonvision.common.logging.Logger; import org.photonvision.vision.videoStream.SocketVideoStreamManager; @@ -149,7 +147,25 @@ private void broadcastFramesTask() { while (!Thread.currentThread().isInterrupted()) { for(var user : users){ + ObjectMapper mapper = new ObjectMapper(); + + ObjectNode sendData = mapper.createObjectNode(); + var frames = svsManager.getSendFrames(user); + var frameSendList = sendData.putArray("frameData"); + for(var frame : frames){ + var port = frame.getFirst(); + var bytes = frame.getSecond(); + if(bytes != null){ + ObjectNode frameData = mapper.createObjectNode(); + frameData.put("port", port); + frameData.put("data", bytes.toArray()); //todo actual encoding? + frameSendList.add(frameData); + } + } + //TODO - string encoding needed?? + user.send(sendData); + } svsManager.allStreamConvertNextFrame(); @@ -157,7 +173,7 @@ private void broadcastFramesTask() { try { Thread.sleep(5); } catch (InterruptedException e) { - logger.error("Exception waiting for camera stream broadcast semaphor", e); + logger.error("Exception waiting for camera stream broadcast semaphore", e); } } } From ddedeb4868ce530b1e93d14dc02ecb71684b2758 Mon Sep 17 00:00:00 2001 From: Chris Date: Sat, 22 Oct 2022 11:37:44 -0500 Subject: [PATCH 04/27] IT LIVES. Still needs lots of cleanup. But we're transferring and displaying data! --- photon-client/src/main.js | 1 - photon-client/src/plugins/ColorPicker.js | 2 +- .../src/plugins/WebsocketVideoStream.js | 33 ++++++++ photon-client/src/views/PipelineView.vue | 10 ++- .../vision/videoStream/SocketVideoStream.java | 28 ++++--- .../videoStream/SocketVideoStreamManager.java | 10 +-- .../server/CameraSocketHandler.java | 79 ++++++++----------- 7 files changed, 96 insertions(+), 67 deletions(-) create mode 100644 photon-client/src/plugins/WebsocketVideoStream.js diff --git a/photon-client/src/main.js b/photon-client/src/main.js index 1b01de79b3..fd1bcba833 100644 --- a/photon-client/src/main.js +++ b/photon-client/src/main.js @@ -16,7 +16,6 @@ if (process.env.NODE_ENV === "production") { } const wsDataURL = '//' + Vue.prototype.$address + '/websocket_data'; -//const wsCamerasURL = '//' + Vue.prototype.$address + '/websocket_cameras'; import VueNativeSock from 'vue-native-websocket'; diff --git a/photon-client/src/plugins/ColorPicker.js b/photon-client/src/plugins/ColorPicker.js index b887fd2da0..8c58235d24 100644 --- a/photon-client/src/plugins/ColorPicker.js +++ b/photon-client/src/plugins/ColorPicker.js @@ -5,7 +5,7 @@ function initColorPicker() { if (!canvas) canvas = document.createElement('canvas'); - image = document.querySelector('#normal-stream'); + image = document.querySelector('#raw-stream'); if (image !== null) { canvas.width = image.width; canvas.height = image.height; diff --git a/photon-client/src/plugins/WebsocketVideoStream.js b/photon-client/src/plugins/WebsocketVideoStream.js new file mode 100644 index 0000000000..00572cac95 --- /dev/null +++ b/photon-client/src/plugins/WebsocketVideoStream.js @@ -0,0 +1,33 @@ +var image = undefined; +const wsCamerasURL = "ws://" + window.location.host + "/websocket_cameras"; + + +function initVideoStream(port, drawDiv) { + + image = document.getElementById(drawDiv); + + var webSocket = new WebSocket(wsCamerasURL); + + webSocket.onopen = () => { + webSocket.send(JSON.stringify({"cmd": "subscribe", "port":1181})); + }; + + webSocket.onmessage = (event) => { + const msg = JSON.parse(event.data); + var images = msg["frameData"]; + + for(var img of images){ + if(img['port'] == port){ + image.setAttribute( + 'src', `data:image/jpeg;base64,${img['data']}` + ); + } + } + + + } + +} + + +export default {initVideoStream} diff --git a/photon-client/src/views/PipelineView.vue b/photon-client/src/views/PipelineView.vue index 30613b146e..f60a1f94bc 100644 --- a/photon-client/src/views/PipelineView.vue +++ b/photon-client/src/views/PipelineView.vue @@ -58,7 +58,7 @@ >
{ + self.streamer.initVideoStream(1181, "raw-stream"); + //self.streamer.initVideoStream(1182, "processed-stream"); + }); + }, created() { this.$store.state.connectedCallbacks.push(this.reloadStreams) }, diff --git a/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStream.java b/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStream.java index 06770f420c..5a3fe8ced4 100644 --- a/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStream.java +++ b/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStream.java @@ -1,8 +1,7 @@ package org.photonvision.vision.videoStream; +import java.util.Base64; import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -37,7 +36,7 @@ public SocketVideoStream(int portID){ @Override public void accept(Frame frame) { if (subscribedUsers.size() > 0){ - if(jpegBytesLock.tryLock()){ + if(jpegBytesLock.tryLock()){ //we assume frames are coming in frequently. Just skip this frame if we're locked doing something else. try{ //Does a single-shot frame recieve and convert to JPEG for efficency // Will not capture/convert again until convertNextFrame() is called @@ -53,20 +52,23 @@ public void accept(Frame frame) { } } - public MatOfByte getJPEGBytes(){ - frameWasConsumed = true; - return jpegBytes; + public String getJPEGBase64EncodedStr(){ + String sendStr = null; + jpegBytesLock.lock(); + if(jpegBytes != null){ + sendStr = Base64.getEncoder().encodeToString(jpegBytes.toArray()); + } + jpegBytesLock.unlock(); + return sendStr; } public void convertNextFrame(){ - if(frameWasConsumed){ - jpegBytesLock.lock(); - if(jpegBytes != null){ - jpegBytes.release(); - jpegBytes = null; - } - jpegBytesLock.unlock(); + jpegBytesLock.lock(); + if(jpegBytes != null){ + jpegBytes.release(); + jpegBytes = null; } + jpegBytesLock.unlock(); } public void subscribeUser(WsContext user){ diff --git a/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java b/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java index 1d8323629f..09f3009a14 100644 --- a/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java +++ b/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java @@ -5,8 +5,6 @@ import java.util.List; import java.util.Map; -import org.opencv.core.MatOfByte; - import edu.wpi.first.math.Pair; import io.javalin.websocket.WsContext; @@ -56,12 +54,12 @@ public void removeAllSubscriptions(WsContext user){ } // For a given user, return a list of ports and jpeg byte mats to transmit - public List> getSendFrames(WsContext user){ - var retList = new ArrayList>(); + public List> getSendFrames(WsContext user){ + var retList = new ArrayList>(); for (SocketVideoStream stream : streams.values()){ if(stream.userIsSubscribed(user)){ - retList.add(new Pair(stream.portID, stream.getJPEGBytes())); + retList.add(new Pair(stream.portID, stream.getJPEGBase64EncodedStr())); } } @@ -69,7 +67,7 @@ public List> getSendFrames(WsContext user){ } // Causes all streams to "re-trigger" and recieve and convert their next mjpeg frame - // Only invoke this after all returned jpeg MatOfBytes have been used. + // Only invoke this after all returned jpeg Strings have been used. public void allStreamConvertNextFrame(){ for (SocketVideoStream stream : streams.values()){ stream.convertNextFrame(); diff --git a/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java b/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java index 0ec09a7862..f266fe7acb 100644 --- a/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java +++ b/photon-server/src/main/java/org/photonvision/server/CameraSocketHandler.java @@ -29,6 +29,7 @@ import io.javalin.websocket.WsMessageContext; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -83,51 +84,38 @@ public void onMessage(WsMessageContext context) { try { JsonNode actualObj = mapper.readTree(messageStr); - for (var entry : actualObj) { - try { - var entryKey = entry.textValue(); - var entryValue = entry.elements(); - var socketMessageType = CameraSocketMessageType.fromEntryKey(entryKey); - - logger.trace( - () -> - "Got Camera WS message: [" - + socketMessageType - + "] ==> [" - + entryKey - + "], [" - + entryValue - + "]"); - - if (socketMessageType == null) { - logger.warn("Got unknown socket message type: " + entryKey); - continue; - } + try { + var entryCmd = actualObj.get("cmd").asText(); + var socketMessageType = CameraSocketMessageType.fromEntryKey(entryCmd); - switch (socketMessageType) { - case CSMT_SUBSCRIBE: - { - while(entryValue.hasNext()){ - var portSpec = entryValue.next(); - int portId = portSpec.get("port").asInt(); - svsManager.addSubscription(context, portId); - } - break; - } - case CSMT_UNSUBSCRIBE: - { - while(entryValue.hasNext()){ - var portSpec = entryValue.next(); - int portId = portSpec.get("port").asInt(); - svsManager.removeSubscription(context, portId); - } - break; - } - } - } catch (Exception e) { - logger.error("Failed to parse message!", e); + logger.trace( + () -> + "Got Camera WS message: [" + + socketMessageType + + "]"); + + if (socketMessageType == null) { + logger.warn("Got unknown socket message command: " + entryCmd); } + + switch (socketMessageType) { + case CSMT_SUBSCRIBE: + { + int portId = actualObj.get("port").asInt(); + svsManager.addSubscription(context, portId); + break; + } + case CSMT_UNSUBSCRIBE: + { + int portId = actualObj.get("port").asInt(); + svsManager.removeSubscription(context, portId); + break; + } + } + } catch (Exception e) { + logger.error("Failed to parse message!", e); } + } catch (JsonProcessingException e) { logger.warn("Could not parse message \"" + messageStr + "\""); e.printStackTrace(); @@ -155,11 +143,12 @@ private void broadcastFramesTask() { var frameSendList = sendData.putArray("frameData"); for(var frame : frames){ var port = frame.getFirst(); - var bytes = frame.getSecond(); - if(bytes != null){ + var sendStr = frame.getSecond(); + if(sendStr != null){ ObjectNode frameData = mapper.createObjectNode(); frameData.put("port", port); - frameData.put("data", bytes.toArray()); //todo actual encoding? + + frameData.put("data", sendStr); //todo actual encoding? frameSendList.add(frameData); } } From 364516187880c64a45957f0d2bce5d252478514c Mon Sep 17 00:00:00 2001 From: Chris Date: Sat, 22 Oct 2022 14:12:57 -0500 Subject: [PATCH 05/27] moved down an architecture layer. Improved multiple-camera handling --- .../src/components/common/cv-image.vue | 12 +-- .../src/plugins/WebsocketVideoStream.js | 90 ++++++++++++++++--- photon-client/src/store/index.js | 3 + photon-client/src/views/CamerasView.vue | 2 + photon-client/src/views/PipelineView.vue | 11 +-- .../videoStream/SocketVideoStreamManager.java | 20 ++++- 6 files changed, 105 insertions(+), 33 deletions(-) diff --git a/photon-client/src/components/common/cv-image.vue b/photon-client/src/components/common/cv-image.vue index 366cc0827c..71df1c62aa 100644 --- a/photon-client/src/components/common/cv-image.vue +++ b/photon-client/src/components/common/cv-image.vue @@ -13,7 +13,7 @@ export default { name: "CvImage", // eslint-disable-next-line vue/require-prop-types - props: ['address', 'scale', 'maxHeight', 'maxHeightMd', 'maxHeightLg', 'maxHeightXl', 'colorPicking', 'id', 'disconnected'], + props: ['address', 'port', 'scale', 'maxHeight', 'maxHeightMd', 'maxHeightLg', 'maxHeightXl', 'colorPicking', 'id', 'disconnected'], data() { return { seed: 1.0, @@ -46,18 +46,14 @@ return ret; } }, - src: { - get() { - return this.disconnected ? require("../../assets/noStream.jpg") : this.address + "?" + this.seed // This prevents caching - }, - }, }, mounted() { - this.reload(); // Force reload image on creation + var wsvs = require('../../plugins/WebsocketVideoStream'); + this.wsStream = new wsvs.WebsocketVideoStream(this.port, this.id); }, methods: { reload() { - this.seed = new Date().getTime(); + this.wsStream.setPort(this.port); } }, } diff --git a/photon-client/src/plugins/WebsocketVideoStream.js b/photon-client/src/plugins/WebsocketVideoStream.js index 00572cac95..e02342980e 100644 --- a/photon-client/src/plugins/WebsocketVideoStream.js +++ b/photon-client/src/plugins/WebsocketVideoStream.js @@ -1,33 +1,95 @@ -var image = undefined; -const wsCamerasURL = "ws://" + window.location.host + "/websocket_cameras"; -function initVideoStream(port, drawDiv) { +export class WebsocketVideoStream{ - image = document.getElementById(drawDiv); - var webSocket = new WebSocket(wsCamerasURL); + constructor(streamPort, drawDiv) { - webSocket.onopen = () => { - webSocket.send(JSON.stringify({"cmd": "subscribe", "port":1181})); - }; + this.image = document.getElementById(drawDiv); + this.streamPort = streamPort + this.serverAddr = "ws://" + window.location.host + "/websocket_cameras"; + this.setNoStream(); + this.ws_connect(); + } + + setNoStream() { + this.image.setAttribute('src', require("../assets/noStream.jpg")); + } + + startStream() { + if(this.serverConnectionActive == true){ + this.ws.send(JSON.stringify({"cmd": "subscribe", "port":this.streamPort})); + } + } + + stopStream() { + if(this.serverConnectionActive == true){ + this.ws.send(JSON.stringify({"cmd": "unsubscribe", "port":this.streamPort})); + } + } + + setPort(streamPort){ + this.stopStream(); + this.streamPort = streamPort + this.startStream(); + } + + ws_onOpen() { + // Set the flag allowing general server communication + this.serverConnectionActive = true; + console.log("Connected!"); + + this.startStream(); + } - webSocket.onmessage = (event) => { - const msg = JSON.parse(event.data); + ws_onClose(e) { + this.setNoStream(); + + //Clear flags to stop server communication + this.ws = null; + this.serverConnectionActive = false; + + console.log('Camera Socket is closed. Reconnect will be attempted in 0.5 second.', e.reason); + setTimeout(this.ws_connect.bind(this), 500); + + if(!e.wasClean){ + console.error('Socket encountered error!'); + } + + } + + ws_onError(e){ + e; //prevent unused failure + this.ws.close(); + } + + ws_onMessage(e){ + const msg = JSON.parse(e.data); var images = msg["frameData"]; for(var img of images){ - if(img['port'] == port){ - image.setAttribute( + if(img['port'] == this.streamPort){ + this.image.setAttribute( 'src', `data:image/jpeg;base64,${img['data']}` ); } } + } - + ws_connect() { + this.ws = new WebSocket(this.serverAddr); + this.ws.binaryType = "arraybuffer"; + this.ws.onopen = this.ws_onOpen.bind(this); + this.ws.onmessage = this.ws_onMessage.bind(this); + this.ws.onclose = this.ws_onClose.bind(this); + this.ws.onerror = this.ws_onError.bind(this); + console.log("Connecting to server " + this.serverAddr); } } -export default {initVideoStream} + + + +export default {WebsocketVideoStream} diff --git a/photon-client/src/store/index.js b/photon-client/src/store/index.js index 086b75260e..45be180a3e 100644 --- a/photon-client/src/store/index.js +++ b/photon-client/src/store/index.js @@ -259,6 +259,9 @@ export default new Vuex.Store({ streamAddress: state => ["http://" + location.hostname + ":" + state.cameraSettings[state.currentCameraIndex].inputStreamPort + "/stream.mjpg", "http://" + location.hostname + ":" + state.cameraSettings[state.currentCameraIndex].outputStreamPort + "/stream.mjpg"], + streamPort: state => + [state.cameraSettings[state.currentCameraIndex].inputStreamPort, + state.cameraSettings[state.currentCameraIndex].outputStreamPort], currentPipelineResults: state => { return state.pipelineResults; }, diff --git a/photon-client/src/views/CamerasView.vue b/photon-client/src/views/CamerasView.vue index 39a8f9dfdf..d71d35ec78 100644 --- a/photon-client/src/views/CamerasView.vue +++ b/photon-client/src/views/CamerasView.vue @@ -291,7 +291,9 @@ >