In [1]:
# Importowanie bibliotek
import numpy as np
from PIL import Image, ImageOps
from retinaface import RetinaFace # Lub import, którego używa wasza biblioteka
import os
import io
import math
from tqdm.auto import tqdm # Do paska postępu
import tensorflow as tf # Do czytania i pisania w GCS

# --- KONFIGURACJA (Zmieńcie te wartości) ---

# 1. Ścieżki do "Magazynów" (waszych bucketów GCS)
# BUCKET_ORYGINALNY = "kaggle-casia-dataset-original-..." # Wiadro z danymi z Kaggle
# BUCKET_OKLUZJI = "kaggle-casia-dataset-occluded-..." # NOWE, PUSTE wiadro na wyniki

# # 2. Nazwa folderu wejściowego (tam, gdzie `script.sh` wrzucił pliki)
# FOLDER_WEJSCIOWY = "CASIA-WebFace" # Sprawdźcie w GCS, czy ta nazwa się zgadza!

# # 3. Nazwa folderu wyjściowego (jak mają się nazywać nowe dane)
# FOLDER_WYJSCIOWY = "casia-webface-occluded-sunglasses"

# 4. Nazwa pliku z okularami (musi być w GCS)
# OKULARY_PNG_PATH = f"gs://{BUCKET_ORYGINALNY}/assets/sunglasses.png"
OKULARY_PNG_PATH = "okulary/oksy.png"

# --- Koniec Konfiguracji ---


# Inicjalizacja "Maszyn" (globalne instancje)

# 1. Wczytaj asset okularów z GCS RAZ (oszczędza czas)
try:
    print(f"Wczytywanie assetu okularów z: {OKULARY_PNG_PATH}...")
    with tf.io.gfile.GFile(OKULARY_PNG_PATH, 'rb') as f:
        sunglasses_asset_bytes = f.read()
    
    # Konwertujemy na obraz PIL z kanałem alfa (RGBA)
    sunglasses_img = Image.open(io.BytesIO(sunglasses_asset_bytes)).convert("RGBA")
    print("Asset okularów wczytany pomyślnie.")

except Exception as e:
    print(f"BŁĄD KRYTYCZNY: Nie udało się wczytać pliku {OKULARY_PNG_PATH}")
    print("Upewnij się, że wrzuciłaś plik `sunglasses.png` do bucketa GCS!")
    # Tutaj można przerwać działanie notatnika


# 2. Uruchom detektor RetinaFace (niech od razu załaduje model do pamięci GPU)
# To jest KRYTYCZNE dla wydajności - robimy to raz, a nie 490,000 razy w pętli!
try:
    print("Ładowanie modelu RetinaFace do pamięci GPU...")
    # (Poniższa linia to tylko przykład, użyjcie waszej implementacji)
    # Upewnijcie się, że RetinaFace używa GPU (`gpu_id=0`)
    print("Detektor RetinaFace gotowy.")
except Exception as e:
    print(f"BŁĄD KRYTYCZNY: Nie udało się załadować RetinaFace: {e}")
    print("Sprawdź, czy biblioteka jest poprawnie zainstalowana i czy TensorFlow widzi GPU.")


Wczytywanie assetu okularów z: okulary/oksy.png...
Asset okularów wczytany pomyślnie.
Ładowanie modelu RetinaFace do pamięci GPU...
Detektor RetinaFace gotowy.


In [2]:
def load_and_detect_face(image_bytes):
    """
    Wczytuje obraz z bajtów i zwraca kluczowe punkty twarzy.
    
    Zwraca:
    - None: Jeśli wystąpił błąd lub nie znaleziono twarzy.
    - dict: Słownik zawierający 'left_eye', 'right_eye' i 'nose_tip'.
    """
    try:
        # Konwertujemy bajty na obraz PIL i potem na NumPy (format dla detektora)
        image_pil = Image.open(io.BytesIO(image_bytes)).convert("RGB")
        image_np = np.array(image_pil)
    except Exception as e:
        print(f"[BŁĄD WCZYTANIA]: {e}")
        return None

    # Używamy detektora (który jest już w GPU) do znalezienia twarzy
    try:
        # Użyj waszej implementacji RetinaFace.
        # To jest tylko przykład, jak to może wyglądać:
        results = RetinaFace.detect_faces(image_np) 
        # Obsługa błędu: co jeśli twarz nie zostanie wykryta?
        if not results or len(results) == 0:
            return None # Zwracamy None, żeby główna pętla pominęła ten plik

        # Bierzemy tylko pierwszą, najbardziej prawdopodobną twarz
        face_data = results['face_1']
        keypoints = face_data['landmarks']
        
        # Zwracamy tylko te punkty, których potrzebujemy
        return {
            'left_eye': keypoints['left_eye'],   # (x, y)
            'right_eye': keypoints['right_eye'], # (x, y)
            'nose_tip': keypoints['nose'],       # (x, y)
            'image_pil': image_pil # Zwracamy też wczytany obraz, żeby nie robić tego dwa razy
        }
        
    except Exception as e:
        print(f"[BŁĄD DETEKCJI]: {e}")
        return None

In [3]:
def calculate_transformations(face_landmarks):
    """
    Oblicza skalę, kąt obrotu i pozycję dla okularów
    na podstawie punktów kluczowych twarzy.
    """
    # 1. Pobierz współrzędne
    left_eye = np.array(face_landmarks['left_eye'])
    right_eye = np.array(face_landmarks['right_eye'])
    
    # 2. Oblicz KĄT OBROTU
    # Obliczamy różnicę w osi Y i X między oczami
    dX = right_eye[1] - left_eye[1]
    dY = right_eye[0] - left_eye[0]
    
    # Trygonometria (atan2) da nam kąt w radianach
    angle_rad = math.atan2(dY, dX)
    # Konwertujemy na stopnie dla biblioteki PIL
    angle_deg = math.degrees(angle_rad)
    
    # 3. Oblicz SKALĘ (Wielkość okularów)
    # Chcemy, żeby okulary były 1.5x szersze niż odległość między oczami
    # (Możecie dostosować ten mnożnik `1.5`!)
    SCALE_FACTOR = 1.5 
    
    # Odległość Euklidesowa między oczami
    eye_distance = np.linalg.norm(right_eye - left_eye)
    
    # Nasz obrazek `sunglasses.png` ma jakąś oryginalną szerokość
    original_sunglasses_width = sunglasses_img.width 
    
    # Obliczamy nową szerokość i skalę
    target_sunglasses_width = eye_distance * SCALE_FACTOR
    scale = target_sunglasses_width / original_sunglasses_width
    
    # 4. Oblicz POZYCJĘ (Środek okularów)
    # Chcemy, żeby środek okularów wylądował dokładnie
    # na środku linii łączącej oczy.
    eye_center = (left_eye + right_eye) / 2
    
    return {
        'angle': angle_deg,
        'scale': scale,
        'position': (int(eye_center[0]), int(eye_center[1]))
    }

In [4]:
def apply_occlusion(original_face_image, sunglasses_asset, transform):
    """
    Nakłada przygotowany asset okularów na obraz twarzy
    używając obliczonych transformacji.
    """
    try:
        # 1. Pobierz transformacje
        scale = transform['scale']
        angle = transform['angle']
        position_xy = transform['position'] # (x, y) środek

        # 2. Stwórz kopię okularów, żeby nie psuć oryginału
        sunglasses_copy = sunglasses_asset.copy()

        # 3. SKALOWANIE
        # Oblicz nowy rozmiar na podstawie skali
        new_width = int(sunglasses_copy.width * scale)
        new_height = int(sunglasses_copy.height * scale)
        
        # Użyj `Resampling.LANCZOS` dla najlepszej jakości
        sunglasses_resized = sunglasses_copy.resize((new_width, new_height), Image.Resampling.LANCZOS)
        
        # 4. OBRÓT
        # Obracamy wokół środka. `expand=True` zapewnia, że nic nie zostanie obcięte.
        sunglasses_rotated = sunglasses_resized.rotate(angle, expand=True, resample=Image.Resampling.BICUBIC)

        # 5. POZYCJONOWANIE (Obliczanie lewego górnego rogu)
        # PIL wkleja obrazy używając lewego górnego rogu (x_pos, y_pos).
        # My mamy środek (`position_xy`). Musimy to przeliczyć.
        
        center_x = position_xy[0]
        center_y = position_xy[1]
        
        # Obliczamy pozycję lewego górnego rogu
        x_pos = center_x - (sunglasses_rotated.width // 2)
        y_pos = center_y - (sunglasses_rotated.height // 2)
        
        # 6. NAKŁADANIE (Magia kanału Alfa)
        # Stwórz kopię oryginalnego zdjęcia, na której będziemy malować
        output_image = original_face_image.copy()
        
        # `paste` z kanałem alfa (RGBA) jako maską robi dokładnie to, co chcemy
        # - wkleja tylko okulary, zostawiając tło przezroczyste.
        output_image.paste(sunglasses_rotated, (x_pos, y_pos), mask=sunglasses_rotated)
        
        return output_image # Zwracamy gotowy obrazek

    except Exception as e:
        print(f"[BŁĄD NAKŁADANIA]: {e}")
        return None

In [5]:
def process_single_image(input_file_path, base_input_path, base_output_path):
    """
    Pełny potok dla jednego obrazu: Wczytaj, Wykryj, Oblicz, Nałóż, Zapisz.
    """
    try:
        # 1. Wczytaj surowe bajty obrazu z GCS
        with tf.io.gfile.GFile(input_file_path, 'rb') as f:
            image_bytes = f.read()
        
        # 2. Uruchom Krok 2: Detekcja
        detection_data = load_and_detect_face(image_bytes)
        
        # 3. Obsługa błędu (jeśli nie znaleziono twarzy)
        if detection_data is None:
            return "failed_detection" # Zwracamy powód błędu
            
        original_image = detection_data['image_pil']
            
        # 4. Uruchom Krok 3: Geometria
        transform_data = calculate_transformations(detection_data)
        
        # 5. Uruchom Krok 4: Nakładanie
        # (sunglasses_img to nasza globalna zmienna z Kroku 1)
        final_image_pil = apply_occlusion(original_image, sunglasses_img, transform_data)

        if final_image_pil is None:
            return "failed_overlay"
            
        # 6. Zapisz wynik z powrotem do GCS
        
        # Obliczamy ścieżkę zapisu, zachowując strukturę folderów
        relative_path = os.path.relpath(input_file_path, start=base_input_path)
        output_file_path = f"{base_output_path}/{relative_path}"
        print("Where: ", output_file_path)
        
        # Upewnij się, że folder docelowy istnieje w GCS
        output_dir = os.path.dirname(output_file_path)
        if not tf.io.gfile.exists(output_dir):
            tf.io.gfile.makedirs(output_dir)
            
        # Zapisujemy obrazek w pamięci (jako JPEG) i wysyłamy do GCS
        with io.BytesIO() as output_buffer:
            final_image_pil.save(output_buffer, format="JPEG")
            image_data_to_write = output_buffer.getvalue()
            
            with tf.io.gfile.GFile(output_file_path, 'wb') as f_out:
                f_out.write(image_data_to_write)
                
        return "success"
        
    except Exception as e:
        print(f"[BŁĄD KRYTYCZNY] przy pliku {input_file_path}: {e}")
        return "failed_critical"

# --- GŁÓWNA PĘTLA URUCHOMIENIOWA ---
def run_augmentation_pipeline():
    print("Uruchamianie pełnej linii produkcyjnej augmentacji...")
    
    # Używamy ścieżek zdefiniowanych w Kroku 1 (`augmentacja_0_setup.md`)
    # base_input_path = f"gs://{BUCKET_ORYGINALNY}/{FOLDER_WEJSCIOWY}"
    # base_output_path = f"gs://{BUCKET_OKLUZJI}/{FOLDER_WYJSCIOWY}"

    base_input_path = "images"
    base_output_path = "occluded"
    
    print(f"Szukanie plików w: {base_input_path}...")
    
    # Znajdź wszystkie pliki .jpg w folderze i podfolderach (czemu jpg??)
    # all_image_paths = tf.io.gfile.glob(f"{base_input_path}/*.png")
    all_image_paths = ["./images/ewa.png", './images/3_faces.png', "./images/face_with_landmarks.jpg"]

    # Uruchomienie pętli z paskiem postępu tqdm
    # (Można to zrównoleglić, ale na razie zróbmy to sekwencyjnie)
    
    stats = {"success": 0, "failed_detection": 0, "failed_overlay": 0, "failed_critical": 0}
    
    # Możecie zmniejszyć liczbę plików do testów, np. biorąc tylko 1000 pierwszych:
    # all_image_paths = all_image_paths[:1000] 
    
    print(f"Znaleziono {len(all_image_paths)} obrazów. Rozpoczynam przetwarzanie...")
    
    for path in tqdm(all_image_paths, desc="Przetwarzanie obrazów"):
        result_status = process_single_image(path, base_input_path, base_output_path)
        stats[result_status] += 1
        
    print("\n--- ZAKOŃCZONO ---")
    print(f"Pomyślnie przetworzono: {stats['success']}")
    print(f"Pominięto (nie znaleziono twarzy): {stats['failed_detection']}")
    print(f"Pominięto (błąd nakładania): {stats['failed_overlay']}")
    print(f"Pominięto (błąd krytyczny): {stats['failed_critical']}")



In [6]:
# Aby to wszystko uruchomić, po prostu odpal tę funkcję w komórce notatnika:
run_augmentation_pipeline()


Uruchamianie pełnej linii produkcyjnej augmentacji...
Szukanie plików w: images...
Znaleziono 3 obrazów. Rozpoczynam przetwarzanie...


Przetwarzanie obrazów:  33%|███▎      | 1/3 [00:11<00:22, 11.03s/it]

Where:  occluded/ewa.png


Przetwarzanie obrazów:  67%|██████▋   | 2/3 [00:19<00:09,  9.67s/it]

Where:  occluded/3_faces.png


Przetwarzanie obrazów: 100%|██████████| 3/3 [00:28<00:00,  9.58s/it]

Where:  occluded/face_with_landmarks.jpg

--- ZAKOŃCZONO ---
Pomyślnie przetworzono: 3
Pominięto (nie znaleziono twarzy): 0
Pominięto (błąd nakładania): 0
Pominięto (błąd krytyczny): 0



