In [None]:
### Data collection and transformation
import aiohttp
import asyncio
import glob
import h5py
import numpy as np
import pandas as pd
import re

from aiohttp import ClientSession
from io import BytesIO
from google.cloud import storage
from PIL import Image

### Neural network, cropping and delivering imports
import os, sys, shutil
import torch

from torch.utils.data import DataLoader
from torchvision import datasets, transforms

from facenet_pytorch import MTCNN, InceptionResnetV1, training
from scripts.hd5_dataset import HD5Dataset

In [None]:
### Parameter Cell
DISTRICT_ID = 1

In [None]:
### Variables
BASE_URL = "https://..../{user_id}/{file_name}"
BUCKET_BASE = "..."
PREFIX = "inputs/image-clustering"
METADATASOURCE = "/inputs/image-clustering/image_metadata_ver_01.csv"

### I decided to write files to the host filesystem, as they get deleted in the end, and as we really 
### need to optimize to avoid memory overflow
BASE_DIR = "."

CROPPED_DIR = BASE_DIR + f"/cropped_{DISTRICT_ID}/"
IMAGES_H5 = BASE_DIR + f"/images_{DISTRICT_ID}.h5"
IDENTIFIERS_H5 = BASE_DIR + f"/identifiers_{DISTRICT_ID}.h5"

images = []
identifiers = []

In [None]:
client = storage.Client()

In [None]:
metadata = pd.read_csv(
    "gs://" + BUCKET_BASE + METADATASOURCE,
    names = [
        'user_id',
        'filename',
        'gender',
        'district_id'
    ]
).query("district_id == @DISTRICT_ID")

In [None]:
if metadata.shape[0] == 0:
    sys.exit(0)

### Checking and removing entries that have already been obtained, cropped, and stored in GCS

In [None]:
already_stored_images = [
    re.search(r"(\d/[0-9]{6,8}\/.+.jpg)$", blob.name)[0].split('/') for blob in client.list_blobs(
        bucket_or_name=BUCKET_BASE,
        prefix=PREFIX
    ) if 'jpg' in blob.name
]

In [None]:
stored = pd.DataFrame(already_stored_images, columns=['gender', 'user_id', 'filename'])
stored.user_id = stored.user_id.astype('int64')

In [None]:
print(f"The number of unobtained images is: {metadata[~metadata.filename.isin(stored.filename)].shape[0]}")
metadata = metadata[~metadata.filename.isin(stored.filename)]

### Obtain images via asynchronous scraping

In [None]:
def url_and_meta_generator(meta_data):
    for uid, filename, gender, _ in meta_data:
        url = BASE_URL.format(
            user_id = uid,
            file_name = filename
            )
        yield uid, filename, gender, url

async def get_image_data(url: str, session: ClientSession, **kwargs) -> bytes:
    """GET request wrapper to fetch image bytes array data.

    kwargs are passed to `session.request()`.
    """
    resp = await session.request(method="GET", url=url, **kwargs)
    content = await resp.read() #read for binary, text() for text
    await asyncio.sleep(0.1)
    return content

async def convert_and_append_images(
    uid: int, 
    filename:str,
    gender:int,
    url:str, 
    session: ClientSession, 
    **kwargs
    ) -> None:
    """Obtain image bytes data, append as flatted array of dim (600 * 600 * 3, 1). 
    Append user ID to separate list.
    """
    try:
        bytes_content = await get_image_data(url=url, session=session, **kwargs)
        rgb_bytes_array = Image.open(BytesIO(bytes_content))
    except Exception as e:
        return None
    else:
        identifier = str(gender) + '/' + str(uid) + '/' + filename
        identifiers.append(identifier)
        rgb_numeric_array = np.asarray(rgb_bytes_array, dtype = 'uint8')
        flattened_rgb = rgb_numeric_array.flatten()
        images.append(flattened_rgb)

async def main(meta_data):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for uid, filename, gender, url in url_and_meta_generator(meta_data):
            tasks.append(asyncio.create_task(convert_and_append_images(uid, filename, gender, url, session)))
        await asyncio.gather(*tasks)

In [None]:
await main(metadata.values)

### Local Storage for Processing

In [None]:
def store_many_hdf5(images, identifiers):
    """ Stores an array of images to HDF5.
        Parameters:
        ---------------
        images       image arrays, n_images * (600, 600, 3) to be stored
        labels       labels arrays, n_images * 1 to be stored
    """
    # Create a new HDF5 file
    # Hierarchy: File -> Group -> Dataset -> actual numpy array of image data

    data = h5py.File(IMAGES_H5, "w")
    meta = h5py.File(IDENTIFIERS_H5, "w")
    try:
        data.create_dataset(
            f"{DISTRICT_ID}",
            images.shape,
            h5py.h5t.STD_U8BE,
            data = images
        )
        meta.create_dataset(
            f"{DISTRICT_ID}",
            identifiers.shape,
            h5py.special_dtype(vlen=str),
            data = identifiers
        )
    except Exception as e:
        print(e)
    finally:
        data.close()
        meta.close()

In [None]:
images = np.array(images)
identifiers = np.array(identifiers, dtype = 'S')

store_many_hdf5(images, identifiers)

print(identifiers[0])
del images, identifiers

### Crop faces to Facenet's preferred dimensions

In [None]:
if not os.path.exists(CROPPED_DIR):
    os.mkdir(CROPPED_DIR)

In [None]:
batch_size = 32
epochs = 8
# workers = 0 if os.name == 'nt' else 8
dim = 182

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Running on device: {}'.format(device))

In [None]:
mtcnn = MTCNN(
    image_size=dim,
    margin=0,
    min_face_size=20,
    thresholds=[0.6, 0.7, 0.7], 
    factor=0.709, 
    device=device
)

In [None]:
# BASE_DIR = "."
hd5data = HD5Dataset(
    data_path = IMAGES_H5, 
    dataset_label = f'{DISTRICT_ID}',
    indices_path = IDENTIFIERS_H5,
    indices_label = f'{DISTRICT_ID}',
    transforms = transforms.Resize((300)), 
    image_dims = (600,600,3)
)

loader = DataLoader(
    hd5data,
    collate_fn=training.collate_pil
)

In [None]:
for i, (img, label) in enumerate(loader):
    mtcnn(img[0], save_path =  CROPPED_DIR + label[0])
    print('\rImage {} of {}'.format(i + 1, len(loader)), end='')

In [None]:
hd5data.close()

### Async Upload to GCS

In [None]:
bucket = client.get_bucket(BUCKET_BASE)

def upload_cropped_image_to_gcs(bucket, file, blob_name):
    blob = bucket.blob(blob_name)
    blob.upload_from_filename(file, content_type='image/jpeg')

In [None]:
for path in glob.glob(f'{CROPPED_DIR}/*/*/*.jp*g'):
    base = "inputs/image-clustering/cropped_faces/"
    filename = re.search(r"cropped_[0-9]{1,2}/(.*)", path).group(1)
    blob_name = (base + filename) #.replace('\\', '/') #only needed for windows
    try:
        print(blob_name)
        upload_cropped_image_to_gcs(
            bucket,
            file = path,
            blob_name = blob_name
        )
    except Exception as e:
        print(f"{file} has failed with {e}.")

### Cleaning up local files and cropped directory

In [None]:
os.remove(IMAGES_H5)
os.remove(IDENTIFIERS_H5)
shutil.rmtree(CROPPED_DIR)