In [1]:
from pathlib import Path
import pandas as pd
import xlwings as xw
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from rdp import rdp

In [2]:
#Access input folder
input_dir1 = Path ("C:/Users/z5379606/OneDrive - UNSW/Viperblast/Reflected_pressure/Near_field_time_history_txt_files_dummy")
print ("1",input_dir1)

#Access folders inside input folder
input_dir2 =  [folder for folder in input_dir1.iterdir() if folder.is_dir()]
print ("2",input_dir2)

# Create output directory
output_dir1 = Path("C:/Users/z5379606/OneDrive - UNSW/Viperblast/Reflected_pressure/Near_field_time_history_dummy2")
output_dir1.mkdir(exist_ok = True)

# Define current work directory
current_dir = Path.cwd()

# Access the template to be used to generate files
excel_template = current_dir/ "Template.xlsx"

1 C:\Users\z5379606\OneDrive - UNSW\Viperblast\Reflected_pressure\Near_field_time_history_txt_files_dummy
2 [WindowsPath('C:/Users/z5379606/OneDrive - UNSW/Viperblast/Reflected_pressure/Near_field_time_history_txt_files_dummy/1.25m'), WindowsPath('C:/Users/z5379606/OneDrive - UNSW/Viperblast/Reflected_pressure/Near_field_time_history_txt_files_dummy/2.75m')]


In [3]:
# Identify peaks with tolerance parameters
height_threshold = 200  # Minimum height of peaks to detect
prominence = 0.01  # Minimum prominence of peaks in pressure units
width = 8  # Minimum width of peaks in time units

In [4]:
def reduce_points(time, pressure, i):
    
    lowest_index1 = np.argmin(pressure[:])
    zero_crossing_index = next((i for i, val in enumerate(pressure[:lowest_index1], start=0) if val <= 0), None)
    
    # Combine time and pressure into a single array of 2D points
    points = np.column_stack((time, pressure))
    
    segment1 = points[:zero_crossing_index]  # Example segmentation
    segment2 = points[zero_crossing_index:]
    
    reduced_segment1 = rdp(segment1, epsilon=0.001)
    reduced_segment2 = rdp(segment2, epsilon=0.5)
    
    reduced_points = np.vstack((reduced_segment1, reduced_segment2))
    
    # Separate the reduced points into time and pressure
    reduced_time = reduced_points[:, 0]
    reduced_pressure = reduced_points[:, 1]
    
    # Plot the original and reduced data
    plt.figure(figsize=(4, 4))
    plt.plot(time, pressure, label='Original Data', alpha=0.5)
    plt.plot(reduced_time, reduced_pressure, label='Reduced Data', marker='o')
    plt.xlabel('Time')
    plt.ylabel('Pressure')
    plt.legend()
    
    # Save the plot as an image
    plt.savefig(output_dir3/f'{i}_reduced.png', dpi=150, bbox_inches='tight', facecolor='white')
        
    # Close the plot to free up memory
    plt.close()
    
    return reduced_time, reduced_pressure

In [5]:
def get_phase_array(time, pressure, CT, CP, TT, TP):
    a=0
    b=0
    phase = []
    for t, p in zip(time, pressure):
        if round (t, 7) < round (CT, 7):
            phase.append('positive')#{t} {change_time}')
        elif round (CT, 7) <= round (t, 7) < round(TT, 7):
            a+=1
            if a == 1 and abs(p - CP) <0.000001:
                phase.append('negative_increasing')
            elif a == 1 and abs(p - CP) >0.000001:
                phase.append('error')
                print (f'error- p={p} , change p ={CP}, t ={t}, change t = {CT}')
            else:
                phase.append('negative_increasing')# {t} {trough_time}')
    
        elif round (TT, 7) <= round (t, 7):
            b+=1
            if b == 1 and abs(p - TP) < 0.000001:
                phase.append('negative_decreasing')
            elif b == 1 and abs(p - TP) > 0.000001:
                phase.append('error')
                print (f'error- p={p} , trough p ={TP}, t ={t}, trough t = {TT}')
            else:
                phase.append('negative_decreasing')
        else:
            phase.append('something wrong')
            print ('something wrong')
                
    return phase

In [6]:
def find_CTE(reduced_pressure, reduced_time):
    print (len(reduced_pressure))
    lowest_index2 = np.argmin(reduced_pressure[:])
    # TROUGH
    TT = reduced_time[lowest_index2]
    TP = reduced_pressure[lowest_index2]
    
    if reduced_pressure[lowest_index2] < -0.2:
        Status = "if"
        
        # Find the first zero crossing between peak and lowest value
        zero_crossing_index_1 = next((i for i, val in enumerate(reduced_pressure[:lowest_index2], start=0) if val <= 0), None)
    
        zero_crossing_index_2 = next((i for i, val in enumerate(reduced_pressure[lowest_index2:], start=lowest_index2) if val > -0.2), None)
        #zero_crossing_index_2 = next((i for i, val in enumerate(smoothed_pressure[lowest_index:], start=lowest_index) if val > -0.1), None)
        
        if zero_crossing_index_2 is None:
            Status = "if_if"
            zero_crossing_index_2 = min(range(lowest_index2, len(reduced_pressure)),key=lambda i: abs(reduced_pressure[i]))
                        
    elif -0.2 < reduced_pressure[lowest_index2] < 0:
        Status = "elif"
        # Find the first zero crossing between peak and lowest value
        zero_crossing_index_1 = next((i for i, val in enumerate(reduced_pressure[:lowest_index2], start=0) if val <= 0), None) 
        
        zero_crossing_index_2 = next((i for i, val in enumerate(reduced_pressure[lowest_index2:], start=lowest_index2) if val > -0.05), None)
        
    else:
        Status = "else"
        zero_crossing_index_1 = lowest_index2
        zero_crossing_index_2 = lowest_index2
    # CHANGE AND END
    CT = reduced_time[zero_crossing_index_1]
    CP = reduced_pressure[zero_crossing_index_1]
    ET = reduced_time[zero_crossing_index_2]
    EP = reduced_pressure[zero_crossing_index_2]
    final_time = reduced_time[: zero_crossing_index_2+1]
    final_pressure = reduced_pressure[: zero_crossing_index_2+1]
    
    phase = get_phase_array(final_time, final_pressure, CT, CP, TT, TP)
    print (zero_crossing_index_1, lowest_index2, zero_crossing_index_2)
    
    # Create a dictionary with the arrays
    data1 = {"Mass": [EM] * len(final_time),
             "Standoff distance": [SD] * len(final_time),
             "Angle": [A] * len(final_time),
             'Time': final_time,
             "Phase": phase,
             'Pressure': final_pressure}
    
    return TT, TP, CT, CP, ET, EP, Status, data1, final_time, final_pressure

In [7]:
A_mapping = {1: 0, 2: 15, 3: 30, 4: 45}
df_all = pd.DataFrame(columns=["Mass", "Standoff distance", "Angle", "Arrival time", "AP", "Change time", "CP", "Trough time", "TP", "End time", "EP", "Status", 'S/NS'])

for folder1 in input_dir2: # folder1-1.00m

    input_dir3 =  [folder2 for folder2 in folder1.iterdir() if folder2.is_dir()]
    # Create output folders
    output_dir2 = output_dir1/ folder1.name
    output_dir2.mkdir(exist_ok = True)
    print ("OUTPUT 2", output_dir2)
    
    for folder2 in input_dir3: # folder2-1.00m00.5kg
        # SD AND EM
        SD = float(folder2.name[0:4])
        EM = float(folder2.name[5:9])
        
        # Create output folders
        output_dir3 = output_dir2/ folder2.name
        output_dir3.mkdir(exist_ok = True)
        print ("OUTPUT 3", output_dir3)
        
        # Make a list of data file names
        txt_files = list(folder2.rglob("*.txt"))
        file1 = txt_files[0]
        print (file1)
        
        # Read data from Excel file
        df1 = pd.read_csv(file1, delimiter='\s+', header=None) 
        
        # Initiate xlwings library
        with xw.App (visible = False) as app:
        
            for i in range (1,5):
                # A
                A = A_mapping.get(i, None) 
                
                time = df1.iloc[:,0].dropna().values*1000
                pressure = df1.iloc[:,i].dropna().values/1000
                
                wb = app.books.open(excel_template)
                
                peak_index = np.argmax(pressure)
                # ARRIVAL
                AT = time[peak_index]
                AP = pressure[peak_index]
                
                pressure = pressure[peak_index:]
                time = time[peak_index:]
                
                # Find peaks
                peaks, properties = find_peaks(pressure, height=height_threshold, prominence=prominence, width=width)
            
                # Ensure there are at least two peaks
                if len(peaks) >= 1:
                    #first_peak_index = peaks[0]
                
                    # Create a copy of pressure data to modify
                    smoothed_pressure = np.copy(pressure)
                
                    # Define a window around each peak after the first peak to apply smoothing
                    window_radius_r = 50
                    window_radius_l = 50 # Adjust the window radius as needed
                
                    for peak_index in peaks[:]:
                        start = max(peak_index - window_radius_l, 0)
                        end = min(peak_index + window_radius_r + 1, len(pressure))
                
                        # Extract values just outside the window
                        if start > 0 and end < len(pressure):
                            start_value = pressure[start - 1]
                            end_value = pressure[end]
                        elif start > 0:
                            start_value = pressure[start - 1]
                            end_value = start_value
                        elif end < len(pressure):
                            end_value = pressure[end]
                            start_value = end_value
                        else:
                            start_value = end_value = np.mean(pressure)  # Fallback in case there are no valid points
                
                        # Create a linearly spaced array between start_value and end_value
                        interpolated_values = np.linspace(start_value, end_value, end - start)
                
                        # Apply the interpolated values to the points within the window
                        smoothed_pressure[start:end] = interpolated_values
                        SS= 'S'
                        
                        reduced_time, reduced_pressure = reduce_points(time, smoothed_pressure, i)
                
                        TT, TP, CT, CP, ET, EP, Status, data1, final_time, final_pressure = find_CTE(reduced_pressure, reduced_time)
                        
                else:
                    SS = 'NS'
                    reduced_time, reduced_pressure =reduce_points(time, pressure, i)
                    TT, TP, CT, CP, ET, EP, Status, data1, final_time, final_pressure = find_CTE(reduced_pressure, reduced_time)
                    

                # Creating a new DataFrame to append
                row_all = pd.DataFrame({"Mass": [EM], "Standoff distance": [SD], "Angle": [A],
                                        "Arrival time": [AT],"AP": [AP],
                                        "Change time": [CT], "CP": [CP],
                                        "Trough time": [TT], "TP": [TP],
                                        "End time": [ET], "EP": [EP],
                                        "Status": [Status], 'S/NS': [SS],
                                        "Original":[len(pressure)], "Reduced": [len(reduced_pressure)]})
                df_all = pd.concat([df_all, row_all], ignore_index=True)
                   
                # Create DataFrames from dictionaries
                df2 = pd.DataFrame(data1)
                    
                # Write dataframe in excel template
                wb.sheets[0].range("A1").options(index=False).value = df2
                
                # Create files in output directory
                wb.save(output_dir3/f"{i}.xlsx")
                    
                print ("file_name",i)
                
                # Create the plot
                plt.figure(figsize=(4, 4))
                plt.plot(time, pressure, color='b', linestyle=':')
                plt.plot(final_time, final_pressure, color='r', linestyle='-')
                # Save the plot as an image
                plt.savefig(output_dir3/f'{i}.png', dpi=150, bbox_inches='tight', facecolor='white')
                    
                # Close the plot to free up memory
                plt.close()
                    

OUTPUT 2 C:\Users\z5379606\OneDrive - UNSW\Viperblast\Reflected_pressure\Near_field_time_history_dummy2\1.25m
OUTPUT 3 C:\Users\z5379606\OneDrive - UNSW\Viperblast\Reflected_pressure\Near_field_time_history_dummy2\1.25m\1.25m02.5kg
C:\Users\z5379606\OneDrive - UNSW\Viperblast\Reflected_pressure\Near_field_time_history_txt_files_dummy\1.25m\1.25m02.5kg\viper3d_th_overpressure.txt
45 47 48
file_name 1
49 51 52
file_name 2
65 69 70
file_name 3
55 57 58
file_name 4
OUTPUT 3 C:\Users\z5379606\OneDrive - UNSW\Viperblast\Reflected_pressure\Near_field_time_history_dummy2\1.25m\1.25m08.5kg
C:\Users\z5379606\OneDrive - UNSW\Viperblast\Reflected_pressure\Near_field_time_history_txt_files_dummy\1.25m\1.25m08.5kg\viper3d_th_overpressure.txt
44 46 59
file_name 1
47 49 61
file_name 2
49 57 60
file_name 3
131 134 138
file_name 4
OUTPUT 3 C:\Users\z5379606\OneDrive - UNSW\Viperblast\Reflected_pressure\Near_field_time_history_dummy2\1.25m\1.25m20.5kg
C:\Users\z5379606\OneDrive - UNSW\Viperblast\Reflecte

KeyboardInterrupt: 

<Figure size 288x288 with 0 Axes>

In [None]:
for folder1 in input_dir2: # folder1-1.00m

    input_dir3 =  [folder2 for folder2 in folder1.iterdir() if folder2.is_dir()]
    # Create output folders
    output_dir2 = output_dir1/ folder1.name
    output_dir2.mkdir(exist_ok = True)
    #print ("OUTPUT 2", output_dir2)
    
    for folder2 in input_dir3: # folder2-1.00m00.5kg
        # SD AND EM
        SD = float(folder2.name[0:4])
        EM = float(folder2.name[5:9])
        
        # Create output folders
        output_dir3 = output_dir2/ folder2.name
        output_dir3.mkdir(exist_ok = True)
        #print ("OUTPUT 3", output_dir3)
        
        # Make a list of data file names
        txt_files = list(folder2.rglob("*.txt"))
        file1 = txt_files[0]
        #print (file1)
        
        # Read data from Excel file
        df1 = pd.read_csv(file1, delimiter='\s', header=None) 
        print (df1.iloc[0,0])

In [None]:
# Write dataframes in excel template
all_path = 'RP_near_field_all.xlsx'

df_all.to_excel(all_path, index=False)