In [2]:

# 打开数据库
# 导入wx zfb# requirements
# pip install rich
# pip install tabulate
# 跳过列
# 某些列格式处理 错误时间列

import csv
import sqlite3
import re


# 核心方法
# region
def import_csv_to_sqlite(csv_file_path, db_path, table_name, strategy, header_row=0, start_row=0, skip_columns=None, params=None):
    """
    导入CSV文件到SQLite数据库，并根据策略执行相应的SQL
    :param csv_file_path: CSV文件路径
    :param db_path: 数据库文件路径
    :param table_name: 临时表名
    :param strategy: 导入策略 ('refund', 'income', 'expense', 'transfer', 'other')
    :param header_row: 表头所在的行号（从0开始）
    :param start_row: 数据开始的行号（从0开始）
    :param skip_columns: 要跳过的列索引列表
    :param params: 策略所需的额外参数，如 monthyear 等
    """
    if skip_columns is None:
        skip_columns = []
    if params is None:
        params = {}

    try:
        encodings = ['gb2312', 'gbk', 'gb18030', 'utf-8']
        # encodings = ['utf-8']

        for encoding in encodings:
            try:
                conn = sqlite3.connect(db_path)
                cursor = conn.cursor()

                # 首先导入CSV到临时表
                with open(csv_file_path, 'r', encoding=encoding) as csvfile:
                    csv_reader = csv.reader(csvfile)

                    # 跳过header_row之前的行
                    for _ in range(header_row):
                        next(csv_reader)

                    # 读取表头行
                    headers = next(csv_reader)

                    # 过滤掉要跳过的列
                    filtered_headers = [h for i, h in enumerate(headers) if i not in skip_columns]
                    cleaned_headers = [sanitize_column_name(header) for header in filtered_headers]

                    # 创建临时表
                    strategy_table_name = f"t_{table_name}_{strategy}"

                    # 使用该方法
                    if rename_existing_table(cursor, strategy_table_name):
                        print(f"表 {table_name} 已存在")
                    else:
                        print(f"表 {table_name} 不存在或重命名失败")

                        # 创建临时表
                        create_strategy_table_query = f"""
                            CREATE TABLE IF NOT EXISTS {strategy_table_name} (
                                {', '.join([f'[{col}] {get_column_type(col)}' for col in cleaned_headers])}
                            )
                            """
                        cursor.execute(create_strategy_table_query)

                        # 跳过start_row之前的行
                        remaining_rows_to_skip = start_row - (header_row + 1)
                        if remaining_rows_to_skip > 0:
                            for _ in range(remaining_rows_to_skip):
                                next(csv_reader)

                        # 插入数据到临时表
                        for row in csv_reader:
                            if not row:  # 跳过空行
                                continue

                            filtered_row = []

                            for i, value in enumerate(row):
                                if i not in skip_columns:
                                    # 如果对应的header是金额列，进行数据清理
                                    if i < len(cleaned_headers) and is_money_column(cleaned_headers[i]):
                                        # print(cleaned_headers[i])
                                        filtered_row.append(clean_money_value(value))
                                    else:
                                        (filtered_row).append(value)

                            if len(filtered_row) == len(cleaned_headers):
                                placeholders = ','.join(['?' for _ in filtered_row])
                                insert_query = f"INSERT INTO {strategy_table_name} VALUES ({placeholders})"
                                try:
                                    cursor.execute(insert_query, filtered_row)
                                except Exception as e:
                                    print(f"插入失败: {e}")
                                    print(f"插入的数据: {filtered_row}")

                # 根据策略执行相应的SQL将数据导入正式表
                create_table_query = sql_createScheme_payMonthYear(table_name)
                cursor.execute(create_table_query)

                if strategy == 'wx':
                    # 微信策略SQL

                    # 导入消费
                    sql1 = f"""
                    INSERT INTO [{table_name}] (id, pay_time, pay_monthyear, pay_source, pay_note, pay_money, pay_tag, app_source)
                         SELECT
                         strftime('%s',[交易时间]) as id,
                         [交易时间] as pay_time,
                         '{params.get('monthyear')}' as pay_monthyear,
                         [交易对方] as pay_source,
                         [商品] as pay_note,
                         cast([金额元] as real)  as pay_money,
                         'N' as 'pay_tag',
                         'wx' as 'app_source'
                         FROM '{strategy_table_name}' where [收支]='支出'
                    """

                    # 导入收入-红包退款-红包转账
                    sql2 = f"""
                    INSERT INTO [{table_name}] (id, pay_time, pay_monthyear, pay_source, pay_note, pay_money, pay_tag, app_source)
                         SELECT
                         strftime('%s',[交易时间]) as id,
                         [交易时间] as pay_time,
                         '{params.get('monthyear')}' as pay_monthyear,
                         [交易对方] as pay_source,
                         ([商品] || [当前状态]) as pay_note,
                         cast(-[金额元] as real)  as pay_money, 
                         'N' as 'pay_tag',
                         'wx' as 'app_source'
                         FROM '{strategy_table_name}' where [收支]='收入'
                    """
                    cursor.execute(sql1)
                    cursor.execute(sql2)
                    # pass

                elif strategy == 'zfb':
                    # 支付宝策略SQL

                    # 导入消费
                    sql3 = f"""
                    INSERT INTO [{table_name}] (id, pay_time, pay_monthyear, pay_source, pay_note, pay_money, pay_tag, app_source) 
                         SELECT
                         strftime('%s',[交易创建时间]) as id,
                         [交易创建时间] as pay_time,
                         '{params.get('monthyear')}' as pay_monthyear,
                         [交易对方] as pay_source,
                         [商品名称] as pay_note,
                         [金额元] as pay_money,
                         'N' as 'pay_tag',
                         'ali' as 'app_source'
                         FROM '{strategy_table_name}'  where [收支] like '支出%'  and [交易状态] like '交易成功%' and [成功退款元]=0
                    """

                    # 导入退款
                    sql4 = f"""
                    INSERT INTO [{table_name}] (id, pay_time, pay_monthyear, pay_source, pay_note, pay_money, pay_tag, app_source) 
                         SELECT
                         strftime('%s',[交易创建时间]) as id,
                         [交易创建时间] as pay_time,
                         '{params.get('monthyear')}' as pay_monthyear,
                         [交易对方] as pay_source, 
                         [商品名称] as pay_note, 
                         [金额元]-[成功退款元]  as pay_money,
                         'N' as 'pay_tag',
                         'ali' as 'app_source'
                         FROM '{strategy_table_name}'  where  [交易状态] like '交易成功%' and [成功退款元]>0
                    """
                    cursor.execute(sql3)
                    cursor.execute(sql4)
                    # pass

                elif strategy == 'yh':
                    # 银行信用卡策略SQL
                    # 在这里添加支出策略的SQL
                    pass

                elif strategy == 'other':
                    # 其他SQL
                    # 在这里添加转账策略的SQL
                    pass

                # 提交事务
                conn.commit()

                # 删除临时表
                # cursor.execute(f"DROP TABLE IF EXISTS {strategy_table_name}")
                # conn.commit()

                print(f"成功使用 {encoding} 编码导入CSV并执行{strategy}策略")
                return

            except UnicodeDecodeError:
                conn.close()
                continue
            except sqlite3.Error as e:
                print(f"SQLite错误: {e}")
                conn.close()

        print("无法使用指定的任何编码解码文件")

    except Exception as e:
        print(f"发生错误: {e}")
    finally:
        if 'conn' in locals():
            conn.close()
# endregion


# 工具方法和策略
# region
def sanitize_column_name(name):
    # 清理列名，确保符合SQLite标识符规范
    # 删除非字母数字字符，确保以字母开头
    cleaned_name = re.sub(r'\W+', '', name)
    if cleaned_name[0].isdigit():
        cleaned_name = '' + cleaned_name
    return cleaned_name.lower()


def sql_createScheme_payMonthYear(table_name: str):
    return f"""
    CREATE TABLE IF NOT EXISTS {table_name}  (
      id BIGINT NOT NULL,
      pay_time VARCHAR NOT NULL,
      pay_monthyear VARCHAR NOT NULL,
      pay_source VARCHAR,
      pay_note VARCHAR,
      pay_money NUMERIC NOT NULL,
      pay_tag VARCHAR,
      app_source VARCHAR,
      PRIMARY KEY (id, pay_monthyear, pay_money)
    )
    """


def get_column_type(column_name):
    """
    根据列名确定数据类型
    """
    # 金额相关的列名关键字
    money_keywords = ['金额', '退款', '价格', '费用', '余额', '服务费']

    # 检查列名是否包含金额相关关键字
    if any(keyword in column_name for keyword in money_keywords):
        return 'REAL'
    return 'TEXT'


def is_money_column(column_name):
    """判断是否为金额列"""
    money_keywords = ['金额', '退款', '价格', '费用', '余额', '服务费']
    return any(keyword in column_name for keyword in money_keywords)


def clean_money_value(value):
    """清理金额数据"""
    if not value:
        return 0.0

    # 清理金额字符串
    cleaned_value = value.replace('¥', '').replace(',', '').strip()

    try:
        return float(cleaned_value)
    except ValueError:
        return cleaned_value


def rename_existing_table(cursor, table_name):
    """
    检查表是否存在，如果存在则将其重命名（添加下划线后缀）

    Args:
        cursor: SQLite游标对象
        table_name: 要检查的表名

    Returns:
        bool: 如果表存在并被重命名返回True，否则返回False
    """
    try:
        # 检查表是否存在
        cursor.execute("""
            SELECT name 
            FROM sqlite_master 
            WHERE type='table' AND name=?
        """, (table_name,))

        if cursor.fetchone():
            # 表存在，执行重命名
            new_table_name = f"{table_name}_"

            # 检查新表名是否已存在，如果存在则递归添加下划线
            while True:
                cursor.execute("""
                    SELECT name 
                    FROM sqlite_master 
                    WHERE type='table' AND name=?
                """, (new_table_name,))

                if not cursor.fetchone():
                    break
                new_table_name += "_"

            # 执行重命名操作
            # cursor.execute(f"ALTER TABLE {table_name} RENAME TO {new_table_name}")
            return True

        return False

    except Exception as e:
        print(f"重命名表时发生错误: {e}")
        return False

# endregion


# 测试微信调用函数
import_csv_to_sqlite(
    '/home/luany/桌面/moneycount/data/wx/202506.csv',
    'moneycount.db',
    'pay_202506',
    strategy='wx',    # 使用微信策略
    header_row=16,     # 表头在第3行（索引为2）
    start_row=17,      # 从第4行开始导入数据（索引为3）
    skip_columns=[8, 9],
    params={
        'monthyear': '2025_06'  # 指定月份
    }         # 传入额外参数
)

# 测试支付宝调用函数
import_csv_to_sqlite(
    '/home/luany/桌面/moneycount/data/zfb/202506.csv',
    'moneycount.db',
    'pay_202506',
    strategy='zfb',    # 使用微信策略
    header_row=4,     # 表头在第3行（索引为2）
    start_row=5,      # 从第4行开始导入数据（索引为3）
    skip_columns=[0, 1, 3, 16],
    params={
        'monthyear': '2025_06'  # 指定月份
    }         # 传入额外参数
)

表 pay_202506 不存在或重命名失败
成功使用 utf-8 编码导入CSV并执行wx策略
表 pay_202506 不存在或重命名失败
成功使用 gb2312 编码导入CSV并执行zfb策略
