In [1]:
import numpy as np
import csv
import time

def profile_execution(func):
    """Decorator to measure execution time of functions"""
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        print(f"{func.__name__} took {(end_time - start_time)*1000:.2f} ms")
        return result
    return wrapper

@profile_execution
def find_first_zero_crossing(Bcurr, startIdx):
    """Find the first zero crossing from a given index"""
    for i in range(int(startIdx), len(Bcurr) - 1):
        if Bcurr[i] * Bcurr[i + 1] <= 0:
            return i
    return None

@profile_execution
def find_zero_crossings(Bcurr, Tms):
    """Find three consecutive zero crossings"""
    Bzero_cross_i = find_first_zero_crossing(Bcurr, 1)
    Bsecond_zero_cross_i = find_first_zero_crossing(Bcurr, Bzero_cross_i + (1000/Tms))
    Bthird_zero_cross_i = find_first_zero_crossing(Bcurr, Bsecond_zero_cross_i + (1000/Tms))
    return Bzero_cross_i, Bsecond_zero_cross_i, Bthird_zero_cross_i

def calc_statistics(data):
    # Number of data points
    n = len(data)

    # Calculate mean
    mean_val = np.mean(data)

    # Calculate standard deviation
    std_dev = np.std(data)

    # Find min and max for peak to peak
    min_val = np.min(data)
    max_val = np.max(data)

    # Calculate RMS
    rms_val = np.sqrt(np.mean(np.square(data)))

    # Calculate absolute mean for shape and impulse factors
    mean_abs = np.mean(np.abs(data))

    # Calculate skewness and kurtosis
    skewness_val = (np.mean((data - mean_val) ** 3)) / (std_dev ** 3)
    kurtosis_val = (np.mean((data - mean_val) ** 4)) / (std_dev ** 4) - 3.0

    # Output results
    results = {
        'mean': mean_val,
        'std_dev': std_dev,
        'skewness': skewness_val,
        'kurtosis': kurtosis_val,
        'peak_to_peak': max_val - min_val,  # peak to peak
        'rms': rms_val,
        'crest_factor': max_val / rms_val,  # crest factor
        'shape_factor': rms_val / mean_abs, # shape factor
        'impulse_factor': max_val / mean_abs,  # impulse factor
        'margin_factor': max_val / (mean_abs ** 2), # margin factor
        'energy': np.sum(np.square(data))  # energy
    }

    return results

def main():
    # Define constants for base currents and sampling time
    IbaseR = 4.79
    IbaseY = 4.54
    IbaseB = 4.56
    Tms = 50
    filename = 'H_L2_1.csv'

    # Measure data loading time
    start_time = time.perf_counter()
    data = []
    with open(filename, newline='') as csvfile:
        csvreader = csv.reader(csvfile)
        next(csvfile)  # Skip the first two header rows
        next(csvfile)
        for row in csvreader:
            data.append([float(value) for value in row])

    print(f"Loading CSV data took {(time.perf_counter() - start_time)*1000:.2f} ms")

    # Convert to numpy array for easier manipulation
    data = np.array(data)

    # Create time vector
    time_vec = np.linspace(0, Tms/1000, 1002)

    # Extract and normalize currents
    start_time = time.perf_counter()
    Rx, Yx, Bx = data[:, 2], data[:, 4], data[:, 6]
    R1, Y1, B1 = Rx / IbaseR, Yx / IbaseY, Bx / IbaseB
    print(f"Current normalization took {(time.perf_counter() - start_time)*1000:.2f} ms")

    # Apply Park's Transformation
    start_time = time.perf_counter()
    curr_vect = np.vstack((R1, Y1, B1))
    Tcmat = np.sqrt(2/3) * np.array([
        [1, -1/2, -1/2],
        [0, np.sqrt(3)/2, -np.sqrt(3)/2],
        [1/np.sqrt(2), 1/np.sqrt(2), 1/np.sqrt(2)]
    ])
    op = np.dot(Tcmat, curr_vect)
    Id, Iq, Io = op[0, :], op[1, :], op[2, :]
    print(f"Park's transformation took {(time.perf_counter() - start_time)*1000:.2f} ms")

    # Calculate complex vector magnitude
    start_time = time.perf_counter()
    Pv = Id + 1j * Iq
    Pvm = np.abs(Pv)
    print(f"Complex vector calculation took {(time.perf_counter() - start_time)*1000:.2f} ms")

    # Find zero crossings
    Bzero_cross_i, Bsecond_zero_cross_i, Bthird_zero_cross_i = find_zero_crossings(Iq, Tms)

    # Extract and process one cycle
    start_time = time.perf_counter()
    Pv2 = Pvm[Bzero_cross_i:Bthird_zero_cross_i]
    Pv3 = Pv2 - np.mean(Pv2)
    print(f"Signal processing took {(time.perf_counter() - start_time)*1000:.2f} ms")

    # Calculate statistics for the processed signal
    statistics = calc_statistics(Pv3)

    # Write statistics to a CSV file
    output_filename = 'statistics_output.csv'
    with open(output_filename, mode='w', newline='') as csvfile:
        fieldnames = ['statistic', 'value']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        writer.writeheader()
        for key, value in statistics.items():
            writer.writerow({'statistic': key, 'value': value})

    print(f"Writing statistics to CSV file took {(time.perf_counter() - start_time)*1000:.2f} ms")

if __name__ == "__main__":
    main()


Loading CSV data took 53.43 ms
Current normalization took 0.39 ms
Park's transformation took 2.03 ms
Complex vector calculation took 1.66 ms
find_first_zero_crossing took 0.75 ms
find_first_zero_crossing took 1.31 ms
find_first_zero_crossing took 1.52 ms
find_zero_crossings took 5.36 ms
Signal processing took 0.34 ms
Writing statistics to CSV file took 7.49 ms
