# Gemini API とオープンソースによる動画処理・動画分析の手法

このハンズオンでは、動画処理のタスクを Cloud Run Jobs にデプロイして、バッチジョブとして実行する方法を学びます。

## 事前準備

### パッケージインストール

In [1]:
pip install google-cloud-run==0.10.17

Collecting google-cloud-run==0.10.17
  Downloading google_cloud_run-0.10.17-py3-none-any.whl.metadata (9.3 kB)
Downloading google_cloud_run-0.10.17-py3-none-any.whl (332 kB)
Installing collected packages: google-cloud-run
Successfully installed google-cloud-run-0.10.17
Note: you may need to restart the kernel to use updated packages.


インストールしたパッケージを利用可能にするためにカーネルを再起動します。

In [2]:
import IPython
app = IPython.Application.instance()
_ = app.kernel.do_shutdown(True)

### モジュールのインポートと初期設定

In [1]:
import json, os, tempfile, uuid
from IPython.display import Video
from google.cloud import storage

[PROJECT_ID] = !gcloud config list --format 'value(core.project)'
[PROJECT_NUMBER] = !gcloud projects describe {PROJECT_ID} --format="value(projectNumber)"
LOCATION = 'us-central1'
BUCKET_NAME = f'{PROJECT_ID}_video_files'

### 補助関数の定義

In [2]:
def upload_blob(source_path, destination_path, bucket_name=BUCKET_NAME):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_path)
    blob.upload_from_filename(source_path)
    
def download_blob(source_path, destination_path, bucket_name=BUCKET_NAME): 
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_path)
    blob.download_to_filename(destination_path)

## 動画の前処理タスクの実装

動画の前処理を行うコードを単一のファイルにまとめます。

In [3]:
%%bash
mkdir -p preprocess_job
cd preprocess_job
cat <<EOF >video_preprocessor.py
import copy, json, os, pprint, sys, tempfile, threading, time, uuid
import cv2
from datetime import datetime, timedelta
from tqdm import tqdm
from moviepy import VideoFileClip, TextClip, CompositeVideoClip, VideoClip, ColorClip

from google.cloud import storage
from google import auth

_, PROJECT_ID = auth.default()
LOCATION = 'us-central1'
BUCKET_NAME = f'{PROJECT_ID}_video_files'


def upload_blob(source_path, destination_path, bucket_name=BUCKET_NAME):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_path)
    blob.upload_from_filename(source_path)
    

def download_blob(source_path, destination_path, bucket_name=BUCKET_NAME): 
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_path)
    blob.download_to_filename(destination_path)


def add_timestamp_overlay(
    video_path,
    output_path,
    video_start_datetime='2025-01-01 00:00:00', 
):
    with tempfile.TemporaryDirectory() as temp_dir:

        video_source = os.path.join(temp_dir, video_path)
        video_dest = os.path.join(temp_dir, output_path)

        dirname, _ = os.path.split(video_source)
        if dirname:
            os.makedirs(dirname, exist_ok=True)
        dirname, _ = os.path.split(video_dest)
        if dirname:
            os.makedirs(dirname, exist_ok=True)

        download_blob(video_path, video_source)

        # Parse the start time
        start_datetime = datetime.strptime(
            video_start_datetime, '%Y-%m-%d %H:%M:%S'
        )

        # Load video and resize it
        video = VideoFileClip(video_source).resized(width=640)

        # Background of timecode area
        bg_clip = (
            ColorClip(
            size=(225, 38),   # Timecode area size
            color=(0, 0, 0),  # Black color
                duration=video.duration,
            )
            .with_opacity(0.5)
            .with_position(('left', 'top'))
        )

        # Timecode clips
        text_clips = []
        total_seconds = int(video.duration)

        for i in range(total_seconds + 1):
            current_time = start_datetime + timedelta(seconds=i)
            timestamp_text = current_time.strftime('%Y-%m-%d %H:%M:%S')

            clip_duration = 1.0
            if i == total_seconds: # Last clip might be shorter
                clip_duration = video.duration - total_seconds
                if clip_duration <= 0:
                    continue

            txt = (
                TextClip(
                    text=timestamp_text,
                    font_size=19,
                    color='white',
                    font='DejaVuSans',
                    method='label',
               )
               .with_duration(clip_duration)
               .with_start(i)
               .with_position((10, 12))
            )
            text_clips.append(txt)

        # Merge all clips
        all_clips = [video, bg_clip] + text_clips
        final_video = CompositeVideoClip(all_clips)

        # Write the result with optimized settings
        if video.audio is None:
            final_video.write_videofile(
                video_dest,
                codec='libx264',
                preset='faster',
                threads=os.cpu_count(),
                audio=False,
                remove_temp=True,
                # logger=None,  # Suppress moviepy's verbose output
            )
        else:
            with tempfile.TemporaryDirectory() as temp_dir:
                final_video.write_videofile(
                  video_dest,
                  codec='libx264',
                  preset='faster',
                  threads=os.cpu_count(),
                  audio_codec='libvorbis',
                  # temp_audiofile should be in tmpdir
                  temp_audiofile=os.path.join(temp_dir, 'temp-audio.ogg'),
                  remove_temp=True,
                  # logger=None,  # Suppress moviepy's verbose output
                )

        upload_blob(video_dest, output_path)
        
        # Cleanup
        for item in all_clips:
            item.close()
        final_video.close()

    return video_start_datetime


def detect_motion_events(video_path):
    sampling_fps = 1
    mog2_history = 500
    mog2_var_threshold = 16
    
    with tempfile.TemporaryDirectory() as temp_dir:
        video_source = os.path.join(temp_dir, video_path)
        dirname, _ = os.path.split(video_source)
        if dirname:
            os.makedirs(dirname, exist_ok=True)
        download_blob(video_path, video_source)
        
        cap = cv2.VideoCapture(video_source)

        fps = cap.get(cv2.CAP_PROP_FPS)
        assert(sampling_fps <= fps)
        frame_skip_interval = max(1, int(round(fps / sampling_fps)))
        print(
            f"Video FPS: {fps}, Sampling FPS: {sampling_fps}, Frame Skip Interval: {frame_skip_interval}"
        )

        # Initialize MOG2 background subtractor
        fgbg_mog2 = cv2.createBackgroundSubtractorMOG2(
            history=mog2_history, varThreshold=mog2_var_threshold, detectShadows=True
        )

        mog2_motion_timestamps = []
        active_rate_list = []
        frame_count = -1
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        for _ in tqdm(
            range(total_frames), desc="Processing video frames", unit="frames"
        ):
            frame_count += 1
            ret, frame = cap.read()
            if not ret:
                break

            # Process with sampling_fps
            if frame_count % frame_skip_interval != 0:
                continue

            # Apply MOG2 to get foreground mask
            fgmask_mog2 = fgbg_mog2.apply(frame)
            fgmask_mog2 = cv2.morphologyEx(fgmask_mog2, cv2.MORPH_OPEN, kernel)
            fgmask_mog2 = cv2.morphologyEx(fgmask_mog2, cv2.MORPH_CLOSE, kernel)

            # Check if moving objects occupy more than 0.5% of the entire screen
            mog2_motion_pixels = cv2.countNonZero(fgmask_mog2)
            active_rate = (mog2_motion_pixels * 100) / (frame.shape[0] * frame.shape[1])
            current_time = frame_count / fps

            if active_rate > 0.5:
                mog2_motion_timestamps.append(current_time)

            if current_time == 0:
                active_rate = 0
            active_rate_list.append(active_rate)

        cap.release()
        video_duration = total_frames / fps if fps > 0 else 0.0

    return {
        "video_duration": video_duration,
        "event_timestamp": mog2_motion_timestamps,
        "active_rate": active_rate_list,
    }


def analyze_motion_intervals(
        mog2_motion_timestamps,
        video_duration,
        time_threshold=1.5,
        min_event_duration=3.0,
        max_event_duration=180.0,
):
    assert min_event_duration < max_event_duration

    def add_event(event_list, start_time, end_time, event_type):
        # The main logic should append event periods consecutively.
        assert start_time <= end_time
        if event_list:
            assert event_list[-1]['end_time'] == start_time
        else:
            assert start_time == 0

        # Ignore if start_time == end_time
        if start_time == end_time:
            return

        # Join consecutive no_event periods
        if (
            event_type == 'no_event'
            and event_list
            and event_list[-1]['type'] == 'no_event'
        ):
            event_list[-1]['end_time'] = end_time
            return

        # Add an event period
        event_list.append(
            {'start_time': start_time, 'end_time': end_time, 'type': event_type}
        )
        return

    event_intervals = []
    timestamps = copy.copy(mog2_motion_timestamps)
    current_start_time = current_end_time = 0

    while timestamps:
        event_timestamp = timestamps.pop(0)
        if (
            event_timestamp - current_end_time <= time_threshold
        ):  # Event is continuing
            current_end_time = event_timestamp
            if (
                timestamps
                and current_end_time - current_start_time < max_event_duration
            ):
                continue

        # Now split events as:
        #   A. (current_start_time, current_end_time): event (or no_event if event_duration < min_event_duration)
        #   B. (current_end_time, event_timestamp): no_event

        # Add A.
        event_duration = current_end_time - current_start_time
        if (
            event_intervals and event_intervals[-1]['type'] == 'event'
        ):  # Event is continuing
            if (
                event_duration < 5
            ):  # Extend the last event if the current event period is too short
                event_intervals[-1]['end_time'] = current_end_time
            else:
                add_event(
                    event_intervals, current_start_time, current_end_time, 'event'
                )
        elif event_duration >= min_event_duration:
            add_event(
                event_intervals, current_start_time, current_end_time, 'event'
            )
        else:
            add_event(
                event_intervals, current_start_time, current_end_time, 'no_event'
            )

        # Add B.
        add_event(event_intervals, current_end_time, event_timestamp, 'no_event')

        current_start_time = current_end_time = event_timestamp

    # Add the final no_event (or maybe event) period until the end of the video.
    if (event_intervals[-1]['type']=='event' and
        video_duration - current_end_time <= time_threshold):
        event_intervals[-1]['end_time'] = video_duration
    else:
        add_event(event_intervals, current_end_time, video_duration, 'no_event')            

    return event_intervals


def detect_events(
    video_path,
    video_path_ts_overlay,
    video_start_datetime='2025-01-01 00:00:00'
):
    print(f'# Preprocess: {video_path}')
    add_timestamp_overlay(video_path, video_path_ts_overlay, video_start_datetime)
    print(f'# Detect events: {video_path_ts_overlay}')
    motion_events = detect_motion_events(video_path_ts_overlay)     
    merged_events = analyze_motion_intervals(
        motion_events.get('event_timestamp'),
        motion_events.get("video_duration"),
    )
    active_rate = motion_events.get('active_rate')

    return {
        'video_path_ts_overlay': video_path_ts_overlay,
        'video_start_datetime': video_start_datetime,
        'merged_events': merged_events,
        'active_rate': active_rate,
    }

#####

def start_job():
    gcs_payload_uri = os.environ.get("GCS_PAYLOAD_URI")
    json_payload_string = None

    if not gcs_payload_uri:
        print("GCS_PAYLOAD_URI environment variable not found. Cannot load job payload.")
        sys.exit(1)

    print(f"Attempting to download payload from: {gcs_payload_uri}")
    try:
        if not gcs_payload_uri.startswith(f"gs://{BUCKET_NAME}/"):
            print(f"GCS_PAYLOAD_URI is not in the expected format or bucket. URI: {gcs_payload_uri}")
            sys.exit(1)
        
        blob_name = gcs_payload_uri.split(f"gs://{BUCKET_NAME}/", 1)[1]
        storage_client = storage.Client()
        bucket = storage_client.bucket(BUCKET_NAME)
        blob = bucket.blob(blob_name)
        json_payload_string = blob.download_as_text()

        if json_payload_string is False:
            print(f"Failed to download payload from GCS URI: {gcs_payload_uri}")
            sys.exit(1)
        if not json_payload_string:
            print(f"Downloaded payload from GCS is empty. URI: {gcs_payload_uri}")
            sys.exit(1)
            
        print(f"Successfully downloaded and read payload from {gcs_payload_uri}")

    except Exception as e:
        print(f"Error downloading or processing payload from GCS URI {gcs_payload_uri}: {e}", exc_info=True)
        sys.exit(1)

    try:
        input_data = json.loads(json_payload_string)
        event_type = input_data["event_type"]
        job_id = input_data["job_id"]
        analysis_request = input_data['analysis_request']
 
        print(f"Event type: {event_type}")
        print(f"Job id: {job_id}")
        print(f"Analysis request: {analysis_request}")

        if event_type == "video_preprocess":
            video_path = analysis_request["video_path"]
            video_path_ts_overlay = analysis_request["video_path_ts_overlay"]
            video_start_datetime = analysis_request["video_start_datetime"]
            
            ret = detect_events(
                video_path,
                video_path_ts_overlay,
                video_start_datetime=video_start_datetime
            )

            if not ret:
                print(f"Video analysis job is failed")
                sys.exit(1)
        else:
            print(f"Not supported event type: {event_type}")
            sys.exit(1)

    except Exception as e:
        print(f"Error at executing job: {e}", exc_info=True)
        sys.exit(1)


    print("Success to process long-running job ...")
    sys.exit(0)


if __name__ == "__main__":
    start_job()
EOF

### 実行例

このコードは、リクエスト内容を記述した JSON ファイルを GCS バケットに保存して実行します。

次は、リクエストファイルを作成して、該当ファイルの URI を環境変数 `GCS_PAYLOAD_URI` にセットしています。

In [4]:
job_id = str(uuid.uuid4())
request_file = f'{job_id}.json'
gcs_payload_uri = f'gs://{BUCKET_NAME}/job_request/{request_file}'

os.environ['GCS_PAYLOAD_URI'] = gcs_payload_uri

request_content = {
    'event_type': 'video_preprocess',
    'job_id': job_id,
    'analysis_request': {
        'video_path': 'original/room_camera_example.mov',
        'video_path_ts_overlay': 'processed/room_camera_example.mp4',
        'video_start_datetime': '2025-01-01 12:30:00',
    },
}

with tempfile.TemporaryDirectory() as temp_dir:
    tmp_request_file = os.path.join(temp_dir, request_file)
    with open(tmp_request_file, 'wt') as f:
        f.write(json.dumps(request_content))
    upload_blob(tmp_request_file, f'job_request/{request_file}')

既存の前処理済みファイル `room_camera_example.mp4` を削除しておきます。

In [5]:
!gsutil rm gs://{BUCKET_NAME}/processed/room_camera_example.mp4
!gsutil ls -l gs://{BUCKET_NAME}/processed/room_camera_example.mp4

Removing gs://video-analysis-handson00_video_files/processed/room_camera_example.mp4...
/ [1 objects]                                                                   
Operation completed over 1 objects.                                              
CommandException: One or more URLs matched no objects.


この状態でコードを実行すると、リクエストファイルに従った処理が行われます。

In [7]:
%%bash
echo $GCS_PAYLOAD_URI
python preprocess_job/video_preprocessor.py

gs://video-analysis-handson00_video_files/job_request/9b2e868b-0640-465d-ac61-246330e3646d.json
Attempting to download payload from: gs://video-analysis-handson00_video_files/job_request/9b2e868b-0640-465d-ac61-246330e3646d.json
Successfully downloaded and read payload from gs://video-analysis-handson00_video_files/job_request/9b2e868b-0640-465d-ac61-246330e3646d.json
Event type: video_preprocess
Job id: 9b2e868b-0640-465d-ac61-246330e3646d
Analysis request: {'video_path': 'original/room_camera_example.mov', 'video_path_ts_overlay': 'processed/room_camera_example.mp4', 'video_start_datetime': '2025-01-01 12:30:00'}
# Preprocess: original/room_camera_example.mov
{'video_found': True, 'audio_found': True, 'metadata': {'major_brand': 'qt', 'minor_version': '0', 'compatible_brands': 'qt', 'creation_time': '2025-07-27T00:24:34.000000Z', 'com.apple.quicktime.make': 'Apple', 'com.apple.quicktime.model': 'Mac15,12', 'com.apple.quicktime.software': 'macOS 15.5 (24F74)', 'com.apple.quicktime.cre

frame_index:  67%|██████▋   | 2607/3893 [01:29<00:43, 29.43it/s, now=None]

MoviePy - Done !
MoviePy - video ready /var/tmp/tmpejqe6c68/processed/room_camera_example.mp4
# Detect events: processed/room_camera_example.mp4
Video FPS: 59.95, Sampling FPS: 1, Frame Skip Interval: 60


Processing video frames: 100%|██████████| 3893/3893 [00:01<00:00, 3725.64frames/s]


Success to process long-running job ...


In [8]:
!gsutil ls -l gs://{BUCKET_NAME}/processed/room_camera_example.mp4

   1445927  2025-07-27T04:32:38Z  gs://video-analysis-handson00_video_files/processed/room_camera_example.mp4
TOTAL: 1 objects, 1445927 bytes (1.38 MiB)


In [None]:
processed_video_path = f'processed/room_camera_example.mp4'
tmpfile = f'/tmp/tmp_movie_{uuid.uuid4()}.mp4'
download_blob(processed_video_path, tmpfile)
Video(tmpfile, embed=True, width=480)

## Croud Run Jobs へのデプロイ

`Docerfile` と `requirements.txt` を用意します。

In [10]:
%%bash
cd preprocess_job
cat <<EOF >Dockerfile
FROM python:3.12-slim

# Packages for OpenCV
RUN apt-get update && apt-get install -y \
    libgl1-mesa-glx \
    libglib2.0-0 \
    fontconfig \
    fonts-dejavu \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "video_preprocessor.py"]
EOF

cat <<EOF >requirements.txt
fastapi[all]==0.115.12
loguru==0.7.3
requests==2.32.3
grpcio==1.67.1
grpcio-status==1.67.1
python-jose==3.3.0
passlib==1.7.4
msgspec==0.19.0

# GCP dependency
google-cloud-logging==3.12.1
google-cloud-run==0.10.17
google-cloud-storage==2.14.0

# Used for Video rendering
opencv-python==4.11.0.86
moviepy==2.1.2
tqdm==4.67.1
EOF

ビルド用のリポジトリを作成します。

In [11]:
REPO_NAME = 'cloud-run-source-deploy'
REPO = f'{LOCATION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}'

result = !gcloud artifacts repositories describe --location {LOCATION} {REPO_NAME}
if result and result[0].startswith('ERROR: '):
    !gcloud artifacts repositories create {REPO_NAME} \
        --repository-format docker --location {LOCATION}

JOB_NAME = 'video-preprocess-job'

Create request issued for: [cloud-run-source-deploy]
Waiting for operation [projects/video-analysis-handson00/locations/us-central1/
operations/bc2f13bd-6171-48d7-a5a1-3f08f677610f] to complete...done.           
Created repository [cloud-run-source-deploy].


コンテナイメージをビルドして、Cloud Run Jobs にデプロイします。

In [None]:
!cd preprocess_job && gcloud builds submit --tag {REPO}/{JOB_NAME}
!gcloud run jobs deploy {JOB_NAME} \
    --image {REPO}/{JOB_NAME} \
    --region us-central1 \
    --task-timeout 3600s \
    --cpu=4 \
    --memory=8Gi | cat

### 実行例

既存の前処理済みファイル `room_camera_example.mp4` を削除しておきます。

In [13]:
!gsutil rm gs://{BUCKET_NAME}/processed/room_camera_example.mp4
!gsutil ls -l gs://{BUCKET_NAME}/processed/room_camera_example.mp4

Removing gs://video-analysis-handson00_video_files/processed/room_camera_example.mp4...
/ [1 objects]                                                                   
Operation completed over 1 objects.                                              
CommandException: One or more URLs matched no objects.


次のように、`RunJobRequest.Overrides()` で環境変数の値をセットしてジョブの実行をリクエストします。

In [16]:
from google.cloud.run_v2.services.jobs import JobsClient
from google.cloud.run_v2.types import RunJobRequest

gcs_payload_uri = f'gs://{BUCKET_NAME}/job_request/{request_file}'
overrides = RunJobRequest.Overrides(
    container_overrides=[
        RunJobRequest.Overrides.ContainerOverride(
            env=[
                {"name": "GCS_PAYLOAD_URI", "value": gcs_payload_uri},
            ],
        )
    ]
)

job_path = f'projects/{PROJECT_ID}/locations/{LOCATION}/jobs/{JOB_NAME}'
request = RunJobRequest(name=job_path, overrides=overrides)
operation = JobsClient().run_job(request=request)
operation.metadata

name: "projects/video-analysis-handson00/locations/us-central1/jobs/video-preprocess-job/executions/video-preprocess-job-vlg4b"
uid: "40c5d398-5b80-4ef6-bac1-9eb8c4599da0"
generation: 1
create_time {
  seconds: 1753591078
  nanos: 921655000
}
update_time {
  seconds: 1753591078
  nanos: 921655000
}
launch_stage: GA
job: "video-preprocess-job"
parallelism: 1
task_count: 1
template {
  containers {
    image: "us-central1-docker.pkg.dev/video-analysis-handson00/cloud-run-source-deploy/video-preprocess-job"
    env {
      name: "GCS_PAYLOAD_URI"
      value: "gs://video-analysis-handson00_video_files/job_request/9b2e868b-0640-465d-ac61-246330e3646d.json"
    }
    resources {
      limits {
        key: "cpu"
        value: "4"
      }
      limits {
        key: "memory"
        value: "8Gi"
      }
    }
  }
  max_retries: 3
  timeout {
    seconds: 3600
  }
  service_account: "127334181968-compute@developer.gserviceaccount.com"
  execution_environment: EXECUTION_ENVIRONMENT_GEN2
}
rec

Cloud Console の Cloud Run Jobs [管理ページ](https://console.cloud.google.com/run/jobs) からジョブの実行状況を確認してください。


## 演習課題

Cloud Run Jobs を用いて、動画の前処理に加えて、Gemini API による分析処理も行えるように実装してください。