In [2]:
%reset
!rm -rf '/content/gaf_output'
!rm '/content/gaf_output.zip'
%cd /content/

Once deleted, variables cannot be recovered. Proceed (y/[n])? n
Nothing done.
rm: cannot remove '/content/gaf_output.zip': No such file or directory
/content


In [3]:
# Constants
folder_name = 'gaf_output'
training_folder_name = 'training'
test_folder_name = 'test'
parameter_name = 'pow'
file_id = '1Ez3wPfDYCxVQIX_6fqt2DAf5z3YTGC-M'
filename = 'ukdale_channel_1.dat'
training_output_dataset_filename = 'gaf_dataset_training.csv'
test_output_dataset_filename = 'gaf_dataset_test.csv'
data_batch = 70000000
GAF_RESOLUTION = 128
IMAGE_SIZE= (9,9) # in inches

NORMAL_CLASS_LABEL = 'normal'
NORMAL_CLASS_LABEL_NO = 0
ABNORMAL_CLASS_LABEL = 'abnormal' 
ABNORMAL_CLASS_LABEL_NO = 1

eval_split = 0.1
training_split = 0.9

In [4]:
# Install required libaries
!pip install -q pyts

[K     |████████████████████████████████| 2.5 MB 5.1 MB/s 
[?25h

In [5]:
# Imports
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
from google.colab import files
matplotlib.use('Agg')

import numpy as np
import pandas as pd
from datetime import datetime
import os
import matplotlib
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import ImageGrid
from pyts.image import GramianAngularField
from typing import *
from multiprocessing import Pool
import datetime as dt

In [6]:
# Import data
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)
dataset_downloaded = drive.CreateFile({'id': file_id})
dataset_downloaded.GetContentFile(filename)  

In [7]:
# Preprocessing
col_name = ['datetime','pow']
df = pd.read_csv(filename, names=col_name, header=None, sep="\s+")

convert_dict = {'datetime': int,
                'pow': int}
df = df.head(data_batch)
df = df.astype(convert_dict)
df['datetime'] = df['datetime'].apply(lambda x: datetime.utcfromtimestamp(int(x)).strftime('%Y-%m-%d %H:%M:%S'))
print(df.size, df.head())

KeyboardInterrupt: ignored

In [None]:
# Folder setup
print('Creating Directories...')

PATH = os.path.abspath('')
GAF_PATH = os.path.join(PATH , folder_name)

TRAINING_OUTPUT_PATH = os.path.join(GAF_PATH , training_folder_name)
TEST_OUTPUT_PATH = os.path.join(GAF_PATH , test_folder_name)

TRAINING_NORMAL_PATH = os.path.join(TRAINING_OUTPUT_PATH , NORMAL_CLASS_LABEL)
TRAINING_ABNORMAL_PATH = os.path.join(TRAINING_OUTPUT_PATH , ABNORMAL_CLASS_LABEL)

os.makedirs(TRAINING_OUTPUT_PATH, exist_ok=True)
os.makedirs(TEST_OUTPUT_PATH, exist_ok=True)
os.makedirs(TRAINING_NORMAL_PATH, exist_ok=True)
os.makedirs(TRAINING_ABNORMAL_PATH, exist_ok=True)

In [None]:
# Create and save GAFs - supporting functions
def create_gaf(ts) -> Dict[str, Any]:
    """
    :param ts:
    :return:
    """
    data = dict()
    gasf = GramianAngularField(method='summation', image_size=ts.shape[0])
    data['gasf'] = gasf.fit_transform(pd.DataFrame(ts).T)[0]
    data['gasf_mean'] = np.mean(np.mean(data['gasf'], axis=0), axis=0)
    return data


# Create images of the bundle that we pass
def create_images(X_plots: Any, image_name: str, output_path: str, destination: str, image_matrix: tuple =(1, 1), mix_classes: bool = False) -> None:
    """
    :param X_plots:
    :param image_name:
    :param destination:
    :param image_matrix:
    :return:
    """
    # output_dataset_image_list.append(image_name)
    fig = plt.figure(figsize=IMAGE_SIZE)
    grid = ImageGrid(fig,
                     111,
                     axes_pad=(0,0),
                     cbar_pad=0,
                     nrows_ncols=image_matrix,
                     share_all=True,
                     )
    images = X_plots
    for image, ax in zip(images, grid):
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(image, cmap='rainbow', origin='lower')

    repo = os.path.join(output_path, destination) if mix_classes == False else output_path
    fig.savefig(os.path.join(repo, image_name))
    plt.close(fig)
    return {'image_name': image_name+'.png', 'label': destination}

def generate_gaf(images_data: Dict[str, pd.DataFrame]) -> None:
    """
    :param images_data:
    :return:
    """
    output_dataset = pd.DataFrame(columns = ['image_name','label'])
    main_decision = list(images_data.keys())[0]
    for decision, data in images_data.items():
        for i, image_data in enumerate(data):
            # print('decision', decision)
            # print('image_data', image_data)
            to_plot = [create_gaf(x)['gasf'] for x in image_data[1]]
            gaf_mean = [create_gaf(x)['gasf_mean'] for x in image_data[1]]
            classfication = NORMAL_CLASS_LABEL if gaf_mean[0] < 0 else ABNORMAL_CLASS_LABEL

            # print(i, '- gaf_mean:', gaf_mean, '-> classification:', classfication)
            i = len(output_dataset)
            output_path = TRAINING_OUTPUT_PATH if decision == training_folder_name else TEST_OUTPUT_PATH
            destination_folder = classfication
            mix_classes = False if decision == training_folder_name else True
            output_dataset.loc[i] = create_images(X_plots=to_plot,
                              image_name='{0}'.format(image_data[0].replace('-', '_')),
                              output_path=output_path, destination=destination_folder, mix_classes=mix_classes)
            
    def label_no (row):
      if row['label'] == NORMAL_CLASS_LABEL :
          return NORMAL_CLASS_LABEL_NO
      elif row['label'] == ABNORMAL_CLASS_LABEL:
        return ABNORMAL_CLASS_LABEL_NO
      else: 
        return ABNORMAL_CLASS_LABEL_NO
    
    output_dataset['label_no'] = output_dataset.apply (lambda row: label_no(row), axis=1)
    
    # Convert output_dataset to csv
    dataset_filename = GAF_PATH + '/' + training_output_dataset_filename if main_decision == training_folder_name else GAF_PATH + '/' + test_output_dataset_filename
    output_dataset.to_csv(dataset_filename, index=False)

In [None]:
# Create and save GAFs
def data_to_image_preprocess(data: pd.DataFrame, create_eval: bool = False, destination_folder: str = 'training') -> None:
    """
    :return: None
    """
    print('Processing dataset...')
    # Drop unnecessary data_slice
    data['DateTime'] = pd.to_datetime(data['datetime'], infer_datetime_format=True)
    data = data.groupby(pd.Grouper(key='DateTime', freq='1h')).mean().interpolate().reset_index() 
    # print(df)
    # Send to slicing
    set_gaf_data(data, create_eval, destination_folder)


def set_gaf_data(df: pd.DataFrame, create_eval: bool = False, destination_folder: str = 'training') -> None:
    """
    :param df: DataFrame data_slice
    :return: None
    """
    dates = df['DateTime'].dt.date
    dates = dates.drop_duplicates()
    list_dates = dates.apply(str).tolist() 
    print('original list_dates:', len(list_dates))
    eval_split_len = round(len(list_dates) * eval_split) + 1
    training_split_len = round(len(list_dates) * training_split) + 1
    list_dates = list_dates[:training_split_len] if create_eval == False else list_dates[:eval_split_len]
    print('final list_dates:', len(list_dates))
    index = 0
    box_size = GAF_RESOLUTION
    # Container to store data_slice for the creation of GAF
    decision_map = {key: [] for key in [destination_folder]}
    print('list_dates:', list_dates)
    while True:
        if index >= len(list_dates) - 1:
            break
        # Select appropriate timeframe
        data_slice = df.loc[(df['DateTime'] < list_dates[len(list_dates) - 1]) & (df['DateTime'] > list_dates[index])]
        # print("DATA SLICE==========================================", index)
        # print(data_slice)
        # print("DATA SLICE END======================================")
        gafs = []
        # Group data_slice by time frequency
        for freq in ['1h']:
            group_dt = data_slice.groupby(pd.Grouper(key='DateTime', freq=freq)).mean().reset_index()
            group_dt = group_dt.dropna()
            gafs.append(group_dt[parameter_name].head(box_size))
            # print('group_dt', group_dt[parameter_name].head(box_size))
        decision_map[destination_folder].append([list_dates[index], gafs])
        index += 1
    
    # print('decision_map:', decision_map)
    print('Generating GAF images...')    
    # Generate the images from processed data_slice
    generate_gaf(decision_map)
    total_images = len(decision_map[destination_folder])
    images_created = total_images
    print("========GAF REPORT========:\nTotal Images Created: {0}".format(images_created))

def main():
    global df
    print('CONVERTING TIME-SERIES TO GAF IMAGES...')
    print('Training dataset:')
    data_to_image_preprocess(data = df, create_eval = False, destination_folder = training_folder_name)
    print("-----------------------------------------------Training dataset created----------------------------------")
    print('Test dataset:')
    data_to_image_preprocess(data = df, create_eval = True, destination_folder = test_folder_name)
    print("-----------------------------------------------Test dataset created--------------------------------------")

main()

In [None]:
# Save output images as a compressed file and dataset as CSV
!zip -r -q /content/gaf_output.zip /content/gaf_output
files.download("/content/gaf_output.zip")

In [None]:
from google.colab import drive
drive.mount('/gdrive', force_remount=False)
%cd /gdrive/My\ Drive/Colab\ Notebooks/data/
%rm -rf gaf_output 
%mkdir gaf_output 
%cd gaf_output
!unzip -q /content/gaf_output.zip