In [13]:
import os
import pandas as pd
import json
import signal
import sys

# 全局变量用于存储处理状态和唯一值字典
state = {'processed_files': []}
unique_values_dict = {}
state_file_path = None

def save_state():
    """保存当前状态和唯一值字典到状态文件"""
    if state_file_path:
        with open(state_file_path, 'w') as f:
            json.dump({'state': state, 'unique_values_dict': unique_values_dict}, f)
        print(f"State saved to {state_file_path}")

def signal_handler(sig, frame):
    """捕获信号并保存状态和唯一值字典"""
    print('Interrupt signal received, saving state...')
    save_state()
    sys.exit(0)

def metadatasep(folder_path, folder_path_out, state_file=None):
    global state, state_file_path, unique_values_dict
    state_file_path = state_file if state_file else os.path.join(folder_path_out, 'state.json')

    # 加载状态文件，如果存在的话
    if os.path.exists(state_file_path):
        with open(state_file_path, 'r') as f:
            saved_data = json.load(f)
            state = saved_data.get('state', {'processed_files': []})
            unique_values_dict = saved_data.get('unique_values_dict', {})
    else:
        state = {'processed_files': []}

    # 获取文件夹中所有文件的列表
    files = os.listdir(folder_path)

    # 过滤列表，只包含 parquet 文件
    parquet_files = [f for f in files if f.endswith('.parquet')]

    # 仅处理尚未处理的文件
    parquet_files_to_process = [f for f in parquet_files if f not in state['processed_files']]

    try:
        for file in parquet_files_to_process:
            file_path = os.path.join(folder_path, file)
            df = pd.read_parquet(file_path)

            # 找出仅有一个唯一值的列
            unique_value_columns = [col for col in df.columns if df[col].nunique() == 1]

            if unique_value_columns:
                # 创建数据框以存储唯一值列
                unique_values_df = df[unique_value_columns].drop_duplicates().reset_index(drop=True)

                # 将数据框转换为字典并存储在唯一值字典中
                unique_values_dict[file] = unique_values_df.iloc[0].to_dict()

                # 从主数据框中删除这些列
                df = df.drop(columns=unique_value_columns)

            # 定义主数据输出 CSV 文件路径
            file_path_out = os.path.join(folder_path_out, file)
            output_file_path = os.path.splitext(file_path_out)[0] + '.parquet'

            # 将主数据框保存为 CSV
            df.to_parquet(output_file_path, index=True)
            print(f"Converted {file} to CSV (data).")

            # 更新状态文件，添加已处理文件
            state['processed_files'].append(file)

        # 合并所有唯一值字典到一个数据框中
        if unique_values_dict:
            combined_unique_values_df = pd.DataFrame.from_dict(unique_values_dict, orient='index')

            # 定义合并唯一值输出 CSV 文件路径
            combined_unique_values_csv_file_path = os.path.join(folder_path_out, 'combined_unique_values.csv')

            # 将合并唯一值数据框保存为 CSV
            combined_unique_values_df.to_csv(combined_unique_values_csv_file_path)
            print(f"Saved combined unique values to {combined_unique_values_csv_file_path}")

    except Exception as e:
        print(f"An error occurred: {e}")
        save_state()
        sys.exit(1)

    finally:
        # 程序正常结束时保存状态
        save_state()

# 设置信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

<function __main__.signal_handler(sig, frame)>

In [15]:

# Specify the path to the folder containing parquet files
folder_path = f'D:\\2min-resample'
folder_path_out =f"D:\\2min-resample\MetaDataSeparation"
# Convert all parquet files in the specified folder to CSV
metadatasep(folder_path,folder_path_out)

Converted 2ZIA54byai4uCJ_CkrdzuA==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 2ZZt6PDipUzG4ut17BV5cw==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 2_tsJqWwHHjOxik1R9Z2DA==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 3+oIb6ZmXA9wQuxlOh9_ZA==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 308k5F0DfUf6qgJS1EnJQw==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 311onSNSqZ+KynasAYkzRA==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 32jpUMlqgb8kWVy7rm3iXQ==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 34GngwJP4o8LjIVCpnwfug==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 34sql0ebUxmJkonIDzyqqw==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 363VcQXt+5PXVeYTlvUZKQ==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 37qASUINKucqkmIlyykJfw==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 38aOFeWNTTPkklG8xXQr8g==.asfreqffill.2min.snappy.parquet to CSV (data).
Converted 395_XJ