# Image Featurization
Start with setup and imports (apologies for the vast quantity of imports)

In [1]:
from pyspark.sql import SparkSession


#Build spark session, unable to get more than 8 cores for the second half of the semester
#This configuration performs relatively well given that constraint. This application definitely wants more cores though. 
spark = SparkSession \
    .builder\
    .config("spark.executor.memory", '14g') \
    .config("spark.executor.instances", "3") \
    .config('spark.executor.cores', '2') \
    .config("spark.driver.memory",'14g') \
    .getOrCreate()




In [2]:
sc = spark.sparkContext


In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
import pandas as pd
from PIL import Image
import numpy as np
import io
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
import os
from spark_tensorflow_distributor import MirroredStrategyRunner
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
import re
from pyspark.ml.feature import *
from pyspark.ml.classification import GBTClassifier
from spark_tensorflow_distributor import MirroredStrategyRunner
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import *

# Read in images to spark image Dataframe

We also setup file locations. Everything should be accessable on the project folder. 

In [4]:
#Data in project directory
data_dir = "/project/ds5559/BioNerds/ham"
os.chdir('/project/ds5559/BioNerds/ham')
#Location of saved model from resnet notebook
ResNet50_Folder = "/project/ds5559/BioNerds/saved_data/ResNet50_testing/"
model_path = "Final_model"
model_dir = os.path.join(ResNet50_Folder,model_path)

#Location of saved mobilenet weights 
mobilenet_path = '/home/aaw3ff/HAM/Saved_Networks/mnet/mobilenet_tl-0001.ckpt'

In [5]:
#use recursive file lookup, files are in multiple directories
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load("/project/ds5559/BioNerds/ham")

# Featurize images

Assign features to images using our resnet 50, xception, and MobileNet models. 



## Begin with defining UDFs to featurize our spark dataframe

Each udf calls on a preprocess function to open the image, and a model function to return the model used to featurize the image. It returns an array of floats corresponding to the model weights the layer immediately preceding the top. 

In [6]:

def model_fn_xcep():
    import tensorflow as tf
    """
    Return a topless Xception TL model with loaded weights.
    """
    
    #Define augmentation layer, no trainable weights but want identical structure
    data_augmentation = tf.keras.Sequential([
    tf.keras.layers.experimental.preprocessing.RandomFlip('horizontal_and_vertical'),
    tf.keras.layers.experimental.preprocessing.RandomRotation(.99),
    ])
    
    #Define global_average_layer for handling xception output
    global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
    
    #import Xception model with 10% dropout and preloaded imagenet weights 
    base_model = tf.keras.applications.Xception(
    weights= 'imagenet',  # Load weights pre-trained on ImageNet.
    input_shape=(150, 150, 3),
    include_top=False,
    classifier_activation = 'softmax'
    )  
    
    #Define model 
    inputs = tf.keras.Input(shape=(150, 150, 3))
    x = data_augmentation(inputs)
    x = tf.keras.applications.xception.preprocess_input(x)
    #Feed data to Xception
    x = base_model(x)
    x = global_average_layer(x)
    #Add postprocessing layers, omit 7 class final layer
    x = tf.keras.layers.Dense(1024,activation='relu')(x)
    outputs = tf.keras.layers.Dense(256,activation='relu')(x)
    model = tf.keras.Model(inputs, outputs)
    
    #load weights from file, expect partial since we're missing the top layer
    #Go ahead and mock me, I thought Xception and inceptionV3 were the same thing for a hot minute
    #And now it won't load the weights if I change the filename :(
    model.load_weights('/project/ds5559/BioNerds/Saved_Networks/xcep/inception_tl-0058.ckpt').expect_partial()

    #Compile the model so that it can be used for prediction 
    model.compile(optimizer=tf.keras.optimizers.Adam(.001),
                   loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                   metrics=['accuracy'])
    return model


In [7]:

def model_fn_mnet():
    import tensorflow as tf
    """
    Return a topless xception TL model with loaded weights.
    """
    
    #Define augmentation layer, no trainable weights but want identical structure
    data_augmentation = tf.keras.Sequential([
    tf.keras.layers.experimental.preprocessing.RandomFlip('horizontal_and_vertical'),
    tf.keras.layers.experimental.preprocessing.RandomRotation(.99),
    ])
    
    #import mobilenet model with 10% dropout and preloaded imagenet weights 
    mnet2 = tf.keras.applications.MobileNet(
        input_shape=None,
        alpha=1.0,
        depth_multiplier=1,
        dropout=0.1,
        include_top=False,
        weights='imagenet',
        input_tensor=None,
        pooling='max',
        classifier_activation="softmax"
    )
    
    #Define model 
    inputs = tf.keras.Input(shape=(224, 224, 3))
    #Augmentation layer used in training
    x = data_augmentation(inputs)
    #Preprocess and feed to mobilenet
    x = tf.keras.applications.mobilenet.preprocess_input(x)
    x = mnet2(x)
    #Postprocessing layers, omit final layer, see mobilenet notebook for details
    x = tf.keras.layers.Dense(units = 1024, activation = 'relu')(x)
    outputs = tf.keras.layers.Dense(units = 256, activation = 'relu')(x)
    model = tf.keras.Model(inputs, outputs)
    
    #load weights from file, expect partial since we're missing the top layer
    model.load_weights('/project/ds5559/BioNerds/Saved_Networks/mnet/mobilenet_tl-0001.ckpt').expect_partial()

    #Compile the model so that it can be used
    model.compile(optimizer=tf.keras.optimizers.Adam(.001),
                   loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                   metrics=['accuracy'])
    return model


In [8]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction, resize to 224 for mobilenet/resnet.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def preprocess_xcep(content):
    """
    Preprocesses raw image bytes for prediction, resize to 150 for xception.
    """
    img = Image.open(io.BytesIO(content)).resize([150, 150])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Take the model and output the layer below the top as features. 
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    #Flatten multidimensional model outputs
    output = [p.flatten() for p in preds]
    return pd.Series(output)

def featurize_series_xcep(model, content_series):
    """
    Take the model and output the layer below the top as features. 
    """
    input = np.stack(content_series.map(preprocess_xcep))
    preds = model.predict(input)
    #Flatten multidimensional model outputs
    output = [p.flatten() for p in preds]
    return pd.Series(output)


In [9]:
def model_fn_r50():
    """
    Returns a New ResNet50 transfer learning model with top layer removed.
    See resnet 50 notebook for the creation of said model
    """
    model = tf.keras.models.load_model(model_dir)
    new_model = tf.keras.models.Sequential(model.layers[:-1])
    return new_model

In [10]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
#Your own code recipe for this specifies type, stop yelling at me DataBricks
def featurize_mnet_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn_mnet()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



In [11]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
#Your own code recipe for this specifies type, stop yelling at me DataBricks
def featurize_r50_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn_r50()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

In [12]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
#Your own code recipe for this specifies type, stop yelling at me DataBricks
def featurize_xcep_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn_xcep()
    for content_series in content_series_iter:
        yield featurize_series_xcep(model, content_series)

## Call said udfs

Here, we call our UDFS to featurize the images in the dataframe.

In [13]:
#Define a udf that takes a path and returns the image ID for joining with other dataframes

pathToID = udf(lambda z: toClass(z), StringType())
spark.udf.register("pathToID", pathToID)
def toClass(s):
    #just use regex to grab class from filename
    p = re.compile("ISIC_[0-9]*")
    result = p.search(s)
    return(result.group(0))




In [14]:
#Featurize them images, theres probably a more efficient/elegant way to do this
mnet_df = images.repartition(12).select(col("path"), featurize_mnet_udf("content").alias("mnet_features"))
r50_df = images.repartition(12).select(col("path"), featurize_r50_udf("content").alias("r50_features"))
xcep_df = images.repartition(12).select(col("path"), featurize_xcep_udf("content").alias("xcep_features"))

#Add ID column for dataframe joins
mnet_df = mnet_df.withColumn('ID', pathToID('path'))
r50_df = r50_df.withColumn('ID', pathToID('path'))
xcep_df = xcep_df.withColumn('ID', pathToID('path'))

In [15]:
#Join the features dataframes
#Drop path as it is not needed
all_feats = mnet_df.join(r50_df, on = 'ID', how = 'inner')
all_feats = all_feats.join(xcep_df, on = 'ID', how = 'inner').drop('path')

all_feats.show(5)


+------------+--------------------+--------------------+--------------------+
|          ID|       mnet_features|        r50_features|       xcep_features|
+------------+--------------------+--------------------+--------------------+
|ISIC_0024536|[0.0, 0.0, 0.0, 0...|[0.03389938, 0.0,...|[0.0, 0.0, 0.0, 0...|
|ISIC_0024549|[0.0, 0.19771415,...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|
|ISIC_0024738|[0.0, 0.0, 0.0, 0...|[0.31574133, 0.0,...|[0.0, 0.0, 0.0, 0...|
|ISIC_0024766|[0.0, 0.0, 0.0, 0...|[1.1685055, 0.0, ...|[0.0, 0.0, 0.0, 0...|
|ISIC_0025187|[0.0, 0.0, 0.0, 0...|[0.1397682, 0.0, ...|[0.0, 0.0, 0.0, 0...|
+------------+--------------------+--------------------+--------------------+
only showing top 5 rows



# Integration with metadata

There is also a file with patient metadata for the patients. We will integrate the relevant portions with our features. The metadata contains the diagnosis (our variable of interest) as well as ptient age, patient sex, and location of lesion. Dx-type is included, but is excluded as a predictive variable since it is not a predictor available at time of diagnosis. Ideally, we want to have some idea what these lesions are before we cut them off entirely. 

In [16]:
# Read CSV file
meta_df = spark.read.csv('/project/ds5559/BioNerds/ham/HAM10000_metadata.csv', header = True, inferSchema = True)

In [17]:
meta_df.show(5)

+-----------+------------+---+-------+----+----+------------+
|  lesion_id|    image_id| dx|dx_type| age| sex|localization|
+-----------+------------+---+-------+----+----+------------+
|HAM_0000118|ISIC_0027419|bkl|  histo|80.0|male|       scalp|
|HAM_0000118|ISIC_0025030|bkl|  histo|80.0|male|       scalp|
|HAM_0002730|ISIC_0026769|bkl|  histo|80.0|male|       scalp|
|HAM_0002730|ISIC_0025661|bkl|  histo|80.0|male|       scalp|
|HAM_0001466|ISIC_0031633|bkl|  histo|75.0|male|         ear|
+-----------+------------+---+-------+----+----+------------+
only showing top 5 rows



## Join Metadata and Features

We've already convieniently converted out paths to image ids, so here we'll just join 

In [18]:
all_df = all_feats.join(meta_df, all_feats.ID == meta_df.image_id, 'inner')

In [19]:
all_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- mnet_features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- r50_features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- xcep_features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- lesion_id: string (nullable = true)
 |-- image_id: string (nullable = true)
 |-- dx: string (nullable = true)
 |-- dx_type: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- localization: string (nullable = true)



## Vectorize the output of our UDFs 

Explicit vector UDT types are required for vector assembler and downstream work. We then drop the non-useful portions of the dataframe. 

In [20]:
to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())


all_df = all_df.select("*", to_vector("mnet_features").alias("mnet_vec"))
all_df = all_df.select("*", to_vector("r50_features").alias("r50_vec"))
all_df = all_df.select("*", to_vector("xcep_features").alias("xcep_vec"))

In [21]:
all_df = all_df.drop('path', 'r50_features', 'xcep_features', 'mnet_features')

In [22]:
all_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- lesion_id: string (nullable = true)
 |-- image_id: string (nullable = true)
 |-- dx: string (nullable = true)
 |-- dx_type: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- localization: string (nullable = true)
 |-- mnet_vec: vector (nullable = true)
 |-- r50_vec: vector (nullable = true)
 |-- xcep_vec: vector (nullable = true)



In [23]:
all_df.cache()

DataFrame[ID: string, lesion_id: string, image_id: string, dx: string, dx_type: string, age: double, sex: string, localization: string, mnet_vec: vector, r50_vec: vector, xcep_vec: vector]

# Save featurized images for future use

Write a parquet with the featurized images and associated metadata. 

In [24]:
#First rule of grading this project, do not ask about clean_features.parquet1
all_df.write.parquet("clean_features.parquet2")

In [29]:
#Did is work?
new = spark.read.parquet("clean_features.parquet2")

In [32]:
#yep
new.show(5)

+------------+-----------+------------+---+---------+----+------+---------------+--------------------+--------------------+--------------------+
|          ID|  lesion_id|    image_id| dx|  dx_type| age|   sex|   localization|            mnet_vec|             r50_vec|            xcep_vec|
+------------+-----------+------------+---+---------+----+------+---------------+--------------------+--------------------+--------------------+
|ISIC_0024306|HAM_0000550|ISIC_0024306| nv|follow_up|45.0|  male|          trunk|[0.0,0.0,0.0,0.0,...|[0.24354705214500...|[0.0,0.0,0.0,0.0,...|
|ISIC_0024387|HAM_0004156|ISIC_0024387| nv|    histo|65.0|female|lower extremity|[0.0,1.3627424240...|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|ISIC_0024397|HAM_0001501|ISIC_0024397| nv|follow_up|65.0|  male|          trunk|[0.0,0.0,0.0,0.0,...|[1.18369388580322...|[0.0,0.0,0.0,0.0,...|
|ISIC_0024551|HAM_0002629|ISIC_0024551| nv|follow_up|50.0|  male|        abdomen|[0.0,0.0,0.0,0.0,...|[0.01115676853805...|[0.0,0.