In [1]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import BatchNormalization, Conv2D, MaxPooling2D, UpSampling2D
from tensorflow.keras.layers import Activation, Input, Dense, Flatten, Dropout
from tensorflow.keras.optimizers import SGD, Adam, Adadelta
from sklearn.model_selection import train_test_split
from tensorflow.keras import backend as K
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import numpy as np

In [2]:
train_data_path = '/home/evren/classified_data/clean_data/'
noisy_15dB_path = '/home/evren/classified_data/noisy_data/15dB'
noisy_20dB_path = '/home/evren/classified_data/noisy_data/20dB'
noisy_25dB_path = '/home/evren/classified_data/noisy_data/25dB'
noisy_30dB_path = '/home/evren/classified_data/noisy_data/30dB'
noisy_100dB_path = '/home/evren/classified_data/noisy_data/100dB'

In [3]:
# class_mode='input': images identical to input images (mainly used to work with autoencoders)
# seed: Optional random seed for shuffling and transformations.
# it yields float32 tensors of shape (batch_size, *target_size, channels)
# all imgs height is 64, width changes between approximately 100-500

train_generator = ImageDataGenerator(rescale=1./255).flow_from_directory(
                           train_data_path,target_size=(64,128), shuffle=False, class_mode='input', seed=42)

test_15dB_generator = ImageDataGenerator(rescale=1./255).flow_from_directory(
                           noisy_15dB_path,target_size=(64,128), shuffle=False, class_mode='input', seed=42)

test_20dB_generator = ImageDataGenerator(rescale=1./255).flow_from_directory(
                           noisy_20dB_path,target_size=(64,128), shuffle=False, class_mode='input', seed=42)

test_25dB_generator = ImageDataGenerator(rescale=1./255).flow_from_directory(
                           noisy_25dB_path,target_size=(64,128), shuffle=False, class_mode='input', seed=42)

test_30dB_generator = ImageDataGenerator(rescale=1./255).flow_from_directory(
                           noisy_30dB_path,target_size=(64,128), shuffle=False, class_mode='input', seed=42)

test_100dB_generator = ImageDataGenerator(rescale=1./255).flow_from_directory(
                           noisy_100dB_path,target_size=(64,128), shuffle=False, class_mode='input', seed=42)

Found 69071 images belonging to 1 classes.
Found 69071 images belonging to 1 classes.
Found 69071 images belonging to 1 classes.
Found 69071 images belonging to 1 classes.
Found 69071 images belonging to 1 classes.
Found 69071 images belonging to 1 classes.


In [4]:
train_data=np.concatenate([train_generator.next()[0] for i in range(train_generator.__len__())])
print(train_data.shape)
print(len(train_data))

test_15dB_data=np.concatenate([test_15dB_generator.next()[0] for i in range(test_15dB_generator.__len__())])
print(test_15dB_data.shape)
print(len(test_15dB_data))

(69071, 64, 128, 3)
69071
(69071, 64, 128, 3)
69071


In [5]:
noisy_train, x_test, clean_train, y_test = train_test_split(test_15dB_data, train_data,
                                                           test_size=20, random_state=0)

In [6]:
#train_data=[]
#train_generator.reset()
#for i in range(train_generator.__len__()):
#    a,b = train_generator.next()
#    train_data.append(a)

#train_data=np.array(train_data)
#print(len(train_data))


#test_15dB_data=[]
#test_15dB_generator.reset()
#for i in range(test_15dB_generator.__len__()):
#    a,b = test_15dB_generator.next()
#    test_15dB_data.append(a)

#test_15dB_data=np.array(test_15dB_data)
#print(test_15dB_data)

In [7]:
steps_per_ep = train_generator.n // train_generator.batch_size
# validation_st = val_generator.n // val_generator.batch_size

test_15dB_steps= test_15dB_generator.n // test_15dB_generator.batch_size
test_20dB_steps= test_20dB_generator.n // test_20dB_generator.batch_size
test_25dB_steps= test_25dB_generator.n // test_25dB_generator.batch_size
test_30dB_steps= test_30dB_generator.n // test_30dB_generator.batch_size
test_100dB_steps= test_100dB_generator.n // test_100dB_generator.batch_size

In [8]:
model = Sequential([
    
    Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=(64,128,3)),
    BatchNormalization(),
    MaxPooling2D((2, 2), padding='same'),
    Conv2D(8, (3, 3), activation='relu', padding='same'), 
    BatchNormalization(),
    MaxPooling2D((2, 2), padding='same'),
    Conv2D(8, (3, 3), activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling2D((2, 2), padding='same'),
    
    #Flatten(),
 
    Conv2D(8, (3, 3), activation='relu', padding='same'),
    UpSampling2D((2, 2)),
    Conv2D(8, (3, 3), activation='relu', padding='same'),
    UpSampling2D((2, 2)),
    Conv2D(32, (3, 3), activation='relu', padding='same'),
    UpSampling2D((2, 2)),
    Conv2D(1, (3, 3), activation='relu', padding='same')

])

In [9]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 64, 128, 32)       896       
_________________________________________________________________
batch_normalization (BatchNo (None, 64, 128, 32)       128       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 32, 64, 32)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 32, 64, 8)         2312      
_________________________________________________________________
batch_normalization_1 (Batch (None, 32, 64, 8)         32        
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 16, 32, 8)         0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 16, 32, 8)         5

In [10]:
model.compile(optimizer='Adam', loss='mse',metrics=['accuracy'])

In [11]:
history = model.fit(test_15dB_generator, train_generator,
                epochs=20
                )

ValueError: `y` argument is not supported when using `keras.utils.Sequence` as input.

In [None]:
model.save('model1.h5')

train_loss, train_acc = model.evaluate(train_generator)
val_loss, val_acc = model.evaluate(val_generator) 

In [None]:
plt.rcParams['figure.figsize'] = (17,6)
fig, ax1 = plt.subplots()
xepochs = range(1,len(history.history['loss']) + 1)

plt.plot(xepochs, history.history['loss'], label = 'loss')
plt.plot(xepochs, history.history['val_loss'], label = 'val_loss')
plt.title('Loss')
plt.legend()
plt.show()

plt.plot(xepochs, history.history['accuracy'], label = 'accuracy')
plt.plot(xepochs, history.history['val_accuracy'], label = 'val_accuracy')
plt.title('Metrics (Accuracy)')
plt.legend()
plt.show()

In [None]:
prediction_15dB = model.predict(test_15dB_generator, 
                           steps=test_15dB_steps)

prediction_20dB = model.predict(test_20dB_generator, 
                           steps=test_20dB_steps)

prediction_25dB = model.predict(test_25dB_generator, 
                           steps=test_25dB_steps)

prediction_30dB = model.predict(test_30dB_generator, 
                           steps=test_30dB_steps)

prediction_100dB = model.predict(test_100dB_generator, 
                           steps=test_100dB_steps)

In [None]:
test_100dB_arr, labels = next(test_100dB_generator)

In [None]:
def plotImages(images_arr):
    fig, axes = plt.subplot(1,10, figsize=(20,20))
    axes = axes.flatten()
    for img, ax in zip(images_arr, axes):
        ax.imshow(img)
        ax.axis('off')
    plt.tight_layout()
    plt.show()

In [None]:
plotImages(test_100dB_generator)
plotImages(prediction_100dB)

In [None]:
def PSNR(original, compressed): 
    mse = np.mean((original - compressed) ** 2) 
    if(mse == 0):  # MSE is zero means no noise is present in the signal . 
                  # Therefore PSNR have no importance. 
        return 100
    max_pixel = 255.0
    psnr = 20 * log10(max_pixel / sqrt(mse)) 
    return psnr

In [None]:
# PSNR(test_100dB_generator, prediction_100dB)