In [None]:
import os
import torch.nn as nn
import torch.utils.data
import torch.nn.functional as F
import torchvision
import numpy as np
import torch
import pandas as pd
import torchvision.utils as vutils
import matplotlib.pyplot as plt

from torchvision import transforms
from torchvision.utils import save_image
from PIL import Image, ImageFont, ImageDraw
from matplotlib import pyplot
from sklearn.model_selection import train_test_split, KFold, GridSearchCV
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from tensorflow.keras.layers import Dense, LeakyReLU, Dropout, Input, BatchNormalization, Conv2D, MaxPooling2D
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.utils import plot_model
from numpy import asarray, savetxt
from tensorflow import keras
from matplotlib import pyplot
from torch.utils.data import TensorDataset, DataLoader
from PIL import Image as im
from numpy import asarray, savetxt
from torch import nn, optim
from sklearn.preprocessing import MinMaxScaler

print(os.getcwd())


In [None]:

dir_name = 'C:/'
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
print("Using Device:", DEVICE)

batch_size = 3

# Data loader

trans = transforms.Compose([transforms.Grayscale(), transforms.Resize((280,280)), transforms.ToTensor(),transforms.Normalize(0.5, 0.5)])
image = torchvision.datasets.ImageFolder(root=dir_name,transform=trans)

data_loader = torch.utils.data.DataLoader(dataset=image,
                                          batch_size=batch_size,
                                          shuffle=True)
labels_map = {0:"best", 
             1:"normal",
             2:"worst",
             }

figure = plt.figure(figsize=(8, 8))
cols, rows = 3, 1
for i in range(1, cols * rows + 1):
    sample_idx = torch.randint(len(data_loader), size=(1,)).item()
    img, label = image[sample_idx]
    figure.add_subplot(rows, cols, i)
    plt.title(labels_map[label])
    plt.axis("off")
    plt.imshow(img.squeeze(), cmap="gray")
plt.show()

# Hyper-parameters & Variables setting

num_epoch = 500
learning_rate = 0.0001

img_size = 280 * 280
num_channel = 3
dir_name = "CDGAN"
noise_size = 3
hidden_size1 = 128
hidden_size2 = 256
hidden_size3 = 512
hidden_size4 = 1024


condition_size = 3


# Device setting
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Now using {} devices".format(device))


# Create a directory for saving samples
if not os.path.exists(dir_name):
    os.makedirs(dir_name)

# Define discriminator
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        self.linear1 = nn.Linear(img_size + condition_size, hidden_size4)
        self.linear2 = nn.Linear(hidden_size4, hidden_size3)
        self.linear3 = nn.Linear(hidden_size3, hidden_size2)
        self.linear4 = nn.Linear(hidden_size2, hidden_size1)
        self.linear5 = nn.Linear(hidden_size1, 3)
        self.leaky_relu = nn.LeakyReLU(0.1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.leaky_relu(self.linear1(x))
        x = self.leaky_relu(self.linear2(x))
        x = self.leaky_relu(self.linear3(x))
        x = self.leaky_relu(self.linear4(x))
        x = self.linear5(x)
        x = self.sigmoid(x)
        return x
    
    

# Define generator
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()

        self.linear1 = nn.Linear(noise_size + condition_size, hidden_size1)
        self.linear2 = nn.Linear(hidden_size1, hidden_size2)
        self.linear3 = nn.Linear(hidden_size2, hidden_size3)
        self.linear4 = nn.Linear(hidden_size3, hidden_size4)
        self.linear5 = nn.Linear(hidden_size4, img_size)
        self.relu = nn.ReLU()
        self.tanh = nn.Tanh()
    def forward(self, x):
        x = self.relu(self.linear1(x))
        x = self.relu(self.linear2(x))
        x = self.relu(self.linear3(x))
        x = self.relu(self.linear4(x))
        x = self.linear5(x)
        x = self.tanh(x)
        return x


# For checking validity in final step
def check_condition(_generator):
    test_image = torch.empty(0).to(device)

    for i in range(3):
        test_label = torch.tensor([0])
        test_label_encoded = F.one_hot(test_label, num_classes=3).to(device)

        # create noise(latent vector) 'z'
        _z = torch.randn(1, noise_size).to(device)
        _z_concat = torch.cat((_z, test_label_encoded), 1)

        test_image = torch.cat((test_image, _generator(_z_concat)), 0)

    _result = test_image.reshape(1, 3, 280, 280)
    save_image(_result, os.path.join(dir_name, 'result.png'), nrow=1)



# Initialize generator/Discriminator
discriminator = Discriminator()
generator = Generator()

# Device setting
discriminator = discriminator.to(device)
generator = generator.to(device)

# Loss function & Optimizer setting
criterion = nn.BCELoss()
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=learning_rate)
g_optimizer = torch.optim.Adam(generator.parameters(), lr=learning_rate)


"""
Training part
"""
for epoch in range(num_epoch):
    for i, (images, label) in enumerate(data_loader):

        # make ground truth (labels) -> 1 for real, 0 for fake
        real_label = torch.full((batch_size, 3), 1, dtype=torch.float32).to(device)
        fake_label = torch.full((batch_size, 3), 0, dtype=torch.float32).to(device)

        # reshape real images 
        real_images = images.reshape(batch_size, -1).to(device)

       
        label_encoded = F.one_hot(label, num_classes=3).to(device)
        # concat real images with 'label encoded vector'
        real_images_concat = torch.cat((real_images, label_encoded), 1)

        # +---------------------+
        # |   train Generator   |
        # +---------------------+

        # Initialize grad
        g_optimizer.zero_grad()
        d_optimizer.zero_grad()

        # make fake images with generator & noise vector 'z'
        z = torch.randn(batch_size, noise_size).to(device)

        # concat noise vector z with encoded labels
        z_concat = torch.cat((z, label_encoded), 1)
        fake_images = generator(z_concat)
        fake_images_concat = torch.cat((fake_images, label_encoded), 1)

        # Compare result of discriminator with fake images & real labels
        # If generator deceives discriminator, g_loss will decrease
        g_loss = criterion(discriminator(fake_images_concat), real_label)

        # Train generator with backpropagation
        g_loss.backward()
        g_optimizer.step()

        # +---------------------+
        # | train Discriminator |
        # +---------------------+

        # Initialize grad
        d_optimizer.zero_grad()
        g_optimizer.zero_grad()

        # make fake images with generator & noise vector 'z'
        z = torch.randn(batch_size, noise_size).to(device)

        # concat noise vector z with encoded labels
        z_concat = torch.cat((z, label_encoded), 1)
        fake_images = generator(z_concat)
        fake_images_concat = torch.cat((fake_images, label_encoded), 1)

        # Calculate fake & real loss with generated images above & real images
        fake_loss = criterion(discriminator(fake_images_concat), fake_label)
        real_loss = criterion(discriminator(real_images_concat), real_label)
        d_loss = (fake_loss + real_loss) / 2

        # Train discriminator with backpropagation
        # In this part, we don't train generator
        d_loss.backward()
        d_optimizer.step()

        d_performance = discriminator(real_images_concat).mean()
        g_performance = discriminator(fake_images_concat).mean()

        if (i + 1) % 150 == 0:
            print("Epoch [ {}/{} ]  Step [ {}/{} ]  d_loss : {:.5f}  g_loss : {:.5f}"
                  .format(epoch + 1, num_epoch, i+1, len(data_loader), d_loss.item(), g_loss.item()))

    # print discriminator & generator's performance
    print(" Epock {}'s discriminator performance : {:.2f}  generator performance : {:.2f}"
          .format(epoch + 1, d_performance, g_performance))

    # Save fake images in each epoch
    
    label = label.tolist()
    label = label[:3]
    label = [str(l) for l in label]
    label_text = ", ".join(label)
    
    samples = fake_images.reshape(batch_size, 1, 280, 280)
    save_image(samples, os.path.join(dir_name, 'fake_samples{}({}).png'.format(epoch + 1,label_text)))

check_condition(generator)

In [None]:
target_y = pd.read_csv(".csv")

target_x = pd.read_csv(".csv", usecols=['x1','x2','x3','x4','x6','y_1','y_2'])

target_x


# normalize dataset with MinMaxScaler
scaler = MinMaxScaler(feature_range=(0, 255))
target_x = pd.DataFrame(scaler.fit_transform(target_x))
target_x
target_y=target_y.drop('Unnamed: 0',axis=1)
target_y

target_y_np = target_y.values
target_x_np = target_x.values

target_y_np_reshape = target_y_np.reshape(target_y_np.shape[0],-1,280,280)
target_x_np_reshape = target_x_np.reshape(target_x_np.shape[0],-1,1,7)
print(target_y_np_reshape.shape)
print(target_x_np.shape)

In [None]:
x = torch.rand(6,1,280, 280)
y = torch.rand(6,7)

    
class TensorData(Dataset):
    def __init__(self, x_data, y_data):
        self.x_data = torch.FloatTensor(x_data)
        self.y_data = torch.FloatTensor(y_data)
        self.len = self.y_data.shape[0]
        
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]
    
    def __len__(self):
        return self.len
    
    
trainsets = TensorData(x, y)
trainloader = DataLoader(trainsets, batch_size = 6, shuffle = True)
print(trainloader)

class MultiLayerPerceptron(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=8, kernel_size=(10,10), stride=2, padding=0, bias=True),
            nn.BatchNorm2d(8),
            nn.ReLU(),
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=8, out_channels=16, kernel_size=(10,10), stride=2, padding=0, bias=True),
            nn.BatchNorm2d(16),
            nn.ReLU(),
        )
        self.layer3 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32,  kernel_size=(10,10), stride=2, padding=0, bias=True),
            nn.BatchNorm2d(32),
            nn.ReLU(),
        )
        self.layer4 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64,  kernel_size=(10,10), stride=2, padding=0, bias=True),
            nn.BatchNorm2d(64),
            nn.ReLU(),
        )
        self.layer5 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=1,  kernel_size=(10,10), stride=2, padding=0, bias=True),
            nn.BatchNorm2d(1),
            nn.ReLU(),
        )
        

        self.fc1 = nn.Linear(1,136,bias=False)
        self.fc2 = nn.Linear(136,64,bias=False)
        self.fc3 = nn.Linear(64,28,bias=False)
        self.fc4 = nn.Linear(28,10,bias=False)
        self.fc5 = nn.Linear(10,7,bias=False)
        
        
    def forward(self,x):
#               
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        x = self.fc5(x)
        return x



def tt_split(X_1,Y_1):
    
    
    sc_X = StandardScaler()
    sc_Y = StandardScaler()
    
    
    X_train, X_test, Y_train, Y_test = train_test_split(X_1,Y_1,test_size=0.2)
    
    trainsets = TensorData(X_train, Y_train)
    trainloader = DataLoader(trainsets, batch_size = 6, shuffle = True)
    testsets = TensorData(X_test, Y_test)
    testloader = DataLoader(testsets, batch_size = 6, shuffle = False)
    
    return trainloader, testloader, sc_X, sc_Y



def evaluation(dataloader, model, sc_X, sc_Y):
    predictions = torch.tensor([], dtype=torch.float)
    actual = torch.tensor([],dtype=torch.float)
    with torch.no_grad():
        model.eval()
        for data in dataloader:
            inputs, values = data
            outputs = model(inputs)
            predictions = torch.cat((predictions, outputs), 0)
            actual = torch.cat((actual, values), 0)
    
    predictions = predictions.numpy()
    actual = actual.numpy()


    rmse = np.sqrt(mean_squared_error(predictions.reshape(predictions.shape[0],7), actual.reshape(actual.shape[0],7)))
        
    print(predictions)
    print(actual)
    
    return rmse




def MLP(X,Y):
    model = MultiLayerPerceptron()
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr = 0.0001, weight_decay = 1e-4)
    
    trainloader, testloader, sc_X, sc_Y = tt_split(X,Y)

    loss_ = []
    n = len(trainloader)
    
    for epoch in range(1000):
        running_loss = 0.0
        for data in trainloader:
            inputs, values, = data
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, values)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            
        loss_.append(running_loss/n)
        print(" Epock {} loss {}".format(epoch + 1, loss))
        
    plt.plot(loss_)
    plt.title('Training Loss')
    plt.xlabel('epoch')
    plt.show()
    
    train_rmse = evaluation(trainloader, model, sc_X, sc_Y)
    test_rmse = evaluation(testloader, model, sc_X, sc_Y)
    print('Train RMSE: ', train_rmse)
    print('Test RMSE: ', test_rmse)

    
    torch.save(model, 'C:/.pt')

MLP(target_y_np_reshape, target_x_np_reshape)


In [None]:
#Defining results statistically

model = torch.load('C:/.pt')

#Signal = np.load('Augmented_signal.npy')
test_y = pd.read_csv("all.csv")
test_y=test_y.drop('Unnamed: 0',axis=1)
test_y_np = test_y.values
test_y_np_reshape = test_y_np.reshape(test_y_np.shape[0],-1,280,280)
print(test_y_np_reshape.shape)
model.eval()
predictions = torch.tensor([], dtype=torch.float)
actual = torch.tensor([],dtype=torch.float)
sc_Y = StandardScaler()
testsets = torch.FloatTensor(test_y_np_reshape)
testloader = DataLoader(testsets, batch_size = 1, shuffle = False)

for data in trainloader:
    inputs, values = data

with torch.no_grad():
    for data in testloader:
        inputs = data
        outputs = model(inputs)
        predictions = torch.cat((predictions, outputs), 0)
        actual = torch.cat((actual, values), 0)

predictions = predictions.numpy()
actual = actual.numpy()
a= predictions.T

df1 = pd.DataFrame(a[0][0][0],columns = ['x1'])
df2 = pd.DataFrame(a[1][0][0],columns = ['x2'])
df3 = pd.DataFrame(a[2][0][0],columns = ['x3'])
df4 = pd.DataFrame(a[3][0][0],columns = ['x4'])
df5 = pd.DataFrame(a[4][0][0],columns = ['x5'])
df6 = pd.DataFrame(a[4][0][0],columns = ['y2'])
df7 = pd.DataFrame(a[4][0][0],columns = ['y1'])

fin_df = pd.concat([df1,df2,df3,df4,df5,df6,df7],axis=1)
new = scaler.inverse_transform(fin_df)
new
df = pd.DataFrame(new)
df.to_csv('./var.csv')
print("Mean=", new.mean(axis=0))
print("Max=", new.max(axis=0))
print("Min=", new.min(axis=0))
print(new)