## Federated Model

In [None]:
import pandas as pd
import tensorflow as tf
import missingno as msno
import re
import os
import datetime
import IPython
import IPython.display
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
!pip install --quiet tensorflow_federated
import tensorflow_federated as tff
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
mpl.rcParams['figure.figsize'] = (8, 6)
mpl.rcParams['axes.grid'] = False

[K     |████████████████████████████████| 522kB 8.8MB/s 
[K     |████████████████████████████████| 112kB 12.5MB/s 
[K     |████████████████████████████████| 3.0MB 15.8MB/s 
[K     |████████████████████████████████| 153kB 42.1MB/s 
[K     |████████████████████████████████| 174kB 43.1MB/s 
[K     |████████████████████████████████| 1.1MB 45.7MB/s 
[?25h  Building wheel for absl-py (setup.py) ... [?25l[?25hdone
[31mERROR: datascience 0.10.6 has requirement folium==0.2.1, but you'll have folium 0.8.3 which is incompatible.[0m


## Read Air Pollutants Data


In [None]:
def create_df(PATH):
    # Loading the data
    df = pd.read_csv(PATH,sep=';',skiprows=12)
    df = df.drop(columns=['Slut'])
    column_indices = {i: name for i, name in enumerate(df.columns)}
    # Renaming column names
    for i in range(0,len(column_indices)):
        column = column_indices[i]
        if column.startswith('Black Carbon'):
            df.rename(columns = lambda x: re.sub('Black Carbon.*','Black Carbon',x), inplace = True)
        if column.startswith('CO'):
            df.rename(columns = lambda x: re.sub('CO.*','CO',x), inplace = True)
        if column.startswith('O3'):
            df.rename(columns = lambda x: re.sub('O3.*','O3',x), inplace = True)
        if column.startswith('NO2'):
            df.rename(columns = lambda x: re.sub('NO2.*','NO2',x), inplace = True)
        if column.startswith('NOX as NO2'):
            df.rename(columns = lambda x: re.sub('NOX as NO2.*','NOX as NO2',x), inplace = True)
        if column.startswith('PM10'):
            df.rename(columns = lambda x: re.sub('PM10.*','PM10',x), inplace = True)
        if column.startswith('PM2.5'):
            df.rename(columns = lambda x: re.sub('PM2.5.*','PM2.5',x), inplace = True)
    
    return df

PATH1 = '/content/shair-8779-1-6-3.csv'
PATH2 = '/content/shair-8780-1-6-3.csv'
PATH3 = '/content/shair-8781-1-6-1.csv'
PATH4 = '/content/shair-18644-1-6-3.csv'
station1 = create_df(PATH1)
station2 = create_df(PATH2)
station3 = create_df(PATH3)
station4 = create_df(PATH4)
smape_values = pd.DataFrame(columns=['Station','NO2','NOX as NO2','PM10','PM2.5','Average'])

## Impute Missing Values

In [None]:
# to print zero entries, negative entries and null entries
def missingstats():  
  print('Station 1 Missing Values Stats:')
  print('----------------------------------------------')
  print('\nNo of Zeros Entries:\n',(station1.select_dtypes(include=['float64']) == 0).astype(int).sum(axis=0))
  print('\nNo of Negative Entries:\n',(station1.select_dtypes(include=['float64']) < 0).astype(int).sum(axis=0))
  print('\nNo of Null Entries:\n',(station1.select_dtypes(include=['float64']).isnull()).astype(int).sum(axis=0))
  print('\n\n')

  print('Station 2 Missing Values Stats:')
  print('----------------------------------------------')
  print('\nNo of Zeros Entries:\n',(station2.select_dtypes(include=['float64']) == 0).astype(int).sum(axis=0))
  print('\nNo of Negative Entries:\n',(station2.select_dtypes(include=['float64']) < 0).astype(int).sum(axis=0))
  print('\nNo of Null Entries:\n',(station2.select_dtypes(include=['float64']).isnull()).astype(int).sum(axis=0))
  print('\n\n')

  print('Station 3 Missing Values Stats:')
  print('----------------------------------------------')
  print('\nNo of Zeros Entries:\n',(station3.select_dtypes(include=['float64']) == 0).astype(int).sum(axis=0))
  print('\nNo of Negative Entries:\n',(station3.select_dtypes(include=['float64']) < 0).astype(int).sum(axis=0))
  print('\nNo of Null Entries:\n',(station3.select_dtypes(include=['float64']).isnull()).astype(int).sum(axis=0))
  print('\n\n')

  print('Station 4 Missing Values Stats:')
  print('----------------------------------------------')
  print('\nNo of Zeros Entries:\n',(station4.select_dtypes(include=['float64']) == 0).astype(int).sum(axis=0))
  print('\nNo of Negative Entries:\n',(station4.select_dtypes(include=['float64']) < 0).astype(int).sum(axis=0))
  print('\nNo of Null Entries:\n',(station4.select_dtypes(include=['float64']).isnull()).astype(int).sum(axis=0))
  print('\n\n')
  return None
  
station1 = station1.set_index('Start')
station1[station1 < 0] = 0
station1.reset_index(drop=False, inplace= True)
station1 = station1.interpolate(method ='linear', limit_direction ='forward')
station1['Station'] = 1


station2 = station2.set_index('Start')
station2[station2 < 0] = 0
station2.reset_index(drop=False, inplace= True)
station2 = station2.interpolate(method ='linear', limit_direction ='forward')
station2['Station'] = 2

station3 = station3.set_index('Start')
station3[station3 < 0] = 0
station3.reset_index(drop=False, inplace= True)
station3 = station3.interpolate(method ='linear', limit_direction ='forward')
station3['Station'] = 3

station4 = station4.set_index('Start')
station4[station4 < 0] = 0
station4.reset_index(drop=False, inplace= True)
station4 = station4.interpolate(method ='linear', limit_direction ='forward')
station4['Station'] = 4
#missingstats()
station = station1.append([station2, station3, station4])
to_be_normalized_columns = ['NO2', 'NOX as NO2','PM2.5']

target_column = "PM10"
standard_scaler_x = StandardScaler(with_mean=True, with_std=True)
station[to_be_normalized_columns + [target_column]] = standard_scaler_x.fit_transform(station[to_be_normalized_columns + [target_column]])

In [None]:
station

Unnamed: 0,Start,NO2,NOX as NO2,PM10,PM2.5,Station
0,2015-01-01 00:00,-0.046829,0.192921,3.845831,6.227233,1
1,2015-01-01 01:00,-0.329492,-0.298143,-0.466392,0.169195,1
2,2015-01-01 02:00,-0.631784,-0.512280,-0.480977,0.154974,1
3,2015-01-01 03:00,-0.643562,-0.386696,-0.286514,0.112312,1
4,2015-01-01 04:00,-0.922299,-0.570241,-0.369161,-0.314311,1
...,...,...,...,...,...,...
43819,2019-12-31 19:00,-0.642353,-0.553591,-0.189561,-1.042729,4
43820,2019-12-31 20:00,-0.879216,-0.683478,-0.558982,-1.153335,4
43821,2019-12-31 21:00,-0.903386,-0.698859,-0.626622,-1.153335,4
43822,2019-12-31 22:00,-0.743866,-0.639043,-0.444514,-1.061163,4


In [None]:
import collections
from sklearn.model_selection import train_test_split

NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

def preprocess(dataset):

  def batch_format_fn(element):
      return collections.OrderedDict(x=element['x'], y=element['y'])

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)


def make_federated_data():
  dfs = [x for _, x in station.groupby('Station')]
  train_datasets = []
  test_datasets = []
  for df in dfs:
    X = df.copy()
    X.pop('Start')  
    y = df[['PM10']]
    X.pop('Station') 
    train_x = X.iloc[0:41592,:] 
    test_x = X.iloc[41592:,:]
    train_y = y.iloc[0:41592,:]
    test_y = y.iloc[41592:,:]
    train_dataset = tf.data.Dataset.from_tensor_slices(
                    ({'x': train_x.values, 'y': train_y.values}))
        
    test_dataset = tf.data.Dataset.from_tensor_slices(
                    ({'x': test_x.values, 'y': test_y.values}))
  
    preprocessed_train_dataset = preprocess(train_dataset)
    preprocessed_test_dataset = preprocess(test_dataset)

    train_datasets.append(preprocessed_train_dataset)
    test_datasets.append(preprocessed_test_dataset)
        
  return train_datasets, test_datasets

In [None]:
train_datasets, test_datasets = make_federated_data()

In [None]:
print(train_datasets[0])

<PrefetchDataset shapes: OrderedDict([(x, (None, 4)), (y, (None, 1))]), types: OrderedDict([(x, tf.float64), (y, tf.float64)])>


In [None]:
def build_model():
  model = keras.Sequential([
    layers.Dense(64, activation='relu', input_shape=[4]),
    layers.Dense(1)
  ])
  return model

In [None]:
def create_tff_model():
  return tff.learning.from_keras_model(build_model(), 
                                       input_spec=train_datasets[0].element_spec,
                                       loss=tf.keras.losses.MeanAbsoluteError(),
                                       metrics=[tf.keras.metrics.MeanAbsoluteError()])

In [None]:
print("Create averaging process")
iterative_process = tff.learning.build_federated_averaging_process(model_fn=create_tff_model,
                                                                   client_optimizer_fn = lambda: tf.keras.optimizers.SGD(0.002))

Create averaging process


In [None]:
!pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()



In [None]:
print("Initzialize averaging process")
state = iterative_process.initialize()

print("Start iterations")
for _ in range(10):
  state, metrics = iterative_process.next(state, train_datasets)
  print('metrics={}'.format(metrics))

Initzialize averaging process
Start iterations
metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('value_sum_process', ()), ('weight_sum_process', ())])), ('train', OrderedDict([('mean_absolute_error', 0.024144776), ('loss', 0.024144776)]))])
metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('value_sum_process', ()), ('weight_sum_process', ())])), ('train', OrderedDict([('mean_absolute_error', 0.006381416), ('loss', 0.006381416)]))])
metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('value_sum_process', ()), ('weight_sum_process', ())])), ('train', OrderedDict([('mean_absolute_error', 0.004773584), ('loss', 0.004773584)]))])
metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('value_sum_process', ()), ('weight_sum_process', ())])), ('train', OrderedDict([('mean_absolute_error', 0.0042899773), ('loss', 0.0042899773)]))])
metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('value_sum_process', 

In [None]:
# Global model evaluated per individual client
for i in range(len(test_datasets)):
    test_metrics = evaluation(state.model, [test_datasets[i]])
    print(test_metrics)

OrderedDict([('mean_absolute_error', 0.0016565085), ('loss', 0.0016565085)])
OrderedDict([('mean_absolute_error', 0.0016345803), ('loss', 0.0016345803)])
OrderedDict([('mean_absolute_error', 0.0013538589), ('loss', 0.0013538589)])
OrderedDict([('mean_absolute_error', 0.0015299591), ('loss', 0.0015299591)])
