In [181]:
import numpy as np
import json

# Classes

In [182]:
# Convolution
class Convolution:
    def __init__(self, name, kernel, padding, stride, nb_filter, fct_activation):
        self.name = name
        self.kernel = kernel
        self.padding = padding
        self.stride = stride
        self.nb_filter = nb_filter
        self.fct_activation = fct_activation

In [183]:
# input
class InputLayer:
    def __init__(self, shape):
        self.name = "I"
        self.shape = shape

In [184]:
# Pooling Avg/Max
class Pooling:
    def __init__(self, name, op, kernel=2, padding="valid", stride=None):
        self.name = name
        self.op = op # is the operation wanted (avg/max)
        self.kernel = kernel
        self.padding = padding
        self.stride = stride

In [185]:
# Class Flatten
class Flatten:
    def __init__(self):
        self.name = "f"

In [186]:
# Dense --> Fully connected layer
class Dense:
    def __init__(self, name, nb_neurones, fct_activation):
        self.name = name
        self.nb_neurones = nb_neurones
        self.fct_activation = fct_activation

# Functions

In [187]:
# read json file 
# return list of layer + graph
def readJsonFile(json_file):
    with open(json_file) as archi_json:
        file = json.load(archi_json)
    
    archi =file[0]['archi']
    G_json = file[0]['graph']
    
    # instantiate class
    list_layer = []
    for layer in archi:
        parameters = layer["parameters"] # get parameters
        # if is Input class
        if(layer["class"] == "InputLayer"):
            # instantiate class and add into the list
            list_layer.append(InputLayer(parameters["shape"]))

        
        # if is Pooling class
        elif(layer["class"] == "Pooling"):
            try:
                padding = int(parameters["padding"])
            except:
                padding = "'" + parameters["padding"] + "'"
                
            list_layer.append(Pooling(parameters['name'],
                                      parameters["op"],
                                      parameters["kernel"],
                                      padding,
                                      parameters["stride"]))
            
        # if is Convolution class
        elif(layer["class"] == "Convolution"):
            try:
                padding = int(parameters["padding"])
            except:
                padding = "'" + parameters["padding"] + "'"
                
                
            list_layer.append(Convolution(parameters["name"],
                                          parameters["kernel"],
                                          padding,
                                          parameters["stride"],
                                          parameters["nb_filter"],
                                          parameters["fct_activation"]))
            
        # if is Flatten class
        elif(layer["class"] == "Flatten"):
            list_layer.append(Flatten())
        
        # if is Dense class
        elif(layer["class"] == "Dense"):
            list_layer.append(Dense(parameters["name"],
                                    parameters["nb_neurones"],
                                    parameters["fct_activation"]))

        else : print("Error")
    return list_layer, G_json


In [188]:
# return import of the py file
def writeImport():
    return """
import tensorflow as tf
from tensorflow import keras
import numpy as np
from tensorflow.keras.models import Sequential, Model,load_model
from tensorflow.keras.layers import Input, Add, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, GlobalMaxPooling2D,MaxPool2D
from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras.utils import plot_model
import sys
import traceback
import csv
from time import time
"""

In [189]:
# write mnist data set
def writeMnistDataset():
    return """(train_x, train_y), (test_x, test_y) = keras.datasets.mnist.load_data()

# normaliser les pixel 0-255 -> 0-1
train_x = train_x / 255.0
test_x = test_x / 255.0

train_x = tf.expand_dims(train_x, 3)
test_x = tf.expand_dims(test_x, 3)

val_x = train_x[:5000]
val_y = train_y[:5000]

"""

In [190]:
# write cifar data set
def writecifar10Dataset():
    return """
# load dataset
(train_x, train_y), (test_x, test_y) = keras.datasets.cifar10.load_data()

# normalize to range 0-1
train_x = train_x / 255.0
test_x = test_x / 255.0

val_x = train_x[:5000]
val_y = train_y[:5000]
    
"""

In [191]:
# write function for identity block
def write_identity_block():
    return """
    def id_block(X, f, filters):
   
        X_shortcut = X

        X = Conv2D(filters=filters, kernel_size=(1, 1), strides=(1, 1), padding='same', kernel_initializer=glorot_uniform(seed=0))(X)
        # X = BatchNormalization(axis=3)(X)
        X = Activation('relu')(X)


        X = Conv2D(filters=filters, kernel_size=(f, f), strides=(1, 1), padding='same', kernel_initializer=glorot_uniform(seed=0))(X)
        # X = BatchNormalization(axis=3)(X)

        X = Add()([X, X_shortcut])# SKIP Connection
        X = Activation('relu')(X)

        return X
    """

In [192]:
def write_conv_block():
    return"""
    def conv_block(X, f, filters, s=2):
    
        X_shortcut = X

        X = Conv2D(filters=filters, kernel_size=(1, 1), strides=(s, s), padding='valid', kernel_initializer=glorot_uniform(seed=0))(X)
        # X = BatchNormalization(axis=3)(X)
        X = Activation('relu')(X)

        X = Conv2D(filters=filters, kernel_size=(f, f), strides=(1, 1), padding='same', kernel_initializer=glorot_uniform(seed=0))(X)
        # X = BatchNormalization(axis=3)(X)

        X_shortcut = Conv2D(filters=filters, kernel_size=(1, 1), strides=(s, s), padding='valid', kernel_initializer=glorot_uniform(seed=0))(X_shortcut)
        # X_shortcut = BatchNormalization(axis=3)(X_shortcut)


        X = Add()([X, X_shortcut])
        X = Activation('relu')(X)

        return X
    """

In [193]:
# return string of the wanted layer
def write_layer(layer):
    
    # if is Intput
    if isinstance(layer, InputLayer):
        return "        X_input = X = Input({})\n".format(
                layer.shape)
        
            
    # if is Convolution
    elif isinstance(layer, Convolution):
        return "        X = Conv2D({}, kernel_size={}, strides={}, activation='{}', padding={})(X)\n".format(
                layer.nb_filter,
                layer.kernel,
                layer.stride,
                layer.fct_activation,
                layer.padding)
            
    # if is Pooling
    elif isinstance(layer, Pooling):
        if(layer.op == "avg"): # avg Pooling 
            return "        X = AveragePooling2D(pool_size={}, strides={}, padding={})(X)\n".format(
                    layer.kernel,
                    layer.stride,
                    layer.padding)
                
        else : # Max Pooling
            return  "        X = MaxPooling2D(pool_size={}, strides={}, padding={})(X)\n".format(
                    layer.kernel,
                    layer.stride,
                    layer.padding)
    # Not possible
    else : print("Not Possible")

In [202]:
def end_CNN(archi, nodes, i, j, str_model_cnn):
    # if is Dense (aka Fully connected layer)
    i+=1
    while((i < len(nodes)) and (j <= len(archi))):
        print(nodes[i])
        print(archi[j].name)
        layer = archi[j]
        if isinstance(layer, Dense):
            str_model_cnn += "    head_model = Dense({}, activation='{}')(head_model)\n".format(
                            layer.nb_neurones, 
                            layer.fct_activation)
            i+=1
            j+=1
        # if is flatten
        elif isinstance(layer, Flatten):
            str_model_cnn += "    head_model = Flatten()(head_model)\n"
            i+=1
            j+=1
        # Not possible
        else : print("Not Possible")
    return i, j, str_model_cnn

In [203]:
def create_CNN(archi, g_json):
    nodes = g_json['nodes']
    resnet_module = False
    str_model_cnn = "def ResNet():\n"
    i=0
    j=0
    print(len(nodes))
    print(len(archi))
    
    while ((i < len(nodes)) and (j <= len(archi))):
        if(nodes[i]['id'] == "add0"): # block resnet begin
            resnet_module = True
            
        if resnet_module == False:
            str_model_cnn += write_layer(archi[i])
            i+=1
            j+=1
        else :
            if(( (i+4) < len(nodes)) and ("add" in nodes[i+3]['id'])):
                str_model_cnn += "        X = id_block(X, {}, {})\n".format(archi[j+1].kernel, archi[j].nb_filter)
                i += 3
                j += 2
            elif (( (i+4) < len(nodes)) and ("add" in nodes[i+4]['id'])):
                str_model_cnn += "        X = conv_block(X, {}, {}, {})\n".format(archi[j+1].kernel, archi[j].nb_filter, archi[j].stride)
                i += 4
                j += 3
            else:
                str_model_cnn += "        model = Model(inputs=X_input, outputs=X, name='ResNet18')\n"
                str_model_cnn += "        return model\n\n"
                str_model_cnn += "    Input = ResNet()\n"
                str_model_cnn += "    head_model = Input.output\n"
                i, j, str_model_cnn = end_CNN(archi, nodes, i, j, str_model_cnn)

    str_model_cnn += "    model = Model(inputs=Input.input, outputs=head_model)\n"
    return str_model_cnn

In [211]:
def create_py_file(json_file):
    s = json_file.split(".")
    file_name = "archi_resnet_test" #s[0].split("/")[1]
    architecture, g_json = readJsonFile(json_file)
    
    # python directory
    py_dir = "architecture_py/"
    
    # log directory
    log_dir = "../architecture_log/"
    
    # png directory
    png_dir = "../architecture_img/"
    
    # reset file
    file_py = open(py_dir + file_name + ".py", "w")
    file_py.close()
    
    file_py = open(py_dir + file_name + ".py", "a") # Open file in writting (a --> append)
    
    # write import
    file_py.write(writeImport())
    
    # write train/test data 
    file_py.write(writeMnistDataset())
    
    file_py.write("""

# init training time
training_time = 0
# init result test/train
test_result_loss = ""
test_result_acc = ""

train_result_loss = ""
train_result_acc = ""

nb_layers = "not build"

""")
    
    # try
    file_py.write("""try:
    """)
    
    # write function for id block
    file_py.write(write_identity_block())
    
    # write function for conv block
    file_py.write(write_conv_block())
    
    # write architecture model
    file_py.write(create_CNN(architecture, g_json))
    
    
    # write : create png of the model
    file_py.write("    plot_model(model, show_shapes=True, to_file=\"%s\")\n" % (png_dir+file_name+".png"))
    
    # write compiler
    file_py.write("""    model.compile(optimizer='adam', loss=keras.losses.sparse_categorical_crossentropy, metrics=['accuracy'])\n""")
    
    # write way for time computation
    file_py.write("""    start = time()\n""")
    
    # write model training
    file_py.write("""    model.fit(train_x, train_y, epochs=5, validation_data=(val_x, val_y))\n""")
    
    # write time computation
    file_py.write("""    training_time = time()-start\n""")
    
    # write model evaluation
    file_py.write("""    print(model.evaluate(test_x, test_y))\n""")
    
    
    # all is great
    log_file = log_dir + file_name +".log"
    file_py.write("""
    log_file = open(\"""" + log_file + """\" , "w")
    
    # save test result
    log_file.write('test result : ' + str(model.evaluate(test_x, test_y)))
    test_result_loss = model.evaluate(test_x, test_y)[0]
    test_result_acc = model.evaluate(test_x, test_y)[1]
    
    # save train result
    log_file.write('train result : ' + str(model.evaluate(test_x, test_y)))
    train_result_loss = model.evaluate(train_x, train_y)[0]
    train_result_acc = model.evaluate(train_x, train_y)[1]
    
    print('OK: file """ + log_file +""" has been create')
    
    nb_layers = len(model.layers)
    log_file.close()
""")
    
    # something go wrong 
    error_file = log_dir + file_name + "_error.log"
    file_py.write("""except:
    print('error: file """ + error_file +""" has been create')
    error_file = open(\"""" + error_file + """\" , "w")
    traceback.print_exc(file=error_file)
    result_loss = "Error"
    result_acc = "Error"
    error_file.close()
""")
    
    file_py.write("""finally:
    file = open('../architecture_results_resnet.csv', 'a', newline ='')
    with file: 

        # identifying header   
        header = ['file_name', 'training_time(s)', 'test_result_loss', 'test_result_acc', 'train_result_acc', 'train_result_loss', 'nb_layers'] 
        writer = csv.DictWriter(file, fieldnames = header) 
      
        # writing data row-wise into the csv file 
        writer.writeheader() 
        writer.writerow({'file_name' : '"""+ file_name + """',  
                         'training_time(s)': training_time,  
                         'test_result_loss': test_result_loss,
                         'test_result_acc': test_result_loss,
                         'train_result_acc': train_result_acc,
                         'train_result_loss': train_result_loss,
                         'nb_layers': nb_layers}) 
        print('add line into architecture_results.csv')
    file.close()
    """)
    
    
    
    
    # close
    file_py.close()

# Test

In [205]:
directory = "architecture_json/"
archi, G_json = readJsonFile(directory+"archi_resnet_test.json")

In [206]:
G_json

{'directed': True,
 'multigraph': False,
 'graph': {},
 'nodes': [{'id': 'I'},
  {'id': 'C1'},
  {'id': 'P1'},
  {'id': 'add0'},
  {'id': 'C2'},
  {'id': 'C3'},
  {'id': 'add1'},
  {'id': 'C4'},
  {'id': 'C5'},
  {'id': 'C6'},
  {'id': 'add2'},
  {'id': 'C7'},
  {'id': 'C8'},
  {'id': 'add3'},
  {'id': 'C9'},
  {'id': 'C10'},
  {'id': 'C11'},
  {'id': 'add4'},
  {'id': 'F'},
  {'id': 'D1'},
  {'id': 'D2'},
  {'id': 'D3'},
  {'id': 'D4'},
  {'id': 'D5'}],
 'links': [{'source': 'I', 'target': 'C1'},
  {'source': 'C1', 'target': 'P1'},
  {'source': 'P1', 'target': 'add0'},
  {'source': 'add0', 'target': 'C2'},
  {'source': 'add0', 'target': 'add1'},
  {'source': 'C2', 'target': 'C3'},
  {'source': 'C3', 'target': 'add1'},
  {'source': 'add1', 'target': 'C4'},
  {'source': 'add1', 'target': 'C6'},
  {'source': 'C4', 'target': 'C4'},
  {'source': 'C4', 'target': 'C5'},
  {'source': 'C5', 'target': 'add2'},
  {'source': 'C6', 'target': 'add2'},
  {'source': 'add2', 'target': 'C7'},
  {'sourc

In [207]:
string = create_CNN(archi, G_json)

24
19
{'id': 'F'}
f
{'id': 'D1'}
D1
{'id': 'D2'}
D2
{'id': 'D3'}
D3
{'id': 'D4'}
D4
{'id': 'D5'}
D5


In [208]:
string

"def ResNet():\n        X_input = X = Input([32, 32, 3])\n        X = Conv2D(18, kernel_size=7, strides=2, activation='selu', padding='valid')(X)\n        X = MaxPooling2D(pool_size=3, strides=2, padding='valid')(X)\n        X = id_block(X, 3, 18)\n        X = conv_block(X, 3, 36, 2)\n        X = id_block(X, 3, 36)\n        X = conv_block(X, 3, 72, 2)\n        model = Model(inputs=X_input, outputs=X, name='ResNet18')\n        return model\n\n    Input = ResNet()\n    head_model = Input.output\n    head_model = Flatten()(head_model)\n    head_model = Dense(33, activation='relu')(head_model)\n    head_model = Dense(28, activation='tanh')(head_model)\n    head_model = Dense(22, activation='tanh')(head_model)\n    head_model = Dense(14, activation='tanh')(head_model)\n    head_model = Dense(10, activation='softmax')(head_model)\n    model = Model(inputs=Input.input, outputs=head_model)\n"

In [212]:
create_py_file(directory+"archi_resnet_test.json")

22
17
{'id': 'F'}
f
{'id': 'D1'}
D1
{'id': 'D2'}
D2
{'id': 'D3'}
D3
