<a href="https://colab.research.google.com/github/LC1332/Speaker-Grouping/blob/main/notebook/%E6%9E%84%E9%80%A0%E5%A2%9E%E9%87%8F%E8%81%9A%E7%B1%BB%E7%9A%84%E5%90%8E%E5%8F%B0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

- [x] 构造parquet数据，一个增量，一个有待标注（抹掉speaker信息）
- [x] 编写load函数
- [ ] 编写get_current_table
- [ ] （大任务） 编写compute_speaker
- [ ] 编写label_row
- [ ] 增加一个index 再增加一个单纯用speaker做的

这里我们有一个VideoData的类，他里面实际上核心的数据是一个表格

# 需求

### __init__( inference_table, folder [, previous_tables ] )
见load

### load( inference_table, folder [, previous_tables ] )
inference_table 是当前有待标注的parquet， 包含所有的音频、视频、以及对应的特征信息
folder, 音频和视频的存储目录
previous_tables 当标注连续剧时，可以载入一个list of filename， 每一个是对应的之前标注的parquet

### get_current_table( )
获得当前 需要显示在标注工具左侧的table
- 如果从来没有自动计算过speaker，这个时候会调用compute_speaker() 来进行计算

### compute_spearker( )
对整体的表格重新计算 自动推荐的speaker标注
-  如果有previous_tables， 会优先考虑标注为 之前的speaker，当然对于陌生的新人，也有概率自动聚类为speakerX
- 如果没有previous_tables，都会自动聚类为speakerX

### label_row( index, speaker ) 方法
将index对应的行标记为speaker
并且更新index相关的M近邻的自动标注信息
返回整个更新后的表格

### label_rows( indexes , speakers )
将indexes对应的行进行标记
并且更新indexes的M近邻的并集 减去 indexes 之后行的自动标注信息
返回更新后的整个表格

### get_row_default( index )
给出当前行的speaker的自动标注进行返回

### get_image_fname( index )
给出index对应图片的绝对目录

### get_audio_fname( index )
给出index对应音频的绝对目录

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# 实现增广并查集

```python
class UnionFind:
    def __init__(self, size):
        self.root = list(range(size))
        self.rank = [1] * size

    def find(self, x):
        if self.root[x] != x:
            self.root[x] = self.find(self.root[x])  # Path compression
        return self.root[x]

    def union(self, x, y):
        rootX = self.find(x)
        rootY = self.find(y)

        if rootX != rootY:
            if self.rank[rootX] > self.rank[rootY]:
                self.root[rootY] = rootX
            elif self.rank[rootX] < self.rank[rootY]:
                self.root[rootX] = rootY
            else:
                self.root[rootY] = rootX
                self.rank[rootX] += 1

    def connected(self, x, y):
        return self.find(x) == self.find(y)
```

我希望在这个基础上实现一个增广的并查集类，因为我现在合并的时候是使用cosine similarity合并的，所以我希望把union函数 增广为 union( x,y, similarity )

并且root上会额外记录最后一次合并的similarity，可以使用get_last_similarity( x )  查询x对应的类最后一次合并的similarity

另外增加一个set_root_name 的功能， 支持对某个group对应的类进行命名

也有get_root_name(x) 这样的功能，查询x对应的root的name
所有的名字一开始初始化为default

In [3]:
class AugmentedUnionFind:
    def __init__(self, size):
        self.root = list(range(size))
        self.rank = [1] * size
        self.size = [1] * size  # Each component initially has one element
        self.last_similarity = [-1] * size  # Initialize with -1 or any default value indicating no merge yet
        self.root_name = ['default'] * size  # Initialize names as 'default'

    def find(self, x):
        if self.root[x] != x:
            self.root[x] = self.find(self.root[x])  # Path compression
        return self.root[x]

    def union(self, x, y, similarity):
        rootX = self.find(x)
        rootY = self.find(y)

        if rootX != rootY:
            if self.rank[rootX] > self.rank[rootY]:
                self.root[rootY] = rootX
                self.last_similarity[rootX] = similarity  # Store the similarity of the last merge
                self.size[rootX] += self.size[rootY]  # Update the size of the new root
            elif self.rank[rootX] < self.rank[rootY]:
                self.root[rootX] = rootY
                self.last_similarity[rootY] = similarity  # Store the similarity of the last merge
                self.size[rootY] += self.size[rootX]  # Update the size of the new root
            else:
                self.root[rootY] = rootX
                self.rank[rootX] += 1
                self.last_similarity[rootX] = similarity  # Store the similarity of the last merge
                self.size[rootX] += self.size[rootY]  # Update the size of the new root

        if self.get_root_name( rootX ) == "default" and self.get_root_name( rootY ) != "default":
            self.set_root_name( rootX, self.get_root_name( rootY ) )

    def connected(self, x, y):
        return self.find(x) == self.find(y)

    def get_last_similarity(self, x):
        rootX = self.find(x)
        return self.last_similarity[rootX]

    def set_root_name(self, x, name):
        rootX = self.find(x)
        self.root_name[rootX] = name

    def get_root_name(self, x):
        rootX = self.find(x)
        return self.root_name[rootX]

    def get_size(self, x):
        rootX = self.find(x)
        return self.size[rootX]

# Example usage
uf = AugmentedUnionFind(10)
uf.union(1, 2, 0.9)
uf.union(2, 5, 0.85)
uf.union(5, 6, 0.88)
uf.set_root_name(1, "Group1")

print(uf.get_last_similarity(1))  # Output: 0.88
print(uf.get_root_name(1))  # Output: "Group1"
print(uf.get_size(1))  # Output: 4 (since elements 1, 2, 5, 6 are in the same component)


0.88
Group1
4


构造测试数据

In [4]:
previous_tables = [
    '/content/drive/MyDrive/Speaker/feature/liangjian_10_feature.parquet',
    '/content/drive/MyDrive/Speaker/feature/亮剑12.parquet',
    '/content/drive/MyDrive/Speaker/feature/亮剑13.parquet']

inference_table = '/content/drive/MyDrive/Speaker/feature/亮剑20.parquet'

# 这里我们需要把inference_table copy到/content/ 然后把 "人物" 这一列全部变为none (na)，用pandas读取之后清空后保存到/content

import os
import pandas as pd
import shutil

def remove_names(original_path):
    # Extract the filename from the original path
    filename = os.path.basename(original_path)
    # Construct the new path under /content/
    destination_path = os.path.join('/content/', filename)

    # Copy the file from the original location to /content/
    shutil.copy(original_path, destination_path)

    # Read the parquet file using pandas
    df = pd.read_parquet(destination_path)

    # Replace the '人物' column with None (NA)
    df['人物'] = None

    # Save the modified DataFrame back to the new path
    df.to_parquet(destination_path)

    return destination_path

ground_truth_table = inference_table
inference_table = remove_names('/content/drive/MyDrive/Speaker/feature/亮剑20.parquet')
print(f"Modified file saved to: {inference_table}")

Modified file saved to: /content/亮剑20.parquet


In [12]:
def clean_name(text):
    # Split the text at the first underscore
    parts = text.split('_', 1)
    # If an underscore was found, return the part before it; otherwise, return the entire text
    return parts[0]

# Example usage:
sample_text = "example_name_with_underscores"
print(clean_name(sample_text))  # Output: 'example'

sample_text_no_underscore = "exampleName"
print(clean_name(sample_text_no_underscore))  # Output: 'exampleName'


example
exampleName


In [29]:
import pandas as pd
import os
from sklearn.neighbors import NearestNeighbors
import numpy as np


class VideoData:
    def __init__(self, inference_table, folder, previous_tables=[]):

        self.load_inference(inference_table, folder)
        self.load_previous_tables(previous_tables)

        self.M = 5
        self.minimal_self_group_size = 3
        self.name_epsilon = 0.1
        self.stop_threshold = 0.45

    def load_inference(self, inference_table, folder ):

        self.fname_table = inference_table

        # Load the inference table assuming it's already processed and path updated
        self.table = pd.read_parquet(self.fname_table)

        self.folder = folder

        print(f"Inference table loaded from {self.fname_table}")

    def load_previous_tables(self, previous_tables ):
        self.previous_table_fnames = previous_tables
        self.merge_and_clean( previous_tables )

    def merge_and_clean(self, previous_table_fnames):
        # Concatenate all tables if there are previous tables
        if len(previous_table_fnames) == 0:
            self.previous_data = None
            return

        previous_tables = [pd.read_parquet(fname) for fname in previous_table_fnames]

        all_data = pd.concat(previous_tables, ignore_index=True)

        # Clean data: remove rows where '人物' is NA or 'audio_feature' length is 0
        all_data.dropna(subset=['人物'], inplace=True)
        all_data = all_data[all_data['audio_feature'].apply(len) != 0]

        # Assign the cleaned data back to self.table
        self.previous_data = all_data
        print("Data merged and cleaned. Rows with empty '人物' or 'audio_feature' removed.")

    def compute_speaker(self):
        self.build_edge_map()
        self.sort_edges()
        self.group_all()
        self.append_similarity()
        self.compute_knn_result()

    def compute_knn_result(self):
        self.table['knn_speaker'] = None
        self.table['knn_similarity'] = None
        for index, edges in self.candidate_edges_on_previous.items():
            if edges:  # Ensure there is at least one edge to process
                top_match_index, top_match_similarity = edges[0]
                speaker_name = self.previous_data.iloc[top_match_index]['人物']
                self.table.at[index, 'knn_speaker'] = speaker_name
                self.table.at[index, 'knn_similarity'] = 1 - top_match_similarity
        print("KNN speakers and similarities computed.")


    def build_edge_map(self):
        # Extract and normalize audio features from self.table
        audio_features_self = np.stack(self.table['audio_feature'])
        norms_self = np.linalg.norm(audio_features_self, axis=1, keepdims=True)
        normalized_audio_features_self = audio_features_self / norms_self

        # Fit and query NearestNeighbors for self.table
        knn_self = NearestNeighbors(n_neighbors=self.M + 1, metric='cosine')
        knn_self.fit(normalized_audio_features_self)
        distances_self, indices_self = knn_self.kneighbors(normalized_audio_features_self)

        # Store the indices and distances for self, ignoring the point itself in the indices
        self.candidate_edges_on_self = {i: list(zip(indices_self[i][1:self.M+1], distances_self[i][1:self.M+1])) for i in range(len(self.table))}

        if self.previous_data is not None:
            # Extract and normalize audio features from self.previous_data
            audio_features_previous = np.stack(self.previous_data['audio_feature'])
            norms_previous = np.linalg.norm(audio_features_previous, axis=1, keepdims=True)
            normalized_audio_features_previous = audio_features_previous / norms_previous

            # Fit and query NearestNeighbors for self.previous_data
            knn_previous = NearestNeighbors(n_neighbors=self.M, metric='cosine')
            knn_previous.fit(normalized_audio_features_previous)
            distances_previous, indices_previous = knn_previous.kneighbors(normalized_audio_features_self)

            # Store the indices and distances for previous data
            self.candidate_edges_on_previous = {i: list(zip(indices_previous[i], distances_previous[i])) for i in range(len(self.table))}

        print("Edge maps built for self and previous data with distances included.")

    def append_similarity(self):
        # Extract the list of unique speakers from previous_data
        if self.previous_data is not None:
            speakers_set = set(self.previous_data['人物'])
        else:
            speakers_set = set()

        # Iterate over each row in self.table to append similarity to the 'estimated_speaker' column
        for idx, row in self.table.iterrows():
            estimated_speaker = row['estimated_speaker']
            if estimated_speaker not in speakers_set:
                # If the estimated speaker is not in the speakers list, continue to next row
                continue

            # Find the first match from self.candidate_edges_on_previous that corresponds to this speaker
            for edge in self.candidate_edges_on_previous.get(idx, []):
                target_index, cosine_distance = edge
                target_speaker = self.previous_data.iloc[target_index]['人物']
                if target_speaker == estimated_speaker:
                    # Calculate the cosine similarity and format it
                    cosine_similarity = 1 - cosine_distance
                    formatted_similarity = "{:.2f}".format(cosine_similarity)
                    self.table.at[idx, 'estimated_speaker'] = f"{estimated_speaker}_{formatted_similarity}"
                    break
            else:
                # If no matching speaker was found, set similarity to 0
                self.table.at[idx, 'estimated_speaker'] = f"{estimated_speaker}_0.00"

        print("Similarity values appended to the estimated speakers.")


    def sort_edges(self):
        # Aggregate and sort edges
        all_edges = []

        # Process edges on self
        for source_index, edges in self.candidate_edges_on_self.items():
            all_edges.extend((source_index, "self", target_index, 1 - dist) for target_index, dist in edges)

        # Process edges on previous if available
        if self.previous_data is not None:
            for source_index, edges in self.candidate_edges_on_previous.items():
                all_edges.extend((source_index, "previous", target_index, 1 - dist) for target_index, dist in edges)

        # Sort edges by similarity (1 - distance) in descending order
        self.sorted_edges = sorted(all_edges, key=lambda x: x[3], reverse=True)
        print("Edges sorted by descending similarity.")

    def group_all(self):
        # Initialize the union-find structure for the size of the table
        uf = AugmentedUnionFind(len(self.table))
        self.table['estimated_speaker'] = None

        # If there are marked '人物' (speaker) values, copy them to 'estimated_speaker'
        if '人物' in self.table.columns:
            self.table['estimated_speaker'] = self.table['人物']

        # Initialize undeal_count and visited list
        undeal_count = len(self.table) - self.table['estimated_speaker'].count()
        visited = [False] * len(self.table)

        count = 0
        count_empty_merge = 0

        # Process each edge in the sorted list
        for source_index, table_type, target_index, similarity in self.sorted_edges:
            if visited[source_index]:
                continue

            if similarity < self.stop_threshold:
                break

            # Check if the node has been dealt with
            root_name = uf.get_root_name(source_index)


            if table_type == "self":
                target_name = uf.get_root_name(target_index)
                if root_name == "default" or target_name == "default" or root_name == target_name:
                    # 这个时候要检查target是不是有root_name
                    uf.union(source_index, target_index, similarity)

                if root_name == "default" and target_name == "default":
                    count_empty_merge += 1

            elif table_type == "previous":
                target_name = self.previous_data.iloc[target_index]['人物']
                last_similarity = uf.get_last_similarity(source_index)

                if similarity >= last_similarity - self.name_epsilon or uf.get_size(source_index) < self.minimal_self_group_size:
                    if root_name == "default":
                        uf.set_root_name(source_index, target_name)

            root_name = uf.get_root_name(source_index)

            if root_name != 'default':
                self.table.at[source_index, 'estimated_speaker'] = root_name

            # continue

            new_size = uf.get_size(source_index)

            if not visited[source_index] and (root_name != "default" or  new_size >= self.minimal_self_group_size):
                visited[source_index] = True
                undeal_count -= 1

            count += 1

            if undeal_count == 0:
                break

        print("无名合并次数", count_empty_merge)
        print("合并次数",count)

        # Final pass to ensure all entries have correct labels
        for i in range(len(self.table)):
            root_name = uf.get_root_name(i)
            if root_name != 'default':
                self.table.at[i, 'estimated_speaker'] = root_name
            else:
                root_id = uf.find(i)
                self.table.at[i, 'estimated_speaker'] = str(root_id)

        print("All groups computed and speakers estimated.")

    # Add this method in the class where you define the rest of your methods.

    def get_current_table(self):
        # Check if 'estimated_speaker' column exists, compute if not
        if 'estimated_speaker' not in self.table.columns:
            self.compute_speaker()

        # Check and select the necessary columns to be returned
        required_columns = ['knn_speaker', 'knn_similarity', 'estimated_speaker', '人物', '人物台词', '开始时间']
        # Ensure that all required columns exist in the DataFrame
        available_columns = [col for col in required_columns if col in self.table.columns]

        # Assemble 'knn_result' from 'knn_speaker' and 'knn_similarity'
        self.table['knn_result'] = self.table['knn_speaker'].astype(str) + "_" + self.table['knn_similarity'].apply(lambda x: f"{x:.2f}")

        # Reset the index to add it as a column in the DataFrame
        result_table = self.table[available_columns + ['knn_result']].reset_index()
        # Rename 'index' column to something more descriptive if desired, e.g., 'Row Index'
        result_table.rename(columns={'index': 'Row Index'}, inplace=True)

        # Reset the index to add it as a column in the DataFrame
        result_table = self.table[['knn_result'] + available_columns].reset_index()
        # Drop 'knn_speaker' and 'knn_similarity' from the result table
        result_table.drop(columns=['knn_speaker', 'knn_similarity'], inplace=True)

        # Rename 'index' column to something more descriptive if desired, e.g., 'Row Index'
        result_table.rename(columns={'index': 'Row Index'}, inplace=True)

        # Move 'Row Index' to be the last column if you want it at the end instead of the beginning
        cols = result_table.columns.tolist()  # Convert columns to list
        # Ensure 'knn_result' is the first column
        cols = [cols[1]] + cols[2:] + [cols[0]]  # Skip 'Row Index' and append it at the end
        result_table = result_table[cols]

        # Return the DataFrame with the modified columns
        return result_table

    def label_row(self, index, speaker, if_return = True):
        # Update the specified row's speaker
        self.table.at[index, '人物'] = speaker

        # Extract the audio feature for the specified index row
        audio_feature_index = np.array(self.table.at[index, 'audio_feature'])

        # Normalize the audio feature vector for cosine similarity calculation
        norm_index = np.linalg.norm(audio_feature_index)
        if norm_index > 0:
            audio_feature_index /= norm_index

        # Iterate over all rows except the index row itself
        for idx, row in self.table.iterrows():
            if idx == index:
                continue

            # Extract and normalize the audio feature of the current row
            audio_feature_current = np.array(row['audio_feature'])
            norm_current = np.linalg.norm(audio_feature_current)
            if norm_current > 0:
                audio_feature_current /= norm_current

            # Calculate cosine similarity
            similarity = np.dot(audio_feature_index, audio_feature_current)

            # Check if this similarity is greater than the existing knn_similarity
            if similarity > row['knn_similarity']:
                self.table.at[idx, 'knn_speaker'] = speaker
                self.table.at[idx, 'knn_similarity'] = similarity

                # Assemble 'knn_result' from 'knn_speaker' and 'knn_similarity'
                self.table.at[idx, 'estimated_speaker'] = speaker + "_" + f"%.2f" % similarity

        if if_return:
            # Return the updated table
            return self.get_current_table()
        else:
            return None

    def label_rows(self, indexes, speakers ):
        # 这里有点懒了直接批量调用
        for index, speaker in zip(indexes, speakers):
            self.label_row(index, speaker, False)

        return self.get_current_table()


# Usage example
folder_path = None
inference_table_path = '/content/亮剑20.parquet'
previous_tables_paths = [
    '/content/drive/MyDrive/Speaker/feature/亮剑12.parquet',
    '/content/drive/MyDrive/Speaker/feature/亮剑13.parquet'
]

video_data = VideoData(inference_table_path, folder_path, previous_tables_paths)
print(video_data.previous_data.info())

video_data.compute_speaker()

print(video_data.candidate_edges_on_self[5])
print(video_data.candidate_edges_on_previous[5])

print(video_data.sorted_edges[:15])


vis_table = video_data.get_current_table()
print(vis_table)

# Specify the path where you want to save the CSV file
output_path = '/content/vis_table.csv'

vis_table = video_data.label_row(0,"赵刚")
vis_table = video_data.label_row(1,"赵刚")
vis_table = video_data.label_row(2,"赵刚")
vis_table = video_data.label_row(3,"赵刚")

vis_table = video_data.label_row(139,"罗主任")
vis_table = video_data.label_row(146,"雨田")

# Save the DataFrame to CSV
vis_table.to_csv(output_path, index=False)  # Set index=False to not include row indices in the CSV file

print(f"Table saved to {output_path}")




Inference table loaded from /content/亮剑20.parquet
Data merged and cleaned. Rows with empty '人物' or 'audio_feature' removed.
<class 'pandas.core.frame.DataFrame'>
Index: 1422 entries, 0 to 1859
Data columns (total 8 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   人物               1422 non-null   object
 1   人物台词             1422 non-null   object
 2   开始时间             1422 non-null   object
 3   结束时间             1422 non-null   object
 4   audio_file       1422 non-null   object
 5   screenshot_file  1422 non-null   object
 6   visual_feature   1422 non-null   object
 7   audio_feature    1422 non-null   object
dtypes: object(8)
memory usage: 100.0+ KB
None
Edge maps built for self and previous data with distances included.
Edges sorted by descending similarity.
无名合并次数 878
合并次数 1017
All groups computed and speakers estimated.
Similarity values appended to the estimated speakers.
KNN speakers and similarities computed.
[(6, 0.45110

我把边表（这里包括了待标注剧集自己到自己 和  自己到之前有标注的spearker）排序后 依次合并

- 如果合并两个没有标注的数据，则合并
- 如果合并一个有名字的group，一个没有名字的group
    - 如果没有名字的group大于等于3个数据 则不合并，保留新group
    - 如果没有名字的group小于3个数据，则合并
- 循环直到所有数据都加入至少3个数据的group或者被标记上名字

In [11]:
print(video_data.inverse_edge_map[5:10])

[[6, 62], [2, 5], [14, 40], [4, 10, 12, 15, 24, 43], [47, 61, 72]]
