In [2]:
from prefect.blocks.notifications import CustomWebhookNotificationBlock

custom_webhook_block = CustomWebhookNotificationBlock.load("feishu")

custom_webhook_block.notify("Hello from Prefect!")

AttributeError: 'coroutine' object has no attribute 'notify'

In [34]:
# -*- coding: utf-8 -*-
# @Time : 2025/5/29 18:01
# @Author : Garry-Host
# @FileName: send_file_tjh

import hashlib
import time
import uuid
import requests
import pandas as pd
import json
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime



# 获取寄修数据积压数据

def get_session():
    session = requests.Session()
    retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[502, 503, 504])
    adapter = HTTPAdapter(max_retries=retries)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session


def generate_requrl(pageindex, page, day):
    print(f"正在生成第{page}页的URL")
    """
    从 API 获取数据并转换为 DataFrame
    """
    # 基本参数

    tenant = "laifen"
    api_name = "api/vlist/ExecuteQuery"
    timestamp = str(int(time.time() * 1000))
    reqid = str(uuid.uuid1())
    appid = "AS_department"
    queryid = "38c53a54-813f-a0e0-0000-06f40ebdeca5"
    is_user_query = "true"
    is_preview = "false"
    pagesize = "5000"
    paging = "true"
    key = "u7BDpKHA6VSqTScpEqZ4cPKmYVbQTAxgTBL2Gtit"
    orderby = "createdon descending"
    # extendConditions = f'[{{"name":"new_checkon","val":"{day}","op":"last-x-days"}},{"name":"new_deliveriedon","val":"null","op":"null"},{"name":"new_solution","val":30,"op":"eq"}]'
    conditions = [
        {"name": "new_checkon", "val": day, "op": "last-x-days"},
        {"name": "new_deliveriedon", "val": 'null', "op": "null"},  # None 将自动转为 null
        {"name": "new_solution", "val": 30, "op": "eq"}
    ]

    extendConditions = json.dumps(conditions, ensure_ascii=False)
    args = [appid, extendConditions, orderby, pageindex, pagesize, paging, reqid, tenant, timestamp, is_preview,
            is_user_query, queryid, key]

    """
    生成签名
    """

    sign_str = "".join(args)
    sign = hashlib.sha256(sign_str.encode('utf-8')).hexdigest().upper()
    # 构建 URL
    url = (
        f"https://ap6-openapi.fscloud.com.cn/t/{tenant}/open/{api_name}"
        f"?$tenant={tenant}&$timestamp={timestamp}&$reqid={reqid}&$appid={appid}"
        f"&queryid={queryid}&isUserQuery={is_user_query}&isPreview={is_preview}"
        f"&$pageindex={pageindex}&$pagesize={pagesize}&$paging={paging}"
        f"&$extendConditions={extendConditions}&$orderby={orderby}&$sign={sign}"
    )
    print(f"成功生成第{page}页的URL: {url}")
    return url


def fetch_api_data(url, page):
    print(f"正在获取第{page}页数据")
    # 发送 GET 请求
    session = get_session()
    response = session.get(url, timeout=30)
    if response.status_code != 200:
        raise Exception(f"API 请求失败，状态码: {response.status_code}")

    # 解析 JSON 数据
    data = response.json()
    entities = data["Data"]["Entities"]

    df = pd.DataFrame(entities)
    print(f"第{page}页数据，已通过API获取成功获取")
    return df


def extract_need_data(df):
    df = df.assign(
        产品类型=df["new_productmodel_id"].apply(lambda x: x.get("name", None)),
        产品名称=df["new_product_id"].apply(lambda x: x.get("name", None)),
        旧件签收时间=df["FormattedValues"].apply(lambda x: x.get("new_signedon", None)),
        检测时间=df["FormattedValues"].apply(lambda x: x.get("new_checkon", None)),
        申请类别=df["FormattedValues"].apply(lambda x: x.get("new_srv_rma_0.new_applytype", None)),
        一检时间=df["FormattedValues"].apply(lambda x: x.get("laifen_onechecktime", None)),
        维修完成时间=df["FormattedValues"].apply(lambda x: x.get("laifen_servicecompletetime", None)),
        质检完成时间=df["FormattedValues"].apply(lambda x: x.get("laifen_qualityrecordtime", None)),
        单号=df['new_rma_id'].apply(lambda x: x.get('name', None)),
        分拣人员=df['laifen_systemuser2_id'].apply(lambda x: x.get('name', None) if pd.notnull(x) else None),
        处理状态=df["FormattedValues"].apply(lambda x: x.get("new_srv_rma_0.new_status", None)),
        旧件处理状态=df["FormattedValues"].apply(lambda x: x.get("new_returnstatus", None)),
        检测结果=df["FormattedValues"].apply(lambda x: x.get("new_solution", None)),
        故障现象=df['new_error_id'].apply(lambda x: x.get('name', None) if pd.notnull(x) else None),
        # 发货时间=df['new_deliveriedon'],
        一检人员=df['laifen_systemuser_id'].apply(lambda x: x.get('name', None) if pd.notnull(x) else None),
        发货状态=df['FormattedValues'].apply(lambda x: x.get('new_srv_rma_0.new_deliverstatus', None)),
        产品序列号=df['new_userprofilesn'],
        服务人员=df['new_srv_workorder_1.new_srv_worker_id'].apply(
            lambda x: x.get('name', None) if pd.notnull(x) else None),
        单据来源=df["FormattedValues"].apply(lambda x: x.get("new_srv_rma_0.new_fromsource", None)),
        创建时间=df["FormattedValues"].apply(lambda x: x.get("createdon", None)),
    )
    #    # 选择需要的列
    df = df[[
        '单号', '产品类型', '产品名称', '处理状态', '旧件处理状态', '检测结果', '申请类别', '旧件签收时间',
        '检测时间', '一检时间', '维修完成时间', '质检完成时间', '故障现象', '发货状态',
        '一检人员', '产品序列号', '分拣人员', '服务人员', '单据来源', '创建时间'
    ]]
    print(f"成功提取所需数据,共{df.shape[1]}列")
    return df


def get_sf_data(days=15):
    print(f"正在下载最近{days}天的数据")
    pageindex = "1"
    url = generate_requrl(pageindex, '0', days)

    rs = requests.get(url)
    count = rs.json()['Data']['TotalRecordCount']
    print(f"最近{days}天签收业务量共{count}单,共{count // 5000 + 2}页数据")
    datas = []

    for i in range(1, count // 5000 + 2):
        url = generate_requrl(str(i), i, days)
        data = fetch_api_data(url, i)
        print(f"第{i}页数据已获取")
        datas.append(data)

    df = pd.concat(datas, ignore_index=True)
    df = extract_need_data(df)
    print(f"已成功下载最近{days}天的数据")
    return df


def deal_data():
    t1 = datetime.today().date()
    t2 = datetime(2025, 6, 1).date()
    dt = (t1 - t2).days + 1
    data = get_sf_data(dt)
    

    return data

if __name__ == '__main__':
    x = deal_data()
    

正在下载最近34天的数据
正在生成第0页的URL
成功生成第0页的URL: https://ap6-openapi.fscloud.com.cn/t/laifen/open/api/vlist/ExecuteQuery?$tenant=laifen&$timestamp=1751617361028&$reqid=0d5ea8e3-58b0-11f0-ac3a-5084929d801a&$appid=AS_department&queryid=38c53a54-813f-a0e0-0000-06f40ebdeca5&isUserQuery=true&isPreview=false&$pageindex=1&$pagesize=5000&$paging=true&$extendConditions=[{"name": "new_checkon", "val": 34, "op": "last-x-days"}, {"name": "new_deliveriedon", "val": "null", "op": "null"}, {"name": "new_solution", "val": 30, "op": "eq"}]&$orderby=createdon descending&$sign=C2308A9E82664582DA611320EC012B40A5EDC8C1DB2E7B7BCAA62340FB403E11
最近34天签收业务量共12066单,共4页数据
正在生成第1页的URL
成功生成第1页的URL: https://ap6-openapi.fscloud.com.cn/t/laifen/open/api/vlist/ExecuteQuery?$tenant=laifen&$timestamp=1751617364728&$reqid=0f933a64-58b0-11f0-8709-5084929d801a&$appid=AS_department&queryid=38c53a54-813f-a0e0-0000-06f40ebdeca5&isUserQuery=true&isPreview=false&$pageindex=1&$pagesize=5000&$paging=true&$extendConditions=[{"name": "new_che

In [None]:
import json

with open(r"E:\Dev\myprefect\data\jx\over_power.json", "r", encoding="utf-8") as f:
    bef = json.load(f)  # bef['yd'] 是上次的“昨日总量”


from datetime import timedelta
end_date1 = datetime.today().date() - timedelta(6)
end_date2 = datetime.today().date() - timedelta(5)

data = x
data['检测时间'] = pd.to_datetime(data['检测时间'])

first = data[
    (data['检测时间'] >= '2025-06-01') &
    (data['检测时间'] <= str(end_date1)) &
    (data['产品类型'].isin(['产成品-吹风机', '产成品-电动牙刷', '产成品-剃须刀']))
]['单号'].count()

last = data[
    (data['检测时间'] >= '2025-06-01') &
    (data['检测时间'] <= str(end_date2)) &
    (data['产品类型'].isin(['产成品-吹风机', '产成品-电动牙刷', '产成品-剃须刀']))
]['单号'].count()

less = bef['yd'] - first
more = last - bef['yd']

bef['yd'] = int(last)
with open(r"E:\Dev\myprefect\data\jx\over_power.json", "w", encoding="utf-8") as f:
    json.dump(bef, f, ensure_ascii=False, indent=2)
    
file_path = f"./data/jx/2025-06-01至{end_date2}分拣未发货数据明细.xlsx"
data[
    (data['检测时间'] >= '2025-06-01') &
    (data['检测时间'] <= str(end_date2)) &
    (data['产品类型'].isin(['产成品-吹风机', '产成品-电动牙刷', '产成品-剃须刀']))
].to_excel(file_path,index=False)

print(f"截止至{end_date2}号之前分拣未维修发货的数据，寄修严重超时机器共{last}台,较{end_date1}号之前，减少{less}台,增长{more}台")

截止至2025-06-29号之前分拣未维修发货的数据，寄修严重超时机器共3058,较2025-06-28号之前，减少736台,增长0台


In [44]:
# -*- coding: utf-8 -*-
# @Time : 2025/7/10 9:47
# @Author : Garry-Host
# @FileName: send_hour_bv
import dataframe_image as dfi
from datetime import datetime,timedelta
from sqlalchemy import create_engine
import pandas as pd
from prefect import task,flow
from utils import asbot
conn = create_engine("mysql+pymysql://wty:laifen03@localhost:3306/demo")


query_qs = """
    SELECT DATE_ADD(DATE(`旧件签收时间`), INTERVAL HOUR(`旧件签收时间`) HOUR) AS `时间`, count(`单号`) AS `数量`
    FROM maintenance_ruiyun_realtime
    WHERE `业务类型` = '签收'
      AND `产品类型` IN ('产成品-吹风机', '产成品-电动牙刷', '产成品-剃须刀')
      AND `申请类别` IN ('寄修/返修')
    GROUP BY DATE_ADD(DATE(`旧件签收时间`), INTERVAL HOUR(`旧件签收时间`) HOUR)
    ORDER BY 时间 DESC
    LIMIT 1;"""

query_fj = """
    SELECT DATE_ADD(DATE(`检测时间`), INTERVAL HOUR(`检测时间`) HOUR) AS `时间`, count(`单号`) AS `数量`
    FROM maintenance_ruiyun_realtime
    WHERE `业务类型` = '分拣'
      AND `产品类型` IN ('产成品-吹风机', '产成品-电动牙刷', '产成品-剃须刀')
      AND `申请类别` IN ('寄修/返修')
    GROUP BY DATE_ADD(DATE(`检测时间`), INTERVAL HOUR(`检测时间`) HOUR)
    ORDER BY 时间 DESC
    LIMIT 1;"""

query_wx = """
    SELECT DATE_ADD(DATE(`维修完成时间`), INTERVAL HOUR(`维修完成时间`) HOUR) AS `时间`, count(`单号`) AS `数量`
    FROM demo.maintenance_ruiyun_realtime
    WHERE `业务类型` = '维修'
      AND `产品类型` IN ('产成品-吹风机', '产成品-电动牙刷', '产成品-剃须刀')
      AND `申请类别` IN ('寄修/返修')
    GROUP BY DATE_ADD(DATE(`维修完成时间`), INTERVAL HOUR(`维修完成时间`) HOUR)
    ORDER BY 时间 DESC
    LIMIT 1;"""

query_zj = """
    SELECT DATE_ADD(DATE(`质检完成时间`), INTERVAL HOUR(`质检完成时间`) HOUR) AS `时间`, count(`单号`) AS `数量`
    FROM demo.maintenance_ruiyun_realtime
    WHERE `业务类型` = '质检'
      AND `产品类型` IN ('产成品-吹风机', '产成品-电动牙刷', '产成品-剃须刀')
      AND `申请类别` IN ('寄修/返修')
    GROUP BY DATE_ADD(DATE(`质检完成时间`), INTERVAL HOUR(`质检完成时间`) HOUR)
    ORDER BY 时间 DESC
    LIMIT 1;"""

query_fh = """
    SELECT DATE_ADD(DATE(`发货时间`), INTERVAL HOUR(`发货时间`) HOUR) AS `时间`, count(`单号`) AS `数量`
    FROM demo.maintenance_ruiyun_realtime
    WHERE `业务类型` = '发货'
      AND `产品类型` IN ('产成品-吹风机', '产成品-电动牙刷', '产成品-剃须刀')
      AND `申请类别` IN ('寄修/返修')
    GROUP BY DATE_ADD(DATE(`发货时间`), INTERVAL HOUR(`发货时间`) HOUR)
    ORDER BY 时间 DESC
    LIMIT 1;"""

@flow(name='发送寄修全链路业务量')
def getdata():
    qs = pd.read_sql(query_qs, conn)
    qs['类型']= '签收'
    fj = pd.read_sql(query_fj, conn)
    fj['类型'] = "分拣"
    wx = pd.read_sql(query_wx, conn)
    wx['类型'] = "维修"
    zj = pd.read_sql(query_zj, conn)
    zj['类型'] = "质检"
    fh = pd.read_sql(query_fh, conn)
    fh['类型'] = "发货"
    data = pd.concat([fj, wx, zj, fh])
    data = data.rename(columns={'数量':'完成量'})
    now = datetime.now()
    this_hour = now.replace(minute=0, second=0, microsecond=0)
    last_hour = this_hour - timedelta(hours=1)
    bot = asbot.AsBot('人机')
    new_add = [int(qs.iloc[0,1]),int(fj.iloc[0,1]),int(wx.iloc[0,1]),int(zj.iloc[0,1])]
    # pure_add = [int(qs.iloc[0,1])-int(fj.iloc[0,1]),int(qs.iloc[0,1])-int(fj.iloc[0,1]),int(qs.iloc[0,1])-int(fj.iloc[0,1]),int(qs.iloc[0,1])-int(fj.iloc[0,1])]
    data['新增量'] = new_add
    data['净增长'] = data['新增量'] - data['完成量']
    data = data.rename(columns={'数量':'完成量'})
    msg =  {
        "zh_cn": {
            "title": f"寄修全链路业务量播报：{last_hour.strftime('%H:%M')}-{this_hour.strftime('%H:%M')}",
            "content": [
                [
                    {
                        "tag": "text",
                        "text": f"分拣量：新增{int(qs.iloc[0,1])}台，完成{int(fj.iloc[0,1])}台，净增长{int(qs.iloc[0,1])-int(fj.iloc[0,1])}台\n",
                    }
                ],
                [
                    {
                        "tag": "text",
                        "text": f"维修量：新增{int(fj.iloc[0,1])}台，完成{int(wx.iloc[0,1])}台，净增长{int(qs.iloc[0,1])-int(fj.iloc[0,1])}台\n",
                    },
                ],
                [
                    {
                        "tag": "text",
                        "text": f"质检量：新增{int(wx.iloc[0,1])}台，完成{int(zj.iloc[0,1])}台，净增长{int(qs.iloc[0,1])-int(fj.iloc[0,1])}台\n",
                    },
                ],
                [
                    {
                        "tag": "text",
                        "text": f"发货量：新增{int(zj.iloc[0,1])}台，完成{int(fh.iloc[0, 1])}台，净增长{int(qs.iloc[0,1])-int(fj.iloc[0,1])}台\n",
                    },
                ],
                [
                    {
                        "tag": "at",
                        "user_id": "6e4997ed",
                    }
                ]
            ]
        }
    }
    bot.send_post_to_group(msg)
    return data
if __name__ == '__main__':
    df = getdata()


2025-07-10 15:18:19 [INFO] as_bot: 成功获取bearer token
2025-07-10 15:18:19 [INFO] as_bot: 成功获取人机的chat_id


{'receive_id': 'oc_cd60e5dbbb246891c63270d37acfe1e4', 'msg_type': 'post', 'content': '{"zh_cn": {"title": "\\u5bc4\\u4fee\\u5168\\u94fe\\u8def\\u4e1a\\u52a1\\u91cf\\u64ad\\u62a5\\uff1a14:00-15:00", "content": [[{"tag": "text", "text": "\\u5206\\u62e3\\u91cf\\uff1a\\u65b0\\u589e275\\u53f0\\uff0c\\u5b8c\\u621096\\u53f0\\uff0c\\u51c0\\u589e\\u957f179\\u53f0\\n"}], [{"tag": "text", "text": "\\u7ef4\\u4fee\\u91cf\\uff1a\\u65b0\\u589e96\\u53f0\\uff0c\\u5b8c\\u6210168\\u53f0\\uff0c\\u51c0\\u589e\\u957f179\\u53f0\\n"}], [{"tag": "text", "text": "\\u8d28\\u68c0\\u91cf\\uff1a\\u65b0\\u589e168\\u53f0\\uff0c\\u5b8c\\u6210211\\u53f0\\uff0c\\u51c0\\u589e\\u957f179\\u53f0\\n"}], [{"tag": "text", "text": "\\u53d1\\u8d27\\u91cf\\uff1a\\u65b0\\u589e211\\u53f0\\uff0c\\u5b8c\\u6210290\\u53f0\\uff0c\\u51c0\\u589e\\u957f179\\u53f0\\n"}], [{"tag": "at", "user_id": "6e4997ed"}]]}}'}


2025-07-10 15:18:19 [INFO] as_bot: 200-发送成功


In [46]:
import plotly.graph_objects as go

now = datetime.now()
this_hour = now.replace(minute=0, second=0, microsecond=0)
last_hour = this_hour - timedelta(hours=1)
title = f"寄修全链路业务量播报：{last_hour.strftime('%H:%M')}-{this_hour.strftime('%H:%M')}"
df = df[['类型','完成量','新增量','净增长']]
def export_dataframe_to_image_v2(
    df,
    output_path,
    title="DataFrame Export",
    image_size=(800, 500),
    font_family="Arial",  # 确保兼容的字体
    font_size=18
):
    """
    将 pandas DataFrame 导出为图片，增强样式显示效果。

    参数：
    - df: pandas DataFrame，需要导出的数据
    - output_path: str，图片保存路径
    - title: str，图片标题
    - image_size: tuple，图片尺寸 (宽, 高)，默认 (800, 500)
    - font_family: str，字体样式，默认 "Arial"
    - font_size: int，字体大小，默认 18
    """
    # 配色方案
    header_color = [ "#007BFF", '#007BFF', '#007BFF', '#8081cf', '#8081cf', '#3cc08e', '#3cc08e']  # 表头背景色
    header_font_color = "white"  # 表头字体颜色
    cell_fill_colors = ["#F9F9F9", "#FFFFFF"]  # 单元格条纹背景
    cell_font_color = "#333333"  # 默认单元格字体颜色

    # 动态设置字体颜色：带 + 号的数字为红色
    font_colors = []
    for col in df.columns:
        col_colors = []
        for value in df[col]:
            if isinstance(value, str) and "+" in value:  # 判断是否包含 + 号
                col_colors.append("red")  # 红色
            # elif isinstance(value, str) and "-" in value: # 判断是否包含 - 号
            #     col_colors.append("00ff80")  # 绿色
            else:
                col_colors.append(cell_font_color)  # 默认颜色
        font_colors.append(col_colors)

    # 创建表格
    table = go.Figure(data=[go.Table(
        columnwidth=[1.5,1, 1.2, 1, 1.2, 1, 1.2],  # 列宽设置
        header=dict(
            values=[f"<b>{col}</b>" for col in df.columns],
            fill_color=header_color,
            align="center",
            font=dict(family=font_family, size=font_size, color=header_font_color),
            line_color="white",  # 表头边框颜色
            height=40  # 增大表头高度
        ),
        cells=dict(
            values=[df[col] for col in df.columns],
            fill_color=[cell_fill_colors * (len(df) // 2 + 1)],
            align="center",
            font=dict(family=font_family, size=font_size - 2),
            line_color="#E5E5E5",  # 单元格边框颜色
            height=30,  # 增大单元格高度
            font_color=font_colors  # 动态设置字体颜色
        )
    )])

    # 布局设置
    table.update_layout(
        title=dict(
            text=f"<b>{title}</b>",
            font=dict(family=font_family, size=font_size + 4, color="#007BFF"),
            x=0.5,
            xanchor="center"
        ),
        margin=dict(l=20, r=20, t=70, b=20),  # 边距优化
        paper_bgcolor="white",  # 背景色
        width=image_size[0],
        height=image_size[1]
    )

    # 保存图片
    table.write_image(output_path, width=image_size[0], height=image_size[1], scale=3)
    print(f"图片已保存到: {output_path}")

export_dataframe_to_image_v2(df, 'data/table_output.png', title=title)
print("✅ 已成功导出为表格图像 table_output.png")

图片已保存到: table_output.png
✅ 已成功导出为表格图像 table_output.png


In [45]:
df

Unnamed: 0,时间,完成量,类型,新增量,净增长
0,2025-07-10 14:00:00,96,分拣,275,179
0,2025-07-10 14:00:00,168,维修,96,-72
0,2025-07-10 14:00:00,211,质检,168,-43
0,2025-07-10 14:00:00,290,发货,211,-79
