<a href="https://colab.research.google.com/github/akiyamatakanori/Deploy-and-Manage-Cloud-Environments-with-Google-Cloud-Challenge-Lab/blob/main/%E7%89%A9%E4%BD%93%E6%A4%9C%E7%9F%A5%E3%82%A2%E3%83%97%E3%83%AA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Object Detection Demo

In [14]:
# ステップ1: 必要なライブラリのインストール
print("📦 必要なライブラリをインストールしています...")
!pip install -q streamlit opencv-python-headless numpy matplotlib torch torchvision pillow transformers

📦 必要なライブラリをインストールしています...


In [15]:
# ステップ2: アプリケーションファイルの作成
print("\n📝 アプリケーションファイルを作成しています...")

app_code = '''
import streamlit as st

# ページ設定（最初に実行する必要がある）
try:
    st.set_page_config(
        page_title="物体検知アプリ",
        page_icon="🎯",
        layout="wide",
        initial_sidebar_state="expanded"
    )
except:
    pass

import torch
from transformers import DetrImageProcessor, DetrForObjectDetection
import cv2
import numpy as np
from PIL import Image
import tempfile
import os
import warnings
warnings.filterwarnings('ignore')

# カスタムCSS
st.markdown("""
<style>
    .main { padding: 0rem 1rem; }
    .stButton>button {
        width: 100%;
        background-color: #4CAF50;
        color: white;
        font-weight: bold;
        border-radius: 5px;
        border: none;
        padding: 0.5rem 1rem;
        margin: 0.5rem 0;
    }
    .stButton>button:hover { background-color: #45a049; }
    h1 { color: #2e7d32; text-align: center; padding: 1rem 0; }
    h2 { color: #1976d2; border-bottom: 2px solid #1976d2; padding-bottom: 0.5rem; }
</style>
""", unsafe_allow_html=True)

# セッション状態の初期化
if 'detector' not in st.session_state:
    st.session_state.detector = None
if 'processed_images' not in st.session_state:
    st.session_state.processed_images = []
if 'processed_videos' not in st.session_state:
    st.session_state.processed_videos = []

@st.cache_resource
def load_detector(model_name="facebook/detr-resnet-50"):
    """モデルを読み込む（キャッシュ対応）"""
    try:
        processor = DetrImageProcessor.from_pretrained(model_name)
        model = DetrForObjectDetection.from_pretrained(model_name)
        device = "cuda" if torch.cuda.is_available() else "cpu"
        model.to(device)
        model.eval()
        return processor, model, device
    except Exception as e:
        st.error(f"モデルの読み込みに失敗しました: {str(e)}")
        return None, None, None

def detect_objects(processor, model, device, image, confidence_threshold=0.5):
    """画像から物体を検出"""
    try:
        if isinstance(image, np.ndarray):
            image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        else:
            image_pil = image

        inputs = processor(images=image_pil, return_tensors="pt").to(device)

        with torch.no_grad():
            outputs = model(**inputs)

        target_sizes = torch.tensor([image_pil.size[::-1]])
        results = processor.post_process_object_detection(
            outputs, target_sizes=target_sizes, threshold=confidence_threshold
        )[0]

        return results
    except Exception as e:
        st.error(f"物体検出エラー: {str(e)}")
        return None

def draw_detections(image, results, model):
    """検出結果を画像に描画"""
    if isinstance(image, Image.Image):
        image = np.array(image)

    image_with_boxes = image.copy()
    colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0),
              (255, 0, 255), (0, 255, 255), (128, 0, 0), (0, 128, 0)]

    for idx, (score, label, box) in enumerate(zip(results["scores"], results["labels"], results["boxes"])):
        box = [int(i) for i in box.tolist()]
        class_name = model.config.id2label[label.item()]
        color = colors[label.item() % len(colors)]

        cv2.rectangle(image_with_boxes, (box[0], box[1]), (box[2], box[3]), color, 3)

        label_text = f"{class_name}: {score:.2f}"
        label_size, _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
        label_y = box[1] - 10 if box[1] - 10 > 10 else box[1] + label_size[1] + 10

        cv2.rectangle(
            image_with_boxes,
            (box[0], label_y - label_size[1] - 5),
            (box[0] + label_size[0], label_y + 5),
            color, -1
        )

        cv2.putText(
            image_with_boxes, label_text,
            (box[0], label_y),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.8, (255, 255, 255), 2
        )

    return image_with_boxes

def main():
    st.markdown("<h1>🎯 Object Detection App</h1>", unsafe_allow_html=True)
    st.markdown("<p style='text-align: center; color: #666;'>DETR (Detection Transformer) を使用した物体検知システム</p>", unsafe_allow_html=True)

    # サイドバー
    with st.sidebar:
        st.markdown("## ⚙️ 設定")

        model_options = {
            "DETR ResNet-50": "facebook/detr-resnet-50",
            "DETR ResNet-101": "facebook/detr-resnet-101",
        }
        selected_model = st.selectbox("モデルを選択", options=list(model_options.keys()))

        confidence_threshold = st.slider(
            "信頼度閾値", min_value=0.1, max_value=1.0, value=0.5, step=0.05
        )

        st.markdown("### 🎬 動画処理設定")
        max_frames = st.number_input(
            "処理する最大フレーム数", min_value=1, max_value=100, value=30
        )

        device = "cuda" if torch.cuda.is_available() else "cpu"
        st.info(f"実行デバイス: {device.upper()}")

    # メインコンテンツ
    tabs = st.tabs(["📷 画像処理", "🎬 動画処理", "📊 処理結果"])

    # 画像処理タブ
    with tabs[0]:
        st.markdown("## 📷 画像ファイルのアップロード")

        uploaded_images = st.file_uploader(
            "画像ファイルを選択してください",
            type=['png', 'jpg', 'jpeg', 'bmp', 'tiff'],
            accept_multiple_files=True
        )

        if uploaded_images:
            st.markdown(f"### 📁 アップロードされたファイル: {len(uploaded_images)}個")

            selected_images = []
            cols = st.columns(3)
            for idx, uploaded_file in enumerate(uploaded_images):
                with cols[idx % 3]:
                    image = Image.open(uploaded_file)
                    st.image(image, caption=uploaded_file.name, use_column_width=True)
                    if st.checkbox(f"処理する", key=f"img_{idx}"):
                        selected_images.append((uploaded_file, image))

            if st.button("🚀 選択した画像を処理", type="primary"):
                if not selected_images:
                    st.warning("処理する画像を選択してください")
                else:
                    # モデルの読み込み
                    with st.spinner("モデルを読み込んでいます..."):
                        processor, model, device = load_detector(model_options[selected_model])

                    if model is None:
                        return

                    # 画像処理
                    st.markdown("### 🔄 処理中...")
                    progress_bar = st.progress(0)

                    processed_results = []
                    for idx, (file, image) in enumerate(selected_images):
                        st.text(f"処理中: {file.name}")
                        image_np = np.array(image)

                        results = detect_objects(processor, model, device, image_np, confidence_threshold)
                        if results is not None:
                            processed_img = draw_detections(image_np, results, model)

                            detections = []
                            for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
                                class_name = model.config.id2label[label.item()]
                                detections.append({
                                    'class': class_name,
                                    'confidence': float(score),
                                    'box': [int(i) for i in box.tolist()]
                                })

                            processed_results.append({
                                'name': file.name,
                                'original': image_np,
                                'processed': processed_img,
                                'detections': detections
                            })

                        progress_bar.progress((idx + 1) / len(selected_images))

                    st.session_state.processed_images = processed_results
                    st.success(f"✅ {len(processed_results)}個の画像処理が完了しました")

    # 動画処理タブ
    with tabs[1]:
        st.markdown("## 🎬 動画ファイルのアップロード")

        uploaded_videos = st.file_uploader(
            "動画ファイルを選択してください",
            type=['mp4', 'avi', 'mov', 'mkv'],
            accept_multiple_files=True
        )

        if uploaded_videos:
            st.markdown(f"### 📁 アップロードされたファイル: {len(uploaded_videos)}個")

            selected_videos = []
            for idx, uploaded_file in enumerate(uploaded_videos):
                col1, col2 = st.columns([3, 1])
                with col1:
                    st.text(f"📹 {uploaded_file.name}")
                with col2:
                    if st.checkbox(f"処理する", key=f"vid_{idx}"):
                        selected_videos.append(uploaded_file)

            if st.button("🚀 選択した動画を処理", type="primary", key="process_videos"):
                if not selected_videos:
                    st.warning("処理する動画を選択してください")
                else:
                    # モデルの読み込み
                    with st.spinner("モデルを読み込んでいます..."):
                        processor, model, device = load_detector(model_options[selected_model])

                    if model is None:
                        return

                    # 動画処理
                    st.markdown("### 🔄 処理中...")

                    processed_results = []
                    for video_file in selected_videos:
                        st.text(f"処理中: {video_file.name}")

                        # 一時ファイルに保存
                        with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file:
                            tmp_file.write(video_file.read())
                            tmp_path = tmp_file.name

                        # 動画処理
                        cap = cv2.VideoCapture(tmp_path)
                        fps = int(cap.get(cv2.CAP_PROP_FPS))
                        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

                        frames_to_process = min(max_frames, total_frames)
                        processed_frames = []
                        frame_detections = []

                        progress_bar = st.progress(0)
                        for i in range(frames_to_process):
                            ret, frame = cap.read()
                            if not ret:
                                break

                            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                            results = detect_objects(processor, model, device, frame_rgb, confidence_threshold)

                            if results is not None:
                                frame_with_boxes = draw_detections(frame_rgb, results, model)
                                processed_frames.append(frame_with_boxes)

                                detections = []
                                for score, label in zip(results["scores"], results["labels"]):
                                    class_name = model.config.id2label[label.item()]
                                    detections.append({
                                        'class': class_name,
                                        'confidence': float(score),
                                        'frame': i
                                    })
                                frame_detections.append(detections)

                            progress_bar.progress((i + 1) / frames_to_process)

                        cap.release()
                        os.unlink(tmp_path)

                        if processed_frames:
                            processed_results.append({
                                'name': video_file.name,
                                'frames': processed_frames,
                                'detections': frame_detections,
                                'fps': fps
                            })

                    st.session_state.processed_videos = processed_results
                    st.success(f"✅ {len(processed_results)}個の動画処理が完了しました")

    # 処理結果タブ
    with tabs[2]:
        st.markdown("## 📊 処理結果")

        # 画像の結果表示
        if st.session_state.processed_images:
            st.markdown("### 🖼️ 画像処理結果")

            for result in st.session_state.processed_images:
                st.markdown(f"#### 📄 {result['name']}")

                col1, col2 = st.columns(2)
                with col1:
                    st.markdown("**オリジナル画像**")
                    st.image(result['original'], use_column_width=True)

                with col2:
                    st.markdown("**検出結果**")
                    st.image(result['processed'], use_column_width=True)

                if result['detections']:
                    with st.expander("🔍 検出されたオブジェクトの詳細"):
                        for det in result['detections']:
                            st.write(f"- **{det['class']}** (信頼度: {det['confidence']:.2%})")
                else:
                    st.info("オブジェクトが検出されませんでした")

                st.markdown("---")

        # 動画の結果表示
        if st.session_state.processed_videos:
            st.markdown("### 🎬 動画処理結果")

            for result in st.session_state.processed_videos:
                st.markdown(f"#### 📹 {result['name']}")
                st.info(f"FPS: {result['fps']}, 処理フレーム数: {len(result['frames'])}")

                frame_idx = st.slider(
                    "表示するフレーム", 0, len(result['frames']) - 1, 0,
                    key=f"slider_{result['name']}"
                )

                st.image(result['frames'][frame_idx], use_column_width=True)

                if result['detections'][frame_idx]:
                    with st.expander("🔍 このフレームの検出オブジェクト"):
                        for det in result['detections'][frame_idx]:
                            st.write(f"- **{det['class']}** (信頼度: {det['confidence']:.2%})")

                st.markdown("---")

        if not st.session_state.processed_images and not st.session_state.processed_videos:
            st.info("まだ処理された結果がありません。")

if __name__ == "__main__":
    main()
'''

# アプリケーションファイルを保存
with open('streamlit_app.py', 'w', encoding='utf-8') as f:
    f.write(app_code)

print("✅ アプリケーションファイルが作成されました: streamlit_app.py")


📝 アプリケーションファイルを作成しています...
✅ アプリケーションファイルが作成されました: streamlit_app.py


In [28]:
# Install pyngrok
!pip install -q pyngrok

In [29]:
# ステップ3: Streamlitの実行方法を表示
print("\n" + "="*60)
print("🚀 Streamlitアプリの実行方法")
print("="*60)

print("""
以下のいずれかの方法でアプリを実行してください：

### 方法1: ローカルURL（推奨）
次のセルで以下を実行:
!streamlit run streamlit_app.py --server.port 8501 --server.address localhost

その後、表示されるローカルURLにアクセス
""")

print("""
### 方法2: ngrokを使用（外部アクセス可能）
# ================================================
# Streamlit物体検知アプリ用 ngrok設定
# ================================================
# 1. pyngrokのインストール
""")
print("!pip install -q pyngrok") # Print the install command as a separate line

print("""
# 2. ngrokとStreamlitの起動
""")

# The actual ngrok setup code should be here or in a separate cell,
# not inside a printed multi-line string that represents instructions.
# I will move the relevant code outside this print block.

print('''
以下のコマンドを別々のセルで実行することもできます:

# セル1: Streamlitを起動
!streamlit run streamlit_app.py --server.port 8501 &

# セル2: ngrokでトンネルを作成
from pyngrok import ngrok
ngrok.set_auth_token("2zmXKSu9aeYc9fK2c3WloIz30sn_81oUvaMFw3r7eV8zHXJAo")
public_url = ngrok.connect(8501)
print(f"Public URL: {public_url}")
''')

# The actual code to run Streamlit and ngrok should be outside the print statements.
# I will add the necessary code for Method 2 below the instructions print out.

print("\n" + "="*60)
print("🚀 ngrok経由でのStreamlitアプリ起動:")
print("="*60)

import subprocess
import time
from pyngrok import ngrok

# ngrokの認証トークンを設定（既存のトークンを使用）
ngrok.set_auth_token("2zmXKSu9aeYc9fK2c3WloIz30sn_81oUvaMFw3r7eV8zHXJAo")

# Streamlitアプリをバックグラウンドで起動
# ポート8501（Streamlitのデフォルト）を使用
print("🚀 Streamlitアプリを起動しています...")
process = subprocess.Popen(
    ["streamlit", "run", "streamlit_app.py", "--server.port", "8501"],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE
)

# Streamlitの起動を待つ
print("⏳ Streamlitの起動を待機中...")
time.sleep(5)

# ngrokトンネルを作成（ポート8501に接続）
try:
    # 既存のトンネルがある場合は閉じる
    ngrok.kill()
except:
    pass

# 新しいトンネルを作成
print("🌐 ngrokトンネルを作成中...")
public_url = ngrok.connect(8501)

print("\n" + "="*60)
print("✅ アプリケーションが起動しました！")
print("="*60)
print(f"\n🔗 公開URL: {public_url}")
print(f"🔗 ローカルURL: http://localhost:8501")
print("\n上記のURLをクリックしてアプリにアクセスしてください。")
print("\n⚠️  注意事項:")
print("- 初回アクセス時はモデルのダウンロードに時間がかかります")
print("- セッションを終了する場合は、ランタイムを再起動してください")

# プロセスの状態を確認
import psutil
import os

print("\n📊 実行中のプロセス:")
for proc in psutil.process_iter(['pid', 'name']):
    if 'streamlit' in proc.info['name'].lower():
        print(f"- Streamlit (PID: {proc.info['pid']})")


🚀 Streamlitアプリの実行方法

以下のいずれかの方法でアプリを実行してください：

### 方法1: ローカルURL（推奨）
次のセルで以下を実行:
!streamlit run streamlit_app.py --server.port 8501 --server.address localhost

その後、表示されるローカルURLにアクセス


### 方法2: ngrokを使用（外部アクセス可能）
# Streamlit物体検知アプリ用 ngrok設定
# 1. pyngrokのインストール

!pip install -q pyngrok

# 2. ngrokとStreamlitの起動


以下のコマンドを別々のセルで実行することもできます:

# セル1: Streamlitを起動
!streamlit run streamlit_app.py --server.port 8501 &

# セル2: ngrokでトンネルを作成
from pyngrok import ngrok
ngrok.set_auth_token("2zmXKSu9aeYc9fK2c3WloIz30sn_81oUvaMFw3r7eV8zHXJAo")
public_url = ngrok.connect(8501)
print(f"Public URL: {public_url}")


🚀 ngrok経由でのStreamlitアプリ起動:
🚀 Streamlitアプリを起動しています...
⏳ Streamlitの起動を待機中...
🌐 ngrokトンネルを作成中...

✅ アプリケーションが起動しました！

🔗 公開URL: NgrokTunnel: "https://7a9bae3419b8.ngrok-free.app" -> "http://localhost:8501"
🔗 ローカルURL: http://localhost:8501

上記のURLをクリックしてアプリにアクセスしてください。

⚠️  注意事項:
- 初回アクセス時はモデルのダウンロードに時間がかかります
- セッションを終了する場合は、ランタイムを再起動してください

📊 実行中のプロセス:
- Streamlit (PID: 14186)
