In [None]:
!pip install -r requirements.txt

In [1]:
PROJECT = 'ex-vivo-confocal'
BUCKET = 'ex-vivo-confocal'
REGION = 'europe-west1'

# os.environ omits the gs: protocol
YES_FOLDER = 'gs://{}/datalake/STUDY_2021/HELDOUT/YES'.format(BUCKET)
NO_FOLDER = 'gs://{}/datalake/STUDY_2021/HELDOUT/NO'.format(BUCKET)
PROCESSED_FOLDER = 'gs://{}/processed/STUDY_2021/HELDOUT'.format(BUCKET)
LOCAL_STAGING_FOLDER = 'staging/heldout'

In [2]:
import os

os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION

os.environ['YES_FOLDER'] = YES_FOLDER
os.environ['NO_FOLDER'] = NO_FOLDER

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'credentials/key.json'

In [3]:
!gcloud auth activate-service-account \
    ml-service@ex-vivo-confocal.iam.gserviceaccount.com \
    --key-file=${GOOGLE_APPLICATION_CREDENTIALS} \
    --project=${PROJECT}

Activated service account credentials for: [ml-service@ex-vivo-confocal.iam.gserviceaccount.com]


---

In [4]:
def save_dataset(df, path, header=True):
    df_dir = os.path.dirname(path)
    df_file = os.path.basename(path)
    if os.path.exists(path):
        os.remove(path) 
    if df_dir and not os.path.isdir(df_dir):
        os.makedirs(df_dir, exist_ok=True)
    df.to_csv(path, index=False, header=header)
    print("Dataframe saved into", path)

In [5]:
import pandas as pd

def createDataFrame(yes_paths, no_paths):
    """
    A label equals to 1 means 'yes' it's cancer.
    A label equals to 0 means 'no' it's cancer.
    """
    yes_df = pd.DataFrame(yes_paths, columns = ['image'])
    no_df = pd.DataFrame(no_paths, columns = ['image'])
    yes_df['label'] = 'yes'
    no_df['label'] = 'no'
    return pd.concat([yes_df, no_df], ignore_index=True)

---

## Preparing the dataset

As the original names contain spaces and other special characters, we're going to rename them.

In [None]:
%%bash

gsutil ls gs://ex-vivo-confocal/datalake/STUDY_2021/HELDOUT/**/*.* | \
  while read f; do
    gsutil -m mv "$f" "${f// /_}";
  done;

In [22]:
# DecompressionBombError: Image size (204871680 pixels) exceeds limit of 178956970 pixels, could be decompression bomb DOS attack. [while running 'Synthesise samples']

yes_paths = !gsutil ls -l ${YES_FOLDER}/**/*.* | awk '{if ($1 < 157286400) print $NF}'
no_paths = !gsutil ls -l ${NO_FOLDER}/**/*.* | awk '{if ($1 < 157286400) print $NF}'

In [23]:
df = createDataFrame(yes_paths, no_paths)
df.sample(5)

Unnamed: 0,image,label
8,gs://ex-vivo-confocal/datalake/STUDY_2021/HELD...,yes
11,gs://ex-vivo-confocal/datalake/STUDY_2021/HELD...,yes
45,gs://ex-vivo-confocal/datalake/STUDY_2021/HELD...,no
53,gs://ex-vivo-confocal/datalake/STUDY_2021/HELD...,no
3,gs://ex-vivo-confocal/datalake/STUDY_2021/HELD...,yes


In [25]:
NUM_SAMPLES_YES = len(yes_paths)
NUM_SAMPLES_NO = len(no_paths)

print('Number of positive cases:', NUM_SAMPLES_YES)
print('Number of negative cases:', NUM_SAMPLES_NO)

Number of positive cases: 45
Number of negative cases: 52


In [26]:
path = '{}/heldout.csv'.format(LOCAL_STAGING_FOLDER)
# reorganise the columns to fit with the method signature
save_dataset(df[['label', 'image']], path, header=False)

Dataframe saved into staging/heldout/heldout.csv


---

In [27]:
import gcsfs
import numpy as np
import matplotlib.pyplot as plt

from PIL import Image

def load_image(path):
    """
    Loads an image from both, local filesystem or Google Storage, as Numpy array.
    
    Arguments:
      path -- path to image dataset folder.
        
    Returns:
      img -- a Numpy array with shape (hight, width, num_channels). In the RGB case
             num_channels is equal to 3.
    """
    if path.startswith('gs://'):
        fs = gcsfs.GCSFileSystem(project=BUCKET)
        img = Image.open(fs.open(path, 'rb'))
    else:
        img = Image.open(path)
    return np.asarray(img)

The images need to be resize to have all of them the same dimensions

In [29]:
IMAGE_SIZE = 300

In [30]:
import apache_beam as beam

def resize_samples(input_data, output_folder,
                       save_format='jpeg',
                       target_size=(256, 256),
                       job_mame='resize_samples',
                       runner='DirectRunner'):
    """
    Create an Apache Beam pipeline to create synthetic copies of pitcture appliying some transformations.
    
    Arguments:
    runner -- two possible values are accepted:
        DirectRunner -- executes the pipeline in the same machine that is running this method. Defaul.
        DataflowRunner -- executes the pipeline on GCP (Google Cloud Platform) using the Dataflow service.
    input_data -- path to a CSV file with the following structure:
        label,path_to_image_file,path_to_copy_file
    output -- path to the output folder
    """
    # Dataflow needs the imports here
    import os, datetime, random, string
    import numpy as np
    
    from PIL import Image
    from io import BytesIO
    from apache_beam.io.gcp import gcsio
    #from skimage.transform import resize
    from keras.preprocessing.image import ImageDataGenerator
    
    def load_img(path):
        if path.startswith('gs://'):
            #print(path)
            img = Image.open(gcsio.GcsIO().open(path))
        else:
            img = Image.open(path)
        return img
        
    def resize(element):
        label, path = element
        img = load_img(path)
        img = img.resize(target_size, Image.NEAREST)
        img = np.asarray(img)
        save_to_dir = os.path.join(
                output_folder, label.upper().replace(' ', '_'))
        return (save_to_dir, img)
            
    class WriteImage(beam.DoFn):
        def __init__(self, filesystems):
            self.filesystems = filesystems
            
        def process(self, image_tuple):
            directory, img = image_tuple
            filename = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
            filename = '{}.{}'.format(filename, save_format)
            path = os.path.join(directory, filename)
            file = BytesIO()
            if save_format == 'jpeg':
                img = Image.fromarray((img * 255).astype(np.uint8)).convert('RGB')
            else:
                img = Image.fromarray((img * 255).astype(np.uint8))
            img.save(file, format=save_format)
            if not os.path.isdir(directory):
                self.filesystems.FileSystems.mkdirs(directory)
            writer = self.filesystems.FileSystems.create(path)
            writer.write(file.getvalue())
            writer.close()
            
    timestamp = datetime.datetime.now().strftime("%y%m%d-%H%M%S")

    #dictionary of pipeline options
    opts = {
        # "staging_location": PROCESSED_FOLDER,
        "temp_location": "gs://{}/temp".format(BUCKET),
        "job_name": "{}-{}".format(job_mame, timestamp),
        "project": PROJECT,
        "region": REGION,
        "max_num_workers": 8,
        "runner": runner
    }

    #instantiate PipelineOptions object using options dictionary
    options = beam.pipeline.PipelineOptions(flags = [], **opts)
    p = beam.Pipeline(options=options)
    
    (p | 'Read dataset'   >> beam.io.ReadFromText(input_data)
       | 'Split data'     >> beam.Map(lambda x: x.split(','))
       | 'Resize samples' >> beam.Map(resize)
       | 'Save Image'     >> beam.ParDo(WriteImage(beam.io.filesystems))
    )
    
    p.run()

In [None]:
resize_samples(
    '{}/heldout.csv'.format(LOCAL_STAGING_FOLDER),
    PROCESSED_FOLDER,
    save_format='jpeg',
    target_size=(IMAGE_SIZE, IMAGE_SIZE)
)

In [None]:
!gsutil -m cp -r gs://ex-vivo-confocal/processed/STUDY_2021/HELDOUT data/2021_STUDY

---