In [None]:
import pandas as pd

def validate_temperature_range(normal_min, normal_max):
    if not (-50 <= normal_min <= 50 and -50 <= normal_max <= 50):
        raise ValueError("Temperatures must be between -50°C and 50°C")
    if normal_min > normal_max:
        raise ValueError("Normal temperature minimum must be less than or equal to maximum")


In [None]:
def calculate_spike_threshold(threshold_type, threshold_input, normal_min, normal_max):
    if threshold_type.upper() == 'A':
        spike_threshold = threshold_input
        if normal_min <= spike_threshold <= normal_max:
            raise ValueError(f"Spike threshold ({spike_threshold}°C) falls within normal range ({normal_min}°C to {normal_max}°C)")
        if spike_threshold > normal_max and spike_threshold <= 50:
            return spike_threshold, True, None, None
        elif spike_threshold < normal_min and spike_threshold >= -50:
            return spike_threshold, False, None, None
        else:
            raise ValueError("Absolute threshold must be either > normal max or < normal min")
    elif threshold_type.upper() == 'P':
        p = threshold_input
        if p < 0:
            raise ValueError("Percentage threshold must be a positive number")
        normal_range = abs(normal_max - normal_min)
        spike_thresh_upper = normal_max + ((p / 100) * normal_range)
        spike_thresh_lower = normal_min - ((p / 100) * normal_range)
        print(f"Computed spike thresholds from {p}%:")
        print(f"  Spike Upper Threshold: {spike_thresh_upper}°C")
        print(f"  Spike Lower Threshold: {spike_thresh_lower}°C")
        return None, None, spike_thresh_upper, spike_thresh_lower
    else:
        raise ValueError("Threshold type must be 'A' for Absolute or 'P' for Percentage")


In [None]:
def load_and_clean_data(file_path):
    df = pd.read_csv(file_path)
    df.columns = df.columns.str.strip().str.lower()

    try:
        col_map = {
            'timestamp': next(col for col in df.columns if 'time' in col),
            'temperature': next(col for col in df.columns if 'temp' in col),
            'door_contact': next(col for col in df.columns if 'door' in col)
        }
    except StopIteration:
        raise ValueError("Required columns (time, temp, door) not found in file headers.")

    df = df.rename(columns={
        col_map['timestamp']: 'Timestamp',
        col_map['temperature']: 'Temperature',
        col_map['door_contact']: 'Door Contact'
    })

    df['Timestamp'] = pd.to_datetime(df['Timestamp'].astype(str).str.strip('"').str.strip("'"), errors='coerce')
    df = df.dropna(subset=['Timestamp'])
    df['Door Contact'] = df['Door Contact'].replace('', pd.NA).ffill()
    df['Temperature'] = pd.to_numeric(df['Temperature'].replace('', pd.NA), errors='coerce').ffill()

    df = df.sort_values('Timestamp').reset_index(drop=True)
    df.to_csv('filled_data.csv', index=False)
    return df


In [None]:
def filter_data(df):
    print("\nData Filtering Options:")
    print("1. Filter by specific day")
    print("2. Filter by specific month")
    print("3. Filter by date-time range")
    print("4. No filtering")
    option = input("Select filter option (1-4): ")

    if option == '1':
        day_str = input("Enter day (YYYY-MM-DD): ")
        df = df[df['Timestamp'].dt.date == pd.to_datetime(day_str).date()]
    elif option == '2':
        month_str = input("Enter month (YYYY-MM): ")
        df = df[df['Timestamp'].dt.to_period('M') == pd.to_datetime(month_str).to_period('M')]
    elif option == '3':
        start_str = input("Enter start datetime (YYYY-MM-DD HH:MM): ")
        end_str = input("Enter end datetime (YYYY-MM-DD HH:MM): ")
        start_dt = pd.to_datetime(start_str, dayfirst=True)
        end_dt = pd.to_datetime(end_str, dayfirst=True)
        df = df[(df['Timestamp'] >= start_dt) & (df['Timestamp'] <= end_dt)]

    return df


In [None]:
def merge_consecutive_states(states):
    merged = []
    for state in states:
        if not merged or merged[-1] != state:
            merged.append(state)
    return merged

def count_transitions(merged_seq):
    return max(0, len(merged_seq) - 1)


In [None]:
def analyze_spike_frequency(spike_df):
    if spike_df.shape[0] < 2:
        print("\nNot enough spikes to analyze frequency.")
        return {}

    spike_df = spike_df.sort_values('Spike Start Time').reset_index(drop=True)
    intervals = (spike_df['Spike Start Time'].iloc[1:].reset_index(drop=True) -
                 spike_df['Spike End Time'].iloc[:-1].reset_index(drop=True)).dt.total_seconds() / 60

    frequency_metrics = {
        'Minimum Interval (min)': round(intervals.min(), 2),
        'Maximum Interval (min)': round(intervals.max(), 2),
        'Average Interval (min)': round(intervals.mean(), 2),
        'Std Deviation Interval (min)': round(intervals.std(), 2),
    }

    print("\nSpike Interval Analysis:")
    for metric, val in frequency_metrics.items():
        print(f"{metric:30} {val}")

    return frequency_metrics


In [None]:
def main():
    print("=== Temperature Spike Analysis ===")
    try:
        normal_min = float(input("Enter normal temperature minimum: "))
        normal_max = float(input("Enter normal temperature maximum: "))
        validate_temperature_range(normal_min, normal_max)

        threshold_type = input("Enter threshold type ('A' for absolute, 'P' for percentage): ").strip().upper()
        if threshold_type == 'P':
            threshold_input = float(input("Enter spike threshold percentage (positive number): "))
            spike_threshold, is_positive_spike, spike_thresh_upper, spike_thresh_lower = calculate_spike_threshold(
                threshold_type, threshold_input, normal_min, normal_max)
        else:
            threshold_input = float(input("Enter spike threshold value: "))
            spike_threshold, is_positive_spike, spike_thresh_upper, spike_thresh_lower = calculate_spike_threshold(
                threshold_type, threshold_input, normal_min, normal_max)

    except ValueError as e:
        print(f"Input Error: {e}")
        return

    file_path = "Sorted_Data_1.csv"
    try:
        df = load_and_clean_data(file_path)
        df = filter_data(df)
    except Exception as e:
        print(f"Data Loading Error: {e}")
        return

    # Spike detection and door activity analysis
    spikes, door_sequences_original, door_sequences_merged = [], [], []
    in_spike = False
    door_events = []

    for _, row in df.iterrows():
        temp, time, door = row['Temperature'], row['Timestamp'], row['Door Contact']
        is_spike = (temp >= spike_thresh_upper or temp <= spike_thresh_lower) if threshold_type == 'P' else (
            (is_positive_spike and temp >= spike_threshold) or (not is_positive_spike and temp <= spike_threshold))

        if not in_spike and is_spike:
            in_spike = True
            start_time, start_temp, start_door = time, temp, door
            door_events = [str(door)]
        elif in_spike:
            if normal_min <= temp <= normal_max:
                end_time, end_temp, end_door = time, temp, door
                door_events.append(end_door)

                door_sequences_original.append(door_events.copy())
                merged_seq = merge_consecutive_states(door_events)
                door_sequences_merged.append(merged_seq)
                recovery_time = (end_time - start_time).total_seconds() / 60

                original_open = sum(s.lower() == 'open' for s in door_events)
                original_close = sum(s.lower() == 'close' for s in door_events)
                merged_open = sum(s.lower() == 'open' for s in merged_seq)
                merged_close = sum(s.lower() == 'close' for s in merged_seq)
                transitions = count_transitions(merged_seq)

                spikes.append({
                    'Spike Start Time': start_time,
                    'Spike End Time': end_time,
                    'Temperature Start': round(start_temp, 2),
                    'Temperature End': round(end_temp, 2),
                    'Recovery Time (min)': round(recovery_time, 2),
                    'Door Status Start': start_door,
                    'Door Status End': end_door,
                    'Original Open Count': original_open,
                    'Original Close Count': original_close,
                    'Merged Open Count': merged_open,
                    'Merged Close Count': merged_close,
                    'Original Sequence': ", ".join(door_events),
                    'Merged Sequence': ", ".join(merged_seq),
                    'Door Transitions Count': transitions
                })
                in_spike = False
            else:
                door_events.append(str(door))

    spike_df = pd.DataFrame(spikes)
    print(f"\nTotal spikes detected: {len(spike_df)}")

    door_status_start_list = spike_df['Door Status Start'].tolist() if not spike_df.empty else []
    door_status_end_list = spike_df['Door Status End'].tolist() if not spike_df.empty else []

    print("\nDoor Status at Spike Start:")
    start_status_counts = {}
    for status in set(door_status_start_list):
        count = door_status_start_list.count(status)
        start_status_counts[status] = count
        print(f"{status}: {count}")

    print("\nDoor Status at Spike End:")
    end_status_counts = {}
    for status in set(door_status_end_list):
        count = door_status_end_list.count(status)
        end_status_counts[status] = count
        print(f"{status}: {count}")

    if len(spike_df) > 0:
        door_start_open_count = sum(str(s).lower() == 'open' for s in spike_df['Door Status Start'])
        door_start_closed_count = sum(str(s).lower() == 'close' for s in spike_df['Door Status Start'])
        insight_1 = ("Spikes more frequently begin when the door is open." if door_start_open_count >= door_start_closed_count
                     else "Spikes more frequently begin when the door is closed.")

        avg_recovery = spike_df['Recovery Time (min)'].mean()
        avg_transitions = spike_df['Door Transitions Count'].mean()
        insight_2 = ("Frequent door activity tends to prolong spikes." if avg_transitions > 1.5 else
                     "Lower door activity tends to correspond with shorter spikes.")

        spike_hours = spike_df['Spike Start Time'].dt.hour
        if len(spike_hours) > 0:
            hour_counts = spike_hours.value_counts()
            most_common_hour = hour_counts.idxmax()
            least_common_hour = hour_counts.idxmin()
            insight_3 = (f"Door-related spikes occur most frequently around {most_common_hour}:00 hours "
                         f"and least frequently around {least_common_hour}:00 hours.")
        else:
            insight_3 = "No spike hour data available for analysis."

        insights = [insight_1, insight_2, insight_3]

        print("\n=== Door-Spike Insights ===")
        for ins in insights:
            print(ins)
    else:
        insights = []
        print("\nNo spikes detected - door-spike insights unavailable.")

    user_inputs = pd.DataFrame([
        ['Normal Min Temperature', normal_min],
        ['Normal Max Temperature', normal_max],
        ['Threshold Type', threshold_type],
        ['Spike Threshold Input', threshold_input],
        ['Spike Upper Threshold', spike_thresh_upper if threshold_type == 'P' else None],
        ['Spike Lower Threshold', spike_thresh_lower if threshold_type == 'P' else None]
    ], columns=['Parameter', 'Value'])

    frequency_metrics = analyze_spike_frequency(spike_df) if not spike_df.empty else {}

    if not spike_df.empty:
        spike_df_with_num = spike_df.copy()
        spike_df_with_num.insert(0, 'Spike #', range(1, len(spike_df) + 1))
        for col in ['Spike Start Time', 'Spike End Time']:
            spike_df_with_num[col] = spike_df_with_num[col].dt.strftime('%Y-%m-%d %H:%M')
        print("\nSpike Details:")
        print(spike_df_with_num.to_string(index=False))

    with pd.ExcelWriter('detected_spikes_output.xlsx', engine='xlsxwriter', engine_kwargs={'options': {'nan_inf_to_errors': True}}) as writer:
        user_inputs.to_excel(writer, sheet_name='User Inputs', index=False)
        workbook = writer.book
        worksheet = workbook.add_worksheet('Spike Summary')
        writer.sheets['Spike Summary'] = worksheet

        if insights:
            worksheet.write('A1', "Door-Spike Insights:")
            for i, ins_text in enumerate(insights):
                worksheet.write(i + 1, 0, ins_text)
            start_row_after_insights = len(insights) + 2
        else:
            worksheet.write('A1', "No Door-Spike Insights available")
            start_row_after_insights = 1

        worksheet.write(start_row_after_insights, 0, 'Total Spikes Detected')
        worksheet.write(start_row_after_insights, 1, len(spike_df))

        row_start = start_row_after_insights + 2
        for i, (metric, val) in enumerate(frequency_metrics.items()):
            worksheet.write(row_start + i, 0, metric)
            worksheet.write(row_start + i, 1, val)

        row_after = row_start + len(frequency_metrics)
        worksheet.write(row_after, 0, '')

        worksheet.write(row_after + 1, 0, 'Door Status Counts at Spike Start')
        for i, (status, count) in enumerate(start_status_counts.items()):
            worksheet.write(row_after + 2 + i, 0, status)
            worksheet.write(row_after + 2 + i, 1, count)

        start_end_row = row_after + 2 + len(start_status_counts) + 1
        worksheet.write(start_end_row, 0, 'Door Status Counts at Spike End')
        for i, (status, count) in enumerate(end_status_counts.items()):
            worksheet.write(start_end_row + 1 + i, 0, status)
            worksheet.write(start_end_row + 1 + i, 1, count)

        after_counts_row = start_end_row + 1 + len(end_status_counts)
        worksheet.write(after_counts_row, 0, '')
        spike_df.to_excel(writer, sheet_name='Spike Summary', index=False, startrow=after_counts_row + 2)

    print("\nSpike details and summary saved to 'detected_spikes_output.xlsx'")


In [None]:
main()


=== Temperature Spike Analysis ===
Enter normal temperature minimum: -3
Enter normal temperature maximum: 2
Enter threshold type ('A' for absolute, 'P' for percentage): p
Enter spike threshold percentage (positive number): 34
Computed spike thresholds from 34.0%:
  Spike Upper Threshold: 3.7°C
  Spike Lower Threshold: -4.7°C

Data Filtering Options:
1. Filter by specific day
2. Filter by specific month
3. Filter by date-time range
4. No filtering
Select filter option (1-4): 4

Total spikes detected: 854

Door Status at Spike Start:
Open: 43
nan: 1
Close: 810

Door Status at Spike End:
Open: 31
Close: 823

=== Door-Spike Insights ===
Spikes more frequently begin when the door is closed.
Frequent door activity tends to prolong spikes.
Door-related spikes occur most frequently around 6:00 hours and least frequently around 11:00 hours.

Spike Interval Analysis:
Minimum Interval (min)         9.98
Maximum Interval (min)         525.45
Average Interval (min)         54.98
Std Deviation Inter

In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
!pip install xlsxwriter

Collecting xlsxwriter
  Downloading xlsxwriter-3.2.5-py3-none-any.whl.metadata (2.7 kB)
Downloading xlsxwriter-3.2.5-py3-none-any.whl (172 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.3/172.3 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: xlsxwriter
Successfully installed xlsxwriter-3.2.5
