In [None]:
pip install apache-airflow

In [6]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import os
import time
from glob import glob
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.models import Model

from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision,Accuracy

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import numpy as np
import cv2
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
H=256
W=256
dim=(H,W)


In [32]:
dag = DAG(
    dag_id='data_loading_and_preprocessing',
    start_date=days_ago(1),
    schedule_interval=None
)

# Define the tasks
def load_data_task():
    path = "/kaggle/input/kidneykits19/PNG_Slices_Segmented/PNG_Slices_Segmented"
    images = sorted(glob(f"{path}/*/Images/*.jpg"))
    segmentations = sorted(glob(f"{path}/*/Segmentation/*.png"))
    print(len(images), len(segmentations))
    
    split = 0.3
    split_size = int(len(images) * split)
    train_x, valid_x = train_test_split(images, test_size=split_size, random_state=42)
    train_y, valid_y = train_test_split(segmentations, test_size=split_size, random_state=42)

    return (train_x, train_y), (valid_x, valid_y)

load_data_task = PythonOperator(
    task_id='load_data_task',
    python_callable=load_data_task,
    dag=dag
)

def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, dim)
    x = x/255.0
    x = x.astype(np.float32)
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, dim)
    x = x/255.0
    x = x > 0.5
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=-1)
    return x

def shuffling(x, y):
    x, y = shuffle(x, y, random_state=42)
    return x, y

def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    return x, y

def tf_dataset(x, y, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(10)
    return dataset

train_x = []
train_y = []
valid_x = []
valid_y = []

def preprocess_data_task(**context):
    (train_x, train_y), (valid_x, valid_y) = context['task_instance'].xcom_pull(task_ids='load_data_task')
    train_x, train_y = shuffling(train_x, train_y)
    valid_x, valid_y = shuffling(valid_x, valid_y)
    train_dataset = tf_dataset(train_x, train_y)
    valid_dataset = tf_dataset(valid_x, valid_y)
    # Use the processed data as needed

preprocess_data_task = PythonOperator(
    task_id='preprocess_data_task',
    python_callable=preprocess_data_task,
    provide_context=True,
    dag=dag)



In [34]:
def Conv_Block(input,num_of_filter):
    x=Conv2D(num_of_filter,3,padding="same")(input)
    x=BatchNormalization()(x)
    x=Activation('ReLU')(x)

    x=Conv2D(num_of_filter,3,padding="same")(input)
    x=BatchNormalization()(x)
    x=Activation('ReLU')(x)
    return x


def Encoder(input,num_of_filter):
    x = Conv_Block(input, num_of_filter) #Skip connection
    p = MaxPool2D((2, 2))(x)  #Feature selection
    return x, p

def Decoder(input, skip_features, num_of_filter):
    x = Conv2DTranspose(num_of_filter, (2, 2), strides=2, padding="same")(input)
    x = Concatenate()([x, skip_features])
    x = Conv_Block(x, num_of_filter)
    return x


def build_unet_model(input_shape):
    inputs=Input(input_shape)

  #as you go down in the encoder,resolution decreases , number of filters doubles
    s1,p1=Encoder(inputs,32)
    s2,p2=Encoder(p1,64)
    s3,p3=Encoder(p2,128)
    s4,p4=Encoder(p3,256)

    b1=Conv_Block(p4,512)

    d1 = Decoder(b1, s4, 256)
    d2 = Decoder(d1, s3, 128)
    d3 = Decoder(d2, s2, 64)
    d4 = Decoder(d3, s1, 32)

    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)  #our data is in grayscale, gives 0,1 output

    model = Model(inputs, outputs, name="U-Net")
    return model 
    
    Build_model_task = PythonOperator(
    task_id='build_model_task',
    python_callable=build_unet_model,
    provide_context=True,
    dag=dag)


In [37]:
preprocess_data_task.set_upstream(load_data_task)




In [38]:
#Metrices
import numpy as np
import tensorflow as tf
from tensorflow.keras import backend as K

def iou(y_true, y_pred):
    def f(y_true, y_pred):
        intersection = (y_true * y_pred).sum()
        union = y_true.sum() + y_pred.sum() - intersection
        x = (intersection + 1e-15) / (union + 1e-15)
        x = x.astype(np.float32)
        return x
    return tf.numpy_function(f, [y_true, y_pred], tf.float32)

smooth = 1e-15
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2.0 * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

In [42]:
def train_model():
    num_epochs = 15
    model = UNET_Build((H, W, 3))
    metrics = [dice_coef, iou, Recall(), Precision()]
    model.compile(loss=dice_loss, optimizer=Adam(lr), metrics=metrics)
    model_weights = "/kaggle/working/UNET_FYP_FINAL_15.h5"
    csv_weights = "/kaggle/working/UNET_FYP_Final_15.csv"

    callbacks = [
        ModelCheckpoint(model_weights, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=2, min_lr=1e-7, verbose=1),
        TensorBoard(),
        CSVLogger(csv_weights),
        EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=False),
    ]
    start = time.time()
    model.fit(train_dataset, epochs=num_epochs, validation_data=valid_dataset, callbacks=callbacks, shuffle=False)
    stop = time.time()
    print(f"Training time: {stop - start}s")
    
    

    train_model_task = PythonOperator(
        task_id='train_model',
        python_callable=train_model
    )
    
#     train_model_task

In [46]:
train_model_task.set_upstream(Build_model_task)

NameError: name 'train_model_task' is not defined