----------------------------------销售处理和导入----------------------------------

In [1]:
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, DATETIME, DECIMAL, text
import pandas as pd
import numpy as np
import datetime

In [2]:
def mysql_insert(information_dt):
    """
    写入数据库（按位置匹配列）
    :param information_dt: 需要插入的数据（DataFrame）
    :return:
    """
    config = {
        'host': 'localhost',
        'port': 3306,
        'user': 'root',
        'password': '123456',
        'database': 'sales_information',
    }
    db_url = f"mysql+pymysql://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
    engine = create_engine(db_url)
    Session = sessionmaker(bind=engine)
    session = Session()
    Base = declarative_base()


    # partition = str(information_dt.iloc[0,0][:4]+information_dt.iloc[0,0][5:7])
    partition = str(information_dt.iloc[0,0]).replace('.', '').replace('-', '')[:6]
    # print(partition)

    class SalesFirst(Base):
        __tablename__ = 'new_customer'
        id = Column(Integer, primary_key=True, autoincrement=True)
        record_date = Column(DATETIME, nullable=False)
        department = Column(String(50), nullable=False)
        employee_name = Column(String(50), nullable=False)
        ali_tp_performance = Column(DECIMAL(10, 1), default=0)
        ali_tp_orders = Column(DECIMAL(10, 1), default=0)
        ali_shishang_performance = Column(DECIMAL(10, 1), default=0)
        ali_shishang_orders = Column(DECIMAL(10, 1), default=0)
        ali_operation_performance = Column(DECIMAL(10, 1), default=0)
        ali_operation_orders = Column(DECIMAL(10, 1), default=0)
        douyin_performance = Column(DECIMAL(10, 1), default=0)
        douyin_orders = Column(DECIMAL(10, 1), default=0)
        baidu_performance = Column(DECIMAL(10, 1), default=0)
        baidu_orders = Column(DECIMAL(10, 1), default=0)
        baidu_operation_performance = Column(DECIMAL(10, 1), default=0)
        baidu_operation_orders = Column(DECIMAL(10, 1), default=0)
        other_performance = Column(DECIMAL(10, 1), default=0)
        other_orders = Column(DECIMAL(10, 1), default=0)
        shengtong_performance = Column(DECIMAL(10, 1), default=0)
        shengtong_orders = Column(DECIMAL(10, 1), default=0)
        digital_human_performance = Column(DECIMAL(10, 1), default=0)
        digital_human_orders = Column(DECIMAL(10, 1), default=0)
        wangxiaobao_performance_new = Column(DECIMAL(10, 1), default=0)
        wangxiaobao_order_new = Column(DECIMAL(10, 1), default=0)
        wangxiaobao_performance = Column(DECIMAL(10, 1), default=0)
        wangxiaobao_order = Column(DECIMAL(10, 1), default=0)
        fengshen_performance = Column(DECIMAL(10, 1), default=0)
        fengshen_orders = Column(DECIMAL(10, 1), default=0)
        shooting_performance = Column(DECIMAL(10, 1), default=0)
        shooting_orders = Column(DECIMAL(10, 1), default=0)
        deposit_amount = Column(DECIMAL(10, 1), default=0)
        deposit_amount_orders = Column(DECIMAL(10, 1), default=0)
        deposit_amount_orders = Column(DECIMAL(10, 1), default=0)
        Addres = Column(String(10), nullable=False)


    try:
        with engine.connect() as conn:
            with conn.begin():

                conn.execute(text("ALTER TABLE new_customer TRUNCATE PARTITION p"+partition+";"))

                # 反射表结构，获取非自增字段列表
                inspector = inspect(engine)
                columns = inspector.get_columns('new_customer')
                non_autoinc_cols = [
                    col["name"] for col in columns
                    if not col.get("autoincrement", False)
                ]
                
                # 检查列数是否一致
                if len(information_dt.columns) != len(non_autoinc_cols):
                    raise ValueError(
                        f"数据列数 ({len(information_dt.columns)}) 与数据库表列数 ({len(non_autoinc_cols)}) 不匹配"
                    )
                
                data = [
                    SalesFirst(**dict(zip(non_autoinc_cols, row)))
                    for row in information_dt.values.tolist()
                ]

                session.bulk_save_objects(data)
                session.commit()
                print("数据插入成功。")

    except Exception as e:
        session.rollback()
        print("插入失败:", e)
    finally:
        session.close()

In [3]:
def data_deal(address,sheet):
    dt = pd.read_excel(address,sheet,header=1)
    data = dt.iloc[:, np.r_[0:5, 18:36,78:89]]
    data = data.dropna(subset=[data.columns[0]], how='any')
    data_fill = data.fillna(0)
    data_fill.reset_index(drop=True,inplace=True)
    # print(data_fill.head(5))

    # 直接合并为日期格式
    data_fill.iloc[:,0] = (
        data_fill.iloc[:,1].astype(str) + '-' +  # 年份
        data_fill.iloc[:,2].astype(str).str.zfill(2) + '-' +  # 月份补零
        data_fill.iloc[:,0].astype(str).str.zfill(2)  # 日期补零
    )
    data_fill.drop(columns=['年度', '月份'], axis=1, inplace=True)
    data_fill.reset_index(drop=True,inplace=True)
    return data_fill

In [4]:
address = 'E:\work_file\data\\{}_{}_{}\\2月份表.xlsx'.format(str(datetime.date.today().year),str(datetime.date.today().month),str(datetime.date.today().day))
# address = 'E:\work_file\data\\2025_7_31\\2月份表.xlsx'
sheet='每月业绩统计原表格'
data = data_deal(address, sheet)
mysql_insert(data)

数据插入成功。


----------------------------------销售出勤数据处理和导入----------------------------------

In [5]:
def mysql_insert_attendance(information_dt):
    """
    写入数据库（按位置匹配列）
    :param information_dt: 需要插入的数据（DataFrame）
    :return:
    """
    config = {
        'host': 'localhost',
        'port': 3306,
        'user': 'root',
        'password': '123456',
        'database': 'sales_information',
    }
    db_url = f"mysql+pymysql://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
    engine = create_engine(db_url)
    Session = sessionmaker(bind=engine)
    session = Session()
    Base = declarative_base()


    partition = str(information_dt.iloc[0,0][:4]+information_dt.iloc[0,0][5:7])
    # print(partition)

    class SalesFirst(Base):
        __tablename__ = 'attendance_sales'
        id = Column(Integer, primary_key=True, autoincrement=True)
        Record_Date = Column(DATETIME, nullable=False)
        Department = Column(String(20), nullable=False)
        Employee_Name = Column(String(20), nullable=False)
        _Address = Column(String(20), default=0)
        Category = Column(String(20), default=0)
        _Weight = Column(DECIMAL(10, 1), default=0)
        Multiple_Sign = Column(Integer, default=0)

    try:
        with engine.connect() as conn:
            with conn.begin():
                
                # 强制全表清空（确保数据干净）
                # conn.execute(text("TRUNCATE TABLE attendance_sales;"))

                conn.execute(text("ALTER TABLE attendance_sales TRUNCATE PARTITION p"+partition+";"))

                # 反射表结构，获取非自增字段列表
                inspector = inspect(engine)
                columns = inspector.get_columns('attendance_sales')
                non_autoinc_cols = [
                    col["name"] for col in columns
                    if not col.get("autoincrement", False)
                ]
                
                # 检查列数是否一致
                if len(information_dt.columns) != len(non_autoinc_cols):
                    raise ValueError(
                        f"数据列数 ({len(information_dt.columns)}) 与数据库表列数 ({len(non_autoinc_cols)}) 不匹配"
                    )
                
                data = [
                    SalesFirst(**dict(zip(non_autoinc_cols, row)))
                    for row in information_dt.values.tolist()
                ]

                session.bulk_save_objects(data)
                session.commit()
                print("数据插入成功。")

    except Exception as e:
        session.rollback()
        print("插入失败:", e)
    finally:
        session.close()

In [6]:
def data_deal_attendance(address,sheet):
    data = pd.read_excel(address,sheet,header=0)
    data = data.dropna(subset=[data.columns[0]], how='any')
    data_fill = data.fillna(0)
    data_fill.reset_index(drop=True,inplace=True)
    # 直接合并为日期格式
    data_fill.iloc[:,0] = (
        data_fill.iloc[:,1].astype(str) + '-' +  # 年份
        data_fill.iloc[:,2].astype(str).str.zfill(2) + '-' +  # 月份补零
        data_fill.iloc[:,0].astype(str).str.zfill(2)  # 日期补零
    )
    data_fill.drop(columns=['年度', '月份'], axis=1, inplace=True)
    data_fill.reset_index(drop=True,inplace=True)
    return data_fill

In [7]:
address = 'E:\work_file\data\\{}_{}_{}\\2月工作簿1.xls'.format(str(datetime.date.today().year),str(datetime.date.today().month),str(datetime.date.today().day))
# address = 'E:\work_file\data\\2025_7_31\\2月工作簿1.xls'
sheet='出勤情况统计'
data = data_deal_attendance(address, sheet)
mysql_insert_attendance(data)

数据插入成功。
