In [5]:
import pandas as pd
import sqlite3
import os
import tkinter as tk
from tkinter import filedialog
import xml.etree.ElementTree as ET
from datetime import datetime

# Ask user to select folder containing the Session XML file
root = tk.Tk()
root.withdraw()  # Hide the root window
selected_folder = filedialog.askdirectory(initialdir='D:/Tramp Test/Data/')
if not selected_folder:
    raise ValueError("No folder was selected.")

# Extract the test_date from the selected folder name
folder_name = os.path.basename(selected_folder)
test_date = folder_name.split('_', 1)[0]  # Extract '2024-08-13' from '2024-08-13_105_Growth Plate_'

# Find the XML file titled "Session" in the selected folder
xml_file_path = ''
for r, dirs, files in os.walk(selected_folder):
    for file in files:
        if file.lower().startswith('session') and file.lower().endswith('.xml'):
            xml_file_path = os.path.join(r, file)
            break
    if xml_file_path:
        break

if not xml_file_path:
    raise FileNotFoundError("No 'Session' XML file found in the selected folder.")

# Parse the XML file
tree = ET.parse(xml_file_path)
root_xml = tree.getroot()

# Extract required fields from XML
name = root_xml.find(".//Name").text
dob = root_xml.find(".//DOB").text
height = root_xml.find(".//Height").text
weight = root_xml.find(".//Weight").text
pre_post = root_xml.find(".//Pre_Post").text.lower()
exp_control = root_xml.find(".//Exp_Control").text.lower()
creation_date = root_xml.find(".//Creation_date").text
comments = root_xml.find(".//Comments").text

# Calculate age from DOB
dob_date = datetime.strptime(dob, "%Y-%m-%d")
today = datetime.today()
age = today.year - dob_date.year - ((today.month, today.day) < (dob_date.month, dob_date.day))

# Connect to the SQLite database
db_path = 'D:/Tramp Test/Tramp_Test.sqlite'
conn = sqlite3.connect(db_path)

# Create necessary tables
conn.execute('''CREATE TABLE IF NOT EXISTS Participants (
    participant_id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT,
    dob DATE,
    height REAL,
    weight REAL,
    participant_group TEXT
)''')

conn.execute('''CREATE TABLE IF NOT EXISTS Sessions (
    session_id INTEGER PRIMARY KEY AUTOINCREMENT,
    participant_id INTEGER,
    test_date DATE,
    pre_post TEXT,
    exp_control TEXT,
    comments TEXT,
    FOREIGN KEY (participant_id) REFERENCES Participants(participant_id)
)''')

conn.execute('''CREATE TABLE IF NOT EXISTS Movements (
    movement_id INTEGER PRIMARY KEY AUTOINCREMENT,
    movement_name TEXT
)''')

# Create the Results table with additional name columns
conn.execute("""
    CREATE TABLE IF NOT EXISTS Results (
        result_id INTEGER PRIMARY KEY AUTOINCREMENT,
        session_id INTEGER,
        movement_id INTEGER,
        participant_id INTEGER,
        Trial_Num INTEGER,
        name TEXT,
        movement TEXT,
        JH_IN REAL,
        LEWIS_PEAK_POWER REAL,
        NORM_LEWIS_PEAK_POWER_KG REAL,
        Max_Force REAL,
        FOREIGN KEY (session_id) REFERENCES Sessions(session_id),
        FOREIGN KEY (movement_id) REFERENCES Movements(movement_id),
        FOREIGN KEY (participant_id) REFERENCES Participants(participant_id)
    );
""")


# Insert participant if not already in the database
cursor = conn.cursor()
cursor.execute("SELECT participant_id FROM Participants WHERE name = ?", (name,))
participant = cursor.fetchone()
if participant is None:
    # Combine pre_post and exp_control to define the participant group
    participant_group = f"{exp_control}_{pre_post}"
    cursor.execute("INSERT INTO Participants (name, dob, height, weight, participant_group) VALUES (?, ?, ?, ?, ?)",
                   (name, dob, height, weight, participant_group))
    participant_id = cursor.lastrowid
else:
    participant_id = participant[0]

# Insert session
cursor.execute("INSERT INTO Sessions (participant_id, test_date, pre_post, exp_control, comments) VALUES (?, ?, ?, ?, ?)",
               (participant_id, test_date, pre_post, exp_control, comments))
session_id = cursor.lastrowid

# Define movements and ensure they are in the Movements table
movements = ['cmj', 'dj', 'ppu']
movement_ids = {}
for movement in movements:
    cursor.execute("SELECT movement_id FROM Movements WHERE movement_name = ?", (movement,))
    result = cursor.fetchone()
    if result is None:
        cursor.execute("INSERT INTO Movements (movement_name) VALUES (?)", (movement,))
        movement_ids[movement] = cursor.lastrowid
    else:
        movement_ids[movement] = result[0]

# Insert results with placeholders for additional columns
for movement in movements:
    cursor.execute("""
        INSERT INTO Results (
            session_id, movement_id, participant_id, name, movement, 
            JH_IN, LEWIS_PEAK_POWER, NORM_LEWIS_PEAK_POWER_KG, Max_Force
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (session_id, movement_ids[movement], participant_id, name, movement, None, None, None, None))
conn.commit()
conn.close()

# At the end of the first code block
global_pre_post = pre_post  # Set this as a global variable
print(f"Global pre_post set to: {global_pre_post}")

print(f"Data for participant '{name}' with test date '{test_date}' has been inserted.")


Global pre_post set to: post
Data for participant 'Chandler Seagel' with test date '2025-01-02' has been inserted.


In [6]:
import re
import pandas as pd
import sqlite3
import os

def extract_test_date_from_ascii(ascii_file_path: str) -> str:
    """
    Extracts the test date in 'YYYY-MM-DD' format from the first file path in the ASCII file.
    
    Args:
        ascii_file_path (str): Path to the ASCII file.
    
    Returns:
        str: Extracted test date in 'YYYY-MM-DD' format.
    """
    with open(ascii_file_path, 'r') as file:
        lines = file.readlines()
        # Extract the first file path
        first_file_path = lines[0].strip().split('\t')[0]
        # Split the path to navigate the folder structure
        parts = first_file_path.split('\\')  # Split by folder structure
        if len(parts) > 4:  # Ensure we have enough subfolders
            date_folder = parts[4]  # Get the folder containing the date
            # Use regex to extract 'YYYY-MM-DD' format
            match = re.match(r'^\d{4}-\d{2}-\d{2}', date_folder)
            if match:
                return match.group(0)  # Return the matched date
            else:
                raise ValueError(f"Unable to extract test date from folder: {date_folder}")
        else:
            raise ValueError("Unexpected file path structure: Unable to extract test date.")

def process_ascii_data(df_ascii):
    """
    Process ASCII DataFrame to handle multiple trials dynamically.

    Args:
        df_ascii (pd.DataFrame): Input DataFrame with trial data.

    Returns:
        pd.DataFrame: Processed DataFrame with trials split into rows.
    """
    # Identify the fixed columns and dynamic trial columns
    fixed_columns = ['Trial_ID']
    trial_columns = df_ascii.columns[1:]  # Exclude the Trial_ID column
    num_metrics = 4  # Number of metrics per trial (e.g., JH_IN, LEWIS_PEAK_POWER, etc.)

    # Split columns into groups of metrics for each trial
    trials = []
    for i in range(0, len(trial_columns), num_metrics):
        trial_group = trial_columns[i:i + num_metrics]
        trials.append(trial_group)

    # Prepare a list to store the processed rows
    processed_rows = []

    # Iterate through each row in the DataFrame
    for _, row in df_ascii.iterrows():
        for trial_num, trial_group in enumerate(trials, start=1):
            # Extract data for the current trial
            trial_data = row[trial_group].to_dict()
            trial_data['Trial_ID'] = row['Trial_ID']
            trial_data['Trial_Num'] = trial_num
            processed_rows.append(trial_data)

    # Combine processed rows into a DataFrame
    return pd.DataFrame(processed_rows)

# Use the global variable from the first code block
if 'global_pre_post' in globals():
    pre_post = global_pre_post
    print(f"Using global pre_post: {pre_post}")
else:
    raise ValueError("pre_post value not found. Ensure the first code block was executed.")

# Directory containing the ASCII files
ascii_dir = 'D:/Tramp Test/Output Files/'

# Connect to the database
conn = sqlite3.connect('D:/Tramp Test/Tramp_Test.sqlite')
cursor = conn.cursor()

# Ensure the Results table has a Trial_Num column
try:
    cursor.execute("""
        ALTER TABLE Results ADD COLUMN Trial_Num INTEGER;
    """)
except sqlite3.OperationalError:
    # Ignore if the column already exists
    pass

# Process each movement file
movements = ['cmj', 'dj', 'ppu']
for movement in movements:
    file_path = os.path.join(ascii_dir, f"{movement}.txt")
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")

    # Extract the test date
    test_date = extract_test_date_from_ascii(file_path)
    print(f"Extracted test date: {test_date} for {movement}")

    # Read the ASCII file and extract data
    with open(file_path, 'r') as file:
        lines = file.readlines()
        data = [line.strip().split('\t') for line in lines]

    # Adjust the header to include the missing Trial_ID column
    header = ['Trial_ID'] + lines[1].strip().split('\t')

    # Create the DataFrame using the adjusted header
    df_ascii = pd.DataFrame(data[5:], columns=header)

    # Replace hyphens with underscores in column names
    df_ascii.columns = [col.replace('-', '_') for col in df_ascii.columns]

    # Use the process_ascii_data function to handle multiple trials
    df_processed = process_ascii_data(df_ascii)

    # Fetch participant_name dynamically based on test_date
    cursor.execute("""
        SELECT Participants.name
        FROM Participants
        JOIN Sessions ON Participants.participant_id = Sessions.participant_id
        WHERE Sessions.test_date = ?;
    """, (test_date,))
    participant = cursor.fetchone()

    if not participant:
        raise ValueError(f"No participant found for test_date: {test_date}")

    participant_name = participant[0]
    print(f"Using participant_name: {participant_name}")

    # Fetch the participant_id dynamically
    cursor.execute("""
        SELECT participant_id FROM Participants WHERE name = ?;
    """, (participant_name,))
    participant = cursor.fetchone()

    if not participant:
        raise ValueError(f"No participant found with name: {participant_name}")

    participant_id = participant[0]
    print(f"Using participant_id: {participant_id} for participant_name: {participant_name}")

    # Fetch `pre_post` dynamically based on the session
    cursor.execute("""
        SELECT pre_post FROM Sessions 
        WHERE test_date = ? AND participant_id = ?;
    """, (test_date, participant_id))
    pre_post_value = cursor.fetchone()

    if not pre_post_value:
        raise ValueError(f"No pre_post value found for test_date: {test_date} and participant_id: {participant_id}")

    pre_post = pre_post_value[0]
    print(f"Using pre_post: {pre_post}")

    # Fetch the correct session_id dynamically
    cursor.execute("""
        SELECT session_id FROM Sessions 
        WHERE test_date = ? AND participant_id = ? AND pre_post = ?;
    """, (test_date, participant_id, pre_post))
    session = cursor.fetchone()

    if not session:
        raise ValueError(f"No session found for test_date: {test_date}, participant_id: {participant_id}, pre_post: {pre_post}")

    session_id = session[0]
    print(f"Processing session_id: {session_id} for test_date: {test_date}, pre_post: {pre_post}")

    # Fetch movement_id and movement_name dynamically
    cursor.execute("""
        SELECT movement_id, movement_name 
        FROM Movements
        WHERE movement_name = ?;
    """, (movement,))
    movement_result = cursor.fetchone()
    if not movement_result:
        raise ValueError(f"Movement {movement} not found in database.")
    movement_id, movement_name = movement_result

    # Insert or update processed data in the database
    for _, row in df_processed.iterrows():
        # Check if a matching placeholder row exists
        cursor.execute("""
            SELECT result_id FROM Results 
            WHERE session_id = ? AND movement_id = ? AND Trial_Num IS NULL;
        """, (session_id, movement_id))
        existing_row = cursor.fetchone()

        if existing_row:
            # Update the existing placeholder row
            update_sql = """
                UPDATE Results
                SET Trial_Num = ?, JH_IN = ?, LEWIS_PEAK_POWER = ?, 
                    NORM_LEWIS_PEAK_POWER_KG = ?, Max_Force = ?
                WHERE result_id = ?;
            """
            params = [
                row['Trial_Num'],
                row['JH_IN'],
                row['LEWIS_PEAK_POWER'],
                row['NORM_LEWIS_PEAK_POWER_KG'],
                row['Max_Force'],
                existing_row[0],  # result_id
            ]
            cursor.execute(update_sql, params)
        else:
            # Insert a new row
            insert_sql = """
                INSERT INTO Results (
                    session_id, movement_id, participant_id, Trial_Num, 
                    name, movement, JH_IN, LEWIS_PEAK_POWER, 
                    NORM_LEWIS_PEAK_POWER_KG, Max_Force
                )
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
            """
            params = [
                session_id,
                movement_id,
                participant_id,
                row['Trial_Num'],
                participant_name,
                movement_name,
                row['JH_IN'],
                row['LEWIS_PEAK_POWER'],
                row['NORM_LEWIS_PEAK_POWER_KG'],
                row['Max_Force']
            ]
            cursor.execute(insert_sql, params)

# Commit the transaction
conn.commit()
conn.close()

print("All trials processed and inserted/updated in the database.")


Using global pre_post: post
Extracted test date: 2025-01-02 for cmj
Using participant_name: Chandler Seagel
Using participant_id: 1 for participant_name: Chandler Seagel
Using pre_post: pre
Processing session_id: 1 for test_date: 2025-01-02, pre_post: pre
Extracted test date: 2025-01-02 for dj
Using participant_name: Chandler Seagel
Using participant_id: 1 for participant_name: Chandler Seagel
Using pre_post: pre
Processing session_id: 1 for test_date: 2025-01-02, pre_post: pre
Extracted test date: 2025-01-02 for ppu
Using participant_name: Chandler Seagel
Using participant_id: 1 for participant_name: Chandler Seagel
Using pre_post: pre
Processing session_id: 1 for test_date: 2025-01-02, pre_post: pre
All trials processed and inserted/updated in the database.


In [None]:
# Reorders the database to be in alphabetical order

import sqlite3


db_path = "D:/Tramp Test/Tramp_Test.sqlite" 
sort_column = "name"     

def reorder_all_tables(db_path, sort_column):
    try:
        # Connect to the database
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        # Fetch all table names in the database
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        tables = cursor.fetchall()

        for table in tables:
            table_name = table[0]

            # Skip system tables like sqlite_sequence
            if table_name.startswith("sqlite_"):
                continue

            print(f"Processing table: {table_name}")

            # Check if the column exists in the current table
            cursor.execute(f"PRAGMA table_info({table_name});")
            columns = [info[1] for info in cursor.fetchall()]
            if sort_column not in columns:
                print(f"Skipping table '{table_name}' - Column '{sort_column}' not found.")
                continue

            # Create a new sorted table
            temp_table = f"{table_name}_sorted"
            cursor.execute(f"CREATE TABLE {temp_table} AS SELECT * FROM {table_name} ORDER BY {sort_column} ASC;")
            
            # Drop the old table
            cursor.execute(f"DROP TABLE {table_name};")
            
            # Rename the new table to the original name
            cursor.execute(f"ALTER TABLE {temp_table} RENAME TO {table_name};")
            print(f"Table '{table_name}' reordered successfully.")

        # Commit changes
        conn.commit()
        print("All tables processed.")
    except sqlite3.Error as e:
        print(f"An error occurred: {e}")
    finally:
        conn.close()

reorder_all_tables(db_path, sort_column)
