In [1]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from dotenv import load_dotenv 
import glob
from sklearn.preprocessing import LabelEncoder

In [2]:
load_dotenv(dotenv_path="../.env")
FFT_FILE_PATH = os.getenv("FFT_DATA_PATH")
train_path = FFT_FILE_PATH + "/train_dataset"
test_path = FFT_FILE_PATH + "/test_dataset"
validation_path = FFT_FILE_PATH + "/validation_dataset"

In [35]:
def count_labels_in_directory(dir_path: str):
    # Define the prefixes
    prefixes = ['error', 'normal', 'zero', 'overcurrent', 'overheating']

    # Count files for each prefix
    file_counts = {}

    for prefix in prefixes:
        pattern = f"{dir_path}/{prefix}_*"
        files = glob.glob(pattern)
        file_counts[prefix] = len(files)

    # Print the results
    for prefix, count in file_counts.items():
        print(f"There are {count} files with the prefix '{prefix}' in the directory.")
        
def read_data_from_dir(dir_path):
    # Initialize lists to store data
    amplitude_data = []
    label_data = []
    file_name_data = []

    # Iterate through each prefix
    for prefix in ['error', 'normal', 'zero', 'overcurrent', 'overheating']:
        pattern = f"{dir_path}/{prefix}_*.csv"
        files = glob.glob(pattern)
        
        # Read Amplitude column from each file
        for file in files:
            df = pd.read_csv(file)
            
            # Extract Amplitude column
            amplitude = df['Amplitude'].values.tolist()
            
            # Store data in lists
            amplitude_data.append(amplitude)
            label_data.append(prefix)
            file_name_data.append(os.path.basename(file))

    # Convert lists to numpy arrays if needed
    amplitude_array = np.array(amplitude_data)
    label_array = np.array(label_data)
    file_name_array = np.array(file_name_data)

    return amplitude_array, label_array, file_name_array

def check_data_distribution(array):
    unique_labels, label_counts = np.unique(array, return_counts=True)

    distribution = dict(zip(unique_labels, label_counts))
    return distribution

# def plot_freq_spectrum(frequency_data, signal, title_name):
#     plt.stem(frequency_data[1:4501], signal[1:4501], 'b', markerfmt=" ", basefmt="-b")
#     plt.title(f"Frequency spectrum of {title_name}")
#     plt.xlabel('Freq (Hz)')
#     plt.ylabel('FFT Amplitude |X(freq)|')
#     # Set x-axis ticks at the scale of 10^-5
#     ticks = [10**(-5) * i for i in range(10)]
#     plt.xticks(ticks)
#     plt.show()
    
def plot_freq_spectrum(frequency_data, signal, title_name):
    # Create a figure and axis
    ax = plt.subplots()

    # Plot stems using the x-values aligned with ticks
    ax.stem(frequency_data[1:4501], signal[1:4501], 'b', markerfmt=" ", basefmt="-b")

    # Set x-axis ticks at the scale of 10^-5
    ticks = [10**(-5) * i for i in range(10)]
    ax.set_xticks(ticks)

    # Set labels and title
    ax.set_title(f"Frequency spectrum of {title_name}")
    ax.set_xlabel('Freq (Hz)')
    ax.set_ylabel('FFT Amplitude |X(freq)|')

    # Show the plot
    plt.show()

In [20]:
X_train, y_train, train_file_names = read_data_from_dir(dir_path=train_path)
X_test, y_test, test_file_names = read_data_from_dir(dir_path=test_path)
X_validation, y_validation, validation_file_names = read_data_from_dir(dir_path=validation_path)
print(f"Train dataset: X: {X_train.shape}, y: {y_train.shape}")
print(f"Test dataset: X: {X_test.shape}, y: {y_test.shape}")
print(f"Validation dataset: X: {X_validation.shape}, y: {y_validation.shape}")

Train dataset: X: (8400, 4501), y: (8400,)
Test dataset: X: (1800, 4501), y: (1800,)
Validation dataset: X: (1800, 4501), y: (1800,)


In [21]:
frequency_data = np.array(pd.read_csv("../../data/fft_data/train_dataset/error_data10.csv")['Frequency'].to_list()) 

In [22]:
X_all = np.concatenate((X_train, X_test, X_validation), axis=0)
y_all = np.concatenate((y_train, y_test, y_validation), axis=0)

In [23]:
print(X_all.shape, y_all.shape)

(12000, 4501) (12000,)


In [24]:
error_data = X_all[np.where(y_all == "error")]
normal_data = X_all[np.where(y_all == "normal")]
overcurrent_data = X_all[np.where(y_all == "overcurrent")]
overheating_data = X_all[np.where(y_all == "overheating")]
zero_data = X_all[np.where(y_all == "zero")]

In [26]:
average_error_ampls = np.mean(error_data, axis=0)
average_normal_ampls = np.mean(normal_data, axis=0)
average_overcurrent_ampls = np.mean(overcurrent_data, axis=0)
average_overheating_ampls = np.mean(overheating_data, axis=0)
average_zero_ampls = np.mean(zero_data, axis=0)

In [38]:
average_error_data_dict = {
    "Frequecy": frequency_data,
    "Amplitude": average_error_ampls
}
average_normal_data_dict = {
    "Frequecy": frequency_data,
    "Amplitude": average_normal_ampls
}
average_overcurrent_data_dict = {
    "Frequecy": frequency_data,
    "Amplitude": average_overcurrent_ampls
}
average_overheating_data_dict = {
    "Frequecy": frequency_data,
    "Amplitude": average_overheating_ampls
}
average_zero_data_dict = {
    "Frequecy": frequency_data,
    "Amplitude": average_zero_ampls
}

# Create DataFrames from the dictionaries
df_error = pd.DataFrame(average_error_data_dict)
df_normal = pd.DataFrame(average_normal_data_dict)
df_overcurrent = pd.DataFrame(average_overcurrent_data_dict)
df_overheating = pd.DataFrame(average_overheating_data_dict)
df_zero = pd.DataFrame(average_zero_data_dict)

# Create a Pandas Excel writer using ExcelWriter
with pd.ExcelWriter('output_file.xlsx') as writer:
    # Write each DataFrame to a specific sheet
    df_error.to_excel(writer, sheet_name='error', index=False)
    df_normal.to_excel(writer, sheet_name='normal', index=False)
    df_overcurrent.to_excel(writer, sheet_name='overcurrent', index=False)
    df_overheating.to_excel(writer, sheet_name='overheating', index=False)
    df_zero.to_excel(writer, sheet_name='zero', index=False)