In [None]:
from google.colab import drive
drive.mount('/content/drive')
!pip install google-genai --quiet
from google import genai
import pandas as pd
import re
from typing import List, Dict
from tqdm import tqdm
from datetime import datetime
import time
import fsspec
from google.cloud import bigquery
from google.genai.types import CreateBatchJobConfig
from google.genai.types import HttpOptions, Part
import mimetypes
import csv
import json
import ast
from pathlib import Path
import vertexai
from vertexai.tuning import sft

#Preparation

In [None]:
# Authenticate
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

In [None]:
%cd "/content/drive/Shareddrives/CS152 Project/dataset/"

In [None]:
# Initialize GCloud
!curl https://sdk.cloud.google.com | bash
!gcloud init

In [None]:
# First copy all dataset images to Google Cloud bucket
!gsutil -m cp -r "/content/drive/Shareddrives/CS152 Project/dataset/all" gs://cs152_images/

In [None]:
# A prompt which gives guidance on AI generation and asks detailed output

normal = """
You are an expert digital forensics analyst trained in Professor Hany Farid's methodologies for detecting AI-generated faces. Analyze the provided facial image for signs of artificial generation.
Examine the following categories systematically:
1. Anatomical Integrity
Count facial features (eyes, nostrils, ears, etc.) - are there duplicates or missing elements?
Check for impossible anatomical configurations
Verify natural placement and proportions of features
Look for missing details (eyelashes, tear ducts, nasal hair, skin pores)
2. Phenotypic Plausibility
Assess if phenotypic combinations are statistically probable (e.g., skin tone vs. eye color)
Check for impossible genetic combinations
Verify age-appropriate features match across the face
3. Geometric Consistency
Analyze facial symmetry (natural faces are slightly asymmetric)
Check perspective consistency across features
Verify consistent facial landmark alignment
Look for warping or morphing artifacts
4. Texture and Detail Analysis
Examine skin texture consistency and realism
Check hair patterns for naturalness and consistent growth direction
Verify consistent detail resolution across facial regions
Look for smoothing or sharpening artifacts
5. Ocular Examination
Verify matching reflections in both eyes
Check iris pattern complexity and uniqueness
Examine pupil shape and size consistency
Look for natural eye moisture and blood vessels
6. Lighting and Shadow Coherence
Verify consistent light source direction across all features
Check shadow placement and softness
Examine specular highlights for consistency
Look for impossible lighting conditions
7. Edge and Transition Analysis
Examine face-to-background transitions
Check for halo effects or unnatural boundaries
Verify natural hair-to-skin transitions
Look for copy-paste or blending artifacts

Output Format:

Classification: [0/1/Don't know]
0 = Real/authentic face
1 = AI-generated face
Don't know = Insufficient information or ambiguous indicators
Confidence Score: [X]% Provide a percentage (0-100%) indicating your confidence in the binary classification.

Brief Justification: In 2-3 sentences, cite the most significant indicators that led to your classification. Keep this concise and do not just repeat my guidance above.
Important Notes:
If image quality is too low to make reliable assessments, output "Don't know"
Weight multiple subtle anomalies more heavily than single obvious features
Consider that some real faces may have unusual features due to medical conditions, cosmetic procedures, or rare genetics
Focus on patterns consistent with known GAN, diffusion model, or other AI generation artifacts"""

In [None]:
# A prompt which gives guidance on AI generation but asks for a binary classification

normal_binary = """
You are an expert digital forensics analyst trained in Professor Hany Farid's methodologies for detecting AI-generated faces. Analyze the provided facial image for signs of artificial generation.
Examine the following categories systematically:
1. Anatomical Integrity
Count facial features (eyes, nostrils, ears, etc.) - are there duplicates or missing elements?
Check for impossible anatomical configurations
Verify natural placement and proportions of features
Look for missing details (eyelashes, tear ducts, nasal hair, skin pores)
2. Phenotypic Plausibility
Assess if phenotypic combinations are statistically probable (e.g., skin tone vs. eye color)
Check for impossible genetic combinations
Verify age-appropriate features match across the face
3. Geometric Consistency
Analyze facial symmetry (natural faces are slightly asymmetric)
Check perspective consistency across features
Verify consistent facial landmark alignment
Look for warping or morphing artifacts
4. Texture and Detail Analysis
Examine skin texture consistency and realism
Check hair patterns for naturalness and consistent growth direction
Verify consistent detail resolution across facial regions
Look for smoothing or sharpening artifacts
5. Ocular Examination
Verify matching reflections in both eyes
Check iris pattern complexity and uniqueness
Examine pupil shape and size consistency
Look for natural eye moisture and blood vessels
6. Lighting and Shadow Coherence
Verify consistent light source direction across all features
Check shadow placement and softness
Examine specular highlights for consistency
Look for impossible lighting conditions
7. Edge and Transition Analysis
Examine face-to-background transitions
Check for halo effects or unnatural boundaries
Verify natural hair-to-skin transitions
Look for copy-paste or blending artifacts

Output Format:

Either "0" or "1", where "0" indicates the image is Real and "1" indicates the image is AI-Generated. Do NOT output anything else.
"""

In [None]:
# A short prompt which just asks for binary classification on the image task
short = """
Analyze the provided facial image for signs of artificial generation.

Output Format:

Either "0" or "1", where "0" indicates the image is Real and "1" indicates the image is AI-Generated. Do NOT output anything else.
"""

In [None]:
# Load train, dev, or test dataset. Store prompt to use from above.

dataset = "train.csv" # @param {type:"string"}
df = pd.read_csv(dataset)
prompt_type = "long" # @param ["long", "long binary", "short binary"] {type:"string"}
if prompt_type == "long":
  prompt = normal
elif prompt_type == "long binary":
  prompt = normal_binary
else:
  prompt = short

In [None]:
# Package inputs into JSON for batch-calling Gemini

records = [
    {
        "prompt": prompt,
        "parts": [
            {"fileUri": "gs://cs152_images/all" + path},
        ],
    }
    for path in df['Image Path']
]


OUTPUT_FILE = f"requests_{prompt_type}_prompt.jsonl"
TEMPERATURE = 0.4

with open(OUTPUT_FILE, "w", encoding="utf-8") as fout:
    for rec in records:
        parts = [{"text": rec["prompt"].strip()}]
        for media in rec["parts"]:
            uri = media["fileUri"]
            mime, _ = mimetypes.guess_type(uri)
            mime = mime or "application/octet-stream"
            parts.append({
                "file_data": {
                    "file_uri": uri,
                    "mime_type": mime
                }
            })
        obj = {
            "request": {
                "contents": [
                    {
                        "role": "user",
                        "parts": parts
                    }
                ],
                "generationConfig": {
                    "temperature": TEMPERATURE
                }
            }
        }

        fout.write(json.dumps(obj))
        fout.write("\n")

print(f"Wrote {len(records)} requests to {OUTPUT_FILE}")

In [None]:
# Send .json to Cloud
!gsutil -m cp -r "test_requests_short_prompt.jsonl" gs://cs152_images/

# Batch-call Gemini on dataset

In [None]:
#Set Google Cloud details to use for batch-calls

INPUT_DATA = "gs://cs152_images/test_requests_short_prompt.jsonl"  # @param {type:"string"}
PROJECT_ID = "gemini-deepfake-detection"
LOCATION = "us-central1"  # @param {type:"string"}
BUCKET_URI = "gs://cs152_test_output"  # @param {type:"string"}
MODEL_ID = "gemini-2.0-flash-001" # @param {type:"string"}

if BUCKET_URI == "[your-cloud-storage-bucket]":
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    BUCKET_URI = f"gs://{PROJECT_ID}-{TIMESTAMP}"

    ! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

In [None]:
# Initialize Gemini client
client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)

In [None]:
# Start a batch job on the dataset
gcs_batch_job = client.batches.create(
    model=MODEL_ID,
    src=INPUT_DATA,
    config=CreateBatchJobConfig(dest=BUCKET_URI),
)
gcs_batch_job.name

In [None]:
# Check all existing batch jobs
for job in client.batches.list():
    print(job.name, job.create_time, job.state)

In [None]:
# Refresh the job until complete
while gcs_batch_job.state == "JOB_STATE_RUNNING":
    time.sleep(5)
    print(f"Job state: {gcs_batch_job.state}")
    gcs_batch_job = client.batches.get(name=gcs_batch_job.name)

# Check if the job succeeds
if gcs_batch_job.state == "JOB_STATE_SUCCEEDED":
    print("Job succeeded!")
else:
    print(f"Job failed: {gcs_batch_job.error}")

In [None]:
# Read the output json of batch job
fs = fsspec.filesystem("gcs")
file_paths = fs.glob(f"{gcs_batch_job.dest.gcs_uri}/*/predictions.jsonl")
if gcs_batch_job.state == "JOB_STATE_SUCCEEDED":
  df = pd.read_json(f"gs://{file_paths[-1]}", lines=True)
df

In [None]:
# The following 3 cells will extract the relevant results from the Gemini outputs

def extract_response_text(resp):
    try:
        return resp['candidates'][0]['content']['parts'][0]['text']
    except (KeyError, IndexError, TypeError):
        return None

# Keep the original function for extracting request text from the 'request' column
def extract_request_text(req):
    try:
        # The structure is req -> contents[0] -> parts[1] -> file_data -> file_uri
        return req['contents'][0]['parts'][1]['file_data']['file_uri']
    except (KeyError, IndexError, TypeError):
        return None

df['response_text'] = df['response'].apply(extract_response_text)

# 3. Now vectorize your three regex pulls over that new column:
# Correct the column name from 'request_text' to 'request'
df['Image Path'] = df['request'].apply(extract_request_text).str.lstrip("gs://cs152_images/all")

In [None]:
# In case of full prompt with justifications
df['Classification'] = (
    df['response_text'].str.extract(
    r"Classification:\s*((?:\[?\d+\]?|Don't know))",
    expand=False)
    .str.strip("[]")
)

In [None]:
# In case of prompt with just 0/1
df['Classification'] = df['response_text'].str.strip()
df

In [None]:
df['Confidence'] = df['response_text'].str.extract(
    r'Confidence Score:\s*\[?([\d]+(?:\.\d+)?)%\]?',
    expand=False
)

In [None]:
df['Justification'] = (
    df['response_text']
      .str.extract(r'Brief Justification:\s*(.+)', flags=re.DOTALL, expand=False)
      .str.strip()
)

In [None]:
# In case of prompt with confidence, justification
cols_to_keep = ['Image Path', 'Classification', 'Confidence', 'Justification']
df = df[cols_to_keep]
#df.to_csv("train_with_gemini.csv")
df.to_csv("test_with_gemini_binary_prompt.csv")

In [None]:
cols_to_keep = ['Image Path', 'Classification']
df = df[cols_to_keep]
#df.to_csv("train_with_gemini.csv")
df.to_csv("test_with_gemini_short_prompt.csv")

In [None]:
df

In [None]:
# Merge with  results from other models

df_base = pd.read_csv('test_all_models.csv')
df_results = pd.read_csv('test_with_gemini_short_prompt.csv')
df_results['Image Path'] = '/' + df_results['Image Path'].astype(str)
to_merge = df_results[['Image Path', 'Classification']]
df_base_enriched = df_base.merge(
    to_merge,
    on='Image Path',
    how='left'
)

In [None]:
df_base_enriched

In [None]:
# Sanity check
n_missing = df_base_enriched['Classification'].isna().sum()
n_missing

In [None]:
# Do some cleanup of the new pd, then save (should have results from the model just batch-called)
df_base_enriched = df_base_enriched.rename(columns={"Classification": "Gemini Short-Prompt Classification", "Confidence" : "Gemini Confidence", "Justification" : "Gemini Justification"})
df_base_enriched['Gemini Classification'] = df_base_enriched['Gemini Classification'].replace("Don't know", 0.5)
df_base_enriched = df_base_enriched.dropna(subset=['Gemini Classification'])
df_base_enriched.to_csv("test_all_models.csv")
df_base_enriched

In [None]:
# Define function to calculate eval metrics
def eval_model(df, column_name):
  # Calculate eval metrics
  true_positives = 0
  false_positives = 0
  true_negatives = 0
  false_negatives = 0
  for index, row in df.iterrows():
    pred = float(row[column_name])
    if row["is_ai"] == 1 and pred >= 0.5:
      true_positives += 1
    elif row["is_ai"] == 1 and pred < 0.5:
      false_negatives += 1
    elif row["is_ai"] == 0 and pred >= 0.5:
      false_positives += 1
    elif row["is_ai"] == 0 and pred < 0.5:
      true_negatives += 1

  assert true_positives + false_positives + true_negatives + false_negatives == len(df)

  print(f"Model: {column_name}")
  print(f"Dataset size {len(df)}")
  print()
  print(f"True Positives: {true_positives}")
  print(f"False Positives: {false_positives}")
  print(f"True Negatives: {true_negatives}")
  print(f"False Negatives: {false_negatives}")
  print()

  accuracy = (true_positives + true_negatives) / (true_positives + true_negatives + false_positives + false_negatives)

  if true_positives + false_positives == 0:
    precision = 0
  else:

    precision = true_positives / (true_positives + false_positives)
  if true_positives + false_negatives == 0:
    recall = 0
  else:
    recall = true_positives / (true_positives + false_negatives)

  if precision + recall == 0:
    f1_score = 0
  else:
    f1_score = 2 * (precision * recall) / (precision + recall)

  print(f"Accuracy: {accuracy}")
  print(f"Precision: {precision}")
  print(f"Recall: {recall}")
  print(f"F1 Score: {f1_score}")
  print()
  print()


In [None]:
train = False # @param {type:"boolean"}
test = not train

if train:
  print("Performance on train: \n")
  df_base_enriched = pd.read_csv("train_all_models_fixed.csv")
else:
  print("Performance on test: \n")
  df_base_enriched = pd.read_csv("test_all_models.csv")

eval_model(df_base_enriched, "dima806_score")
eval_model(df_base_enriched, "wvolf_score")
eval_model(df_base_enriched, "Gemini Classification")
eval_model(df_base_enriched, "Gemini Short-Prompt Classification")

# Supervised Fine-Tuning of Gemini

In [None]:
%cd "/content/drive/Shareddrives/CS152 Project/dataset/"


In [None]:
# Prepare train/val data, send to gCloud as .jsonl
seed = 42
df = pd.read_csv('train.csv')

train_df = df.sample(frac=0.9, random_state=seed)
val_df   = df.drop(train_df.index)
print(len(train_df), len(val_df))

In [None]:
# Define the prompt used for supervised fine-tuning
# (This is the same as the long-binary prompt used earlier)
prompt = """
You are an expert digital forensics analyst trained in Professor Hany Farid's methodologies for detecting AI-generated faces. Analyze the provided facial image for signs of artificial generation.
Examine the following categories systematically:
1. Anatomical Integrity
Count facial features (eyes, nostrils, ears, etc.) - are there duplicates or missing elements?
Check for impossible anatomical configurations
Verify natural placement and proportions of features
Look for missing details (eyelashes, tear ducts, nasal hair, skin pores)
2. Phenotypic Plausibility
Assess if phenotypic combinations are statistically probable (e.g., skin tone vs. eye color)
Check for impossible genetic combinations
Verify age-appropriate features match across the face
3. Geometric Consistency
Analyze facial symmetry (natural faces are slightly asymmetric)
Check perspective consistency across features
Verify consistent facial landmark alignment
Look for warping or morphing artifacts
4. Texture and Detail Analysis
Examine skin texture consistency and realism
Check hair patterns for naturalness and consistent growth direction
Verify consistent detail resolution across facial regions
Look for smoothing or sharpening artifacts
5. Ocular Examination
Verify matching reflections in both eyes
Check iris pattern complexity and uniqueness
Examine pupil shape and size consistency
Look for natural eye moisture and blood vessels
6. Lighting and Shadow Coherence
Verify consistent light source direction across all features
Check shadow placement and softness
Examine specular highlights for consistency
Look for impossible lighting conditions
7. Edge and Transition Analysis
Examine face-to-background transitions
Check for halo effects or unnatural boundaries
Verify natural hair-to-skin transitions
Look for copy-paste or blending artifacts

Output Format:

Either "0" or "1", where "0" indicates the image is Real and "1" indicates the image is AI-Generated. Do NOT output anything else.
"""

In [None]:
# Prepare jsonl objects for train/val


def df_to_gemini_jsonl(
    df: pd.DataFrame,
    out_path: str,
    system_instruction: str | None = None,
):
    """
    Convert a DataFrame with image paths & labels to Gemini-SFT JSONL.
    Each row becomes one line in `out_path`.
    """
    # Build the systemInstruction once
    sys_block = (
        {"role": "system", "parts": [{"text": system_instruction}]}
        if system_instruction
        else None
    )

    with open(out_path, "w", encoding="utf-8") as f:
        for _, row in df.iterrows():
            # ----- user message -----
            mime_type = mimetypes.guess_type(row['Image Path'])[0]
            mime_type = mime_type or "application/octet-stream"
            user_msg = {
                "role": "user",
                "parts": [
                    {
                        "fileData": {
                            "mimeType": mime_type,
                            "fileUri": "gs://cs152_images/all" + row['Image Path'],
                        }
                    },
                ],
            }

            # ----- label (model message) -----
            model_msg = {
                "role": "model",
                "parts": [
                    {"text": str(row['is_ai'])}
                ],
            }

            record = {"contents": [user_msg, model_msg]}
            if sys_block:
                record["systemInstruction"] = sys_block

            json.dump(record, f, ensure_ascii=False)
            f.write("\n")

    print(f"✅ Wrote {len(df)} examples to {out_path}")

In [None]:
df_to_gemini_jsonl(
    df=train_df,
    out_path="train_requests.jsonl",
    system_instruction=prompt
)
df_to_gemini_jsonl(
    df=val_df,
    out_path="val_requests.jsonl",
    system_instruction=prompt
)

In [None]:
!gsutil -m cp -r "/content/drive/Shareddrives/CS152 Project/dataset/val_requests.jsonl" gs://cs152_finetuning/
!gsutil -m cp -r "/content/drive/Shareddrives/CS152 Project/dataset/train_requests.jsonl" gs://cs152_finetuning/

In [None]:
# Start the fine-tuning job

vertexai.init(project=PROJECT_ID, location="us-central1")

sft_tuning_job = sft.train(
    source_model="gemini-2.0-flash-001",
    train_dataset="gs://cs152_finetuning/train_requests.jsonl",
    validation_dataset="gs://cs152_finetuning/val_requests.jsonl"
)

# Polling for job completion
while not sft_tuning_job.has_ended:
    time.sleep(60)
    sft_tuning_job.refresh()

print(sft_tuning_job.tuned_model_name)
print(sft_tuning_job.tuned_model_endpoint_name)
print(sft_tuning_job.experiment)

In [None]:
%cd "/content/drive/Shareddrives/CS152 Project/dataset"

In [None]:
# Call the fine-tuned model on the test dataset
tuning_job_id = "1732516294546161664"
client = genai.Client(http_options=HttpOptions(api_version="v1"), vertexai=True, project=PROJECT_ID, location=LOCATION)
tuning_job_name = f"projects/{PROJECT_ID}/locations/{LOCATION}/tuningJobs/{tuning_job_id}"
tuning_job = client.tunings.get(name=tuning_job_name)
text_classifications = []
df = pd.read_csv("test_all_models.csv")
for _, row in tqdm(df.iterrows()):
  file_uri = "gs://cs152_images/all" + row['Image Path']
  response = client.models.generate_content(
      model= tuning_job.tuned_model.endpoint,
      contents=[Part.from_uri(
            file_uri=file_uri,
            mime_type=mimetypes.guess_type(row['Image Path'])[0],
        ),
        prompt]
  )
  text_classifications.append(response.text)

In [None]:
# Sanity check
list(set(text_classifications))

In [None]:
# Save predictions and evaluate the fine-tuned model
df['Fine-Tuned Gemini Classification'] = text_classifications
df.to_csv("test_with_finetuned_gemini.csv")
eval_model(df, "Fine-Tuned Gemini Classification")