In [1]:
import pandas as pd
import random
import tensorflow as tf
import numpy as np
from tensorflow_federated import python as tff
from collections import OrderedDict
from tensorflow import keras
import linecache
import nest_asyncio
nest_asyncio.apply()
import time
from os import listdir
from os.path import isfile, join

2023-04-12 23:04:59.376143: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
NUM_EPOCHS = 4 
BATCH_SIZE = 40
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10
time_steps = 48
interval = 1000 
future_steps = 12
split = 0.8

In [3]:
def preprocess(dataset):
    def batch_format_fn(x_d, y_d):
        return OrderedDict(x=x_d, y=y_d)

    return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)
    
def create_dataset_fed(files, lower, upper):
    xs, ys = [], []
    print("\n#######################START", lower, upper)
    for file in files:
        x_t, y_t = [], []
        data = file[lower:upper]
        if data:
            if (len(data) - time_steps - 1 - future_steps) < 0:
                print("EMPTY", (len(data) - time_steps - 1 - future_steps))
            for i in range(len(data) - time_steps - 1 - future_steps):
                v = data[i:(i + time_steps)] 
                z = data[(i + time_steps):(i + time_steps + future_steps)]
                if check_nulls(v) and check_nulls(z):
                    x_t.append(v)
                    y_t.append(z)
                else:
                    print("null-", i, end=" ")
            x_t = np.array(x_t)[:,:,np.newaxis]
            y_t = np.array(y_t)[:,:,np.newaxis]
            xs.append(x_t)
            ys.append(y_t)
        else:
            print("no data" )
    xs = np.array(xs)
    ys = np.array(ys)
    # return [tf.data.Dataset.from_tensor_slices((Xs[x],  np.array(ys[x]))) for x in range(len(Xs))]
    # return [ tf.data.Dataset.from_tensor_slices((Xs[x],  np.array(ys[x]))) for x in range(len(Xs))] - removed brackets, and np.array
    tikva = [tf.data.Dataset.from_tensor_slices((xs[x], ys[x])) for x in range(len(xs))]
    print("\n#######################END", lower, upper)
    return tikva

def make_federated_data(files, lower, upper):
    data = create_dataset_fed(files, lower, upper)
    return [preprocess(x) for x in data if x]

def create_keras_model():
    return tf.keras.models.Sequential([
      keras.layers.LSTM(64, input_shape=(time_steps, 1)),
      keras.layers.Dense(12),
    ])

def model_fn():
    # We _must_ create a new model here, and _not_ capture it from an external
    # scope. TFF will call this within different graph contexts.
    keras_model = create_keras_model()
    return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.MeanSquaredError(),
      metrics=[tf.keras.metrics.RootMeanSquaredError()])

def check_nulls(data):
    if not(all(is_float(ele) for ele in data)):
        return False
    else:
        return True
    
def is_float(element):
    try:
        float(element)
        return True
    except ValueError:
        return False

In [10]:

clusters = [['3081.txt', '1318.txt', '2202.txt', '2574.txt', '4111.txt', '1727.txt', '1055.txt', '3871.txt', '2776.txt', '3050.txt', '3133.txt', '3046.txt', '3195.txt', '3585.txt', '2922.txt', '3427.txt', '1143.txt', '1343.txt', '2474.txt', '1627.txt', '2065.txt', '2529.txt', '2647.txt', '3405.txt', '3820.txt', '3599.txt', '2536.txt', '3167.txt', '1980.txt', '3617.txt', '2680.txt', '3777.txt', '1809.txt', '1660.txt', '1059.txt', '2791.txt', '2828.txt', '2964.txt', '1840.txt', '1711.txt', '1117.txt', '3073.txt'], 
            ['3041.txt', '1447.txt', '3330.txt', '4163.txt', '4121.txt', '2838.txt'], 
            ['4076.txt', '2407.txt', '3296.txt', '3447.txt', '4049.txt', '1904.txt', '1086.txt', '4129.txt', '3497.txt', '3910.txt', '2239.txt', '1081.txt', '1839.txt', '3036.txt', '2315.txt', '2301.txt', '2067.txt', '1180.txt', '1619.txt', '2304.txt', '3346.txt', '3781.txt', '2893.txt', '2501.txt', '1950.txt', '2685.txt', '2121.txt', '2519.txt', '3349.txt', '1827.txt', '2081.txt', '2522.txt', '1610.txt', '3359.txt', '1404.txt', '1834.txt', '2235.txt', '1113.txt', '3843.txt', '2593.txt', '1515.txt', '2424.txt', '4055.txt', '1312.txt', '3931.txt', '1115.txt', '3884.txt', '1063.txt', '2595.txt', '2436.txt', '2153.txt'], 
            ['1330.txt']]



In [11]:
# Read files in a dictionary with key = file name, and value = file

C1086 = len(clusters)
PATH = './ExperementData/'

alloc = {}
for c in range(len(clusters)):
    for name in clusters[c]:
        alloc[name] = c
print(alloc)
        
files_in_cluster = []
for c in range(len(clusters) + 1):
    files_in_cluster.append([])

data = []

randomDict = {}

for cl in clusters:
    for f in cl:
        with open(PATH + f,'r') as reader:
            temp = []
            randomDict[f] = []
            for l, line in enumerate(reader):
                if line.strip() == 'Null':
                    temp.append('Null')
                    randomDict[f].append(l)
                else:
                    temp.append(float(line.strip()))

            print(f, len(randomDict[f]))
            if len(randomDict[f]) < 51:
                print(randomDict[f])
            data.append(temp)
            files_in_cluster[alloc[f]].append(temp)
            if f == "1086.txt":
                files_in_cluster[C1086].append(temp)

test = [d[int(25727*split):] for d in data]

{'3081.txt': 0, '1318.txt': 0, '2202.txt': 0, '2574.txt': 0, '4111.txt': 0, '1727.txt': 0, '1055.txt': 0, '3871.txt': 0, '2776.txt': 0, '3050.txt': 0, '3133.txt': 0, '3046.txt': 0, '3195.txt': 0, '3585.txt': 0, '2922.txt': 0, '3427.txt': 0, '1143.txt': 0, '1343.txt': 0, '2474.txt': 0, '1627.txt': 0, '2065.txt': 0, '2529.txt': 0, '2647.txt': 0, '3405.txt': 0, '3820.txt': 0, '3599.txt': 0, '2536.txt': 0, '3167.txt': 0, '1980.txt': 0, '3617.txt': 0, '2680.txt': 0, '3777.txt': 0, '1809.txt': 0, '1660.txt': 0, '1059.txt': 0, '2791.txt': 0, '2828.txt': 0, '2964.txt': 0, '1840.txt': 0, '1711.txt': 0, '1117.txt': 0, '3073.txt': 0, '3041.txt': 1, '1447.txt': 1, '3330.txt': 1, '4163.txt': 1, '4121.txt': 1, '2838.txt': 1, '4076.txt': 2, '2407.txt': 2, '3296.txt': 2, '3447.txt': 2, '4049.txt': 2, '1904.txt': 2, '1086.txt': 2, '4129.txt': 2, '3497.txt': 2, '3910.txt': 2, '2239.txt': 2, '1081.txt': 2, '1839.txt': 2, '3036.txt': 2, '2315.txt': 2, '2301.txt': 2, '2067.txt': 2, '1180.txt': 2, '1619.txt

In [13]:
example_dataset = create_dataset_fed(files_in_cluster[C1086], 1, 500)[0]
preprocessed_example_dataset = preprocess(example_dataset)
process = [tff.learning.algorithms.build_weighted_fed_avg(
    model_fn,
    client_optimizer_fn=lambda: keras.optimizers.Adam(0.001),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0)) for _ in range(len(clusters))]
          
state = [process[x].initialize() for x in range (len(process))]
for c in range(len(clusters)):
#     max_time = (len(clusters[c])/60)*9000
    max_time = (len(clusters[c])/60)*9
    start_time = time.time()
    while time.time() - start_time < max_time:
        location = random.randint(1, int(25727*split)-interval)
        print(location)
        federated_train_data = make_federated_data(files_in_cluster[c], location, location + interval)
        if len(federated_train_data) > 0:
            result = process[c].next(state[c], federated_train_data)
            state[c] = result.state
            metrics = result.metrics
            print('round {:2d}, metrics={}'.format(c, metrics))
        else:
            print("empty main", location)
#     tensorflow 2.9.0 and federated 0.40.0


#######################START 1 500

#######################END 1 500
10036

#######################START 10036 11036

#######################END 10036 11036
round  0, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('root_mean_squared_error', 0.86599404), ('loss', 0.74994564), ('num_examples', 157752), ('num_batches', 3948)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', OrderedDict([('update_non_finite', 0)]))])
1118

#######################START 1118 2118

#######################END 1118 2118
round  1, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('root_mean_squared_error', 2.8518505), ('loss', 8.13305), ('num_examples', 22536), ('num_batches', 564)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', OrderedDict([('update_non_finite', 0)]))])
8775

#######################START 8775 9775

#################

In [14]:
def create_dataset(data, time_steps=1):
    Xs, ys = [], []
    for i in range(len(data) - time_steps-12):
        v = data[i:(i + time_steps)]
        z = data[(i + time_steps):(i + time_steps+12)]
        if check_nulls(z) and check_nulls(v):
            ys.append(z)
            Xs.append(v)
    return np.array(Xs), np.array(ys)

In [16]:
onlyfiles = [f for f in listdir(PATH) if isfile(join(PATH, f))and f[-4:]==".txt"]
# for i in range(len(onlyfiles)):
#     if onlyfiles[i] == "1086.txt":
#         one_file = onlyfiles[i]
# print(one_file)

for i in range(len(test)):
    X_test, y_test = create_dataset(test[i], time_steps)
    if X_test.size > 0 and y_test.size > 0:
        X_test = X_test[:,:,np.newaxis]
        y_test = y_test[:,:,np.newaxis]
        model_for_inference = create_keras_model()
        weights = state[alloc[onlyfiles[i]]].global_model_weights
        weights.assign_weights_to(model_for_inference)
        y_pred = model_for_inference.predict(X_test)
        dataframe = pd.DataFrame(np.squeeze(np.array(y_pred)))
        dataframe.to_csv(r"./num_clust_four_100/pred-kmeans2/"+onlyfiles[i][:4]+'.csv')
        dataframe = pd.DataFrame(np.squeeze(np.array(y_test)))
        dataframe.to_csv(r"./num_clust_four_100/test-kmeans2/"+onlyfiles[i][:4]+'.csv')


