### 학습용 csv파일 만들기 : clahe 전처리(밝기 조정) -> midas로 평균깊이 추정 -> custom_vision으로 배수구 위치 출력 -> csv파일로 저장

In [None]:
# 하나의 폴더만 clahe 전처리 처리
# CLAHE + Grayscale 변환 (어두운 배수구 주변도 더 뚜렷하게 + ROI 밝기 통계 산출용) 
import os
import cv2
import numpy as np
import pandas as pd
import requests
import torch
from tqdm import tqdm
from PIL import Image

# === MiDaS 모델 불러오기 ===
midas = torch.hub.load("intel-isl/MiDaS", "DPT_Large", trust_repo=True)
midas.eval()
midas_transforms = torch.hub.load("intel-isl/MiDaS", "transforms", trust_repo=True)
transform = midas_transforms.dpt_transform

# === Custom Vision 설정 ===
PREDICTION_KEY = "5sloqMYWiZSpfCd7HZgQ9ZpfuQAXnRWwjSw648WjXGr5Fy5f7imcJQQJ99BFACYeBjFXJ3w3AAAIACOGBDCR"
PREDICTION_URL = "https://7aiteam05ai012-prediction.cognitiveservices.azure.com/customvision/v3.0/Prediction/f360eeb2-f8df-441e-8bed-f27d3ef1797a/detect/iterations/Iteration2/image"

# === 보조 함수 ===
def get_customvision_bboxes(image_path):
    with open(image_path, "rb") as image_file:
        headers = {
            "Prediction-Key": PREDICTION_KEY,
            "Content-Type": "application/octet-stream"
        }
        response = requests.post(PREDICTION_URL, headers=headers, data=image_file)
        response.raise_for_status()
        predictions = response.json()["predictions"]
        return [p["boundingBox"] for p in predictions if p["probability"] > 0.5]

def clahe_normalize(img):
    lab = cv2.cvtColor(img, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
    cl = clahe.apply(l)
    limg = cv2.merge((cl, a, b))
    return cv2.cvtColor(limg, cv2.COLOR_LAB2RGB)

def get_bbox_roi(image, bbox, img_w, img_h):
    x = int(bbox["left"] * img_w)
    y = int(bbox["top"] * img_h)
    w = int(bbox["width"] * img_w)
    h = int(bbox["height"] * img_h)
    roi = image[y:y + h, x:x + w]
    return roi, (x, y, w, h)

# === 입력 & 출력 폴더 설정 ===
input_folder = "medium_clean_100"                         # 예측 대상 이미지 폴더 입력!!!!!!
output_folder = "depth_output_medium_clean_100_ver1"      # 결과 저장할 폴더 입력!!!!!
os.makedirs(output_folder, exist_ok=True)

summary = []

# === 전체 이미지 순회 처리 ===
for filename in tqdm(os.listdir(input_folder)):
    if not filename.lower().endswith((".jpg", ".jpeg", ".png")):
        continue

    path = os.path.join(input_folder, filename)
    bgr = cv2.imread(path)
    if bgr is None:
        continue

    rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
    clahe_img = clahe_normalize(rgb)
    gray = cv2.cvtColor(clahe_img, cv2.COLOR_RGB2GRAY)

    # === MiDaS depth 예측 ===
    input_tensor = transform(clahe_img)
    with torch.no_grad():
        pred = midas(input_tensor)
        pred = torch.nn.functional.interpolate(
            pred.unsqueeze(1), size=rgb.shape[:2], mode="bicubic", align_corners=False
        ).squeeze()
    depth_map = pred.cpu().numpy()

    # === 시각화용 컬러맵 ===
    mean_depth = np.mean(depth_map)
    vmin, vmax = max(0, mean_depth - 5), mean_depth + 5
    clipped = np.clip(depth_map, vmin, vmax)
    norm = ((clipped - vmin) / (vmax - vmin) * 255).astype(np.uint8)
    depth_colored = cv2.applyColorMap(norm, cv2.COLORMAP_INFERNO)

    # === 커스텀비전으로 bbox 예측 ===
    bboxes = get_customvision_bboxes(path)

    for i, bbox in enumerate(bboxes):
        roi_gray, (x, y, w, h) = get_bbox_roi(gray, bbox, rgb.shape[1], rgb.shape[0])
        roi_depth, _ = get_bbox_roi(depth_map, bbox, rgb.shape[1], rgb.shape[0])

        # 중심 축소 (표준편차 기반)
        if np.std(roi_depth) > 5.0:
            x, y = x + w // 4, y + h // 4
            w, h = w // 2, h // 2
            roi_gray = gray[y:y + h, x:x + w]
            roi_depth = depth_map[y:y + h, x:x + w]

        # 통계 수집
        brightness_mean = roi_gray.mean()
        brightness_std = roi_gray.std()
        depth_mean = roi_depth.mean()
        depth_std = roi_depth.std()

        # 박스 시각화
        cv2.rectangle(depth_colored, (x, y), (x + w, y + h), (0, 255, 0), 2)
        cv2.putText(depth_colored, f"{depth_mean:.2f}", (x, y - 5),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)

        summary.append({
            "filename": filename,
            "bbox_index": i,
            "brightness_mean": brightness_mean,
            "brightness_std": brightness_std,
            "depth_mean": depth_mean,
            "depth_std": depth_std,
            "x": x, "y": y, "w": w, "h": h
        })

    # 시각화 이미지 저장
    save_path = os.path.join(output_folder, filename.rsplit('.', 1)[0] + "_depth.jpg")
    cv2.imwrite(save_path, depth_colored)

# === CSV 저장 ===
df = pd.DataFrame(summary)
csv_path = os.path.join(output_folder, "clahe_depth_medium_clean_100_ver1.csv")          # 최종 csv 파일명 입력!!!!!!
df.to_csv(csv_path, index=False)
print(f"✅ 분석 완료: {csv_path}")

### 학습용 모델 만들기 : 위에서 만든 학습용 csv파일 가져오기 -> SVR 모델 학습 -> predicted-score 예측 -> 결과 저장(csv파일)

In [None]:
# 밝기 고려
import pandas as pd
import numpy as np
from sklearn.svm import SVR
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import joblib

# 📁 학습용 CSV 불러오기
file_clean = "depth_output_both_ver1/clahe_depth_m_clean_ver2.csv"                 # 학습용 csv 파일 경로 입력
file_medium_heavy = "depth_output_both_ver1/clahe_depth_m_heavy_ver2.csv"          # 학습용 csv 파일 경로 입력
file_heavy = "depth_output_heavy_62_ver1/clahe_depth_combined_heavy_ver1.csv"      # 학습용 csv 파일 경로 입력

df_clean = pd.read_csv(file_clean)
df_medium_heavy = pd.read_csv(file_medium_heavy)
df_heavy = pd.read_csv(file_heavy)

# 🎯 입력 특성 4개로 확장
features = ['brightness_mean', 'brightness_std', 'depth_mean', 'depth_std']

X_clean = df_clean[features]
X_medium_heavy = df_medium_heavy[features]
X_heavy = df_heavy[features]

# 📌 라벨링: Clean=0, Medium=50, Heavy=100
y_clean = np.full(len(X_clean), 0)
y_medium_heavy = np.full(len(X_medium_heavy), 50)
y_heavy = np.full(len(X_heavy), 100)

# 🔗 전체 데이터 병합
X = pd.concat([X_clean, X_medium_heavy, X_heavy], ignore_index=True)
y = np.concatenate([y_clean, y_medium_heavy, y_heavy])

# ⚙️ SVR 모델 학습
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = make_pipeline(StandardScaler(), SVR(kernel="rbf", C=100, epsilon=1.0))
model.fit(X_train, y_train)

# ✅ 모델 저장
joblib.dump(model, "svr_pollution_model_4features.pkl")

# 🧠 예측 결과 저장
predicted_score = model.predict(X)
X_result = X.copy()
X_result["predicted_score"] = predicted_score
X_result.to_csv("pollution_score_svr_4features.csv", index=False)                  

# 📁 예측 대상 CSV 로드 (test 이미지용 clahe + depth 완료된 CSV)
group = pd.read_csv("test_ver1/clahe_test_ver1.csv")                  # 예측할 csv파일 경로 입력!!!!!
predicted_df = pd.read_csv("pollution_score_svr_4features.csv")

# 병합: group에 predicted_score 붙이기
group_key = group[["filename", "bbox_index"]].reset_index(drop=True)
group_for_merge = pd.concat([group_key, predicted_df["predicted_score"]], axis=1)
group["predicted_score"] = group_for_merge["predicted_score"]

# 💾 저장
group.to_csv("test_ver1/clahe_test_ver1_with_score.csv", index=False)       # 저장할 csv 파일 경로 및 이름 입력!!!!!

# 📊 시각화 (예시: depth 기준 점수 분포)
plt.figure(figsize=(8, 6))
plt.scatter(X_result["depth_mean"], X_result["depth_std"],
            c=X_result["predicted_score"], cmap="coolwarm", edgecolor="k")
plt.colorbar(label="Predicted Pollution Score (0~100)")
plt.xlabel("Depth Mean")
plt.ylabel("Depth Std")
plt.title("SVR with Brightness + Depth Features")
plt.grid(True)
plt.tight_layout()
plt.show()

print("✅ 모델 학습 + 예측 + 시각화 완료 (밝기 포함 4 feature 기반)")


### 테스트용 csv파일 만들기 : clahe 전처리(밝기 조정) -> midas로 평균깊이 추정 -> custom_vision으로 배수구 위치 출력 -> csv파일로 저장

In [None]:
# 하나의 폴더만 clahe 전처리 처리
import os
import cv2
import numpy as np
import pandas as pd
import requests
import torch
from tqdm import tqdm
from PIL import Image

# === MiDaS 모델 불러오기 ===
midas = torch.hub.load("intel-isl/MiDaS", "DPT_Large", trust_repo=True)
midas.eval()
midas_transforms = torch.hub.load("intel-isl/MiDaS", "transforms", trust_repo=True)
transform = midas_transforms.dpt_transform

# === Custom Vision 설정 ===
PREDICTION_KEY = "5sloqMYWiZSpfCd7HZgQ9ZpfuQAXnRWwjSw648WjXGr5Fy5f7imcJQQJ99BFACYeBjFXJ3w3AAAIACOGBDCR"
PREDICTION_URL = "https://7aiteam05ai012-prediction.cognitiveservices.azure.com/customvision/v3.0/Prediction/f360eeb2-f8df-441e-8bed-f27d3ef1797a/detect/iterations/Iteration2/image"

# === 보조 함수 ===
def get_customvision_bboxes(image_path):
    with open(image_path, "rb") as image_file:
        headers = {
            "Prediction-Key": PREDICTION_KEY,
            "Content-Type": "application/octet-stream"
        }
        response = requests.post(PREDICTION_URL, headers=headers, data=image_file)
        response.raise_for_status()
        predictions = response.json()["predictions"]
        if not predictions:
            return []

        # 가장 높은 확률 하나만 추출
        best = max(predictions, key=lambda x: x["probability"])
        if best["probability"] > 0.5:
            return [best["boundingBox"]]
        else:
            return []
        # return [p["boundingBox"] for p in predictions if p["probability"] > 0.5]

def clahe_normalize(img):
    lab = cv2.cvtColor(img, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
    cl = clahe.apply(l)
    limg = cv2.merge((cl, a, b))
    return cv2.cvtColor(limg, cv2.COLOR_LAB2RGB)

def get_bbox_roi(image, bbox, img_w, img_h):
    x = int(bbox["left"] * img_w)
    y = int(bbox["top"] * img_h)
    w = int(bbox["width"] * img_w)
    h = int(bbox["height"] * img_h)
    roi = image[y:y + h, x:x + w]
    return roi, (x, y, w, h)

# === 입력 & 출력 폴더 설정 ===
input_folder = "test3"                           # 예측 대상 이미지 폴더 입력!!!!!"
output_folder = "test_ver3"                      # 결과 폴더 입력!!!!!"
os.makedirs(output_folder, exist_ok=True)

summary = []

# === 전체 이미지 순회 처리 ===
for filename in tqdm(os.listdir(input_folder)):
    if not filename.lower().endswith((".jpg", ".jpeg", ".png")):
        continue

    path = os.path.join(input_folder, filename)
    bgr = cv2.imread(path)
    if bgr is None:
        continue

    rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
    clahe_img = clahe_normalize(rgb)
    gray = cv2.cvtColor(clahe_img, cv2.COLOR_RGB2GRAY)

    # === MiDaS depth 예측 ===
    input_tensor = transform(clahe_img)
    with torch.no_grad():
        pred = midas(input_tensor)
        pred = torch.nn.functional.interpolate(
            pred.unsqueeze(1), size=rgb.shape[:2], mode="bicubic", align_corners=False
        ).squeeze()
    depth_map = pred.cpu().numpy()

    # === 시각화용 컬러맵 ===
    mean_depth = np.mean(depth_map)
    vmin, vmax = max(0, mean_depth - 5), mean_depth + 5
    clipped = np.clip(depth_map, vmin, vmax)
    norm = ((clipped - vmin) / (vmax - vmin) * 255).astype(np.uint8)
    depth_colored = cv2.applyColorMap(norm, cv2.COLORMAP_INFERNO)

    # === 커스텀비전으로 bbox 예측 ===
    bboxes = get_customvision_bboxes(path)

    for i, bbox in enumerate(bboxes):
        roi_gray, (x, y, w, h) = get_bbox_roi(gray, bbox, rgb.shape[1], rgb.shape[0])
        roi_depth, _ = get_bbox_roi(depth_map, bbox, rgb.shape[1], rgb.shape[0])

        # 중심 축소 (표준편차 기반)
        if np.std(roi_depth) > 5.0:
            x, y = x + w // 4, y + h // 4
            w, h = w // 2, h // 2
            roi_gray = gray[y:y + h, x:x + w]
            roi_depth = depth_map[y:y + h, x:x + w]

        # 통계 수집
        brightness_mean = roi_gray.mean()
        brightness_std = roi_gray.std()
        depth_mean = roi_depth.mean()
        depth_std = roi_depth.std()

        # 박스 시각화
        cv2.rectangle(depth_colored, (x, y), (x + w, y + h), (0, 255, 0), 2)
        cv2.putText(depth_colored, f"{depth_mean:.2f}", (x, y - 5),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)

        summary.append({
            "filename": filename,
            "bbox_index": i,
            "brightness_mean": brightness_mean,
            "brightness_std": brightness_std,
            "depth_mean": depth_mean,
            "depth_std": depth_std,
            "x": x, "y": y, "w": w, "h": h
        })

    # 시각화 이미지 저장
    save_path = os.path.join(output_folder, filename.rsplit('.', 1)[0] + "_depth.jpg")
    cv2.imwrite(save_path, depth_colored)

# === CSV 저장 ===
df = pd.DataFrame(summary)
csv_path = os.path.join(output_folder, "clahe_test_ver3.csv")
df.to_csv(csv_path, index=False)
print(f"✅ 분석 완료: {csv_path}")

### 테스트 실행 : test용 csv파일 입력 -> 이미지당 오염수치값 제일높은 bbox만 시각화 -> predicted_score 계산해서 최종 오염수치값 bbox 중앙 표시

In [None]:
# bbox 중심에 빨간 글씨로 점수 시각화 완료
import pandas as pd
import joblib
import cv2
import os

# === [1] 모델 불러오기 ===
model = joblib.load("svr_pollution_model_4features.pkl")

# === [2] CSV 로드 ===
csv_path = "test_ver3/clahe_test_ver3.csv"                   # test용 csv파일 입력!!!!!
df = pd.read_csv(csv_path)

# === [3] 예측 수행 ===
features = ['brightness_mean', 'brightness_std', 'depth_mean', 'depth_std']
X = df[features].values
df["predicted_score"] = model.predict(X)

# === [4] 이미지당 최고 bbox만 시각화 ===
output_folder = "test_ver3"                                  # output_folder 이름 입력!!!!!

for filename, group in df.groupby("filename"):
    best_row = group.loc[group["predicted_score"].idxmax()]
    score = best_row["predicted_score"]
    x, y, w, h = int(best_row.x), int(best_row.y), int(best_row.w), int(best_row.h)

    # 이미지 로드
    image_path = os.path.join(output_folder, filename.rsplit('.', 1)[0] + "_depth.jpg")
    img = cv2.imread(image_path)
    if img is None:
        print(f"❌ 이미지 불러오기 실패: {image_path}")
        continue

    # === bbox 그리기 ===
    cv2.rectangle(img, (x, y), (x + w, y + h), (0, 255, 0), 4)

    # === 텍스트 설정 ===
    label = f"{score:.1f}"
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 1.8           # 글씨 크기 조절 (적당히 큼)
    font_thickness = 4
    text_color = (0, 0, 255)   # 빨간색

    # 텍스트 크기 계산
    text_size, _ = cv2.getTextSize(label, font, font_scale, font_thickness)
    text_width, text_height = text_size

    # === 텍스트 위치 (bbox 중앙) ===
    text_x = x + (w - text_width) // 2
    text_y = y + (h + text_height) // 2

    # === 글자 테두리(검정) + 본문(빨강) ===
    cv2.putText(img, label, (text_x, text_y), font, font_scale, (0, 0, 0), font_thickness + 2, cv2.LINE_AA)
    cv2.putText(img, label, (text_x, text_y), font, font_scale, text_color, font_thickness, cv2.LINE_AA)

    # === 이미지 저장 ===
    save_path = os.path.join(output_folder, filename.rsplit('.', 1)[0] + "_labeled.jpg")
    cv2.imwrite(save_path, img)

# === [5] 최고 bbox만 CSV 저장 ===
df_top = df.loc[df.groupby("filename")["predicted_score"].idxmax()].reset_index(drop=True)
df_top.to_csv(os.path.join(output_folder, "clahe_test_best_bbox_only.csv"), index=False)    

print("✅ bbox 중심에 빨간 글씨로 점수 시각화 완료")
