In [None]:
import polars as pl

df = pl.read_csv("中国专利转让数据库1998-2024年.csv")
print(df.shape)
df = df.filter(pl.col("申请日") < "2010-01-01")
print(df.shape)

# 只保留选中的列
df = df.select([
    "申请号",
    "历史法律状态"
])
print(f"筛选列后的形状: {df.shape}")


(4275769, 40)
(600437, 40)


In [7]:
import polars as pl

df = pl.read_csv("中国专利转让数据库1998-2024年.csv")
df = df.select([
    "申请号",
    "IPC分类号",
    "IPC主分类号",
])
df.write_parquet('ipc信息.parquet')


In [2]:
import polars as pl
import time

def expand_legal_status_native(df: pl.DataFrame) -> pl.DataFrame:
    """
    使用Polars原生操作的高性能版本
    """
    print(f"处理 {len(df):,} 行数据...")
    start_time = time.time()
    
    # 使用Polars原生字符串操作和向量化处理
    result = (
        df.with_row_index()
        # 按#分割法律状态记录
        .with_columns(
            pl.col("历史法律状态")
            .str.split("#")
            .alias("status_blocks")
        )
        # 展开每个状态块
        .explode("status_blocks")
        # 过滤空值
        .filter(pl.col("status_blocks").is_not_null() & (pl.col("status_blocks").str.len_chars() > 0))
        # 提取法律状态公告日
        .with_columns([
            pl.col("status_blocks")
            .str.extract(r"法律状态公告日：([^；\n]+)")
            .alias("法律状态公告日"),
            
            # 提取法律状态
            pl.col("status_blocks")
            .str.extract(r"法律状态：([^；\n]+)")
            .alias("当前法律状态"),
            
            # 提取描述信息
            pl.col("status_blocks")
            .str.extract(r"描述信息：([^；\n]+)")
            .alias("状态描述信息"),
            
            # 保留原始数据
            pl.col("历史法律状态").alias("原始历史法律状态")
        ])
        # 重新排列列顺序
        .select([
            "法律状态公告日",
            "当前法律状态", 
            "状态描述信息",
            "申请号",
            "原始历史法律状态"
        ])
    )
    
    total_time = time.time() - start_time
    print(f"处理完成！原始记录: {len(df):,}, 扩展后记录: {len(result):,}")
    print(f"总耗时: {total_time:.2f} 秒")
    
    return result

def expand_legal_status_alternative(df: pl.DataFrame) -> pl.DataFrame:
    """
    替代方案：使用map_elements但批量处理
    """
    print(f"处理 {len(df):,} 行数据...")
    start_time = time.time()
    
    def parse_status_batch(status_text):
        """批量解析单个状态字符串"""
        if not status_text or status_text == "":
            return [{"法律状态公告日": None, "当前法律状态": None, "状态描述信息": None}]
        
        blocks = [block.strip() for block in str(status_text).split('#') if block.strip()]
        if not blocks:
            return [{"法律状态公告日": None, "当前法律状态": None, "状态描述信息": None}]
        
        results = []
        for block in blocks:
            result = {"法律状态公告日": None, "当前法律状态": None, "状态描述信息": None}
            
            lines = block.split('\n')
            for line in lines:
                line = line.strip()
                if '法律状态公告日：' in line:
                    result["法律状态公告日"] = line.split('法律状态公告日：', 1)[1].split('；')[0].strip()
                elif '法律状态：' in line:
                    result["当前法律状态"] = line.split('法律状态：', 1)[1].split('；')[0].strip()
                elif '描述信息：' in line:
                    result["状态描述信息"] = line.split('描述信息：', 1)[1].split('；')[0].strip()
            
            results.append(result)
        
        return results
    
    # 使用map_elements进行向量化处理
    result = (
        df.with_row_index()
        .with_columns(
            pl.col("历史法律状态")
            .map_elements(parse_status_batch, return_dtype=pl.List(pl.Struct({
                "法律状态公告日": pl.Utf8,
                "当前法律状态": pl.Utf8, 
                "状态描述信息": pl.Utf8
            })))
            .alias("parsed_status")
        )
        .explode("parsed_status")
        .unnest("parsed_status")
        .with_columns(pl.col("历史法律状态").alias("原始历史法律状态"))
        .select([
            "法律状态公告日",
            "当前法律状态",
            "状态描述信息", 
            "申请号",
            "原始历史法律状态"
        ])
    )
    
    total_time = time.time() - start_time
    print(f"处理完成！原始记录: {len(df):,}, 扩展后记录: {len(result):,}")
    print(f"总耗时: {total_time:.2f} 秒")
    
    return result

print("已加载Polars原生优化版本")

已加载Polars原生优化版本


In [3]:
expanded_df = expand_legal_status_native(df)
# 使用Polars原生操作版本（推荐）


# 如果上面的版本有问题，可以尝试替代方案
# expanded_df = expand_legal_status_alternative(df)

# expanded_df = expanded_df.unique()
# expanded_df.write_parquet("expanded_df.parquet")

处理 600,437 行数据...
处理完成！原始记录: 600,437, 扩展后记录: 3,116,810
总耗时: 3.96 秒


In [None]:
expanded_df = expanded_df.unique()
expanded_df["当前法律状态"].value_counts().write_csv("当前法律状态.csv")
expanded_df.write_parquet("expanded_df2.parquet")

In [2]:
import polars as pl
import time
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
import os

def expand_legal_status_optimized(df: pl.DataFrame) -> pl.DataFrame:
    """
    优化版本：利用Polars并行特性和分块处理
    """
    print(f"处理 {len(df):,} 行数据...")
    start_time = time.time()
    
    # 优化策略1: 使用更高效的字符串操作
    # 避免复杂正则表达式，使用简单字符串分割
    result = (
        df.with_row_index()
        # 预过滤空值和无效数据
        .filter(
            pl.col("历史法律状态").is_not_null() & 
            (pl.col("历史法律状态").str.len_chars() > 10)
        )
        # 使用更高效的字符串分割
        .with_columns(
            pl.col("历史法律状态")
            .str.replace_all(r"\s+", " ")  # 标准化空白字符
            .str.split("#")
            .alias("status_blocks")
        )
        # 展开状态块
        .explode("status_blocks")
        # 过滤空块
        .filter(
            pl.col("status_blocks").is_not_null() & 
            (pl.col("status_blocks").str.len_chars() > 5)
        )
        # 并行提取字段 - 避免复杂正则，使用简单字符串操作
        .with_columns([
            # 提取公告日 - 使用字符串分割而非正则
            pl.col("status_blocks")
            .str.split("法律状态公告日：").list.get(1, null_on_oob=True)
            .str.split("；").list.get(0, null_on_oob=True)
            .str.strip_chars()
            .alias("法律状态公告日"),
            
            # 提取法律状态
            pl.col("status_blocks")
            .str.split("法律状态：").list.get(1, null_on_oob=True)
            .str.split("；").list.get(0, null_on_oob=True) 
            .str.strip_chars()
            .alias("当前法律状态"),
            
            # 提取描述信息
            pl.col("status_blocks")
            .str.split("描述信息：").list.get(1, null_on_oob=True)
            .str.split("；").list.get(0, null_on_oob=True)
            .str.strip_chars() 
            .alias("状态描述信息"),
            
            pl.col("历史法律状态").alias("原始历史法律状态")
        ])
        # 过滤有效记录
        .filter(
            pl.col("法律状态公告日").is_not_null() |
            pl.col("当前法律状态").is_not_null() |
            pl.col("状态描述信息").is_not_null()
        )
        # 选择最终列
        .select([
            "法律状态公告日",
            "当前法律状态", 
            "状态描述信息",
            "申请号",
            "原始历史法律状态"
        ])
    )
    
    total_time = time.time() - start_time
    print(f"优化版本处理完成！原始记录: {len(df):,}, 扩展后记录: {len(result):,}")
    print(f"总耗时: {total_time:.2f} 秒")
    
    return result

def process_chunk(chunk_data):
    """处理单个数据块的函数"""
    chunk_df, chunk_id = chunk_data
    print(f"处理块 {chunk_id}, 大小: {len(chunk_df):,}")
    return expand_legal_status_optimized(chunk_df)

def expand_legal_status_parallel(df: pl.DataFrame, n_chunks: int = None) -> pl.DataFrame:
    """
    并行分块处理版本
    """
    if n_chunks is None:
        n_chunks = max(1, mp.cpu_count() - 1)  # 保留一个核心
    
    print(f"使用 {n_chunks} 个进程进行并行处理")
    print(f"处理 {len(df):,} 行数据...")
    start_time = time.time()
    
    # 计算每块大小
    chunk_size = len(df) // n_chunks
    if chunk_size == 0:
        return expand_legal_status_optimized(df)
    
    # 分割数据
    chunks = []
    for i in range(n_chunks):
        start_idx = i * chunk_size
        if i == n_chunks - 1:  # 最后一块包含剩余所有数据
            end_idx = len(df)
        else:
            end_idx = (i + 1) * chunk_size
        
        chunk = df.slice(start_idx, end_idx - start_idx)
        chunks.append((chunk, i + 1))
    
    # 并行处理
    results = []
    with ProcessPoolExecutor(max_workers=n_chunks) as executor:
        future_to_chunk = {executor.submit(process_chunk, chunk): chunk[1] 
                          for chunk in chunks}
        
        for future in as_completed(future_to_chunk):
            chunk_id = future_to_chunk[future]
            try:
                result = future.result()
                results.append(result)
                print(f"块 {chunk_id} 处理完成")
            except Exception as exc:
                print(f"块 {chunk_id} 处理失败: {exc}")
    
    # 合并结果
    print("合并处理结果...")
    final_result = pl.concat(results)
    
    total_time = time.time() - start_time
    print(f"并行处理完成！原始记录: {len(df):,}, 扩展后记录: {len(final_result):,}")
    print(f"总耗时: {total_time:.2f} 秒")
    print(f"相比原版本预计提升: ~{24.36/total_time:.1f}x")
    
    return final_result

print("已加载优化版本函数")

已加载优化版本函数


In [3]:
# 性能测试和对比
print("=== 性能对比测试 ===")
print(f"系统CPU核心数: {mp.cpu_count()}")
print(f"可用于并行的核心数: {mp.cpu_count() - 1}")

# 使用小样本先测试优化版本
test_sample = df.sample(n=10000, seed=42)
print(f"\n测试样本大小: {len(test_sample):,} 行")

# 测试优化版本（单线程）
print("\n--- 测试优化版本（单线程）---")
optimized_result = expand_legal_status_optimized(test_sample)

# 测试并行版本
print("\n--- 测试并行版本 ---")
if __name__ == '__main__':  # 避免在Jupyter中的多进程问题
    # 在实际使用时取消注释下面的行
    # parallel_result = expand_legal_status_parallel(test_sample, n_chunks=4)
    print("并行版本需要在Python脚本中运行，Jupyter环境下可能有多进程限制")

print("\n=== 优化建议 ===")
print("1. 字符串分割替代正则表达式 - 性能提升显著")
print("2. 预过滤无效数据 - 减少处理量")
print("3. 并行分块处理 - 充分利用多核CPU")
print("4. 内存优化 - 避免不必要的数据复制")

=== 性能对比测试 ===
系统CPU核心数: 20
可用于并行的核心数: 19

测试样本大小: 10,000 行

--- 测试优化版本（单线程）---
处理 10,000 行数据...
优化版本处理完成！原始记录: 10,000, 扩展后记录: 41,561
总耗时: 0.09 秒

--- 测试并行版本 ---
并行版本需要在Python脚本中运行，Jupyter环境下可能有多进程限制

=== 优化建议 ===
1. 字符串分割替代正则表达式 - 性能提升显著
2. 预过滤无效数据 - 减少处理量
3. 并行分块处理 - 充分利用多核CPU
4. 内存优化 - 避免不必要的数据复制


In [None]:
# 应用优化版本处理完整数据集
print("=== 使用优化版本处理完整数据集 ===")

# 使用优化版本替代原来的方法
expanded_df_optimized = expand_legal_status_optimized(df)

# 去重并保存
expanded_df_optimized = expanded_df_optimized.unique()
print(f"去重后记录数: {len(expanded_df_optimized):,}")

# 保存优化版本结果
expanded_df_optimized.write_parquet("expanded_df_optimized.parquet")
print("优化版本结果已保存至 expanded_df_optimized.parquet")

# 数据质量检查
print("\n=== 数据质量检查 ===")
print("非空值统计:")
for col in ["法律状态公告日", "当前法律状态", "状态描述信息"]:
    non_null_count = expanded_df_optimized.filter(pl.col(col).is_not_null()).height
    print(f"  {col}: {non_null_count:,} ({non_null_count/len(expanded_df_optimized)*100:.1f}%)")

# 显示样本数据
print("\n前5条处理结果:")
print(expanded_df_optimized.head().to_pandas())

=== 使用优化版本处理完整数据集 ===
处理 4,275,769 行数据...
