In [23]:
import bagpy
from bagpy import bagreader
import os
import pandas as pd
from dateutil.parser import parse as parsedate
import matplotlib.pyplot as plt
from rosbags.highlevel import AnyReader as RosBagReader
from pathlib import Path
import logging
from dotenv import dotenv_values
from dotenv import load_dotenv
import ast 

%load_ext autoreload
%autoreload 2

config = dotenv_values()
data_path = os.path.normpath(config['DATA_PATH'])
os.environ['DATA_PATH'] = data_path
print(f"DATA_PATH is set to: {os.environ['DATA_PATH']}")

trial_types = {
#Trial Types: ['Start Chamber', 'Left Cue', 'Right Cue', 'Sound Cue']
    1: ['1', 'Triangle', 'No_Cue', 'White_Noise'],
    2: ['1', 'No_Cue', 'Triangle', '5KHz'],
    3: ['1', 'Triangle', 'No_Cue', '5KHz'],
    4: ['1', 'No_Cue', 'Triangle', 'White_Noise'],
    5: ['3', 'Triangle', 'No_Cue', 'White_Noise'],
    6: ['3', 'No_Cue', 'Triangle', '5KHz'],
    7: ['3', 'Triangle', 'No_Cue', '5KHz'],
    8: ['3', 'No_Cue', 'Triangle', 'White_Noise'],
    9: ['5', 'Triangle', 'No_Cue', 'White_Noise'],
    10: ['5', 'No_Cue', 'Triangle', '5KHz'],
    11: ['5', 'Triangle', 'No_Cue', '5KHz'],
    12: ['5', 'No_Cue', 'Triangle', 'White_Noise'],
    13: ['7', 'Triangle', 'No_Cue', 'White_Noise'],
    14: ['7', 'No_Cue', 'Triangle', '5KHz'],
    15: ['7', 'Triangle', 'No_Cue', '5KHz'],
    16: ['7', 'No_Cue', 'Triangle', 'White_Noise']
}

def biases_over_days(rat, path, number_of_days):
    if isinstance(rat, str):
        rat = int(rat)

    anim_folder = os.path.join(path, 'NC4%04d' % rat)
    training_folders = get_latest_folders(anim_folder, number_of_days)
    statistics = {}
    for folder in training_folders:
        folder_name = os.path.basename(folder)
        biases = determine_trial_info (rat, path, folder_name)
        statistics[folder_name] = list(biases)
    return statistics
        
def get_latest_folders(directory, n=7):
   
    subdirs = [os.path.join(directory, d) for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]
    
    subdirs.sort(key=lambda x: os.path.getmtime(x), reverse=True)
    
    latest_subdirs = subdirs[:n]
    
    return latest_subdirs
    
def determine_trial_info(rat, date, path=os.environ['DATA_PATH']):
    print(f"Using path: {path}") 
    if isinstance(rat, str):
        rat = int(rat)

    #look in the specific animal folder based on the number designated to each animal 
    anim_folder = os.path.join(path, 'NC4%04d' % rat)
    print(f"Using path: {path}")
    print(f"Looking in folder: {anim_folder}")

    #look for specific a date 
    if '-' in date:  # e.g. date = '24-Jul-24' 
        date = parsedate(date).strftime('%y%m%d')

    date_folder = os.path.join(anim_folder, date)

    print(f"Looking in folder: {date_folder}")
    
    bag_file = None  

    for file in os.listdir(date_folder):
        if file.endswith('.bag'):
            bag_file = os.path.join(date_folder, file)  
            break 

    if bag_file is None:
        raise FileNotFoundError("No .bag file found in the specified folder.")
    
    msg_list = []
    with RosBagReader([Path(bag_file)]) as reader:
        connections = [x for x in reader.connections if x.topic=='/rosout']
        for connection,  timestamp, rawdata in reader.messages(connections=connections):
            msg = reader.deserialize(rawdata, connection.msgtype)
            msg_list.append(msg.msg)

    return msg_list, bag_file

def filter_start_of_trial_messages(messages):
    keywords = ['Chamber', 'Current trial number', 'START OF TRIAL', 'SUCCESS', 'ERROR', 'Right', 'Left']
    return [msg for msg in messages if any(msg.startswith(keyword) for keyword in keywords)]

def remove_specific_message(messages):
    return [msg for msg in messages if msg != 'ERROR_END']

def create_dataframe_from_messages(messages):
    # Calculate the number of rows needed
    num_rows = len(messages) // 5 + (1 if len(messages) % 5 != 0 else 0)
    
    columns =['Number', 'Start Chamber', 'Trial Number', 'Trial', 'Result', 'Choice']

    # Initialize an empty DataFrame
    df = pd.DataFrame(columns=columns)
    
    # Populate the DataFrame with messages
    for i in range(num_rows):
        start_idx = i * 5
        end_idx = start_idx + 5
        row_data = messages[start_idx:end_idx]
        
        # Fill the remaining columns with empty strings if less than 6 messages in the last row
        while len(row_data) < 5:
            row_data.append('')
        
        df.loc[i] = [i + 1] + row_data
    
    return df

# Function to extract the specific items from the 'Trial' column
def extract_trial_info(trial_str):
    try:
        # Extract the list from the string
        list_str = trial_str.split('START OF TRIAL ')[1]
        
        # Convert the string representation of the list to an actual list
        trial_list = ast.literal_eval(list_str)
        
        # Ensure the extracted list has at least three elements
        if len(trial_list) < 3:
            return None, None, None  # Or handle it differently based on your needs
        
        # Extract the specific items (first three items)
        item1 = trial_list[0]
        item2 = trial_list[1]
        item3 = trial_list[2]
        
        return item1, item2, item3
    
    except (IndexError, ValueError, SyntaxError) as e:
        # Handle cases where the string is not as expected
        print(f"Error processing: {trial_str} - {e}")
        return None, None, None
    
# Function to determine trial type
def determine_trial_type(row):
    # Extract relevant columns
    trial_info = [row['Start Chamber Number'], row['Left Cue'], row['Right Cue'], row['Sound Cue']]
    
    # Loop through trial types and find a match
    for trial_type, expected_values in trial_types.items():
        if trial_info == expected_values:
            return trial_type
    return None  # In case no match is found


rat_number = 6
date = '240722'

messages, bag_file = determine_trial_info(rat_number, date)

extracted_biases_file_path = os.path.join(os.path.dirname(bag_file), 'extracted_biases.csv')

#check whether the files aleady exist. If they exists read them if not create them.
if os.path.exists(extracted_biases_file_path):
    #read the extracted bias csv file that contains trial information
    extracted_biases = pd.read_csv(extracted_biases_file_path)
    #read the trial type summary csv file
    csv_trial_type_summary = os.path.join(os.path.dirname(bag_file), 'trial_type_summary.csv')
    trial_type_summary = pd.read_csv(csv_trial_type_summary)
    print('The dataframes have been created previously')
else:
    filtered_masseages = filter_start_of_trial_messages(messages)

    final_messages = remove_specific_message(filtered_masseages)
    
    df = create_dataframe_from_messages(final_messages)

    #seperate the 'Left Cue', 'Right Cue', 'Sound Cue' of each Trial (row) and pu them in three new columns
    df[['Left Cue', 'Right Cue', 'Sound Cue']] = df['Trial'].apply(lambda x: pd.Series(extract_trial_info(x)))

    df['Start Chamber Number'] = df['Start Chamber'].str.extract(r'Chamber (\d+)')

    #change the order of the columns 
    new_df = df[['Number', 'Start Chamber Number', 'Left Cue', 'Right Cue', 'Sound Cue', 'Result', 'Choice']]

    new_df['Trial Type'] = new_df.apply(determine_trial_type, axis=1)

    new_df.to_csv(extracted_biases_file_path, index=False)

    print(f"DataFrame saved to {extracted_biases_file_path}")

    # Group by 'Trial Types' and 'Results' and count occurrences
    grouped_df = new_df.groupby(['Trial Type', 'Result']).size().unstack(fill_value=0)

    # Rename columns for clarity
    grouped_df.columns = ['Error_Count', 'Success_Count']

    # Calculate total repetitions of each trial type
    grouped_df['Total_Repetitions'] = grouped_df.sum(axis=1)

    # Reset index to make 'Trial Types' a column again
    grouped_df = grouped_df.reset_index()

    # Save the aggregated data to a new CSV file
    output_csv_path = os.path.join(os.path.dirname(bag_file), 'trial_type_summary.csv')
    grouped_df.to_csv(output_csv_path, index=False)

    print(f"Summary saved to {output_csv_path}")





The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
DATA_PATH is set to: \\10.34.1.59\big_gulp\nc4_rat_data\Maze_Rats
Using path: \\10.34.1.59\big_gulp\nc4_rat_data\Maze_Rats
Using path: \\10.34.1.59\big_gulp\nc4_rat_data\Maze_Rats
Looking in folder: \\10.34.1.59\big_gulp\nc4_rat_data\Maze_Rats\NC40006
Looking in folder: \\10.34.1.59\big_gulp\nc4_rat_data\Maze_Rats\NC40006\240722
