# **WhatTheGIF - PreProcessing**

The objective of this code is to create a ***PARQUET*** file with a Numpy array with all the key frames extracted from the GIFs. Those GIFs are stored in a GCP bucket. The process to read and extract the frames from the GIFs, we used Dask to paralelize that job.

## **Loading libraries**

In [1]:
import os
import pandas as pd
import numpy as np
import dask
from dask import delayed, persist
from dask.distributed import Client, wait
import dask.bag as db
from google.cloud import storage
from PIL import Image
from io import BytesIO
import imageio
import cv2
from skimage.metrics import structural_similarity as ssim
import tempfile
import json
import time

## **Dask Configuration**

**Dask** is a Python library for parallel and distributed computing.

We deployed Dask using GCP on a Dataproc cluster, with the following steps:
1. Create a Dataproc Cluster (whatthegif-96f2) on GCP
	- 1 master N workers
	- Running Ubuntu 22, Python 3
	- 1 Master HDD 200GB
	- 2 Nodes HDD 150GB
	- Machines Internal IPs:
		- [Master] 10.128.0.15
		- [Node 0] 10.128.0.14
		- [Node 1] 10.128.0.16
2. Through SSH install dask and dependencies (master and nodes)
	pip install dask[complete] google-cloud-storage gcsfs
3. Through SSH start the scheduler on master. Take note of IP:port
	dask scheduler
	Master scheduler IP:port=tcp://10.128.0.15:8786
4. Through SSH on nodes, start the workers
	dask worker 10.128.0.15:8786
5. From local machine, connect to the scheduler
	35.223.219.111:8786
	Add the port to the firewall rules (SSH TCP)
6. Install on local machine
	pip install google-cloud-storage gcsfs dask-cloudprovider
	This is important to execute a Dask job
	Note: Try to get the same Dask and Distributed versions on Local, Master (Scheduler) and Nodes (Workers).

### **Initialize Dask Client**

In [2]:
def create_dask_client(max_retries=3):
    for attempt in range(max_retries):
        try:
            client = Client("tcp://34.123.134.129:8786", timeout='15s')
            print(f"Successfully connected to Dask scheduler. Cluster info: {client}")
            return client
        except Exception as e:
            print(f"Connection attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(15)  # Wait 15 seconds before retrying
            else:
                raise


# Connect to the Dask cluster
client = create_dask_client()

Successfully connected to Dask scheduler. Cluster info: <Client: 'tcp://10.128.0.15:8786' processes=2 threads=4, memory=14.90 GiB>



+---------+----------------+----------------+----------------+
| Package | Client         | Scheduler      | Workers        |
+---------+----------------+----------------+----------------+
| numpy   | 1.26.0         | 1.26.4         | 1.26.4         |
| pandas  | 2.1.1          | 2.1.4          | 2.1.4          |
| python  | 3.11.5.final.0 | 3.11.8.final.0 | 3.11.8.final.0 |
| toolz   | 0.12.1         | 1.0.0          | 1.0.0          |
+---------+----------------+----------------+----------------+


## **Set up Google Cloud Storage client**

In [3]:
# Service account credentials
key_path = 'gcs_access.json'
bucket_name = "gif-bucket-1000"

# Load the service account JSON content into a variable
with open(key_path, 'r') as f:
    service_account_key_content = f.read()

def initialize_gcs_client():
    """Initialize Google Cloud Storage client dynamically from credentials content."""
    credentials = storage.Client.from_service_account_info(json.loads(service_account_key_content))
    return credentials

## **Defining functions needed to PreProcessing**

### **resize_and_normalize_frame** function

Resize and normalize the frame while preserving aspect ratio.

In [4]:
def resize_and_normalize_frame(frame, target_size=(256, 256)):
    original_height, original_width = frame.shape[:2]
    aspect_ratio = original_width / original_height

    if aspect_ratio > 1:  # Wider than tall
        new_width = target_size[0]
        new_height = int(target_size[0] / aspect_ratio)
    else:  # Taller than wide or square
        new_width = int(target_size[1] * aspect_ratio)
        new_height = target_size[1]

    resized_frame = cv2.resize(frame, (new_width, new_height))

    # Calculate padding
    delta_w = target_size[0] - new_width
    delta_h = target_size[1] - new_height
    top, bottom = delta_h // 2, delta_h - (delta_h // 2)
    left, right = delta_w // 2, delta_w - (delta_w // 2)

    # Add padding to maintain the target size
    normalized_frame = cv2.copyMakeBorder(
        resized_frame, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0]
    )
    normalized_frame = normalized_frame.astype(np.float32) / 255.0

    return normalized_frame

### **augment_frame** function

Apply random data augmentation to the frame

In [5]:
def augment_frame(frame):
    if np.random.rand() < 0.5:
        frame = cv2.flip(frame, 1)  # Horizontal flip
    if np.random.rand() < 0.5:
        rows, cols = frame.shape[:2]
        rotation_angle = np.random.randint(-10, 10)
        M = cv2.getRotationMatrix2D((cols / 2, rows / 2), rotation_angle, 1)
        frame = cv2.warpAffine(frame, M, (cols, rows))
    return frame

### **extract_key_frames** function

Extract key frames based on structural similarity.

In [6]:
def extract_key_frames(frames, ssim_threshold=0.95, min_scene_change=10, max_key_frames=10):
    key_frames = []
    prev_frame = None
    scene_change_counter = 0
    scene_changes = []  # Initialize scene_changes here

    for i, frame in enumerate(frames):
        if not isinstance(frame, np.ndarray):
            print(f"Frame {i}: Not a valid NumPy array.")
            continue

        if frame.size == 0 or len(frame.shape) != 3:
            print(
                f"Frame {i}: Empty or has invalid dimensions: {frame.shape if isinstance(frame, np.ndarray) else 'N/A'}"
            )
            continue

        gray_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)

        # Convert to floating-point format
        gray_frame = gray_frame.astype(np.float32) / 255.0 

        if prev_frame is None:
            key_frames.append(frame)
            prev_frame = gray_frame
            continue

        similarity_score = ssim(prev_frame, gray_frame, data_range=1.0)

        if (
            similarity_score < ssim_threshold
            and scene_change_counter >= min_scene_change
            and len(key_frames) < max_key_frames
        ):  # Check max_key_frames
            key_frames.append(frame)
            scene_changes.append(i)  # Add index to scene_changes
            scene_change_counter = 0
        else:
            scene_change_counter = 0

        prev_frame = gray_frame

    # Repeat last frame if less than max_key_frames
    while len(key_frames) < max_key_frames:
        key_frames.append(key_frames[-1])

    return key_frames, scene_changes  # Return scene_changes


### **process_gif** function

Preprocesses a GIF from GCS and extracts 10 key frames.

In [7]:
def process_gif(gif_file_name):
    """Preprocesses a GIF from GCS and extracts 10 key frames."""
    try:
        client = initialize_gcs_client()
        bucket = client.bucket(bucket_name)
        gif_blob = bucket.blob(os.path.join('gifs', gif_file_name))
        gif_bytes = gif_blob.download_as_bytes()

        with imageio.get_reader(BytesIO(gif_bytes), 'gif') as reader:
            processed_frames = []
            for frame in reader:
                if frame.shape[-1] == 4:  # Convert RGBA to RGB
                    frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
                resized_frame = resize_and_normalize_frame(frame)
                augmented_frame = augment_frame(resized_frame)
                processed_frames.append(augmented_frame)

        key_frames, _ = extract_key_frames(processed_frames)
        key_frames_np = [frame.flatten() for frame in key_frames]  # Flatten frames for storage
        return key_frames_np

    except Exception as e:
        print(f"Error processing GIF {gif_file_name}: {e}")
        return None

### **process_gif_safe** function

Safely process a GIF, with error handling.

In [8]:
import logging

logging.basicConfig(level=logging.INFO)

def process_gif_safe(gif_file_name):
    """Safely process a GIF, with error handling."""
    try:
        return process_gif(gif_file_name)
    except Exception as e:
        logging.error(f"Error processing {gif_file_name}: {e}")
        return None

### **process_split_in_batches** function

Processes a text file in smaller batches.

In [9]:
def process_split_in_batches(file_path, batch_size=1):
    client = initialize_gcs_client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(file_path)
    
    try:
        file_content = blob.download_as_string().decode('utf-8')
    except Exception as e:
        logging.error(f"Error downloading {file_path}: {e}")
        return []

    lines = file_content.splitlines()
    results = []
    for i in range(0, len(lines), batch_size):
        batch = lines[i:i + batch_size]
        for line in batch:
            try:
                gif_file, description = line.strip().split(': ')
                key_frames = process_gif_safe(gif_file + '.gif')
                if key_frames:
                    results.append({
                        "gif_file": gif_file, 
                        "description": description, 
                        "key_frames": key_frames
                    })
            except Exception as e:
                logging.error(f"Error processing line {line}: {e}")
    return results

## **Reading data from bucket**

We take a sample of 2000 GIFs from the original dataset and splitted in train, test and validation files.

In [10]:
# Specify split files to process
split_files = ['test_with_description.txt', 'val_with_description.txt']

def process_gif_for_bag(line):
    try:
        gif_file, description = line.strip().split(': ')
        key_frames = process_gif_safe(gif_file + '.gif')
        if key_frames:
            return {
                "gif_file": gif_file, 
                "description": description, 
                "key_frames": key_frames
            }
    except Exception as e:
        logging.error(f"Error processing line {line}: {e}")
        return None

def process_split_files(split_files):
    results = []
    for file_path in split_files:
        client = initialize_gcs_client()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(file_path)
        file_content = blob.download_as_string().decode('utf-8')
        
        bag = db.from_sequence(file_content.splitlines())
        processed = bag.map(process_gif_for_bag).compute()
        results.extend([r for r in processed if r is not None])
    
    return pd.DataFrame(results)

df = process_split_files(split_files)

In [11]:
# Save the DataFrame as Parquet directly to GCS
output_path = "gs://gif-bucket-1000/output/key_frames_data.parquet"
df.to_parquet(output_path, engine='pyarrow', storage_options={"token": key_path})

print("Parquet file saved successfully to GCS.")

Parquet file saved successfully to GCS.
