## Convert X-Ray images into SQL Spark Dataframe 
In this notebook, Chest X-Ray dataset has 112120 gray scale images with 1024 by 1024 image size. 
The dataset contains x-ray images that shows one or more Thorax Disease and the total number of disease is 14. This make the problem as multiple class and multiple lables problem. 
The volume of this dataset is around 42 GB.  

In this example, all images will be converted into SQL spark dataframe and will be saved in HDFS.

In the following cell, import all the required packages and libraries.

In [None]:
from bigdl.nn.criterion import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import *
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType, ArrayType
from zoo.common.nncontext import *
from zoo.pipeline.nnframes import *

## Generate one hot encoding 
This part of the code generates SQL spark dataframe for the mutiple labels for each class as one hot encoding then save that as a CSV file.  

In [None]:
label_length = 14
label_texts = ["Atelectasis", "Cardiomegaly", "Effusion", "Infiltration", "Mass", "Nodule", "Pneumonia", "Pneumothorax",
               "Consolidation", "Edema", "Emphysema", "Fibrosis", "Pleural_Thickening", "Hernia"]
label_map = {k: v for v, k in enumerate(label_texts)}

def write_to_csv(df, label_col="label"):
    for i in range(label_length):
        get_Kth = udf(lambda a: a[i] * (i + 1), DoubleType())
        df = df.withColumn(str(i) + " th", get_Kth(col(label_col)))

    df.show()
    df = df.drop("label")
    df.write.csv("label.csv")
    return df

## Class distribution 
This functions computes the number of images in each class.  

In [None]:
def get_count_for_kth_class(k, df, label_col="label"):
    print(k, "th class distribution \t (", label_texts[k], ") is:")
    get_Kth = udf(lambda a: a[k], DoubleType())
    extracted_df = df.withColumn("kth_label", get_Kth(col(label_col)))
    extracted_df.groupby("kth_label").count().show()

## Convert images and their labels into SQL spark dataframe 
This part read all images and their labels ( as CSV ) and generate SQL saprk dataframe. A new label column will be added to the dataframe. The resulted dataframe is saved in HDFS as TestDF and TrainDF. It also display the number of images in each dataframe.

In [None]:
if __name__ == "__main__":
    image_path ="hdfs:///datasets/xray_files/xray/all_images" #sys.argv[1] #"/home/yuhao/workspace/data/xray/middle_images"
    label_path =/datasets/xray_files/ #sys.argv[2] #"/home/yuhao/workspace/data/xray/Data_Entry_2017.csv"
    save_path = "hdfs:///datasets/xray_files/DataFrames" #sys.argv[3] #"./save_model"

    sparkConf = create_spark_conf().setAppName("test_dell_x_ray")
    sc = init_nncontext(sparkConf)
    spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
    print(sc.master)

    def text_to_label(text):
        arr = [0.0] * len(label_texts)
        for l in text.split("|"):
            if l != "No Finding":
                arr[label_map[l]] = 1.0
        return arr

    getLabel = udf(lambda x: text_to_label(x), ArrayType(DoubleType()))
    getName = udf(lambda row: os.path.basename(row[0]), StringType())
    imageDF = NNImageReader.readImages(image_path, sc, resizeH=256, resizeW=256, image_codec=1) \
        .withColumn("Image_Index", getName(col('image')))
    imageDF=imageDF.withColumnRenamed('Image_Index', 'Image_Index')
    labelDF = spark.read.load(label_path + "/Data_Entry_2017.csv", format="csv", sep=",", inferSchema="true", header="true") \
        .select("Image_Index", "Finding_Labels") \
        .withColumn("label", getLabel(col('Finding_Labels'))) \
        .withColumnRenamed('Image_Index', 'Image_Index') \
        .select("Image_Index", "label")
    labelDF.printSchema()
    train_df = imageDF.join(labelDF, on="Image_Index", how="inner")

    trainingList = spark.read.text(label_path + "/train_val_list.txt").withColumnRenamed("value", "Image_Index")
    testList = spark.read.text(label_path + "/test_list.txt").withColumnRenamed("value", "Image_Index")

    trainingDF = train_df.join(trainingList, on="Image_Index")
    testDF = train_df.join(testList, on="Image_Index")

    trainingDF.write.save(save_path + "/trainingDFjan29")
    testDF.write.save(save_path + "/testDF")

    print("data saved at ", save_path)

    loadedTrainingDF = spark.read.load(save_path + "/trainingDFjan29")
    loadedTestDF = spark.read.load(save_path + "/testDFjan29")
    print("trainingDF count: ", loadedTrainingDF.count())
    print("testDF count: ", loadedTestDF.count())
    loadedTrainingDF.show()
    loadedTestDF.show()