In [None]:
#DPT+RANSAC
import os
import pandas as pd
import numpy as np
import torch
from transformers import DPTForDepthEstimation, DPTFeatureExtractor
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
from sklearn.linear_model import RANSACRegressor

# DPT
model_name = "Intel/dpt-large"
dpt_model = DPTForDepthEstimation.from_pretrained(model_name)
feature_extractor = DPTFeatureExtractor.from_pretrained(model_name)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dpt_model.to(device)

def load_and_process_data(csv_path, images_folder_paths):
    df = pd.read_csv(csv_path)
    samples = []

    for _, row in df.iterrows():
        image_name = row['image_name']
        image_path = find_image(images_folder_paths, image_name)
        if not image_path:
            continue
        
        image = Image.open(image_path)
        x_coord, y_coord = int(row['x']), int(row['y'])
        if not (0 <= x_coord < image.width and 0 <= y_coord < image.height):
            continue
        
        true_depth = row['radius']
        inputs = feature_extractor(images=image, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = dpt_model(**inputs)
            predicted_depth = outputs.predicted_depth.squeeze()
        
        predicted_depth_array = torch.nn.functional.interpolate(
            predicted_depth.unsqueeze(0).unsqueeze(0),
            size=image.size[::-1],
            mode="bicubic",
            align_corners=False,
        ).squeeze().cpu().numpy()
        
        predicted_depth = predicted_depth_array[y_coord, x_coord]
        samples.append((predicted_depth, true_depth, row['location']))

    return samples

def find_image(folder_paths, image_name):
    for folder_path in folder_paths:
        for root, _, files in os.walk(folder_path):
            if image_name in files:
                return os.path.join(root, image_name)
    return None

def train_ransac(samples):
    X = np.array([s[0] for s in samples]).reshape(-1, 1)
    y = np.array([s[1] for s in samples])
    
    ransac = RANSACRegressor(max_trials=200, min_samples=0.5, residual_threshold=2.0, random_state=42)
    ransac.fit(X, y)
    
    inlier_mask = ransac.inlier_mask_
    outlier_mask = np.logical_not(inlier_mask)
    
    return ransac, inlier_mask, outlier_mask

def evaluate_samples(samples, model):
    X = np.array([s[0] for s in samples]).reshape(-1, 1)
    y_true = np.array([s[1] for s in samples])
    y_pred = model.predict(X)
    
    mae = mean_absolute_error(y_true, y_pred)
    mse = mean_squared_error(y_true, y_pred)
    
    return mae, mse, y_true, y_pred

def plot_results(train_samples, val_samples, test_samples, model, inlier_mask):
    def plot_fit_line(ax, X, y, y_pred, title, x_label, y_label):
        ax.scatter(y, y_pred, alpha=0.5, label='Predicted vs True')
        ax.set_xlabel(x_label)
        ax.set_ylabel(y_label)
        ax.set_title(title)
        ax.legend()
        ax.grid(True)
        min_val = min(y.min(), y_pred.min())
        max_val = max(y.max(), y_pred.max())
        ax.plot([min_val, max_val], [min_val, max_val], color='red', linewidth=2, label='x=y')
        fit_line = np.poly1d(np.polyfit(y, y_pred, 1))
        fit_y = fit_line(y)
        ax.plot(y, fit_y, color='blue', linewidth=2, label='Fit Line')
        ax.legend()

        
        fit_slope = fit_line.c[0]
        angle = np.arctan(fit_slope) - np.pi / 4
        angle_deg = np.degrees(angle)
        print(f"Angle between x=y and fit line in {title}: {angle_deg:.2f} degrees")

    plt.figure(figsize=(18, 5))
    
    X_train = np.array([s[0] for s in train_samples]).reshape(-1, 1)
    y_train = np.array([s[1] for s in train_samples])
    X_val = np.array([s[0] for s in val_samples]).reshape(-1, 1)
    y_val = np.array([s[1] for s in val_samples])
    X_test = np.array([s[0] for s in test_samples]).reshape(-1, 1)
    y_test = np.array([s[1] for s in test_samples])
    
    y_train_pred = model.predict(X_train)
    y_val_pred = model.predict(X_val)
    y_test_pred = model.predict(X_test)
    
    plt.subplot(1, 3, 1)
    plot_fit_line(plt.gca(), X_train, y_train, y_train_pred, 'Training Set', 'True Depth', 'Predicted Depth')
    
    plt.subplot(1, 3, 2)
    plot_fit_line(plt.gca(), X_val, y_val, y_val_pred, 'Validation Set', 'True Depth', 'Predicted Depth')
    
    plt.subplot(1, 3, 3)
    plot_fit_line(plt.gca(), X_test, y_test, y_test_pred, 'Test Set', 'True Depth', 'Predicted Depth')
    
    plt.tight_layout()
    plt.show()
    
    # 绘制训练集的拟合线
    plt.figure(figsize=(10, 5))
    plt.scatter(y_train, y_train_pred, alpha=0.5, label='Train Set')
    plt.xlabel('True Depth')
    plt.ylabel('Predicted Depth')
    plt.title('Training Set Fit Line')
    plt.plot([min(y_train), max(y_train)], [min(y_train), max(y_train)], color='red', linewidth=2, label='x=y')
    fit_line = np.poly1d(np.polyfit(y_train, y_train_pred, 1))
    fit_y = fit_line(y_train)
    plt.plot(y_train, fit_y, color='blue', linewidth=2, label='Fit Line')
    plt.legend()
    plt.grid(True)
    plt.show()

    
    fit_slope = fit_line.c[0]
    angle = np.arctan(fit_slope) - np.pi / 4
    angle_deg = np.degrees(angle)
    print(f"Angle between x=y and fit line in Training Set: {angle_deg:.2f} degrees")

    print(f"RANSAC model coefficients: {model.estimator_.coef_}")
    print(f"RANSAC model intercept: {model.estimator_.intercept_}")


csv_path = r"C:\Users\D-Frank\Desktop\111.csv"
images_folder_paths = [
    r"C:\Users\D-Frank\Desktop\data\104RECNX",
    r"C:\Users\D-Frank\Desktop\data\105RECNX",
    r"C:\Users\D-Frank\Desktop\data\106RECNX",
    r"C:\Users\D-Frank\Desktop\data\107RECNX",
    r"C:\Users\D-Frank\Desktop\data\108RECNX",
    r"C:\Users\D-Frank\Desktop\data\110RECNX",
    r"C:\Users\D-Frank\Desktop\data\100RECNX",
    r"C:\Users\D-Frank\Desktop\data\101RECNX",
    r"C:\Users\D-Frank\Desktop\data\102RECNX",
    r"C:\Users\D-Frank\Desktop\data\103RECNX"
]

print(f"Processing {csv_path}...")
try:
    samples = load_and_process_data(csv_path, images_folder_paths)
    if not samples:
        print(f"No valid samples found in {csv_path}")

    train_samples, val_test_samples = train_test_split(samples, test_size=0.4, random_state=42)
    val_samples, test_samples = train_test_split(val_test_samples, test_size=0.5, random_state=42)

    ransac_model, inlier_mask, outlier_mask = train_ransac(train_samples)

    train_mae, train_mse, _, _ = evaluate_samples(train_samples, ransac_model)
    print(f"Train Mean Absolute Error: {train_mae}")
    print(f"Train Mean Squared Error: {train_mse}")

    val_mae, val_mse, _, _ = evaluate_samples(val_samples, ransac_model)
    print(f"Validation Mean Absolute Error: {val_mae}")
    print(f"Validation Mean Squared Error: {val_mse}")

    test_mae, test_mse, y_test, y_test_pred = evaluate_samples(test_samples, ransac_model)
    print(f"Test Mean Absolute Error: {test_mae}")
    print(f"Test Mean Squared Error: {test_mse}")

    plot_results(train_samples, val_samples, test_samples, ransac_model, inlier_mask)

except Exception as e:
    print(f"Error processing {csv_path}: {e}")


In [None]:
#DA+RANSAC
import os
import pandas as pd
import numpy as np
import torch
from transformers import pipeline
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
from sklearn.linear_model import RANSACRegressor


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


depth_estimator = pipeline(task="depth-estimation", model="depth-anything/Depth-Anything-V2-Small-hf")

def load_and_process_data(csv_path, images_folder_paths):
    df = pd.read_csv(csv_path)
    samples = []

    for _, row in df.iterrows():
        image_name = row['image_name']
        image_path = find_image(images_folder_paths, image_name)
        if not image_path:
            continue
        
        image = Image.open(image_path)
        x_coord, y_coord = int(row['x']), int(row['y'])
        if not (0 <= x_coord < image.width and 0 <= y_coord < image.height):
            continue
        
        true_depth = row['radius']
        depth = depth_estimator(image)['depth']
        depth_array = np.array(depth)  
        predicted_depth = depth_array[y_coord, x_coord]
        
        
        if predicted_depth < 0 or true_depth < 0:
            continue
        
        samples.append((predicted_depth, true_depth, row['location']))

    return samples

def find_image(folder_paths, image_name):
    for folder_path in folder_paths:
        for root, _, files in os.walk(folder_path):
            if image_name in files:
                return os.path.join(root, image_name)
    return None

def train_ransac(samples):
    X = np.array([s[0] for s in samples]).reshape(-1, 1)
    y = np.array([s[1] for s in samples])
    
    ransac = RANSACRegressor(max_trials=300, min_samples=0.6, residual_threshold=1.5, random_state=42)
    ransac.fit(X, y)
    
    inlier_mask = ransac.inlier_mask_
    outlier_mask = np.logical_not(inlier_mask)
    
    return ransac, inlier_mask, outlier_mask

def evaluate_samples(samples, model):
    X = np.array([s[0] for s in samples]).reshape(-1, 1)
    y_true = np.array([s[1] for s in samples])
    y_pred = model.predict(X)
    
    mae = mean_absolute_error(y_true, y_pred)
    mse = mean_squared_error(y_true, y_pred)
    
    return mae, mse, y_true, y_pred

def plot_results(train_samples, val_samples, test_samples, model, inlier_mask):
    def plot_fit_line(ax, X, y, y_pred, title, x_label, y_label):
        ax.scatter(y, y_pred, alpha=0.5, label='Predicted vs True')
        ax.set_xlabel(x_label)
        ax.set_ylabel(y_label)
        ax.set_title(title)
        ax.legend()
        ax.grid(True)
        min_val = min(y.min(), y_pred.min())
        max_val = max(y.max(), y_pred.max())
        ax.plot([min_val, max_val], [min_val, max_val], color='red', linewidth=2, label='x=y')
        fit_line = np.poly1d(np.polyfit(y, y_pred, 1))
        fit_y = fit_line(y)
        ax.plot(y, fit_y, color='blue', linewidth=2, label='Fit Line')
        ax.legend()


        fit_slope = fit_line.c[0]
        angle = np.arctan(fit_slope) - np.pi / 4
        angle_deg = np.degrees(angle)
        print(f"Angle between x=y and fit line in {title}: {angle_deg:.2f} degrees")

    plt.figure(figsize=(18, 5))
    
    X_train = np.array([s[0] for s in train_samples]).reshape(-1, 1)
    y_train = np.array([s[1] for s in train_samples])
    X_val = np.array([s[0] for s in val_samples]).reshape(-1, 1)
    y_val = np.array([s[1] for s in val_samples])
    X_test = np.array([s[0] for s in test_samples]).reshape(-1, 1)
    y_test = np.array([s[1] for s in test_samples])
    
    y_train_pred = model.predict(X_train)
    y_val_pred = model.predict(X_val)
    y_test_pred = model.predict(X_test)
    
    plt.subplot(1, 3, 1)
    plot_fit_line(plt.gca(), X_train, y_train, y_train_pred, 'Training Set', 'True Depth', 'Predicted Depth')
    
    plt.subplot(1, 3, 2)
    plot_fit_line(plt.gca(), X_val, y_val, y_val_pred, 'Validation Set', 'True Depth', 'Predicted Depth')
    
    plt.subplot(1, 3, 3)
    plot_fit_line(plt.gca(), X_test, y_test, y_test_pred, 'Test Set', 'True Depth', 'Predicted Depth')
    
    plt.tight_layout()
    plt.show()
    
 
    plt.figure(figsize=(10, 5))
    plt.scatter(y_train, y_train_pred, alpha=0.5, label='Train Set')
    plt.xlabel('True Depth')
    plt.ylabel('Predicted Depth')
    plt.title('Training Set Fit Line')
    plt.plot([min(y_train), max(y_train)], [min(y_train), max(y_train)], color='red', linewidth=2, label='x=y')
    fit_line = np.poly1d(np.polyfit(y_train, y_train_pred, 1))
    fit_y = fit_line(y_train)
    plt.plot(y_train, fit_y, color='blue', linewidth=2, label='Fit Line')
    plt.legend()
    plt.grid(True)
    plt.show()

  
    fit_slope = fit_line.c[0]
    angle = np.arctan(fit_slope) - np.pi / 4
    angle_deg = np.degrees(angle)
    print(f"Angle between x=y and fit line in Training Set: {angle_deg:.2f} degrees")


    print(f"RANSAC model coefficients: {model.estimator_.coef_}")
    print(f"RANSAC model intercept: {model.estimator_.intercept_}")


csv_path = r"C:\Users\D-Frank\Desktop\111.csv"
images_folder_paths = [
    r"C:\Users\D-Frank\Desktop\data\104RECNX",
    r"C:\Users\D-Frank\Desktop\data\105RECNX",
    r"C:\Users\D-Frank\Desktop\data\106RECNX",
    r"C:\Users\D-Frank\Desktop\data\107RECNX",
    r"C:\Users\D-Frank\Desktop\data\108RECNX",
    r"C:\Users\D-Frank\Desktop\data\110RECNX",
    r"C:\Users\D-Frank\Desktop\data\100RECNX",
    r"C:\Users\D-Frank\Desktop\data\101RECNX",
    r"C:\Users\D-Frank\Desktop\data\102RECNX",
    r"C:\Users\D-Frank\Desktop\data\103RECNX"
]

print(f"Processing {csv_path}...")
try:
    samples = load_and_process_data(csv_path, images_folder_paths)
    if not samples:
        print(f"No valid samples found in {csv_path}")

    train_samples, val_test_samples = train_test_split(samples, test_size=0.4, random_state=42)
    val_samples, test_samples = train_test_split(val_test_samples, test_size=0.5, random_state=42)

    ransac_model, inlier_mask, outlier_mask = train_ransac(train_samples)

    train_mae, train_mse, _, _ = evaluate_samples(train_samples, ransac_model)
    print(f"Train Mean Absolute Error: {train_mae}")
    print(f"Train Mean Squared Error: {train_mse}")

    val_mae, val_mse, _, _ = evaluate_samples(val_samples, ransac_model)
    print(f"Validation Mean Absolute Error: {val_mae}")
    print(f"Validation Mean Squared Error: {val_mse}")

    test_mae, test_mse, y_test, y_test_pred = evaluate_samples(test_samples, ransac_model)
    print(f"Test Mean Absolute Error: {test_mae}")
    print(f"Test Mean Squared Error: {test_mse}")

    plot_results(train_samples, val_samples, test_samples, ransac_model, inlier_mask)

except Exception as e:
    print(f"Error processing {csv_path}: {e}")


In [None]:
#DPT+PVCNN
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import DPTForDepthEstimation, DPTFeatureExtractor
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression

# PVCNN
class PVCNN(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(PVCNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, 64, kernel_size=1)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=1)
        self.conv3 = nn.Conv1d(128, 256, kernel_size=1)
        self.fc1 = nn.Linear(256, 128)
        self.fc2 = nn.Linear(128, out_channels)

    def forward(self, x):
        x = self.conv1(x)
        x = nn.ReLU()(x)
        x = self.conv2(x)
        x = nn.ReLU()(x)
        x = self.conv3(x)
        x = nn.ReLU()(x)
        x = x.max(dim=2)[0]  
        x = self.fc1(x)
        x = nn.ReLU()(x)
        x = self.fc2(x)
        return x


model_name = "Intel/dpt-large"
dpt_model = DPTForDepthEstimation.from_pretrained(model_name)
feature_extractor = DPTFeatureExtractor.from_pretrained(model_name)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dpt_model.to(device)

def load_and_process_data(csv_path, images_folder_paths):
    df = pd.read_csv(csv_path)
    samples = []

    for _, row in df.iterrows():
        image_name = row['image_name']
        image_path = find_image(images_folder_paths, image_name)
        if not image_path:
            continue
        
        image = Image.open(image_path)
        x_coord, y_coord = int(row['x']), int(row['y'])
        if not (0 <= x_coord < image.width and 0 <= y_coord < image.height):
            continue
        
        true_depth = row['radius']
        inputs = feature_extractor(images=image, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = dpt_model(**inputs)
            predicted_depth = outputs.predicted_depth.squeeze()
        
        predicted_depth_array = torch.nn.functional.interpolate(
            predicted_depth.unsqueeze(0).unsqueeze(0),
            size=image.size[::-1],
            mode="bicubic",
            align_corners=False,
        ).squeeze().cpu().numpy()
        
        predicted_depth = predicted_depth_array[y_coord, x_coord]
        samples.append((predicted_depth, true_depth, row['location']))

    return samples

def find_image(folder_paths, image_name):
    for folder_path in folder_paths:
        for root, _, files in os.walk(folder_path):
            if image_name in files:
                return os.path.join(root, image_name)
    return None

def train_pvcnn(train_samples, val_samples, learning_rate=0.001, epochs=100):
    pvcnn = PVCNN(1, 1).to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(pvcnn.parameters(), lr=learning_rate)

    train_pred = torch.tensor([s[0] for s in train_samples], dtype=torch.float32).view(-1, 1, 1).to(device)
    train_true = torch.tensor([s[1] for s in train_samples], dtype=torch.float32).view(-1, 1).to(device)
    
    val_pred = torch.tensor([s[0] for s in val_samples], dtype=torch.float32).view(-1, 1, 1).to(device)
    val_true = torch.tensor([s[1] for s in val_samples], dtype=torch.float32).view(-1, 1).to(device)

    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        pvcnn.train()
        optimizer.zero_grad()
        outputs = pvcnn(train_pred)
        loss = criterion(outputs, train_true)
        loss.backward()
        optimizer.step()

        pvcnn.eval()
        with torch.no_grad():
            val_outputs = pvcnn(val_pred)
            val_loss = criterion(val_outputs, val_true)
        
        train_losses.append(loss.item())
        val_losses.append(val_loss.item())
        
        print(f"Epoch {epoch+1}/{epochs}, Training Loss: {loss.item()}, Validation Loss: {val_loss.item()}")

    return pvcnn, train_losses, val_losses

def evaluate_samples(samples, model):
    model.eval()
    depths_pred = torch.tensor([s[0] for s in samples], dtype=torch.float32).view(-1, 1, 1).to(device)
    depths_true = np.array([s[1] for s in samples])
    
    with torch.no_grad():
        pred_abs = model(depths_pred).cpu().numpy().flatten()

    mae = mean_absolute_error(depths_true, pred_abs)
    mse = mean_squared_error(depths_true, pred_abs)
    
    return mae, mse

def plot_losses(train_losses, val_losses):
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.show()

def plot_fit(samples, model, title):
    model.eval()
    depths_pred = torch.tensor([s[0] for s in samples], dtype=torch.float32).view(-1, 1, 1).to(device)
    depths_true = np.array([s[1] for s in samples])
    
    with torch.no_grad():
        pred_abs = model(depths_pred).cpu().numpy().flatten()
    
 
    plt.figure(figsize=(10, 5))
    plt.scatter(depths_true, pred_abs, alpha=0.5, label='Predicted vs True')
    

    reg = LinearRegression().fit(depths_true.reshape(-1, 1), pred_abs)
    pred_line = reg.predict(np.linspace(depths_true.min(), depths_true.max(), 100).reshape(-1, 1))
    plt.plot(np.linspace(depths_true.min(), depths_true.max(), 100), pred_line, color='red', linewidth=2, label='Fit Line')

  
    plt.plot([depths_true.min(), depths_true.max()], [depths_true.min(), depths_true.max()], color='blue', linewidth=2, label='x=y')
    
  
    fit_slope = reg.coef_[0]
    angle = np.arctan(fit_slope) - np.pi / 4
    angle_deg = np.degrees(angle)
    print(f"Angle between x=y and fit line in {title}: {angle_deg:.2f} degrees")

    plt.xlabel('True Depth')
    plt.ylabel('Predicted Depth')
    plt.title(f'Predicted vs True Depth ({title})')
    plt.legend()
    plt.grid(True)
    plt.show()


csv_path = r"C:\Users\D-Frank\Desktop\111.csv"
images_folder_paths = [
    r"C:\Users\D-Frank\Desktop\data\104RECNX",
    r"C:\Users\D-Frank\Desktop\data\105RECNX",
    r"C:\Users\D-Frank\Desktop\data\106RECNX",
    r"C:\Users\D-Frank\Desktop\data\107RECNX",
    r"C:\Users\D-Frank\Desktop\data\108RECNX",
    r"C:\Users\D-Frank\Desktop\data\110RECNX",
    r"C:\Users\D-Frank\Desktop\data\100RECNX",
    r"C:\Users\D-Frank\Desktop\data\101RECNX",
    r"C:\Users\D-Frank\Desktop\data\102RECNX",
    r"C:\Users\D-Frank\Desktop\data\103RECNX"
]

print(f"Processing {csv_path}...")
try:
    samples = load_and_process_data(csv_path, images_folder_paths)
    if not samples:
        print(f"No valid samples found in {csv_path}")

    locations = list(set([s[2] for s in samples]))
    train_locations = locations[:6]
    val_test_locations = locations[6:]

    train_samples = [s for s in samples if s[2] in train_locations]
    val_test_samples = [s for s in samples if s[2] in val_test_locations]

    val_samples, test_samples = train_test_split(val_test_samples, test_size=0.5, random_state=42)

    pvcnn_model, train_losses, val_losses = train_pvcnn(train_samples, val_samples)
    torch.save(pvcnn_model.state_dict(), 'pvcnn1.pt')

    train_mae, train_mse = evaluate_samples(train_samples, pvcnn_model)
    print(f"Train Mean Absolute Error: {train_mae}")
    print(f"Train Mean Squared Error: {train_mse}")

    val_mae, val_mse = evaluate_samples(val_samples, pvcnn_model)
    print(f"Validation Mean Absolute Error: {val_mae}")
    print(f"Validation Mean Squared Error: {val_mse}")

    test_mae, test_mse = evaluate_samples(test_samples, pvcnn_model)
    print(f"Test Mean Absolute Error: {test_mae}")
    print(f"Test Mean Squared Error: {test_mse}")

    plot_losses(train_losses, val_losses)
    plot_fit(train_samples, pvcnn_model, 'Training Set')
    plot_fit(val_samples, pvcnn_model, 'Validation Set')
    plot_fit(test_samples, pvcnn_model, 'Test Set')

except Exception as e:
    print(f"Error processing {csv_path}: {e}")


In [None]:
#DA+PVCNN
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import pipeline
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression

# PVCNN
class PVCNN(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(PVCNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, 128, kernel_size=1)
        self.conv2 = nn.Conv1d(128, 256, kernel_size=1)
        self.conv3 = nn.Conv1d(256, 512, kernel_size=1)
        self.fc1 = nn.Linear(512, 256)
        self.fc2 = nn.Linear(256, out_channels)
        self.dropout = nn.Dropout(0.5)  

    def forward(self, x):
        x = self.conv1(x)
        x = nn.ReLU()(x)
        x = self.conv2(x)
        x = nn.ReLU()(x)
        x = self.conv3(x)
        x = nn.ReLU()(x)
        x = x.max(dim=2)[0]  
        x = self.fc1(x)
        x = nn.ReLU()(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = nn.ReLU()(x) 
        return x


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


depth_estimator = pipeline(task="depth-estimation", model="depth-anything/Depth-Anything-V2-Small-hf")

def load_and_process_data(csv_path, images_folder_paths):
    df = pd.read_csv(csv_path)
    samples = []

    for _, row in df.iterrows():
        image_name = row['image_name']
        image_path = find_image(images_folder_paths, image_name)
        if not image_path:
            continue
        
        image = Image.open(image_path)
        x_coord, y_coord = int(row['x']), int(row['y'])
        if not (0 <= x_coord < image.width and 0 <= y_coord < image.height):
            continue
        
        true_depth = row['radius']
        depth = depth_estimator(image)['depth']
        depth_array = np.array(depth)  
        predicted_depth = depth_array[y_coord, x_coord]
        
     
        if predicted_depth < 0 or true_depth < 0:
            continue
        
        samples.append((predicted_depth, true_depth, row['location']))

    return samples

def find_image(folder_paths, image_name):
    for folder_path in folder_paths:
        for root, _, files in os.walk(folder_path):
            if image_name in files:
                return os.path.join(root, image_name)
    return None

def train_pvcnn(train_samples, val_samples, learning_rate=0.0001, epochs=300, batch_size=32):
    pvcnn = PVCNN(1, 1).to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(pvcnn.parameters(), lr=learning_rate)

    train_pred = torch.tensor([s[0] for s in train_samples], dtype=torch.float32).view(-1, 1, 1).to(device)
    train_true = torch.tensor([s[1] for s in train_samples], dtype=torch.float32).view(-1, 1).to(device)
    
    val_pred = torch.tensor([s[0] for s in val_samples], dtype=torch.float32).view(-1, 1, 1).to(device)
    val_true = torch.tensor([s[1] for s in val_samples], dtype=torch.float32).view(-1, 1).to(device)

    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        pvcnn.train()
        permutation = torch.randperm(train_pred.size()[0])
        for i in range(0, train_pred.size()[0], batch_size):
            indices = permutation[i:i + batch_size]
            batch_pred, batch_true = train_pred[indices], train_true[indices]

            optimizer.zero_grad()
            outputs = pvcnn(batch_pred)
            loss = criterion(outputs, batch_true)
            loss.backward()
            optimizer.step()

        pvcnn.eval()
        with torch.no_grad():
            val_outputs = pvcnn(val_pred)
            val_loss = criterion(val_outputs, val_true)
        
        train_losses.append(loss.item())
        val_losses.append(val_loss.item())
        
        print(f"Epoch {epoch+1}/{epochs}, Training Loss: {loss.item()}, Validation Loss: {val_loss.item()}")

    return pvcnn, train_losses, val_losses

def evaluate_samples(samples, model):
    model.eval()
    depths_pred = torch.tensor([s[0] for s in samples], dtype=torch.float32).view(-1, 1, 1).to(device)
    depths_true = np.array([s[1] for s in samples])
    
    with torch.no_grad():
        pred_abs = model(depths_pred).cpu().numpy().flatten()

    mae = mean_absolute_error(depths_true, pred_abs)
    mse = mean_squared_error(depths_true, pred_abs)
    
    return mae, mse

def plot_losses(train_losses, val_losses):
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.show()

def plot_fit(samples, model, title):
    model.eval()
    depths_pred = torch.tensor([s[0] for s in samples], dtype=torch.float32).view(-1, 1, 1).to(device)
    depths_true = np.array([s[1] for s in samples])
    
    with torch.no_grad():
        pred_abs = model(depths_pred).cpu().numpy().flatten()

    plt.figure(figsize=(10, 5))
    plt.scatter(depths_true, pred_abs, alpha=0.5, label='Predicted vs True')
    
    
    reg = LinearRegression().fit(depths_true.reshape(-1, 1), pred_abs)
    pred_line = reg.predict(np.linspace(depths_true.min(), depths_true.max(), 100).reshape(-1, 1))
    plt.plot(np.linspace(depths_true.min(), depths_true.max(), 100), pred_line, color='red', linewidth=2, label='Fit Line')

 
    plt.plot([depths_true.min(), depths_true.max()], [depths_true.min(), depths_true.max()], color='blue', linewidth=2, label='x=y')
    
    
    fit_slope = reg.coef_[0]
    angle = np.arctan(fit_slope) - np.pi / 4
    angle_deg = np.degrees(angle)
    print(f"Angle between x=y and fit line in {title}: {angle_deg:.2f} degrees")

    plt.xlabel('True Depth')
    plt.ylabel('Predicted Depth')
    plt.title(f'Predicted vs True Depth ({title})')
    plt.legend()
    plt.grid(True)
    plt.show()


csv_path = r"C:\Users\D-Frank\Desktop\111.csv"
images_folder_paths = [
    r"C:\Users\D-Frank\Desktop\data\104RECNX",
    r"C:\Users\D-Frank\Desktop\data\105RECNX",
    r"C:\Users\D-Frank\Desktop\data\106RECNX",
    r"C:\Users\D-Frank\Desktop\data\107RECNX",
    r"C:\Users\D-Frank\Desktop\data\108RECNX",
    r"C:\Users\D-Frank\Desktop\data\110RECNX",
    r"C:\Users\D-Frank\Desktop\data\100RECNX",
    r"C:\Users\D-Frank\Desktop\data\101RECNX",
    r"C:\Users\D-Frank\Desktop\data\102RECNX",
    r"C:\Users\D-Frank\Desktop\data\103RECNX"
]

print(f"Processing {csv_path}...")
try:
    samples = load_and_process_data(csv_path, images_folder_paths)
    if not samples:
        print(f"No valid samples found in {csv_path}")

    locations = list(set([s[2] for s in samples]))
    train_locations = locations[:6]
    val_test_locations = locations[6:]

    train_samples = [s for s in samples if s[2] in train_locations]
    val_test_samples = [s for s in samples if s[2] in val_test_locations]

    val_samples, test_samples = train_test_split(val_test_samples, test_size=0.5, random_state=42)

    pvcnn_model, train_losses, val_losses = train_pvcnn(train_samples, val_samples)
    
    torch.save(pvcnn_model.state_dict(), 'pvcnn2.pt')
    
    train_mae, train_mse = evaluate_samples(train_samples, pvcnn_model)
    print(f"Train Mean Absolute Error: {train_mae}")
    print(f"Train Mean Squared Error: {train_mse}")

    val_mae, val_mse = evaluate_samples(val_samples, pvcnn_model)
    print(f"Validation Mean Absolute Error: {val_mae}")
    print(f"Validation Mean Squared Error: {val_mse}")

    test_mae, test_mse = evaluate_samples(test_samples, pvcnn_model)
    print(f"Test Mean Absolute Error: {test_mae}")
    print(f"Test Mean Squared Error: {test_mse}")

    plot_losses(train_losses, val_losses)
    plot_fit(train_samples, pvcnn_model, 'Training Set')
    plot_fit(val_samples, pvcnn_model, 'Validation Set')
    plot_fit(test_samples, pvcnn_model, 'Test Set')

except Exception as e:
    print(f"Error processing {csv_path}: {e}")
