In [5]:
import pandas as pd
from librosa.util import index_to_slice
from scipy.signal import spectrogram

# meta data file (with syllable information)
from PretermDataLoader import DataLoader as dl
import audioPreprocess as ap
import random
import dataPreprocess as dp

In [27]:
metafile = 'guide_test_syllableInfor.pkl'
datanum = 1000 # assume that you want 10000 tokens

In [8]:
loader = dl()
meta = loader.get_metadata(metafile)

In [28]:
mels,indexes = loader.load_data('mel',datanum)
spectrograms = loader.load_data('spectrogram',datanum,indexes)[0]
soundpaths = [meta['filepath'][i] for i in indexes]

In [None]:
# mels: [datanum, (mel_dim, length)]
# 長度固定，因此可以直接轉成numpy array

(128, 126)

In [20]:
# or you can get a bunch of indexlists that keeps the original distribution first
meta['index'] = meta.index
indexset = meta[['stress_type','index','suid']]
# Split into subsets
subsets_index = dp.split_into_subsets(indexset, num_subsets=100, sortkey='stress_type')
# #Display the resulting subsets
# for i, subset in enumerate(subsets):
#     dp.checkDistribution(subset['stress_type'])

  return bound(*args, **kwds)


In [22]:
subsets_index[0]

Unnamed: 0,stress_type,index,suid
98594,1,98594,446-123502-0022-0002
22107,0,22107,1502-122619-0078-0047
27266,1,27266,2092-145706-0037-0037
69203,0,69203,3240-131232-0006-0042
98418,0,98418,446-123502-0017-0045
...,...,...,...
109917,1,109917,5163-39921-0022-0007
125275,1,125275,78-368-0045-0016
120438,1,120438,7402-59171-0047-0042
80574,0,80574,3807-4955-0009-0038


# Now it seems certain that the loading is not on-the-full. 
Therefore, we can happily design loading functions based on this mechanism. We don't have to design a "load-for-all" mechanism, because, anyway, the time consumption and memory usage is not "the full". 

## Next: 
Design how we pass in data for different dataloaders and how we plan learning curve. 

This should be easy to do just following our old practice. 

In [3]:
import random
from collections import deque, Counter

class LearningPathPlanner:
    """
    A planner for generating a learning path from a pool of dataset IDs, allowing for control over 
    probability of selecting new or old datasets, and imposing revisit limits on old datasets.
    """

    def __init__(self, dataset_ids, total_epochs, p1=0.5, revisit_limit=5):
        """
        Initializes the LearningPathPlanner.

        Args:
            dataset_ids (list): List of dataset IDs representing the pool of available datasets.
            total_epochs (int): The total number of epochs for which to generate a learning path.
            p1 (float): Probability of selecting a new dataset in each epoch (between 0 and 1).
            revisit_limit (int): Maximum number of times each dataset can be revisited before being removed.
        """
        self.dataset_ids = dataset_ids  # Pool of dataset IDs
        self.total_epochs = total_epochs
        self.p1 = p1
        self.revisit_limit = revisit_limit
        self.new_datasets = set(dataset_ids)  # Datasets not yet seen
        self.old_datasets = deque()  # Queue to track datasets that have been used
        self.revisit_count = Counter()  # Counter to track the number of revisits for each dataset

    def get_next_dataset(self):
        """
        Determines the next dataset ID to use based on the learning path logic.

        Returns:
            int: The ID of the next dataset to use for training.
        """
        # Decide whether to select a new or an old dataset based on probability p1
        if self.new_datasets and random.random() < self.p1:
            # Choose a new dataset
            next_dataset = self.new_datasets.pop()
            self.old_datasets.append(next_dataset)  # Move to old datasets
            self.revisit_count[next_dataset] = 0  # Initialize revisit count

        else:
            # Choose an old dataset, if any are available
            if self.old_datasets:
                next_dataset = random.choice(self.old_datasets)
                self.revisit_count[next_dataset] += 1

                # Check if the revisit limit is reached
                if self.revisit_count[next_dataset] >= self.revisit_limit:
                    # Remove dataset if revisit limit is reached
                    self.old_datasets.remove(next_dataset)
                    del self.revisit_count[next_dataset]
            else:
                # Fallback to a new dataset if no old datasets are available
                next_dataset = self.new_datasets.pop()
                self.old_datasets.append(next_dataset)
                self.revisit_count[next_dataset] = 0

        return next_dataset

    def generate_learning_path(self):
        """
        Generates a full learning path for the specified total number of epochs.

        Returns:
            list: A list of dataset IDs representing the planned learning path.
        """
        learning_path = []
        for _ in range(self.total_epochs):
            next_dataset = self.get_next_dataset()
            learning_path.append(next_dataset)
        return learning_path


In [10]:
# Define a pool of dataset IDs
dataset_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9]

# Initialize the planner with total epochs, probability p1, and revisit limit
planner = LearningPathPlanner(dataset_ids, total_epochs=20, p1=0.5, revisit_limit=3)

# Generate a learning path
learning_path = planner.generate_learning_path()
print("Planned Learning Path:", learning_path)


Planned Learning Path: [1, 1, 2, 3, 1, 4, 1, 5, 6, 7, 8, 4, 9, 6, 3, 2, 5, 9, 8, 3]


In [8]:
import random
import math
from collections import deque, Counter

class LearningPathPlanner:
    """
    A planner for generating a learning path from a pool of dataset IDs, where revisit probability exponentially decreases 
    with each visit.
    """

    def __init__(self, dataset_ids, total_epochs, p1=0.5, decay_rate=0.5):
        """
        Initializes the LearningPathPlanner.

        Args:
            dataset_ids (list): List of dataset IDs representing the pool of available datasets.
            total_epochs (int): The total number of epochs for which to generate a learning path.
            p1 (float): Probability of selecting a new dataset in each epoch (between 0 and 1).
            decay_rate (float): The rate at which revisit probability decreases exponentially with each visit.
        """
        self.dataset_ids = dataset_ids  # Pool of dataset IDs
        self.total_epochs = total_epochs
        self.p1 = p1
        self.decay_rate = decay_rate
        self.new_datasets = set(dataset_ids)  # Datasets not yet seen
        self.old_datasets = deque()  # Queue to track datasets that have been used
        self.visit_count = Counter()  # Counter to track the number of visits for each dataset

    def get_exponential_probability(self, visits):
        """
        Calculates the probability weight for a dataset based on its visit count using exponential decay.

        Args:
            visits (int): The number of times the dataset has been visited.

        Returns:
            float: The probability weight for the dataset.
        """
        return math.exp(-self.decay_rate * visits)

    def get_next_dataset(self):
        """
        Determines the next dataset ID to use based on the learning path logic, with exponential decay in revisit probability.

        Returns:
            int: The ID of the next dataset to use for training.
        """
        # Decide whether to select a new or an old dataset based on probability p1
        if self.new_datasets and random.random() < self.p1:
            # Choose a new dataset
            next_dataset = self.new_datasets.pop()
            self.old_datasets.append(next_dataset)  # Move to old datasets
            self.visit_count[next_dataset] = 0  # Initialize visit count

        else:
            # Choose an old dataset with probability exponentially decreasing by visit count
            if self.old_datasets:
                # Calculate weights for old datasets based on exponential decay
                weights = [self.get_exponential_probability(self.visit_count[ds]) for ds in self.old_datasets]
                total_weight = sum(weights)
                probabilities = [w / total_weight for w in weights]

                # Randomly choose an old dataset based on calculated probabilities
                next_dataset = random.choices(list(self.old_datasets), weights=probabilities, k=1)[0]
                self.visit_count[next_dataset] += 1  # Increment visit count

            else:
                # Fallback to a new dataset if no old datasets are available
                next_dataset = self.new_datasets.pop()
                self.old_datasets.append(next_dataset)
                self.visit_count[next_dataset] = 0

        return next_dataset

    def generate_learning_path(self):
        """
        Generates a full learning path for the specified total number of epochs.

        Returns:
            list: A list of dataset IDs representing the planned learning path.
        """
        learning_path = []
        for _ in range(self.total_epochs):
            next_dataset = self.get_next_dataset()
            learning_path.append(next_dataset)
        return learning_path


In [23]:
# Define a pool of dataset IDs
dataset_ids = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"]

# Initialize the planner with total epochs, probability p1, and decay rate
planner = LearningPathPlanner(dataset_ids, total_epochs=200, p1=0.5, decay_rate=0.3)

# Generate a learning path
learning_path = planner.generate_learning_path()
print("Planned Learning Path:", learning_path)


Planned Learning Path: ['D', 'D', 'D', 'F', 'E', 'C', 'B', 'I', 'J', 'J', 'E', 'K', 'G', 'H', 'H', 'B', 'I', 'K', 'A', 'K', 'A', 'G', 'A', 'C', 'J', 'H', 'I', 'H', 'I', 'E', 'G', 'C', 'G', 'D', 'F', 'C', 'K', 'H', 'B', 'C', 'I', 'K', 'B', 'G', 'D', 'A', 'D', 'C', 'H', 'J', 'E', 'A', 'K', 'E', 'G', 'J', 'K', 'C', 'E', 'F', 'F', 'D', 'E', 'D', 'I', 'B', 'B', 'F', 'D', 'G', 'J', 'A', 'E', 'K', 'F', 'B', 'E', 'F', 'A', 'J', 'B', 'J', 'A', 'J', 'F', 'I', 'D', 'H', 'H', 'G', 'C', 'F', 'B', 'H', 'B', 'B', 'I', 'D', 'K', 'C', 'D', 'E', 'J', 'H', 'A', 'F', 'E', 'I', 'A', 'K', 'K', 'G', 'I', 'F', 'B', 'F', 'F', 'A', 'J', 'H', 'C', 'J', 'H', 'C', 'B', 'H', 'I', 'I', 'B', 'J', 'G', 'G', 'D', 'I', 'H', 'H', 'K', 'B', 'C', 'A', 'H', 'G', 'D', 'D', 'H', 'K', 'E', 'E', 'G', 'F', 'A', 'C', 'F', 'E', 'G', 'I', 'G', 'A', 'F', 'E', 'C', 'K', 'A', 'A', 'J', 'B', 'B', 'J', 'E', 'K', 'K', 'D', 'A', 'A', 'J', 'E', 'H', 'C', 'B', 'A', 'I', 'E', 'I', 'K', 'F', 'E', 'H', 'E', 'C', 'B', 'K', 'C', 'A', 'G', 'J', '

In [5]:
for i in range(0, 1):
    print(i)

0


In [1]:
import random
import math
from collections import Counter

class LearningPathPlanner:
    """
    A planner for generating a learning path from a pool of dataset IDs, where the probability of selecting each dataset 
    decreases exponentially with the number of times it has been viewed.
    """

    def __init__(self, dataset_ids, total_epochs, decay_rate=0.5):
        """
        Initializes the LearningPathPlanner.

        Args:
            dataset_ids (list): List of dataset IDs representing the pool of available datasets.
            total_epochs (int): The total number of epochs for which to generate a learning path.
            decay_rate (float): The rate at which revisit probability decreases exponentially with each view count.
        """
        self.dataset_ids = dataset_ids  # Pool of dataset IDs
        self.total_epochs = total_epochs
        self.decay_rate = decay_rate
        self.view_count = Counter({ds_id: 0 for ds_id in dataset_ids})  # Initialize all datasets with a view count of 0

    def get_exponential_probability(self, views):
        """
        Calculates the probability weight for a dataset based on its view count using exponential decay.

        Args:
            views (int): The number of times the dataset has been viewed.

        Returns:
            float: The probability weight for the dataset.
        """
        return math.exp(-self.decay_rate * views)

    def get_next_dataset(self):
        """
        Determines the next dataset ID to use based on exponential decay in probability.

        Returns:
            int: The ID of the next dataset to use for training.
        """
        # Calculate weights for each dataset based on exponential decay
        weights = [self.get_exponential_probability(self.view_count[ds]) for ds in self.dataset_ids]
        total_weight = sum(weights)
        probabilities = [w / total_weight for w in weights]

        # Randomly choose a dataset based on calculated probabilities
        next_dataset = random.choices(self.dataset_ids, weights=probabilities, k=1)[0]
        self.view_count[next_dataset] += 1  # Increment view count for selected dataset

        return next_dataset

    def generate_learning_path(self):
        """
        Generates a full learning path for the specified total number of epochs.

        Returns:
            list: A list of dataset IDs representing the planned learning path.
        """
        learning_path = []
        for _ in range(self.total_epochs):
            next_dataset = self.get_next_dataset()
            learning_path.append(next_dataset)
        return learning_path


In [None]:
# Define a pool of dataset IDs
dataset_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9]

# Initialize the planner with total epochs and decay rate
planner = LearningPathPlanner(dataset_ids, total_epochs=20, decay_rate=0.5)

# Generate a learning path
learning_path = planner.generate_learning_path()
print("Planned Learning Path:", learning_path)


Planned Learning Path: [6, 2, 1, 1, 2, 4, 3, 5, 4, 8, 8, 3, 4, 7, 6, 2, 8, 9, 5, 3]


# Model structure

In [12]:
import torch
import torch.nn as nn

class SmallNetwork(nn.Module):
    def __init__(self):
        super(SmallNetwork, self).__init__()
        
        self.conv = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1), 
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Reduces dimensions by half
            
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Reduces dimensions by another half
        )
        
        # Adaptive pooling to ensure a consistent, small output size regardless of input
        self.ap = nn.AdaptiveAvgPool2d(output_size=(4, 4))  # Downsample to 4x4 spatial size

        # Fully connected layers with a smaller input size
        self.lin_1 = nn.Sequential(
            nn.Linear(32 * 4 * 4, 64),  # Reduced input size
            nn.Dropout(0.5),
            nn.BatchNorm1d(64),
            nn.ReLU(),
        )
        self.lin = nn.Linear(in_features=64, out_features=38)

        # Initialize weights
        self.apply(self.init_weights)

    def init_weights(self, m):
        if isinstance(m, nn.Conv2d):
            torch.nn.init.kaiming_normal_(m.weight, a=0.1)
            if m.bias is not None:
                m.bias.data.zero_()
        elif isinstance(m, nn.Linear):
            torch.nn.init.kaiming_normal_(m.weight, a=0.1)
            m.bias.data.fill_(0.01)

    def forward(self, x):
        x = self.conv(x)
        x = self.ap(x)  # Adaptive pooling to 4x4
        x = x.view(x.size(0), -1)  # Flatten
        x = self.lin_1(x)
        x = self.lin(x)
        return x

    def predict_on_output(self, output): 
        output = nn.Softmax(dim=1)(output)
        preds = torch.argmax(output, dim=1)
        return preds


In [13]:
from torchinfo import summary
model = SmallNetwork()

In [None]:
summary(model, input_size=(129, 1, 128, 126))

Layer (type:depth-idx)                   Output Shape              Param #
SmallNetwork                             [129, 38]                 --
├─Sequential: 1-1                        [129, 32, 32, 31]         --
│    └─Conv2d: 2-1                       [129, 16, 128, 126]       160
│    └─BatchNorm2d: 2-2                  [129, 16, 128, 126]       32
│    └─ReLU: 2-3                         [129, 16, 128, 126]       --
│    └─MaxPool2d: 2-4                    [129, 16, 64, 63]         --
│    └─Conv2d: 2-5                       [129, 32, 64, 63]         4,640
│    └─BatchNorm2d: 2-6                  [129, 32, 64, 63]         64
│    └─ReLU: 2-7                         [129, 32, 64, 63]         --
│    └─MaxPool2d: 2-8                    [129, 32, 32, 31]         --
├─AdaptiveAvgPool2d: 1-2                 [129, 32, 4, 4]           --
├─Sequential: 1-3                        [129, 64]                 --
│    └─Linear: 2-9                       [129, 64]                 32,832
│    └─

: 