from valkka.core import * import time class Stream: def __init__(self, address, slot=1): # initialize properties self.ports = [] self.address =address # URL self.slot =slot self.live_thread_dict = {} ''' (LiveThread:livethread) -----------> {ForkFrameFilter: fork_filterN} | +---> (LiveThread:livethread) retransmitting to 1st port | +---> (LiveThread:livethread) retransmitting to 2nd port | ... | +---> (LiveThread:livethread) retransmitting to Nth port ''' for i in range(20): port = 60000 + i thread = {} thread['livethread'] =LiveThread("livethread") thread['livethread'].startCall() thread['live_in_filter'] =thread['livethread'].getFrameFilter() thread['out_ctx'] = LiveOutboundContext(LiveConnectionType_rtsp, "", self.slot, 0) thread['livethread'].setRTSPServer(port) self.live_thread_dict[port] = thread self.ports.append(port) # input livethread self.livethread =LiveThread("livethread") self.livethread.startCall() self.fork_framefilter =ForkFrameFilterN("fork_framefilter") for i in range(len(self.ports)): port = self.ports[i] self.fork_framefilter.connect('fork' + str(i), self.live_thread_dict[port]['live_in_filter']) # get context self.ctx =LiveConnectionContext(LiveConnectionType_rtsp, self.address, self.slot, self.fork_framefilter) self.ctx.msreconnect = 20000 for i in self.ports: out_ctx = self.live_thread_dict[i]['out_ctx'] self.live_thread_dict[i]['livethread'].registerOutboundCall(out_ctx) self.livethread.registerStreamCall(self.ctx) self.livethread.playStreamCall(self.ctx) def Stop(self): # stop threads self.livethread.stopStreamCall(self.ctx) self.livethread.deregisterStreamCall(self.ctx) for port in self.ports: out_ctx = self.live_thread_dict[port]['out_ctx'] self.live_thread_dict[port]['livethread'].deregisterOutboundCall(out_ctx) self.livethread.stopCall() for port in self.ports: self.live_thread_dict[port]['livethread'].stopCall() setLiveOutPacketBuffermaxSize(1000000) url = 'rtsp://YourUrlHere' myStream = Stream(url) time.sleep(180) myStream.Stop()