In [12]:
import cv2

def main():
    # RTSP URL
    rtsp_url = "rtsp://192.168.1.25:8554/main.264"

    # Open the RTSP stream
    cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
    cap.set(cv2.CAP_PROP_BUFFERSIZE,0)
    if not cap.isOpened():
        print("Error: Unable to open the RTSP stream")
        return -1

    # Get the video FPS
    fps = cap.get(cv2.CAP_PROP_FPS)
    print(f"FPS: {fps}")

    while True:
        # Read a frame from the RTSP stream
        ret, frame = cap.read()
        if not ret:
            print("Error: Unable to read frame from RTSP stream")
            break
        frame = cv2.resize(frame, (640, 480))

        # Display the frame
        cv2.imshow("RTSP Stream", frame)

        # Press 'q' to exit
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release resources
    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

FPS: 30.0


KeyboardInterrupt: 

In [1]:
import cv2 as cv
import queue
import threading

# 使用多线程读取视频帧，防止缓存累积
class VideoCapture:
    def __init__(self, name):
        self.cap = cv.VideoCapture(name)
        self.state = True
        self.q = queue.Queue()
        self.running = True  # Flag to indicate if the thread should keep running
        self.t = threading.Thread(target=self._reader)
        self.t.daemon = True
        self.t.start()

    def _reader(self):
        while self.running:
            ret, frame = self.cap.read()
            if not ret:
                break
            if not self.q.empty():
                try:
                    self.q.get_nowait()   # 丢弃前一个帧，保持最新
                except queue.Empty:
                    pass
            self.q.put(frame)
            self.state = ret

    def isOpened(self):
        return self.cap.isOpened()

    def read(self):
        return self.state, self.q.get()

    def stop(self):
        self.running = False
        self.t.join()  # 等待线程退出
        self.cap.release()

if __name__ == "__main__":
    # 使用 RTSP 流地址作为视频源
    rtsp_url = "rtsp://192.168.1.25:8554/main.264"  # 替换成您的 RTSP 地址

    # 初始化 VideoCapture 对象
    video = VideoCapture(rtsp_url)

    if not video.isOpened():
        print("无法打开 RTSP 流!")
    else:
        while True:
            ret, frame = video.read()
            if not ret:
                print("无法读取视频帧!")
                break
            
            cv.imshow("RTSP Stream", frame)
            
            # 按 'q' 键退出
            if cv.waitKey(1) & 0xFF == ord('q'):
                break

        # 释放视频资源
        video.stop()
        cv.destroyAllWindows()

QObject::moveToThread: Current thread (0xaaaadac03300) is not the object's thread (0xaaaadb96bf80).
Cannot move to target thread (0xaaaadac03300)

QObject::moveToThread: Current thread (0xaaaadac03300) is not the object's thread (0xaaaadb96bf80).
Cannot move to target thread (0xaaaadac03300)

QObject::moveToThread: Current thread (0xaaaadac03300) is not the object's thread (0xaaaadb96bf80).
Cannot move to target thread (0xaaaadac03300)

QObject::moveToThread: Current thread (0xaaaadac03300) is not the object's thread (0xaaaadb96bf80).
Cannot move to target thread (0xaaaadac03300)

QObject::moveToThread: Current thread (0xaaaadac03300) is not the object's thread (0xaaaadb96bf80).
Cannot move to target thread (0xaaaadac03300)

QObject::moveToThread: Current thread (0xaaaadac03300) is not the object's thread (0xaaaadb96bf80).
Cannot move to target thread (0xaaaadac03300)

QObject::moveToThread: Current thread (0xaaaadac03300) is not the object's thread (0xaaaadb96bf80).
Cannot move to tar

Original code

In [3]:


import cv2 as cv2
import socket
import threading
import datetime
import time


udp = ''
tcp_connect_flag = False


class UDP_SOCKET(object):
    def __init__(self, server_addr=('172.29.91.253', 8888)):
        self.addr = server_addr
        self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    def send_frame(self, frame, addr=()):
        if addr:
            self.addr = addr
        self.udp.sendto(frame, self.addr)

    def close(self):
        self.udp.close()


class TCP_SOCKET(object):
    def __init__(self, server_addr=('172.29.91.253', 9999)):
        global tcp_connect_flag
        self.addr = server_addr
        self.tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.tcp.settimeout(60)
        print('正在连接服务器...')
        while True:
            try:
                self.tcp.connect(server_addr)
                print('tcp connect success!')
                tcp_connect_flag = True
                break
            except Exception:
                pass

    def send_data(self):
        global tcp_connect_flag
        while True:
            if tcp_connect_flag is False:
                print('tcp 已断开连接！')
                break
            current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            self.tcp.send(current_time.encode())
            time.sleep(2)

    def recv_data(self):
        global tcp_connect_flag
        while True:
            data = self.tcp.recv(1024)
            if len(data) != 0:
                print('server: '+str(data.decode()))
            else:
                print('服务端已退出!')
                tcp_connect_flag = False
                break

    def close(self):
        self.tcp.close()


class IMAGE_FUNC(object):
    def __init__(self, compress_quality=89):
        self.encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), compress_quality]

    def compress(self, img, quality=89):
        self.encode_param[1] = quality
        times = 0
        while True:
            _, encode_img = cv2.imencode('.jpg', img, self.encode_param)
            times += 1
            if len(encode_img) <= 65500:
                return encode_img
            if times > 3:
                self.encode_param[1] -= 1
                times = 0

    def send_video(self):
        global udp, tcp_connect_flag

        cam = cv2.VideoCapture('rtsp://192.168.1.25:8554/main.264')
        if cam.isOpened():
            while True:
                try:
                    _, frame = cam.read()
                    cv2.imshow('frame', frame)  # h:480 w:640 s:3
                    udp.send_frame(self.compress(frame))
                except Exception as e:
                    print(e)
                    break
                key = cv2.waitKey(20)
                if key == ord('q'):
                    break
                elif tcp_connect_flag is False:
                    print('tcp 已断开连接！')
                    break
        else:
            print('cam error!')
        cam.release()


tcp = TCP_SOCKET(('192.168.54.59', 9999))
udp = UDP_SOCKET(('192.168.54.59', 8888))
img = IMAGE_FUNC()

tcp_recv_data = threading.Thread(target=tcp.recv_data)
tcp_send_data = threading.Thread(target=tcp.send_data)
udp_video = threading.Thread(target=img.send_video)

tcp_recv_data.start()
udp_video.start()
tcp_send_data.start()
tcp_recv_data.join()
udp_video.join()
tcp_send_data.join()

udp.close()
tcp.close()
cv2.destroyAllWindows()

正在连接服务器...
tcp connect success!


KeyboardInterrupt: 

In [4]:
import cv2 as cv
import socket
import threading
import datetime
import time
import queue

# 初始化UDP
udp = None  
tcp_connect_flag = False

# 用于主线程和子线程间通信的队列
frame_queue = queue.Queue()

class UDP_SOCKET(object):
    def __init__(self, server_addr=('172.29.91.253', 8888)):
        self.addr = server_addr
        self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    def send_frame(self, frame, addr=()):
        if addr:
            self.addr = addr
        self.udp.sendto(frame, self.addr)

    def close(self):
        self.udp.close()


class TCP_SOCKET(object):
    def __init__(self, server_addr=('172.29.91.253', 9999)):
        global tcp_connect_flag
        self.addr = server_addr
        self.tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.tcp.settimeout(60)
        print('正在连接服务器...')
        while True:
            try:
                self.tcp.connect(server_addr)
                print('tcp connect success!')
                tcp_connect_flag = True
                break
            except Exception as e:
                print(f"TCP连接错误: {e}")
                time.sleep(1)

    def send_data(self):
        global tcp_connect_flag
        while tcp_connect_flag:
            try:
                current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                self.tcp.send(current_time.encode())
                time.sleep(2)
            except Exception as e:
                print(f"发送数据时出错: {e}")
                break

    def recv_data(self):
        global tcp_connect_flag
        while tcp_connect_flag:
            try:
                data = self.tcp.recv(1024)
                if len(data) != 0:
                    print('服务器: ' + str(data.decode()))
                else:
                    print('服务端已退出!')
                    tcp_connect_flag = False
                    break
            except Exception as e:
                print(f"接收数据时出错: {e}")
                break

    def close(self):
        self.tcp.close()


class IMAGE_FUNC(object):
    def __init__(self, compress_quality=89):
        self.encode_param = [int(cv.IMWRITE_JPEG_QUALITY), compress_quality]

    def compress(self, img, quality=89):
        self.encode_param[1] = quality
        times = 0
        while True:
            _, encode_img = cv.imencode('.jpg', img, self.encode_param)
            times += 1
            if len(encode_img) <= 65500:
                return encode_img
            if times > 3:
                self.encode_param[1] -= 1
                times = 0

    def send_video(self):
        global udp, tcp_connect_flag, frame_queue
        rtsp_url = 'rtsp://192.168.1.25:8554/main.264'
        cam = cv.VideoCapture(rtsp_url)
        cam.set(cv.CAP_PROP_BUFFERSIZE,0)
        if cam.isOpened():
            while True:
                try:
                    # 捕获图像帧
                    _, frame = cam.read()
                    if not _:
                        print("Error: No frame received!")
                        break

                    # 强制调整图像为 640x480
                    frame = cv.resize(frame, (640, 480))

                    # 将帧传送给主线程（通过队列）
                    frame_queue.put(frame)

                    # 压缩并发送帧
                    udp.send_frame(self.compress(frame))

                except Exception as e:
                    print(f"捕获帧时出错: {e}")
                    break

                if not tcp_connect_flag:
                    print('tcp 已断开连接！')
                    break
        else:
            print('摄像头错误!')
        cam.release()


def show_video():
    """主线程用来显示视频帧"""
    while True:
        if not frame_queue.empty():
            # 从队列中获取最新的帧
            frame = frame_queue.get()

            # 显示图像帧
            cv.imshow('RTSP Stream', frame)

            # 按 'q' 键退出
            key = cv.waitKey(1) & 0xFF
            if key == ord('q'):
                break

    cv.destroyAllWindows()


# 初始化 UDP 和 TCP 套接字
udp = UDP_SOCKET(('192.168.54.59', 8888))
tcp = TCP_SOCKET(('192.168.54.59', 9999))
img = IMAGE_FUNC()

# 启动线程处理 TCP 和 UDP 通信
tcp_recv_data = threading.Thread(target=tcp.recv_data)
tcp_send_data = threading.Thread(target=tcp.send_data)
udp_video = threading.Thread(target=img.send_video)
#video_display = threading.Thread(target=show_video)

# 启动所有线程
tcp_recv_data.start()
udp_video.start()
tcp_send_data.start()
#video_display.start()

# 等待线程结束
tcp_recv_data.join()
udp_video.join()
tcp_send_data.join()
#video_display.join()

# 关闭套接字并清理
udp.close()
tcp.close()
cv.destroyAllWindows()

正在连接服务器...
tcp connect success!
接收数据时出错: timed out
服务器: 111

服务端已退出!
tcp 已断开连接！


In [None]:
import cv2 as cv
import socket
import threading
import datetime
import time
import queue

udp = None  
tcp_connect_flag = False

# 用于主线程和子线程间通信的队列
frame_queue = queue.Queue()

class UDP_SOCKET(object):
    def __init__(self, server_addr=('172.29.91.253', 8888)):
        self.addr = server_addr
        self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    def send_frame(self, frame, addr=()):
        if addr:
            self.addr = addr
        self.udp.sendto(frame, self.addr)

    def close(self):
        self.udp.close()


class TCP_SOCKET(object):
    def __init__(self, server_addr=('172.29.91.253', 9999)):
        global tcp_connect_flag
        self.addr = server_addr
        self.tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.tcp.settimeout(60)
        print('正在连接服务器...')
        while True:
            try:
                self.tcp.connect(server_addr)
                print('tcp connect success!')
                tcp_connect_flag = True
                break
            except Exception as e:
                print(f"TCP连接错误: {e}")
                time.sleep(1)

    def send_data(self):
        global tcp_connect_flag
        while tcp_connect_flag:
            try:
                current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                self.tcp.send(current_time.encode())
                time.sleep(2)
            except Exception as e:
                print(f"发送数据时出错: {e}")
                break

    def recv_data(self):
        global tcp_connect_flag
        while tcp_connect_flag:
            try:
                data = self.tcp.recv(1024)
                if len(data) != 0:
                    print('服务器: ' + str(data.decode()))
                else:
                    print('服务端已退出!')
                    tcp_connect_flag = False
                    break
            except Exception as e:
                print(f"接收数据时出错: {e}")
                break

    def close(self):
        self.tcp.close()


class IMAGE_FUNC(object):
    def __init__(self, compress_quality=89):
        self.encode_param = [int(cv.IMWRITE_JPEG_QUALITY), compress_quality]

    def compress(self, img, quality=89):
        self.encode_param[1] = quality
        times = 0
        while True:
            _, encode_img = cv.imencode('.jpg', img, self.encode_param)
            times += 1
            if len(encode_img) <= 65500:
                return encode_img
            if times > 3:
                self.encode_param[1] -= 1
                times = 0

    def send_video(self, video_capture):
        global udp, tcp_connect_flag, frame_queue
        while video_capture.isOpened() and tcp_connect_flag:
            try:
                # 读取视频帧
                ret, frame = video_capture.read()
                #if not ret:
                #    print("Error: No frame received!")
                 #   break

                # 强制调整图像为 640x480
                frame = cv.resize(frame, (640, 480))

                # 将帧传送给主线程（通过队列）
                frame_queue.put(frame)

                # 压缩并发送帧
                udp.send_frame(self.compress(frame))

                del(ret)
                del(frame)

            except Exception as e:
                print(f"捕获帧时出错: {e}")
                break


def show_video():
    """主线程用来显示视频帧"""
    while True:
        if not frame_queue.empty():
            # 从队列中获取最新的帧
            frame = frame_queue.get()

            # 显示图像帧
            cv.imshow('RTSP Stream', frame)

            # 按 'q' 键退出
            key = cv.waitKey(1) & 0xFF
            if key == ord('q'):
                break

    cv.destroyAllWindows()


class VideoCapture:
    def __init__(self, name):
        self.cap = cv.VideoCapture(name)
        self.state = False
        self.q = queue.Queue()
        self.lock = threading.Lock()
        self.running = True  # Flag to indicate if the thread should keep running
        self.t = threading.Thread(target=self._reader)
        self.t.daemon = True
        self.t.start()

    def _reader(self):
        while self.running:
            ret, frame = self.cap.read()
            #if not ret:
             #   break
            if not self.q.empty():
                try:
                    self.q.get_nowait()  # Discard previous frame
                except queue.Empty:
                    pass
            self.q.put(frame)
            self.state = ret

    def isOpened(self):
        return self.cap.isOpened()

    def read(self):
        return self.state, self.q.get()

    def stop(self):
        self.running = False
        self.t.join()  # Wait for the thread to exit


# 初始化 UDP 和 TCP 套接字
udp = UDP_SOCKET(('192.168.54.59', 8888))
tcp = TCP_SOCKET(('192.168.54.59', 9999))
img = IMAGE_FUNC()

# 使用自定义的 VideoCapture 类读取视频
video_capture = VideoCapture('rtsp://192.168.1.25:8554/main.264')

# 启动线程处理 TCP 和 UDP 通信
tcp_recv_data = threading.Thread(target=tcp.recv_data)
tcp_send_data = threading.Thread(target=tcp.send_data)
udp_video = threading.Thread(target=img.send_video, args=(video_capture,))
#video_display = threading.Thread(target=show_video)

# 启动所有线程
tcp_recv_data.start()
udp_video.start()
tcp_send_data.start()
#video_display.start()

# 等待线程结束
tcp_recv_data.join()
udp_video.join()
tcp_send_data.join()
#video_display.join()

# 停止视频捕获并关闭套接字
video_capture.stop()
udp.close()
tcp.close()
cv.destroyAllWindows()

正在连接服务器...
tcp connect success!


[h264 @ 0xaaaaf3ea7fb0] error while decoding MB 40 19, bytestream -5


服务端已退出!


[ WARN:13@153.997] global cap_ffmpeg_impl.hpp:453 _opencv_ffmpeg_interrupt_callback Stream timeout triggered after 30039.248229 ms


KeyboardInterrupt: 

[ WARN:13@184.047] global cap_ffmpeg_impl.hpp:453 _opencv_ffmpeg_interrupt_callback Stream timeout triggered after 30043.399837 ms


In [2]:
import cv2 as cv
import socket
import threading
import datetime
import time
import queue

udp = None  
tcp_connect_flag = False

# 用于主线程和子线程间通信的队列
frame_queue = queue.Queue()

class UDP_SOCKET(object):
    def __init__(self, server_addr=('172.29.91.253', 8888)):
        self.addr = server_addr
        self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    def send_frame(self, frame, addr=()):
        if addr:
            self.addr = addr
        self.udp.sendto(frame, self.addr)

    def close(self):
        self.udp.close()


class TCP_SOCKET(object):
    def __init__(self, server_addr=('172.29.91.253', 9999)):
        global tcp_connect_flag
        self.addr = server_addr
        self.tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.tcp.settimeout(60)
        print('正在连接服务器...')
        while True:
            try:
                self.tcp.connect(server_addr)
                print('tcp connect success!')
                tcp_connect_flag = True
                break
            except Exception as e:
                print(f"TCP连接错误: {e}")
                time.sleep(1)

    def send_data(self):
        global tcp_connect_flag
        while tcp_connect_flag:
            try:
                current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                self.tcp.send(current_time.encode())
                time.sleep(2)
            except Exception as e:
                print(f"发送数据时出错: {e}")
                break

    def recv_data(self):
        global tcp_connect_flag
        while tcp_connect_flag:
            try:
                data = self.tcp.recv(1024)
                if len(data) != 0:
                    print('服务器: ' + str(data.decode()))
                else:
                    print('服务端已退出!')
                    tcp_connect_flag = False
                    break
            except Exception as e:
                print(f"接收数据时出错: {e}")
                break

    def close(self):
        self.tcp.close()


class IMAGE_FUNC(object):
    def __init__(self, compress_quality=89):
        self.encode_param = [int(cv.IMWRITE_JPEG_QUALITY), compress_quality]

    def compress(self, img, quality=89):
        self.encode_param[1] = quality
        times = 0
        while True:
            _, encode_img = cv.imencode('.jpg', img, self.encode_param)
            times += 1
            if len(encode_img) <= 65500:
                return encode_img
            if times > 3:
                self.encode_param[1] -= 1
                times = 0

    def send_video(self, video_capture):
        global udp, tcp_connect_flag, frame_queue
        while video_capture.isOpened() and tcp_connect_flag:
            try:
                # 读取视频帧
                ret, frame = video_capture.read()
                #if not ret:
                #    print("Error: No frame received!")
                 #   break

                # 强制调整图像为 640x480
                frame = cv.resize(frame, (640, 480))

                # 将帧传送给主线程（通过队列）
                frame_queue.put(frame)

                # 压缩并发送帧
                udp.send_frame(self.compress(frame))

                del(ret)
                del(frame)

            except Exception as e:
                print(f"捕获帧时出错: {e}")
                break


def show_video():
    """主线程用来显示视频帧"""
    while True:
        if not frame_queue.empty():
            # 从队列中获取最新的帧
            frame = frame_queue.get()

            # 显示图像帧
            cv.imshow('RTSP Stream', frame)

            # 按 'q' 键退出
            key = cv.waitKey(1) & 0xFF
            if key == ord('q'):
                break

    cv.destroyAllWindows()


class VideoCapture:
    def __init__(self, name):
        self.cap = cv.VideoCapture(name)
        self.state = False
        self.q = queue.Queue()
        self.lock = threading.Lock()
        self.running = True  # Flag to indicate if the thread should keep running
        self.t = threading.Thread(target=self._reader)
        self.t.daemon = True
        self.t.start()

    def _reader(self):
        while self.running:
            ret, frame = self.cap.read()
            #if not ret:
             #   break
            if not self.q.empty():
                try:
                    self.q.get_nowait()  # Discard previous frame
                except queue.Empty:
                    pass
            self.q.put(frame)
            self.state = ret

    def isOpened(self):
        return self.cap.isOpened()

    def read(self):
        return self.state, self.q.get()

    def stop(self):
        self.running = False
        self.t.join()  # Wait for the thread to exit


# 初始化 UDP 和 TCP 套接字
udp = UDP_SOCKET(('192.168.54.59', 8888))
tcp = TCP_SOCKET(('192.168.54.59', 9999))
img = IMAGE_FUNC()

# 使用自定义的 VideoCapture 类读取视频
video_capture = VideoCapture('rtsp://192.168.1.25:8554/main.264')

# 启动线程处理 TCP 和 UDP 通信
tcp_recv_data = threading.Thread(target=tcp.recv_data)
tcp_send_data = threading.Thread(target=tcp.send_data)
udp_video = threading.Thread(target=img.send_video, args=(video_capture,))
#video_display = threading.Thread(target=show_video)

# 启动所有线程
tcp_recv_data.start()
udp_video.start()
tcp_send_data.start()
#video_display.start()

# 等待线程结束
tcp_recv_data.join()
udp_video.join()
tcp_send_data.join()
#video_display.join()

# 停止视频捕获并关闭套接字
video_capture.stop()
udp.close()
tcp.close()
cv.destroyAllWindows()

正在连接服务器...
tcp connect success!
服务器: 2000
接收数据时出错: timed out
发送数据时出错: [Errno 32] Broken pipe


KeyboardInterrupt: 