# Data Mining - Brain-to-text '25

## Overview of the competition

People with ALS or brainstem stroke can lose the ability to move and speak. Speech brain-computer interfaces (BCIs) can restore communication by decoding what someone is trying to say directly from their brain activity. Once decoded, the person’s intended message can be spoken for them or typed as text on a computer.

### Loading hdf5 file in Python

In [None]:
import h5py
import numpy as np
import pandas as pd
import glob
import os
from tqdm import tqdm
import matplotlib.pyplot as plt


In [None]:
LOGIT_PHONE_DEF = [
    'BLANK', 'SIL', # blank and silence
    'AA', 'AE', 'AH', 'AO', 'AW',
    'AY', 'B',  'CH', 'D', 'DH',
    'EH', 'ER', 'EY', 'F', 'G',
    'HH', 'IH', 'IY', 'JH', 'K',
    'L', 'M', 'N', 'NG', 'OW',
    'OY', 'P', 'R', 'S', 'SH',
    'T', 'TH', 'UH', 'UW', 'V',
    'W', 'Y', 'Z', 'ZH'
]
SIL_DEF = ['SIL']

In [None]:
# Define a function to load the data from the hdf5 file
def load_h5py_file(file_path):
    data = {
        'neural_features': [],
        'n_time_steps': [],
        'seq_class_ids': [],
        'seq_len': [],
        'transcriptions': [],
        'sentence_label': [],
        'session': [],
        'block_num': [],
        'trial_num': [],
    }
    # Open the hdf5 file for that day
    with h5py.File(file_path, 'r') as f:

        keys = list(f.keys())

        # For each trial in the selected trials in that day
        for key in keys:
            g = f[key]

            neural_features = g['input_features'][:]
            n_time_steps = g.attrs['n_time_steps']
            seq_class_ids = g['seq_class_ids'][:] if 'seq_class_ids' in g else None
            seq_len = g.attrs['seq_len'] if 'seq_len' in g.attrs else None
            transcription = g['transcription'][:] if 'transcription' in g else None
            sentence_label = g.attrs['sentence_label'][:] if 'sentence_label' in g.attrs else None
            session = g.attrs['session']
            block_num = g.attrs['block_num']
            trial_num = g.attrs['trial_num']

            data['neural_features'].append(neural_features)
            data['n_time_steps'].append(n_time_steps)
            data['seq_class_ids'].append(seq_class_ids)
            data['seq_len'].append(seq_len)
            data['transcriptions'].append(transcription)
            data['sentence_label'].append(sentence_label)
            data['session'].append(session)
            data['block_num'].append(block_num)
            data['trial_num'].append(trial_num)
    return data


In [None]:
DATA_PATH = 'data/t15_copyTask_neuralData/hdf5_data_final'

# Recursively search for all *train.hdf5 files under all subdirectories of DATA_PATH
def get_data_files(data_type='train'):
    """
    Return a list of files matching the given data type ('train', 'val', or 'test').
    """
    return glob.glob(os.path.join(DATA_PATH, '**', f'*{data_type}.hdf5'), recursive=True)

In [None]:
train_files = get_data_files('train')
train_files

In [None]:
# Load all data into a single DataFrame
def load_data(files):
    df = pd.DataFrame()

    for file in tqdm(files, desc="Loading data"):
        data = load_h5py_file(file)
        df = pd.concat([df, pd.DataFrame(data)], ignore_index=True)
    return df


In [None]:
train_df = load_data(train_files)

## Data observation

### Understand the Structure & EDA

In [None]:
train_df.head()

In [None]:
train_df.info()

In [None]:
# Count duplicate sentence_label data
duplicate_count = sum(train_df.duplicated(subset=['sentence_label']))
print(f"Duplicate sentence_label count: {duplicate_count}")

# Count duplicate sentence_label data ratio
duplicate_ratio = train_df.duplicated(subset=['sentence_label']).mean() * 100
print(f"Duplicate sentence_label ratio: {duplicate_ratio:.2f}%")


In [None]:
# Display duplicated sentence_label data
duplicated_sentences = train_df[train_df.duplicated(subset=['sentence_label'], keep=False)].sort_values(by=['sentence_label', 'session'])
duplicated_sentences

In [None]:
# Visualize the n_time_steps differences for duplicated sentence_label (interactive plot)

import plotly.graph_objs as go
import plotly.express as px

# Find duplicated sentence_labels
dup_sent_labels = train_df[train_df.duplicated(subset=['sentence_label'], keep=False)]['sentence_label'].unique()

# Keep only rows with these duplicated sentence_labels
dup_df = train_df[train_df['sentence_label'].isin(dup_sent_labels)].copy()

# Prepare long-format data
plot_df = dup_df[['sentence_label', 'n_time_steps']].copy()
plot_df['Occurrence'] = plot_df.groupby('sentence_label').cumcount() + 1

# Interactive line plot: one line per duplicated sentence
fig = go.Figure()
for label in dup_sent_labels:
    group = plot_df[plot_df['sentence_label'] == label]
    if len(group) > 1:
        fig.add_trace(go.Scatter(
            x=group['Occurrence'],
            y=group['n_time_steps'],
            mode='lines+markers',
            name=label,
            text=[label]*len(group),
            opacity=0.5,
            hovertemplate='sentence: %{text}<br>Occurrence: %{x}<br>n_time_steps: %{y}<extra></extra>'
        ))

fig.update_layout(
    title="n_time_steps differences for duplicated sentence_label (interactive)",
    xaxis_title="Occurrence (different trials of the same sentence)",
    yaxis_title="n_time_steps",
    legend_title="sentence_label",
    height=600,
    width=1200
)
fig.show()

In [None]:
# Interactive boxplot
box_data = plot_df.groupby('sentence_label').filter(lambda x: len(x) > 1)
fig2 = px.box(
    box_data,
    x='sentence_label',
    y='n_time_steps',
    title="Distribution of n_time_steps for duplicated sentence_label (interactive boxplot)",
    labels={'sentence_label': 'sentence_label', 'n_time_steps': 'n_time_steps'}
)
fig2.update_layout(
    xaxis_tickangle=45,
    height=600,
    width=1300
)
fig2.show()


In [None]:
train_df.describe()

In [None]:
# Check the shape of the neural features data
for i in range(5):
    print(f"Shape of data item {i}: ")
    print(f"\t neural_features {np.array(train_df['neural_features'][i]).shape}")
    print(f"\t seq_class_ids len {np.array(train_df['seq_len'][i])}")
    # 忽略 0 並計算長度
    transcriptions = np.array(train_df['transcriptions'][i])
    transcriptions_wo_zero = transcriptions[transcriptions != 0]
    print(f"\t transcriptions len (ignore 0): {len(transcriptions_wo_zero)}")

In [None]:
from random import randint
trl = randint(0, len(train_df))  # 可以改成你想看的 trial 編號

features = np.array(train_df['neural_features'][trl])  # shape: (time, channel)

# 顯示該 trial 的基本資訊
print(f"session:      {train_df['session'][trl]}")
print(f"block_num:    {train_df['block_num'][trl]}")
print(f"trial_num:    {train_df['trial_num'][trl]}")
print(f"sentence:     {train_df['sentence_label'][trl]}")

# 畫出所有 channel 的 heatmap
plt.figure(figsize=(12, 6))
plt.imshow(features.T, aspect='auto', cmap='viridis', interpolation='none')
plt.colorbar(label='Neural Activity')
plt.ylabel('Channel')
plt.xlabel('Time')
plt.title(f" Neural Features Heatmap\nSession: {train_df['session'][trl]}, Trial: {trl}, sentence: \"{train_df['sentence_label'][trl]}\"")
plt.show()


In [None]:
n_time_steps = np.array(train_df['n_time_steps'])
plt.hist(n_time_steps, bins=50)
plt.title("Distribution of time steps per trial")
plt.xlabel("Number of time steps")
plt.ylabel("Count")
plt.show()


In [None]:
seq_len = np.array(train_df['seq_len'])
plt.hist(seq_len, bins=50)
plt.title("Distribution of sequence lengths per trial")
plt.xlabel("Sequence length")
plt.ylabel("Count")
plt.show()

In [None]:
from pandas.plotting import scatter_matrix
scatter_matrix(train_df[['n_time_steps', 'seq_len']], alpha=0.5, figsize=(10, 8), diagonal='kde')
plt.show()


In [None]:
# Plot histogram of class ids
# Concatenate all seq_class_ids from all trials into a single list
all_class_ids = []
for seq in train_df['seq_class_ids']:
    all_class_ids.extend(seq)
all_class_ids = np.array(all_class_ids)

# Count the frequency of each class id
min_id = int(all_class_ids.min())
max_id = int(all_class_ids.max())
class_ids = np.arange(min_id, max_id + 1)
counts = np.array([(all_class_ids == cid).sum() for cid in class_ids])

# Take log on y-axis
log_counts = np.log10(counts + 1)  # Add 1 to avoid log(0)

fig = go.Figure()
fig.add_trace(go.Bar(
    x=class_ids,
    y=log_counts,
    text=counts,  # Show actual frequency
    textposition='outside',
    marker_color='skyblue'
))

fig.update_layout(
    title='Class ID Frequency Distribution (log scale, all trials)',
    xaxis_title='Class ID',
    yaxis_title='log10(Frequency + 1)',
    xaxis=dict(
        tickmode='array',
        tickvals=class_ids,
        ticktext=[str(cid) for cid in class_ids]
    ),
    bargap=0.1,
    showlegend=False,
    height=500,
    width=1000
)

fig.show()

In [None]:
# Directly use all_class_ids to map to phonemes, ensuring the distribution matches class id distribution
def ids_to_phonemes(seq_ids, id2symbol):
    return [id2symbol[i] if 0 <= i < len(id2symbol) else f"UNK_{i}" for i in seq_ids]

# Get all phoneme labels corresponding to class ids
all_phoneme_labels = np.array(ids_to_phonemes(all_class_ids, LOGIT_PHONE_DEF))

# Count the frequency of each class id (corresponds to class_ids, counts above)
min_id = int(all_class_ids.min())
max_id = int(all_class_ids.max())
class_ids = np.arange(min_id, max_id + 1)
phoneme_list = [LOGIT_PHONE_DEF[cid] if 0 <= cid < len(LOGIT_PHONE_DEF) else f"UNK_{cid}" for cid in class_ids]
counts = np.array([(all_class_ids == cid).sum() for cid in class_ids])

# Take log on y-axis
log_counts = np.log10(counts + 1)

fig = go.Figure()
fig.add_trace(go.Bar(
    x=phoneme_list,
    y=log_counts,
    text=counts,  # Show actual frequency
    textposition='outside',
    marker_color='skyblue'
))

fig.update_layout(
    title='Phoneme Frequency Distribution (log scale, all trials)',
    xaxis_title='Phoneme',
    yaxis_title='log10(Frequency + 1)',
    xaxis=dict(
        tickmode='array',
        tickvals=phoneme_list,
        ticktext=phoneme_list
    ),
    bargap=0.1,
    showlegend=False,
    height=500,
    width=1000
)

fig.show()



In [None]:
# Concatenate all time points from all trials into a single (total_time_points, channel) matrix
all_features_matrix = np.concatenate(train_df['neural_features'], axis=0)  # shape: (total_time, channel)
all_features_matrix.shape

In [None]:
sample_n = 50000
idx = np.random.choice(all_features_matrix.shape[0], sample_n, replace=False)
all_features_matrix_sample = all_features_matrix[idx, :]

all_features_matrix_sample.shape

In [None]:
# Use min-max range color bar + mean line for a cleaner distribution visualization
means = np.mean(all_features_matrix_sample, axis=0)      # (channel,)
mins = np.min(all_features_matrix_sample, axis=0)        # (channel,)
maxs = np.max(all_features_matrix_sample, axis=0)        # (channel,)

plt.figure(figsize=(8, 4))
channels = np.arange(all_features_matrix_sample.shape[1])

# Plot min-max range as a color bar
plt.fill_between(channels, mins, maxs, color='skyblue', alpha=0.5, label='min-max range')

# Plot mean line
plt.plot(channels, means, color='navy', linewidth=2, label='mean')

plt.title("Signal distribution per neural feature (min-max + mean)")
plt.xlabel("Feature index (0–511)")
plt.ylabel("Neural activity")
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
corr = np.corrcoef(all_features_matrix_sample.T)

plt.figure(figsize=(8, 8))
plt.imshow(corr, cmap='coolwarm')
plt.colorbar()
plt.title('Feature correlation map')

plt.show()


In [None]:
# Perform PCA on (channel, time) to analyze the distribution across channels
from sklearn.decomposition import PCA

pca = PCA(n_components=2, svd_solver='randomized')
pca_result = pca.fit_transform(all_features_matrix_sample)  # shape: (channel, 2)

# Visualization
plt.figure(figsize=(8,6))
plt.scatter(pca_result[:,0], pca_result[:,1], s=2, alpha=0.5)
plt.title('PCA of All Channel-Time Points)')
plt.xlabel('PCA 1')
plt.ylabel('PCA 2')
plt.show()

In [None]:
# 查看每個 channel 在主成分上的貢獻

loadings = pca.components_

print(loadings.shape)

top_pca1 = np.argsort(np.abs(loadings[0, :]))[-10:]
top_pca2 = np.argsort(np.abs(loadings[1, :]))[-10:]

print("PCA 1 最重要的前 10 個 channel：")
for rank, idx in enumerate(top_pca1, 1):
    print(f"  {rank}. channel {idx}，貢獻 {loadings[0, idx]:.4f}")

print()

print("PCA 2 最重要的前 10 個 channel：")
for rank, idx in enumerate(top_pca2, 1):
    print(f"  {rank}. channel {idx}，貢獻 {loadings[1, idx]:.4f}")



In [None]:
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

best_n_clusters = 0
best_score = -1
best_labels = None
n = 2


kmeans = KMeans(n_clusters=n, random_state=42)
labels = kmeans.fit_predict(all_features_matrix_sample)
score = silhouette_score(all_features_matrix_sample, labels)
print(f"n_clusters={n}，Silhouette Score={score:.4f}")


In [None]:
# Visualization
plt.figure(figsize=(8,6))
plt.scatter(pca_result[:,0], pca_result[:,1], c=labels, cmap='viridis', s=2, alpha=0.5)
plt.title('KMeans Clustering of All Channel-Time Points)')
plt.xlabel('PCA 1')
plt.ylabel('PCA 2')
plt.show()