In [1]:
import pandas as pd 
import numpy as np 

In [2]:
df_ts = pd.read_csv(
    "../ClipSpeechSegmenter/data/ichinose_tamaki_taidan_timestamps.csv", 
    index_col=[0, 1], header=0,
)
df_ts.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Start,End,Start_seconds,Start_samples,End_seconds,End_samples
Speaker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
Ichinose,3,0:00:08.85,0:00:10.00,8.85,390285,10.0,441000
Ichinose,9,0:00:22.45,0:00:29.11,22.45,990045,29.11,1283751
Ichinose,10,0:00:29.11,0:00:33.82,29.11,1283751,33.82,1491462
Ichinose,11,0:00:33.82,0:00:36.39,33.82,1491462,36.39,1604799
Ichinose,12,0:00:36.39,0:00:38.91,36.39,1604799,38.91,1715930


## Problem statement

The data contains timestamps of subtitles. It has a hierarchical index containing the `Speaker` and each line's index in the original subtitle file. For example, the index `(Ichinose, 3)` corresponds to the 4th entry in the original subtitle file. 

There are a number of lines that are split continuously. For example, lines 10-13 can be concatenated to form one, continuous line. We want to concatenate these with rest of the data. 

The processed dataframe will be used to create segments from a corresponding audio file, which will be used to train a neural network for voice classification, but this is beyond the scope of the current notebook.

### Final implementation

In [4]:
class TruesGrouper:
    def __init__(self) -> None: 
        self.idx = pd.IndexSlice

    def select_speaker(self, df: pd.DataFrame, speaker: str) -> pd.DataFrame:
        return df.loc[self.idx[speaker, :], :]

    @staticmethod
    def get_true_masks(inds: pd.Int64Index) -> tuple[np.ndarray]:
        mask = ((inds[1:] - inds[:-1]) == 1)
        mask = np.insert(mask, 0, mask[0] == True)

        mask_1L = mask.copy()
        for i, m in enumerate(mask_1L[1:-1]):
            if m == False and mask_1L[i+2] == True:
                mask_1L[i+1] = True 
        
        return mask, mask_1L 

    @staticmethod
    def get_true_groups(mask: np.ndarray, mask_1L: np.ndarray) -> list[pd.Index]:
        trues = pd.Series(~mask).\
            cumsum().\
            mask(mask).\
            ffill().\
            mask(~mask_1L)
        
        for u in trues.unique():
            if np.isnan(u): continue 
            yield trues.loc[trues == u].index 

    def group_trues(
        self,
        df_: pd.DataFrame, 
        inds: pd.Int64Index, 
        mask: np.ndarray, 
        mask_1L: np.ndarray, 
        speaker: str
    ) -> pd.DataFrame:

        df_dict: dict[str, dict] = {col : {} for col in df_.columns}
        for grp in  self.get_true_groups(mask, mask_1L):
            if grp.shape[0] < 2: continue 
            key = inds[grp[0]]
            for col in df_dict.keys():
                if 'Start' in col:
                    df_dict[col][key] = df_.at[(speaker, key), col]
                else:
                    df_dict[col][key] = df_.at[(speaker, inds[grp[-1]]), col]
        
        return pd.DataFrame.from_dict(df_dict, orient='columns')

    def concat_trues_falses(self, df_: pd.DataFrame, trues: pd.DataFrame, mask_1L: np.ndarray) -> pd.DataFrame:
        return pd.concat(
            [df_.loc[self.idx[:, ~mask_1L], :].droplevel(0), trues], 
            axis=0
        ).sort_index()

    def group_consecutive_trues(self, df: pd.DataFrame, speaker: str) -> pd.DataFrame:
        df_ = self.select_speaker(df, speaker)
        inds = df_.index.get_level_values(level=1)
        mask, mask_1L = self.get_true_masks(inds)
        grouped = self.group_trues(df_, inds, mask, mask_1L, speaker)
        return self.concat_trues_falses(df_, grouped, mask_1L)

In [5]:
TruesGrouper().group_consecutive_trues(df_ts, 'Ichinose')

Unnamed: 0,Start,End,Start_seconds,Start_samples,End_seconds,End_samples
3,0:00:08.85,0:00:10.00,8.85,390285,10.00,441000
9,0:00:22.45,0:00:38.91,22.45,990045,38.91,1715930
14,0:00:41.20,0:00:45.60,41.20,1816920,45.60,2010960
20,0:00:59.08,0:01:03.37,59.08,2605428,63.37,2794617
24,0:01:12.22,0:01:19.48,72.22,3184902,79.48,3505068
...,...,...,...,...,...,...
1080,0:56:51.20,0:56:55.65,3411.20,150433920,3415.65,150630165
1083,0:56:59.60,0:57:06.50,3419.60,150804360,3426.50,151108650
1087,0:57:12.65,0:57:15.50,3432.65,151379865,3435.50,151505550
1089,0:57:24.40,0:57:27.30,3444.40,151898040,3447.30,152025930


### Mechanism

`mask` is a boolean array that is `True` when indices differ only by one. However, this means that the first member of a consecutive set will be lost. Thus, another array, `mask_1L`, is created in which these first members are `True`. The need for two arrays will be clear by looking at the core method, `TruesGrouper.get_true_groups`:

```python
@staticmethod
def get_true_groups(mask: np.ndarray, mask_1L: np.ndarray) -> list[pd.Index]:
    trues = pd.Series(~mask).\
        cumsum().\
        mask(mask).\
        ffill().\
        mask(~mask_1L)
    
    for u in trues.unique():
        if np.isnan(u): continue 
        yield trues.loc[trues == u].index 
```

We will first construct the two mask arrays before applying the function. 

#### Preparing masks

In [10]:
idx = pd.IndexSlice
uruha = df_ts.loc[idx["Ichinose", :], ['Start_samples', 'End_samples']]
uruha.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Start_samples,End_samples
Speaker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Ichinose,3,390285,441000
Ichinose,9,990045,1283751
Ichinose,10,1283751,1491462
Ichinose,11,1491462,1604799
Ichinose,12,1604799,1715930


In [13]:
inds = uruha.index.get_level_values(level=1)
mask = ((inds[1:] - inds[:-1]) == 1)
mask = np.insert(mask, 0, mask[0] == True)

mask_1L = mask.copy()
for i, m in enumerate(mask_1L[1:-1]):
    if m == False and mask_1L[i+2] == True:
        mask_1L[i+1] = True 

`mask` is `True` when consecutives indices differ only by one. There are two caveats, which we address above:

1. We skip the first element, which means `mask` has shape `(N-1)`, where `N` is the number of rows in the original dataframe, `uruha`. This is an extension of the point below, but we do this to `mask` so that it can have `N` rows. 
2. As mentioned above, it is `False` for the first member of a consecutive set of indices. We don't adjust `mask` itself, but a copy of it, `mask_1L`. The `_1L` is for "one left", which reflects that the first/left-most element in a consecutive set of indices will be adjusted from `False` to `True`.

The dataframe belows shows that indices 9-12 in the original dataframe are consecutive, but, in `mask`, the element for '9' is `False`, whereas the same element is `True` in `mask_1L`.

In [16]:
pd.DataFrame(dict(original=inds[:5], mask=mask[:5], mask_1L=mask_1L[:5]))

Unnamed: 0,original,mask,mask_1L
0,3,False,False
1,9,False,True
2,10,True,True
3,11,True,True
4,12,True,True


#### Assigning unique numeric values to index each consecutive set of elements

The columns of the dataframe below shows the sequence of operations used to assign unique indices to each element of each consecutive set of indices.
That was a mouthful, but, essentially:

1. Use `~mask` (the reverse of `mask`) to filter out all non-consecutive indices, as well as the first members of consecutive sets. 
2. To obtain unique numeric values for each set of consecutive indices, we use `cumsum`, ie a cumulative sum. Importantly, there is no increment between the first and last members of a consecutive set. 
3. We use `~mask_1L` to set all non-consecutive indices, but not the first members of consecutive sequences, to `NaN`. This completes the operation.

In [25]:
C = pd.Series(~mask)

sidebyside = pd.concat(
    [
        pd.Series(inds),
        C, 
        C.cumsum(),
        C.cumsum().mask(~mask_1L)
    ],
    keys=['inds', '~mask', '~mask.cumsum', '~mask.~mask_1L'],
    axis=1
)

sidebyside

Unnamed: 0,inds,~mask,~mask.cumsum,~mask.~mask_1L
0,3,True,1,
1,9,True,2,2.0
2,10,False,2,2.0
3,11,False,2,2.0
4,12,False,2,2.0
...,...,...,...,...
550,1083,True,292,292.0
551,1084,False,292,292.0
552,1087,True,293,
553,1089,True,294,


To extract groups of consecutive indices, we only need to do conditional indexing. The values are as follows:

In [31]:
U = sidebyside["~mask.~mask_1L"].dropna().unique().astype(int)
U

array([  2,   5,   6,   8,   9,  12,  18,  21,  28,  29,  30,  38,  39,
        40,  43,  46,  47,  48,  49,  52,  56,  58,  62,  64,  66,  70,
        73,  74,  79,  81,  82,  86,  88,  89,  90,  92,  99, 101, 103,
       104, 105, 109, 110, 111, 113, 114, 118, 124, 125, 129, 130, 135,
       137, 138, 142, 147, 148, 149, 151, 153, 155, 159, 164, 166, 173,
       178, 181, 182, 183, 185, 188, 189, 190, 192, 193, 194, 200, 201,
       204, 205, 208, 213, 214, 216, 217, 218, 221, 222, 225, 227, 230,
       235, 236, 237, 241, 243, 244, 249, 251, 253, 254, 255, 259, 260,
       261, 265, 266, 268, 270, 271, 272, 273, 274, 275, 280, 283, 287,
       288, 290, 291, 292])

For example, the indices in the original dataframe corresponding to the consecutive set with value `8` is:

In [38]:
pos = sidebyside.loc[sidebyside['~mask.~mask_1L'] == U[3]].index
pos 

Int64Index([12, 13, 14], dtype='int64')

Using these to index the original dataframe, we see that this corresponds to subtitles 48-50:

In [39]:
uruha.iloc[pos, :]

Unnamed: 0_level_0,Unnamed: 1_level_0,Start_samples,End_samples
Speaker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Ichinose,48,6526800,6757884
Ichinose,49,6757884,6973533
Ichinose,50,6973533,7108919
