<font size="5.5"><u><i>Convert Strain to Image</i></u></font>

<font size="4">Script to convert strain samples (time series) to image samples (TF scalograms).</font>
<br/>
<font size="4">Author: Manuel David Morales</font>

<i><font size="3.5">Remark: For the moment, this script is worked in Google Colaboratory</font><br/>
https://colab.research.google.com/drive/1WAx41-Hnjuok8OeMKtFgjebaUh2Yo6KN?usp=sharing</i>

In [None]:
# Mount Google Drive repository

from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


## 1) Library imports

In [None]:
# Data analysis
import numpy as np
import pandas as pd

# Visualization
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow

# Scientific computing
import scipy.misc
from scipy import signal

# Files/folders management
import os, glob, io

# To read csv files
import csv

# Garbage collector
import gc

# Image management
from PIL import Image

## 2) Input parameters

In [None]:
# Interferometer for noise data
# ------------------------------------------------------------
detector = "L1"    # Options: "L1", "H1", "V1"
# ------------------------------------------------------------

## 3) Read files

In [None]:
# ------> Specify folder location

data_dir = '/content/drive/MyDrive/Colab Notebooks/GitHub/CCSNeHFGW_ResNetClass/Datasets/'
print("Strain datasets are located at:", data_dir)
print("")

Strain datasets are located at: /content/drive/MyDrive/Colab Notebooks/GitHub/CCSNeHFGW_ResNetClass/Datasets/



In [None]:
# ---------------------------------------------------------------
# ------> Window sample strain files reader
# ---------------------------------------------------------------

def load_straindata(folder_path, class_samples):

    """
    Function to load strain samples for a given class.

    INPUT:
            folder_path    -> Folder path
            class_samples  -> Dictionary, to save strain
                              samples for each class (key)

    OUTPUT:
            class_samples  -> Updated dictionary
            Num_samples    -> No. of loaded samples
    """

    os.chdir(folder_path)
    print(folder_path)

    # Initialize No. of samples count
    Num_samples = 0

    # Loop: Waveform class subfolders
    for subfolder in glob.glob("wfclass_*"):

        print("=======> SCANNING", subfolder, "SUBFOLDER")

        class_label = subfolder[-1]

        os.chdir(folder_path + "/" + subfolder)
        #print(folder_path + "/" + subfolder)

        windows_onelabel = []

        # Loop: Strain window samples
        for file in glob.glob("sample_strain_*"):

            #print("***** READING FILE", file, " *****")
            with open(file) as csv_file:

                sample = pd.read_csv(file)
                sample_arr = sample.to_numpy()
                windows_onelabel.append(sample_arr)

                Num_samples += 1

        class_samples["class " + class_label] = windows_onelabel

    return class_samples, Num_samples

In [None]:
# Initialize list for strain segments
strain_segments = []

# Initialize No. of total injections count
Num_samples_total = 0

# ------> Scan folders

os.chdir(data_dir)

# ------> First loop: Segment folders
for folder in glob.glob(detector + "*"):

    print("SCANNING", folder, "FOLDER")
    print("------------------------------------------")

    # Initialize dictionary to distinguish classes

    class_samples = {}

    # Call function load_straindata
    class_samples, Num_samples = load_straindata(data_dir + folder, class_samples)

    strain_segments.append(class_samples)

    Num_samples_total += Num_samples

    print("")

print("***** Total number of loaded samples: ", Num_samples_total)

SCANNING L1_1257050112 FOLDER
------------------------------------------
/content/drive/MyDrive/Colab Notebooks/GitHub/CCSNeHFGW_ResNetClass/Datasets/L1_1257050112

SCANNING L1_1256783872 FOLDER
------------------------------------------
/content/drive/MyDrive/Colab Notebooks/GitHub/CCSNeHFGW_ResNetClass/Datasets/L1_1256783872

***** Total number of loaded samples:  3053


In [None]:
# ------> Some checks

print("Key of dictionaries in strain_segments lists:")
print("-----------------------------------------------")
print(strain_segments[0].keys())
print(strain_segments[1].keys())

Key of dictionaries in strain_segments lists:
-----------------------------------------------
dict_keys(['class 1', 'class 2', 'class 3'])
dict_keys(['class 1', 'class 3', 'class 2'])


## 4) Prepare Data

In [None]:
# ------> Merge lists of strain samples for each class

samples_class1 = strain_segments[0]["class 1"] + strain_segments[1]["class 1"]
samples_class2 = strain_segments[0]["class 2"] + strain_segments[1]["class 2"]
samples_class3 = strain_segments[0]["class 3"] + strain_segments[1]["class 3"]

In [None]:
# ------> Convert lists to numpy arrays, clear memory from lists

samples_c1 = np.array(samples_class1)
del(samples_class1)
gc.collect()

samples_c2 = np.array(samples_class2)
del(samples_class2)
gc.collect()

samples_c3 = np.array(samples_class3)
del(samples_class3)
gc.collect()

0

In [None]:
# ------> Check dimensions

print("Dimension of samples_c1 array:", samples_c1.shape)
print("Dimension of samples_c2 array:", samples_c2.shape)
print("Dimension of samples_c3 array:", samples_c3.shape)

Dimension of samples_c1 array: (1021, 3971, 2)
Dimension of samples_c2 array: (1018, 3971, 2)
Dimension of samples_c3 array: (1014, 3971, 2)


In [None]:
# ------> Compute sampling frequency

fs = 1 / (np.transpose(samples_c1[0])[0][1] - np.transpose(samples_c1[0])[0][0])

## 4) Samples conversion

The next cell contains functions to compute wavelet transform

In [None]:
# @title
# ---------------------------------------------------------------
# ------> Wavelet time-frequency representation
# ---------------------------------------------------------------

def WaveletTF_transform(h, fs, fstart, fstop, delta_f, width, doplots):

    """
    This function computes the time-frequency representation
    based on a Wavelet transform of a signal (time series)

    INPUT:
            h        -> Signal
            fs       -> Sampling frequency
            fstart   -> Filter's initial frequency
            fstop    -> Filter's final frequency
            delta_f  -> Frequency bin width
            width    -> Width of the wavelet (in cycles)
            doplots  -> Do plots for checks (0: no, 1: yes)

    OUTPUT:
            timeVec  -> Vector of times
            freqVec  -> Vector of frequencies
            WL       -> Wavelet coefficients

    """

    # ------> Time vector and time sampling

    Nsamples = len(h)
    timeVec = np.arange(0, Nsamples)/fs
    ts      = 1/fs

    # ------> Frequency vector

    Nfreq = round((fstop - fstart) / delta_f) + 1
    freqVec = np.linspace(fstart, fstop, Nfreq).reshape((-1, 1))

    # ------> Initialize Wavelet Transform Matrix

    WL = np.zeros((Nfreq, Nsamples))

    #print("Wavelet Transform Matrix shape:", WL.shape)

    # ------> Compute the time-frequency representation

    signal = h
    #signal = detrend(h, axis=-1, type='linear')

    for ifre in range(Nfreq):

        # Compute the Morlet wavelet
        Morlet = Morlet_wavelet(freqVec[ifre], ts, width, doplots);

        # Apply the Morlet wavelet transform
        WLcomplex = np.convolve(signal, Morlet, mode='full')

        # Get indexes
        li = int(np.ceil(len(Morlet) / 2))
        ls = li + Nsamples
        #ls = len(WLcomplex) - int(np.flor(len(Morlet) / 2))

        # Complex coefficients
        WLcomplex = WLcomplex[li:ls]

        if doplots:

            print("Frecuency =", freqVec[ifre])
            print("++++++++++++++++++++++++++++++++++++++++")
            print("")

            plt.figure()
            plt.subplot(3, 1, 1)
            plt.plot(np.real(WLcomplex), 'r')
            plt.plot(np.imag(WLcomplex), 'b')
            plt.legend(['real', 'imag'])
            plt.box(True)
            plt.title('Frequency: ' + str(freqVec[ifre]) + ' Hz')

            plt.subplot(3, 1, 2)
            plt.plot(np.abs(WLcomplex))
            plt.legend(['magnitude'])
            plt.box(True)

            plt.subplot(3, 1, 3)
            plt.plot(np.angle(WLcomplex))
            plt.legend(['phase'])
            plt.box(True)

            plt.tight_layout()
            plt.pause(0.1)  # Pause for a short time to show the plot
            #plt.savefig('wavelet_decomposition.png')

        # Compute the magnitude
        WLmag = 2*(np.abs(WLcomplex)**2)/fs

        # Save wavelet decomposition magnitude
        WL[ifre, :] = WLmag

        WL = np.squeeze(WL)

        if doplots:

            plt.figure()
            X, Y = np.meshgrid(timeVec, freqVec)

            plt.pcolormesh(X, Y, WL[:, :], shading='gouraud')

            plt.axis([np.min(timeVec), np.max(timeVec), np.min(freqVec), np.max(freqVec)])
            plt.xlabel('Time (s)')
            plt.ylabel('Frequency (Hz)')
            plt.title('Time-Frequency representation')
            plt.box(True)
            plt.colorbar()

            plt.show()

    return timeVec, freqVec, WL

# ---------------------------------------------------------------
# ------> Morlet Wavelet time-frequency representation
# ---------------------------------------------------------------

def Morlet_wavelet(fi, ts, morlet_w, doplots):

    """
    Function to compute the Complex Morlet Wavelet for frequency "fi"
    and time "t". This will be normalized so the total energy = 1.

    INPUT:
            fi        -> Frequency
            ts        -> Sampling time
            morlet_w  -> Wavelet's width (width>= 5 is suggested)
            doplots   -> Do plots for checks (0: no, 1: yes)

    OUTPUT:
            Morlet    -> Morlet wavelet
    """

    # Frequency standard deviation
    sf = fi / morlet_w

    # Time standard deviation
    st = 1/(2*np.pi*sf)

    # Wavelet's amplitude
    A  = 1/np.sqrt(st*np.sqrt(np.pi))

    # Time array
    t = np.arange(-3.5 * st, 3.5 * st + ts, ts)

    # Compute Morlet wavelet
    Morlet =  A*np.exp(-t**2/(2*st**2))*np.exp(1j*2*np.pi*fi*t)

    if doplots:

        plt.figure()

        plt.subplot(3, 1, 1)
        plt.plot(t, np.real(Morlet), 'r', linewidth=2)
        plt.plot(t, np.imag(Morlet), 'b', linewidth=2)
        plt.ylabel('Morlet')
        plt.legend(['Real', 'Imag'])
        plt.box(True)
        plt.title('Frequency = ' + str(fi) + 'Hz')

        plt.subplot(3, 1, 2)
        plt.plot(t, np.abs(Morlet), '.-r')
        # plt.axis([-4, 4, 0, 6])
        plt.xlabel('Time (s)')
        plt.ylabel('Magnitude')

        plt.subplot(3, 1, 3)
        plt.plot(t, np.angle(Morlet), '.-b')
        # plt.axis([-4, 4, -4, 4])
        plt.xlabel('Time (s)')
        plt.ylabel('Angle')

        plt.tight_layout()
        plt.show()

    return Morlet

In [None]:
cd "/content/drive/MyDrive/Colab Notebooks/GitHub/CCSNeHFGW_ResNetClass/Datasets/TFsamples_c1/"

/content/drive/MyDrive/Colab Notebooks/GitHub/CCSNeHFGW_ResNetClass/Datasets/TFsamples_c1


In [None]:
pwd

'/content/drive/MyDrive/Colab Notebooks/GitHub/CCSNeHFGW_ResNetClass/Datasets/TFsamples_c1'

In [None]:
# ------> Convert samples to images

# Samples class 1

for i in range(len(samples_c1)):
#for i in range(3):

    # Transpose sample array
    sample = np.transpose(samples_c1[0])

    # Abrir carpeta para guardar muestras de imagenes....

    # Print a message
    print("Converting window sample ", i)

    # Generate TF representation
    time, freq, Sxx = WaveletTF_transform(sample[1], fs, 10, 2000, 10, 7, 0)

    # Plot and save TF representation
    plt.figure(i, figsize=(1,1))
    plt.pcolormesh(time+sample[0][0], freq, Sxx, shading='gouraud')
    plt.xlabel('Time [s]'); plt.ylabel('Freq [Hz]')
    plt.axis("off")

    TF_sample = io.BytesIO()
    plt.savefig(TF_sample, bbox_inches='tight', pad_inches=0, format='jpg')

    # Convert matplotlib figure to PIL image
    img = Image.open(TF_sample)

    # Resize PIL image
    img_resized = img.resize((64, 64))

    TF_sample.close()
    #!rm TF_sample

    # Convert PIL resized image to RGB
    if img_resized.mode != 'RGB':
        img_resized = img_resized.convert('RGB')

    sample_number = str(i).zfill(6)

    # Save PIL resized RGB image
    img_resized.save('TF_sample' + sample_number + '.jpg')

    # Save PIL resized RGB image as numpy array
    img_array = np.array(img_resized)
    np.save('TF_sample_' + sample_number, img_array)

    #plt.show()

    plt.figure(i).clear()
    plt.close()
    gc.collect()

Converting window sample  0
Converting window sample  1
Converting window sample  2
Converting window sample  3
Converting window sample  4
Converting window sample  5
Converting window sample  6
Converting window sample  7
Converting window sample  8
Converting window sample  9
Converting window sample  10
Converting window sample  11
Converting window sample  12
Converting window sample  13
Converting window sample  14
Converting window sample  15
Converting window sample  16
Converting window sample  17
Converting window sample  18
Converting window sample  19
Converting window sample  20
Converting window sample  21
Converting window sample  22
Converting window sample  23
Converting window sample  24
Converting window sample  25
Converting window sample  26
Converting window sample  27
Converting window sample  28
Converting window sample  29
Converting window sample  30
Converting window sample  31
Converting window sample  32
Converting window sample  33
Converting window sample