In [None]:
import pywt
import numpy as np
import joblib
import random
import pickle
from sklearn import preprocessing
import scipy
import pandas as pd

In [None]:
def compute_wavelet(signal, i ,j,scales,data_cwt, start,
                 waveletname = 'morl'):
    start = 60 + start
    [coefficients, frequencies] = pywt.cwt(signal, scales, waveletname)
    coefficients = coefficients[:127,start:start+127]
    data_cwt[j,:,:,i] = np.abs(coefficients)

In [None]:
snr_levels = [0, 20, 30]

## Detection

In [None]:
with open("X_detect.pickle", "rb") as f:
    X_detect = pickle.load(f)

In [None]:
size1 = 40000
X_detect_input = np.ndarray(shape=(size1, 127, 127, 6), dtype=np.float32)

In [None]:
j = 0
for k, items in enumerate(X_detect):
    i = 0
    snr = np.random.choice(snr_levels)
    start = int(random.random()*0.1*2000)
    for (columnName,columnData) in items.items(): 
        input_signal = columnData[:1000].values
        if max(input_signal)<1:
            input_signal = input_signal*666
        if snr == 0:
            filtered_signal = input_signal
        else:
            signal_original = add_all_noises(input_signal/5,noise_factors,snr)   
            if "I" in columnName:
                signal_original = signal_original/666
            b,a = scipy.signal.cheby1(9, 1, 100, fs=2000, btype='lowpass')
            filtered_signal = scipy.signal.filtfilt(b, a, signal_original)
        signal_normalised = preprocessing.normalize([filtered_signal])
        compute_wavelet(signal_normalised.reshape(-1),i,j,scales=np.arange(1,128),data_cwt=X_detect_input, start=start)
        i += 1
    j += 1 

In [None]:
joblib.dump(X_detect_input, "X_detect_input_noisy_varied1.pkl")

## Classification

In [None]:
with open("X_classify.pickle", "rb") as f:
    X_classify = pickle.load(f)

In [None]:
size2 = 22000
X_classify_input = np.ndarray(shape=(size2, 127, 127, 6), dtype=np.float32)

In [None]:
j = 0
for k, items in enumerate(X_classify):
    i = 0
    snr = np.random.choice(snr_levels)
    start = int(random.random()*0.1*2000)
    for (columnName,columnData) in items.items(): 
        input_signal = columnData[:1000].values
        if max(input_signal)<1:
            input_signal = input_signal*666
        if snr == 0:
            filtered_signal = input_signal
        else:
            signal_original = add_all_noises(input_signal/5,noise_factors,snr)   
            if "I" in columnName:
                signal_original = signal_original/666
            b,a = scipy.signal.cheby1(9, 1, 100, fs=2000, btype='lowpass')
            filtered_signal = scipy.signal.filtfilt(b, a, signal_original)
        signal_normalised = preprocessing.normalize([filtered_signal])
        compute_wavelet(signal_normalised.reshape(-1),i,j,scales=np.arange(1,128),data_cwt=X_classify_input, start=start)
        i += 1
    j += 1 

In [None]:
joblib.dump(X_classify_input, "X_classify_input_noisy_varied1.pkl")

## Localisation

In [None]:
def compute_wavelet(signal, i ,j,scales, data_cwt, 
                 waveletname = 'morl'):
    
    [coefficients, frequencies] = pywt.cwt(signal, scales, waveletname)
    coefficients = coefficients[:40,:40]
    data_cwt[j,:,:,i] = np.abs(coefficients)

In [None]:
snr_levels = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]

### Training

In [None]:
size2 = 1010
X_locate_input = np.ndarray(shape=(size2, 80, 80, 6), dtype=np.float32)

In [None]:
lengths = pd.read_csv("C:\\Users\\Aimee Simons\\Desktop\\2024\\Lectures\\Semester 2\\Final Thesis\\Model\\Fault Localisation\\Tester Model\\Line Lengths.csv")
lines = lengths['Line'].values
for line in lines:
    with open(f"Location Data2\\{line}\\X_locate_{line}_training.pickle", 'rb') as f:
        X_locate = pickle.load(f)
    for snr in snr_levels:
        j = 0
        for items in X_locate:
            i = 0
            # snr = np.random.choice(snr_levels)
            for (columnName,columnData) in items.items():
                input_signal = columnData.values
                factor = 400/max(input_signal)
                input_signal = input_signal*factor
                if snr == 0 or snr== '_noNoise':
                    filtered_signal = input_signal/factor
                    snr='_noNoise'
                else:
                    signal_original = add_all_noises(input_signal/5,noise_factors,snr)
                    signal_original = signal_original/factor 
                    b,a = scipy.signal.cheby1(9, 1, 100, fs=2000, btype='lowpass')
                    filtered_signal = scipy.signal.filtfilt(b, a, signal_original)
                signal_normalised = preprocessing.normalize([filtered_signal])
                compute_wavelet(signal_normalised.reshape(-1),i,j,scales=np.arange(1,81),data_cwt=X_locate_input)
                i += 1
            j += 1 
        with open(f"LocationData\\{line}\\X_locate_input_train{snr}.pickle", 'wb') as f:
            pickle.dump(X_locate_input, f)

### Testing

In [None]:
orderOfTypes =['ABC','ABG','AB','ACG','AC','AG','BCG','BC','BG','CG']

In [None]:
size2 = 15
X_locate_input = np.ndarray(shape=(size2, 40, 40, 6), dtype=np.float32)

In [None]:
lengths = pd.read_csv("C:\\Users\\Aimee Simons\\Desktop\\2024\\Lectures\\Semester 2\\Final Thesis\\Model\\Fault Localisation\\Tester Model\\Line Lengths.csv")
lines = lengths['Line'].values
for line in lines:
    with open(f"Location Data2\\{line}\\X_locate_{line}_testing.pickle", 'rb') as f:
        X_locate = pickle.load(f)
    for k, fault in enumerate(orderOfTypes):
        j = 0
        for items in X_locate[k]:
            i = 0
            snr = np.random.choice(snr_levels)
            for (columnName,columnData) in items.items():
                input_signal = columnData.values
                factor = 400/max(input_signal)
                input_signal = input_signal*factor
                if snr == 0 or snr== '_noNoise':
                    filtered_signal = input_signal/factor
                    snr='_noNoise'
                else:
                    signal_original = add_all_noises(input_signal/5,noise_factors,snr)
                    signal_original = signal_original/factor 
                    b,a = scipy.signal.cheby1(9, 1, 100, fs=2000, btype='lowpass')
                    filtered_signal = scipy.signal.filtfilt(b, a, signal_original)
                signal_normalised = preprocessing.normalize([filtered_signal])
                compute_wavelet(signal_normalised.reshape(-1),i,j,scales=np.arange(1,81),data_cwt=X_locate_input)
                i += 1
            j += 1 
        with open(f"LocationData\\{line}\\X_locate_input_test_{fault}.pickle", 'wb') as f:
            pickle.dump(X_locate_input, f)