In [1]:
import cv2
import numpy as np
import math
import socket

# ROI函数
def roi(frame):
    height, width = frame.shape[:2]

    polygons = np.array([[(1*width//10, 1*height//10), (9*width//10, 1*height//10),
                          (7*width//10, 8*height//10), (3*width//10, 8*height//10)]])

    mask = np.zeros_like(frame)
    cv2.fillPoly(mask, polygons, 255)
    return cv2.bitwise_and(frame, mask)

# 滑动窗口车道线拟合函数
def fit_lanes_with_sliding_window(edges, lane_distance=100):
    try:
        height, width = edges.shape
        nwindows = 20
        margin = 10
        minpix = 1
        
        # 直方图峰值检测
        histogram = np.sum(edges[height//2:, :], axis=0)
        midpoint = width // 2
        leftx_base = np.argmax(histogram[:midpoint])
        rightx_base = np.argmax(histogram[midpoint:]) + midpoint

        # 滑动窗口追踪
        window_height = height // nwindows
        left_lane_inds = []
        right_lane_inds = []
        current_leftx = leftx_base
        current_rightx = rightx_base

        for window in range(nwindows):
            # 窗口垂直范围
            win_y_low = height - (window + 1) * window_height
            win_y_high = height - window * window_height
            
            # 左窗口
            win_xleft_low = max(0, current_leftx - margin)
            win_xleft_high = min(width, current_leftx + margin)
            left_window = edges[win_y_low:win_y_high, win_xleft_low:win_xleft_high]
            left_y, left_x = left_window.nonzero()
            left_x += win_xleft_low
            left_y += win_y_low
            left_lane_inds.extend(zip(left_x, left_y))
            
            # 右窗口
            win_xright_low = max(0, current_rightx - margin)
            win_xright_high = min(width, current_rightx + margin)
            right_window = edges[win_y_low:win_y_high, win_xright_low:win_xright_high]
            right_y, right_x = right_window.nonzero()
            right_x += win_xright_low
            right_y += win_y_low
            right_lane_inds.extend(zip(right_x, right_y))
            
            # 调整窗口位置
            if len(left_x) > minpix:
                current_leftx = np.int32(np.mean(left_x))
            if len(right_x) > minpix:
                current_rightx = np.int32(np.mean(right_x))

        # 检查是否有足够的点进行拟合
        if len(left_lane_inds) < minpix or len(right_lane_inds) < minpix:
            # 返回边缘检测图像而不是None
            result_img = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
            return result_img, None, None, None, None

        # 拟合多项式
        left_x = np.array([x for x, y in left_lane_inds])
        left_y = np.array([y for x, y in left_lane_inds])
        right_x = np.array([x for x, y in right_lane_inds])
        right_y = np.array([y for x, y in right_lane_inds])
        
        left_fit = np.polyfit(left_y, left_x, 2)
        right_fit = np.polyfit(right_y, right_x, 2)
        
        # 生成拟合曲线
        ploty = np.linspace(0, height-1, height)
        left_fitx = left_fit[0]*ploty**2 + left_fit[1]*ploty + left_fit[2]
        right_fitx = right_fit[0]*ploty**2 + right_fit[1]*ploty + right_fit[2]
        
        # 准备返回结果
        result_img = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
        pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))], dtype=np.int32)
        pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))], dtype=np.int32)
        
        return result_img, left_fit, right_fit, pts_left, pts_right
    
    except Exception as e:
        print(f"车道线检测错误: {e}")
        # 返回边缘检测图像而不是None
        result_img = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
        return result_img, None, None, None, None

# 中心线拟合和目标点计算函数
def calculate_and_draw_centerline_v2(result_img, left_fit, right_fit, ref_point, dist):
    try:
        height = result_img.shape[0]
        ploty = np.linspace(0, height-1, height)
        
        left_fitx = left_fit[0]*ploty**2 + left_fit[1]*ploty + left_fit[2]
        right_fitx = right_fit[0]*ploty**2 + right_fit[1]*ploty + right_fit[2]
        mid_fitx = (left_fitx + right_fitx) / 2
        
        center_fit = np.polyfit(ploty, mid_fitx, 2)
        center_fitx = center_fit[0]*ploty**2 + center_fit[1]*ploty + center_fit[2]
        pts_center = np.array([np.transpose(np.vstack([center_fitx, ploty]))], dtype=np.int32)
        
        # 查找目标点
        ref_x, ref_y = ref_point
        target = None
        min_diff = float('inf')
        candidate_ys = np.arange(height-1, -1, -1)
        
        for y in candidate_ys:
            x = center_fit[0]*y**2 + center_fit[1]*y + center_fit[2]
            if x < 0 or x >= result_img.shape[1]:
                continue
            current_dist = math.hypot(x - ref_x, y - ref_y)
            diff = abs(current_dist - dist)
            if diff < min_diff:
                min_diff = diff
                target = (x, y)
            if diff < 2:
                break
        
        return result_img, center_fit, pts_center, target if min_diff < dist*0.1 else None
    
    except Exception as e:
        print(f"中心线计算错误: {e}")
        return result_img, None, None, None


# 初始化视频流
url = 'http://192.168.4.1'
cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
matrix = np.load("perspective_matrix.npy")

# 视频保存配置
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or 30
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output_original.mp4', fourcc, fps, (frame_width, frame_height))

# 转向控制参数，实际1cm在图像中为4像素
ref_point = (frame_width//2, frame_height)
dist = 120
tread = 60
left_fit = None
right_fit = None
last_steer_angle = 0.0
alpha = 0.5

# 初始化网络连接
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('192.168.4.1', 81))


# 主循环
frame_counter = 0
while True:
    ret, frame = cap.read()
    if not ret: break

    # 预测逻辑，每2帧预测一次
    frame_counter += 1
    if frame_counter % 2 == 0:
    
        # 透视变换
        warped = cv2.warpPerspective(frame, matrix, (frame_width, frame_height))
        
        # 预处理
        gray = cv2.cvtColor(warped, cv2.COLOR_BGR2GRAY)
        blur = cv2.GaussianBlur(gray, (5,5), 0)
        edges = cv2.Canny(blur, 10, 50)
        edges = roi(edges)
        
        # 车道线检测 - 现在result_img永远不会是None
        result_img, new_left, new_right, pts_left, pts_right = fit_lanes_with_sliding_window(edges)
        
        # 更新车道参数
        if new_left is not None and new_right is not None:
            left_fit, right_fit = new_left, new_right
        
        # 中心线与目标点计算
        final_img = warped.copy()
        overlay = result_img.copy()  # 现在result_img保证不是None
        
        if left_fit is not None and right_fit is not None:
            
            result_img, center_fit, pts_center, target = calculate_and_draw_centerline_v2(
                result_img, left_fit, right_fit, ref_point, dist)
                        
            if target is not None:
                target_x, target_y = target
                cv2.circle(overlay, (int(target_x), int(target_y)), 3, (0,0,255), -1)
                dx = target_x - frame_width//2
                steer_angle = 50 * tread * dx / dist**2
                last_steer_angle = steer_angle
            else:
                last_steer_angle = 0.0  # 目标点无效时归零
            
            cv2.putText(frame, f"Steer:{last_steer_angle:.0f}",
                       (frame_width-80, 50),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
        else:
            last_steer_angle = 0.0  # 车道线无效时归零
        
        # 发送转向角度
        sock.send(f"{last_steer_angle:.0f}\n".encode())
        
        # 显示与保存
        cv2.imshow("Lane Detection", frame)
        out.write(frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
out.release()
sock.close()
cv2.destroyAllWindows()
