# 物流分布图

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go


import pandas as pd
import requests
import time
import uuid
import hashlib
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
conn = create_engine("mysql+pymysql://root:000000@localhost/demo")
p_by_r = pd.read_sql('select 地区,省份 from provinces_by_region', con=conn)


def generate_jxnrequrl(pageindex, conditions=None):
    """
    从 API 获取数据并转换为 DataFrame
    """
    # 基本参数
    tenant = "laifen"
    api_name = "api/vlist/ExecuteQuery"
    timestamp = str(int(time.time() * 1000))
    reqid = str(uuid.uuid1())
    appid = "AS_department"
    key = "u7BDpKHA6VSqTScpEqZ4cPKmYVbQTAxgTBL2Gtit"
    is_user_query = "true"
    is_preview = "false"
    paging = "true"
    queryid = "6b8b0a54-813f-a029-0000-07043254fb90"

    pagesize = "5000"

    args = [appid, pageindex, pagesize, paging, reqid, tenant, timestamp, is_preview, is_user_query, queryid, key]

    """
    生成签名
    """

    sign_str = "".join(args)
    sign = hashlib.sha256(sign_str.encode('utf-8')).hexdigest().upper()
    # 构建 URL
    url = (
        f"https://ap6-openapi.fscloud.com.cn/t/{tenant}/open/{api_name}"
        f"?$tenant={tenant}&$timestamp={timestamp}&$reqid={reqid}&$appid={appid}"
        f"&queryid={queryid}&isUserQuery={is_user_query}&isPreview={is_preview}"
        f"&$pageindex={pageindex}&$pagesize={pagesize}&$paging={paging}"
        f"&$sign={sign}"
    )

    return url


def fetch_api_data(url):
    # 发送 GET 请求
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"API 请求失败，状态码: {response.status_code}")

    # 解析 JSON 数据
    data = response.json()
    entities = data["Data"]["Entities"]

    df = pd.DataFrame(entities)

    return df


def extract_need_data(df):
    df = df.assign(
        创建时间=df['FormattedValues'].apply(lambda x: x.get("createdon", None)),
        上门取件结束时间=df['FormattedValues'].apply(lambda x: x.get("new_pickupendon", None)),
        申请类别=df['FormattedValues'].apply(lambda x: x.get("new_applytype", None)),
        单号=df['new_name'],
        单据来源=df['FormattedValues'].apply(lambda x: x.get('new_fromsource', None)),
        省份=df['new_province_id'].apply(lambda x: x.get('name', None) if pd.notnull(x) else None)
    )
    #    # 选择需要的列
    df = df[[
        '单号', '创建时间', '上门取件结束时间', '申请类别', '单据来源', '省份'
    ]]

    return df


def getdata():
    pageindex = "1"
    url = generate_jxnrequrl(pageindex)
    rs = requests.get(url)
    count = rs.json()['Data']['TotalRecordCount']
    datas = []

    for i in range(1, count // 5000 + 2):
        url = generate_jxnrequrl(str(i))
        data = fetch_api_data(url)
    
        datas.append(data)

    df = pd.concat(datas, ignore_index=True)
    df = extract_need_data(df)
    return df


def make_sendgoods_data():
    data = getdata()
    data['上门取件结束时间'] = pd.to_datetime(data['上门取件结束时间'])
    data['创建时间'] = pd.to_datetime(data['创建时间'])
    data['月份'] = data['创建时间'].dt.month

    df = data.query("上门取件结束时间.notnull()").copy()
    df1 = data.query("单据来源 == '聚水潭' and 月份 !=12").copy()
    df = pd.concat([df, df1])

    df['取件至今'] = (pd.to_datetime('today') - df['上门取件结束时间']).dt.days
    df['取件天数'] = np.where(df['取件至今'] < 3, '0-3天内', np.where(df['取件至今'] < 7, '3-7天内',
                                                                      np.where(df['取件至今'] < 10, '7-10天内',
                                                                               '超10天')))
    df['月份'] = df['上门取件结束时间'].dt.month
    df = pd.merge(left=df, right=p_by_r, left_on='省份', right_on='省份')
    df = df.pivot_table(index=['取件天数', '地区', '省份'], values='单号', aggfunc='count')
    df = df.reset_index()
    df = df.rename(columns={'单号': '数量'})
    df = df.sort_values(['数量', '取件天数'], ascending=False)
    return df


def generate_asd_wc_image(df, title='wdnmd'):

    # 获取唯一的取件天数分类
    categories = df['取件天数'].unique().tolist()
    categories.sort()
    
    # 创建子图
    fig = make_subplots(
        rows=2,  # 两行布局
        cols=4,  # 四列布局
        row_heights=[0.8, 0.2],  # 第一行高度占 80%，第二行高度占 20%
        vertical_spacing=0.05,  # 减小行间距
        specs=[
            [{"type": "pie"}, {"type": "pie"}, {"type": "pie"}, {"type": "pie"}],  # 第一行是 4 个饼图
            [{"type": "domain", "colspan": 4}, None, None, None]  # 第二行用于文字，跨 4 列
        ],
        subplot_titles=[f"{category} (总数: {df[df['取件天数'] == category]['数量'].sum()})" for category in categories]  # 子标题，显示总数
    )

    # 动态添加饼图
    for i, category in enumerate(categories, start=1):
        filtered_df = df.query(f"取件天数 == '{category}'")
        labels = filtered_df['地区'].tolist()
        values = filtered_df['数量'].tolist()

        fig.add_trace(
            go.Pie(
                labels=labels,
                values=values,
                hole=0.35,
                textinfo='label+value',
                textposition='inside',
                sort=True,
                rotation=180,  # 从正上方开始
                direction='clockwise'  # 顺时针排列
            ),
            row=1,
            col=i
        )

    # 添加文字说明
    text = """
    1. 华北地区：北京市、天津市、河北省、山西省、内蒙古自治区<br>
    2. 华东地区：上海市、江苏省、浙江省、安徽省、福建省、江西省、山东省、台湾省<br>
    3. 华中地区：河南省、湖北省、湖南省<br>
    4. 华南地区：广东省、广西壮族自治区、海南省、香港特别行政区、澳门特别行政区<br>
    5. 西南地区：重庆市、四川省、贵州省、云南省、西藏自治区<br>
    6. 西北地区：陕西省、甘肃省、青海省、宁夏回族自治区、新疆维吾尔自治区<br>
    7. 东北地区：辽宁省、吉林省、黑龙江省<br>
    """

    fig.add_annotation(
        x=0.5,  # 文字水平居中
        y=0.05,  # 文字垂直位置（靠近底部）
        text=text,  # 文字内容
        showarrow=False,  # 不显示箭头
        font=dict(size=12, color="black"),  # 字体样式
        xref="paper",  # 使用相对坐标
        yref="paper",  # 使用相对坐标
        align="left",  # 文字左对齐
        xanchor="center",  # 文字水平锚点居中
        yanchor="top"  # 文字垂直锚点顶部对齐
    )

    # 更新布局
    fig.update_layout(
        title_text=title,
        title_x=0.5,  # 主标题居中
        title_y=0.95,  # 调整主标题的垂直位置
        showlegend=True,
        width=1000,  # 增加宽度以容纳 4 个饼图
        height=500,  # 增加高度以容纳文字
        margin=dict(l=20, r=20, t=80, b=150),  # 增加底部边距以容纳文字
        title_font=dict(size=28)
    )

    # 调整子标题位置
    for annotation in fig.layout.annotations:
        if annotation.text.startswith(tuple(categories)):  # 饼图子标题
            annotation.update(y=0.15, font=dict(size=16))
    fig.show()
    # 显示图表
    # logger.info(f'图片生成成功，保存至{path}')
    # 保存图表
    # fig.write_image(path, scale=3)

df= make_sendgoods_data()
generate_asd_wc_image(df)


# 一天内创单量

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
df = pd.read_excel(r'E:\Dev\AS_Bot\data\input\瑞云积压数据2025-01-21 17-44.xlsx')
df = df[['单号','单据来源','创建时间','旧件签收时间']].copy()

df = df.rename(columns={'单据来源':'平台'})
df = df[['单号','平台','创建时间','旧件签收时间']]
df['创建时间'] = pd.to_datetime(df['创建时间'],errors='coerce')
df['旧件签收时间'] = pd.to_datetime(df['旧件签收时间'],errors='coerce')
df['时间差'] = (df['旧件签收时间'] - df['创建时间']).dt.total_seconds() / (24 * 3600)
df['日期'] = df["旧件签收时间"].dt.strftime("%Y-%m-%d")
# 
bf =  df[df["时间差"] <= 1].copy()
bf['数量'] = 1
x = bf.pivot_table(index='日期',columns='平台',values='数量',aggfunc='sum',fill_value=0)
x.reset_index(inplace=True)

# 假设 df_filled 是你的数据
def makd_image(df):
    
    df_filled = df
    df_filled['日期'] = pd.to_datetime(df_filled['日期'])
    df_filled.set_index("日期", inplace=True)
    
    # 将日期列转换为中文格式的字符串
    df_filled.index = df_filled.index.strftime("%m月%d日")
    
    # 创建子图对象（双 Y 轴）
    fig = make_subplots(specs=[[{"secondary_y": True}]])  # 启用右侧 Y 轴
    
    # 添加柱状图（晓多和聚水潭，使用右侧 Y 轴）
    fig.add_trace(go.Bar(
        x=df_filled.index,  # X 轴：日期（已转换为中文格式）
        y=df_filled["晓多"],   # Y 轴：晓多列数据
        name="晓多",           # 图例名称
        marker_color="blue",  # 柱状图颜色
        text=df_filled["晓多"],  # 数据标签
        textposition="outside"  # 数据标签位置
    ), secondary_y=True)  # 使用右侧 Y 轴
    
    fig.add_trace(go.Bar(
        x=df_filled.index,
        y=df_filled["聚水潭"],
        name="聚水潭",
        marker_color="green",
        text=df_filled["聚水潭"],  # 数据标签
        textposition="outside"  # 数据标签位置
    ), secondary_y=True)  # 使用右侧 Y 轴
    
    # 添加折线图（CRM，使用左侧 Y 轴）
    fig.add_trace(go.Scatter(
        x=df_filled.index,  # X 轴：日期（已转换为中文格式）
        y=df_filled["CRM"],   # Y 轴：CRM 列数据
        name="CRM",           # 图例名称
        mode="lines+markers+text",  # 折线 + 数据点 + 数据标签
        line=dict(color="red", width=2),  # 折线样式
        marker=dict(size=10),  # 数据点样式
        text=df_filled["CRM"],  # 数据标签
        textposition="top center"  # 数据标签位置
    ), secondary_y=False)  # 使用左侧 Y 轴
    
    # 设置左侧 Y 轴范围
    fig.update_yaxes(
        range=[0, 500],  # 设置左侧 Y 轴范围为 0 到 450
        secondary_y=False
    )
    
    # 设置右侧 Y 轴范围
    fig.update_yaxes(
        range=[0, 350],  # 设置右侧 Y 轴范围为 0 到 50
        secondary_y=True
    )
    
    # 更新布局
    fig.update_layout(
        title="一天内创单量",  # 图表标题
        title_x=0.5, 
        title_font=dict(size=28),
        xaxis_title="日期",              # X 轴标题
        yaxis_title="CRM 的数量",   # 左侧 Y 轴标题
        yaxis2_title="晓多和聚水潭的数量",       # 右侧 Y 轴标题
        barmode="group",                 # 柱状图分组显示
        template="plotly_white",          # 主题样式
        height = 500,
        width = 1500,
    )
    
    # 设置 X 轴显示中文“月-日”格式
    fig.update_xaxes(
        tickangle=45,        # 旋转日期标签
        tickmode="auto",     # 自动调整刻度
        nticks=len(df_filled) # 显示所有日期
    )
    
    # 显示图表
    fig.write_image(r'E:\Dev\AS_Bot\data\image\data2.png',scale=3)
    fig.show()
makd_image(x)

In [None]:
import pandas as pd
df = pd.read_excel(r'E:\Dev\AS_Bot\data\input\瑞云积压数据2025-01-21 17-44.xlsx')
df = df[['单号','单据来源','创建时间','旧件签收时间']].copy()

df = df.rename(columns={'单据来源':'平台'})
df = df[['单号','平台','创建时间','旧件签收时间']]
df['创建时间'] = pd.to_datetime(df['创建时间'],errors='coerce')
df['旧件签收时间'] = pd.to_datetime(df['旧件签收时间'],errors='coerce')
df['时间差'] = (df['旧件签收时间'] - df['创建时间']).dt.total_seconds() / (24 * 3600)
df['日期'] = df["旧件签收时间"].dt.strftime("%Y-%m-%d")


In [None]:
import requests
from datetime import datetime

url = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=user_id"
import json

def build_message(
    receive_id: str,
    template_id: str,
    template_version: str,
    template_vars: dict,
    msg_type: str = "interactive"
) -> dict:
    """
    构造可动态配置的嵌套JSON消息
    
    :param receive_id: 接收方ID
    :param template_id: 模板ID
    :param template_version: 模板版本
    :param template_vars: 模板变量字典（可自由增减字段）
    :param msg_type: 消息类型，默认为 interactive
    :return: 完整消息字典
    """
    # 构造内层 content 结构
    content = {
        "type": "template",
        "data": {
            "template_id": template_id,
            "template_version_name": template_version,
            "template_variable": template_vars  # 动态变量部分
        }
    }
    
    # 构造完整消息
    return {
        "receive_id": receive_id,
        "msg_type": msg_type,
        "content": json.dumps(content, ensure_ascii=False)
    }


mock_data = ([
  {
    "type": "S1",
    "value": 340
  },
  {
    "type": "S2",
    "value": 170
  },
  {
    "type": "S3",
    "value": 150
  },
  {
    "type": "S4",
    "value": 120
  },
  {
    "type": "S5",
    "value": 100
  }
])
data = ( {
          "type": "pie",
          "percent":"true",
          "title": {
            "text": "寄修时效占比"
          },
          "data": {
            "values":  mock_data 
          },
          "valueField": "value",
          "categoryField": "type",
          "outerRadius": 0.8,
          "innerRadius": 0.4,
          "padAngle": 0.6,
          "legends": {
            "visible": "true",
            "orient": "right"
          },
          "padding": {
            "left": 10,
            "top": 10,
            "bottom": 5,
            "right": 0
          },
          "label": {
            "visible": "true"
          },
        "pie": {
        "style": {
          "cornerRadius": 8
        },
        "state": {
          "hover": {
            "outerRadius": 0.85,
            "stroke": '#000',
            "lineWidth": 1
          },
          "selected": {
            "outerRadius": 0.85,
            "stroke": '#000',
            "lineWidth": 1
          }
        }
      },
})
# 使用示例 ---------------------------
# 定义模板变量（可自由增减字段）
variables = {
    "title": "寄修当月时效占比",
    "datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "table": data,
}

# 生成消息
payload = build_message(
    receive_id="10201470",
    template_id="AAqBc0EeBjtyz",
    template_version="1.0.7",
    template_vars=variables
)

# 转换为JSON字符串（indent参数用于美化输出）
json_output = json.dumps(payload, indent=2)

headers = {
  'Content-Type': 'application/json',
  'Authorization': 'Bearer t-g10431fu3MTAXU5G6U3R7XOTEJSMMOJ23BEW7XUL'
}

response = requests.request("POST", url, headers=headers, data=json_output)

print(response.text)

print(payload)

# feishu crad

In [None]:
import requests
from datetime import datetime

url = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=user_id"
import json

def build_message(
    receive_id: str,
    template_id: str,
    template_version: str,
    template_vars: dict,
    msg_type: str = "interactive"
) -> dict:
    """
    构造可动态配置的嵌套JSON消息
    
    :param receive_id: 接收方ID
    :param template_id: 模板ID
    :param template_version: 模板版本
    :param template_vars: 模板变量字典（可自由增减字段）
    :param msg_type: 消息类型，默认为 interactive
    :return: 完整消息字典
    """
    # 构造内层 content 结构
    content = {
        "type": "template",
        "data": {
            "template_id": template_id,
            "template_version_name": template_version,
            "template_variable": template_vars  # 动态变量部分
        }
    }
    
    # 构造完整消息
    return {
        "receive_id": receive_id,
        "msg_type": msg_type,
        "content": json.dumps(content, ensure_ascii=False)
    }


mock_data = ([
  {
    "type": "S1",
    "value": 340
  },
  {
    "type": "S2",
    "value": 170
  },
  {
    "type": "S3",
    "value": 150
  },
  {
    "type": "S4",
    "value": 120
  },
  {
    "type": "S5",
    "value": 100
  }
])
data = ( {
          "type": "pie",
          "percent":"true",
          "title": {
            "text": "寄修时效占比"
          },
          "data": {
            "values":  mock_data 
          },
          "valueField": "value",
          "categoryField": "type",
          "outerRadius": 0.8,
          "innerRadius": 0.4,
          "padAngle": 0.6,
          "legends": {
            "visible": "true",
            "orient": "right"
          },
          "padding": {
            "left": 10,
            "top": 10,
            "bottom": 5,
            "right": 0
          },
          "label": {
            "visible": "true"
          },
        "pie": {
        "style": {
          "cornerRadius": 8
        },
        "state": {
          "hover": {
            "outerRadius": 0.85,
            "stroke": '#000',
            "lineWidth": 1
          },
          "selected": {
            "outerRadius": 0.85,
            "stroke": '#000',
            "lineWidth": 1
          }
        }
      },
})
# 使用示例 ---------------------------
# 定义模板变量（可自由增减字段）
variables = {
    "title": "寄修当月时效占比",
    "datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "table": data,
}

# 生成消息
payload = build_message(
    receive_id="10201470",
    template_id="AAqBc0EeBjtyz",
    template_version="1.0.7",
    template_vars=variables
)

# 转换为JSON字符串（indent参数用于美化输出）
json_output = json.dumps(payload, indent=2)

headers = {
  'Content-Type': 'application/json',
  'Authorization': 'Bearer t-g10431fu3MTAXU5G6U3R7XOTEJSMMOJ23BEW7XUL'
}

response = requests.request("POST", url, headers=headers, data=json_output)

print(response.text)

print(payload)

# 读取分拣签收数据当月

In [3]:
import requests
import time
import uuid
import hashlib
import numpy as np
from asbot.my_utility import logger
import pandas as pd

def generate_requrl(pageindex):
    logger.info(f"正在生成第{pageindex}页的URL")
    """
    从 API 获取数据并转换为 DataFrame
    """
    # 基本参数
    tenant = "laifen"
    api_name = "api/vlist/ExecuteQuery"
    timestamp = str(int(time.time() * 1000))
    reqid = str(uuid.uuid1())
    appid = "AS_department"
    queryid = "38c53a54-813f-a0e0-0000-06f40ebdeca5"
    is_user_query = "true"
    is_preview = "false"
    pagesize = "5000"
    paging = "true"
    key = "u7BDpKHA6VSqTScpEqZ4cPKmYVbQTAxgTBL2Gtit"
    # orderby = "createdon descending"
    # extendConditions = quote([{"name":"new_checkon","val":"this-month","op":"this-month"}], safe='')
    # additionalConditions = quote({"createdon":"","new_signedon":"","new_checkon":"","laifen_qualityrecordtime":"","laifen_servicecompletetime":""}, safe='')
    # extendConditions = '[{"name":"new_checkon","val":"this-month","op":"this-month"}]'
    extendConditions = '[{"name":"new_checkon","val":"yesterday","op":"yesterday"}]'
    

    args = [ appid,extendConditions, pageindex, pagesize, paging, reqid, tenant, timestamp, is_preview, is_user_query, queryid, key]
    
    """
    生成签名
    """
    
    sign_str = "".join(args)
    sign = hashlib.sha256(sign_str.encode('utf-8')).hexdigest().upper()
    #构建 URL
    url = (
        f"https://ap6-openapi.fscloud.com.cn/t/{tenant}/open/{api_name}"
        f"?$tenant={tenant}&$timestamp={timestamp}&$reqid={reqid}&$appid={appid}"
        f"&queryid={queryid}&isUserQuery={is_user_query}&isPreview={is_preview}"
        f"&$pageindex={pageindex}&$pagesize={pagesize}&$paging={paging}"
        f"&$extendConditions={extendConditions}&$sign={sign}"
    )
    logger.info(f"成功生成第{pageindex}页的URL: {url}")
    return url

def fetch_api_data(url,page):
    logger.info(f"正在获取第{page}页数据")
    # 发送 GET 请求
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"API 请求失败，状态码: {response.status_code}")
    
    # 解析 JSON 数据
    data = response.json()
    entities = data["Data"]["Entities"]

    df = pd.DataFrame(entities)
    logger.info(f"第{page}页数据，已通过API获取成功获取")
    return df
    
def extract_need_data(df):
    df = df.assign(
    产品类型=df["new_productmodel_id"].apply(lambda x: x.get("name", None)),
    产品名称=df["new_product_id"].apply(lambda x: x.get("name", None)),
    旧件签收时间=df["FormattedValues"].apply(lambda x: x.get("new_signedon", None)),
    检测时间=df["FormattedValues"].apply(lambda x: x.get("new_checkon", None)),
    申请类别=df["FormattedValues"].apply(lambda x: x.get("new_srv_rma_0.new_applytype", None)),
    一检时间=df["FormattedValues"].apply(lambda x: x.get("laifen_onechecktime", None)),
    维修完成时间=df["FormattedValues"].apply(lambda x: x.get("laifen_servicecompletetime", None)),
    质检完成时间=df["FormattedValues"].apply(lambda x: x.get("laifen_qualityrecordtime", None)),
    单号 = df['new_rma_id'].apply(lambda x: x.get('name', None)),
    分拣人员 = df['laifen_systemuser2_id'].apply(lambda x: x.get('name', None) if pd.notnull(x) else None),
    处理状态=df["FormattedValues"].apply(lambda x: x.get("new_srv_rma_0.new_status", None)), 
    旧件处理状态=df["FormattedValues"].apply(lambda x: x.get("new_returnstatus", None)), 
    检测结果=df["FormattedValues"].apply(lambda x: x.get("new_solution", None)),
    故障现象= df['new_error_id'].apply(lambda x: x.get('name', None) if pd.notnull(x) else None),
    发货时间 = df['new_deliveriedon'],
    一检人员 = df['laifen_systemuser_id'].apply(lambda x: x.get('name', None) if pd.notnull(x) else None),
    发货状态 = df['FormattedValues'].apply(lambda x: x.get('new_srv_rma_0.new_deliverstatus', None)),
    物流单号 = df['new_srv_rma_0.new_returnlogisticsnumber'],
    产品序列号 = df['new_userprofilesn'],
    服务人员 = df['new_srv_workorder_1.new_srv_worker_id'].apply(lambda x: x.get('name', None) if pd.notnull(x) else None),
    单据来源 = df["FormattedValues"].apply(lambda x: x.get("new_srv_rma_0.new_fromsource", None)),
    创建时间 = df["FormattedValues"].apply(lambda x: x.get("createdon", None)),
)
#    # 选择需要的列
    df = df[[ 
       '单号','产品类型', '产品名称', '处理状态', '旧件处理状态', '检测结果', '申请类别', '旧件签收时间',
       '检测时间', '一检时间', '维修完成时间', '质检完成时间', '故障现象','发货时间','发货状态',
       '一检人员','产品序列号','物流单号','分拣人员','服务人员','单据来源','创建时间'
    ]]
    logger.info(f"成功提取所需数据,共{df.shape[1]}列")
    return df

def get_sf_data():
    logger.info(f"正在下载当月的数据")
    url = generate_requrl("1")
    rs = requests.get(url)
    # print(rs.text)
    count = rs.json()['Data']['TotalRecordCount']
    logger.info(f"当月分拣业务量共{count}单,共{count//5000+2}页数据")
    datas = []

    for i in range(1, count//5000+2):
        url = generate_requrl(str(i))
        data = fetch_api_data(url,i)
        logger.info(f"第{i}页数据已获取")
        datas.append(data)
    
    df = pd.concat(datas, ignore_index=True)
    df = extract_need_data(df)
    logger.info(f"已成功下载当月数据")
    return df

df = get_sf_data()
   

2025-03-15 15:07:37,523 - INFO - 正在下载当月的数据
2025-03-15 15:07:37,523 - INFO - 正在生成第1页的URL
2025-03-15 15:07:37,524 - INFO - 成功生成第1页的URL: https://ap6-openapi.fscloud.com.cn/t/laifen/open/api/vlist/ExecuteQuery?$tenant=laifen&$timestamp=1742022457524&$reqid=2d384349-016c-11f0-958a-5084929d8019&$appid=AS_department&queryid=38c53a54-813f-a0e0-0000-06f40ebdeca5&isUserQuery=true&isPreview=false&$pageindex=1&$pagesize=5000&$paging=true&$extendConditions=[{"name":"new_checkon","val":"yesterday","op":"yesterday"}]&$sign=20146E3E5C0262DD64B9332BECB658C9D37E16108A8E02C1B5CEFA967FEA5E45
2025-03-15 15:07:38,964 - INFO - 当月分拣业务量共3227单,共2页数据
2025-03-15 15:07:38,964 - INFO - 正在生成第1页的URL
2025-03-15 15:07:38,965 - INFO - 成功生成第1页的URL: https://ap6-openapi.fscloud.com.cn/t/laifen/open/api/vlist/ExecuteQuery?$tenant=laifen&$timestamp=1742022458965&$reqid=2e141c0d-016c-11f0-bf6f-5084929d8019&$appid=AS_department&queryid=38c53a54-813f-a0e0-0000-06f40ebdeca5&isUserQuery=true&isPreview=false&$pageindex=1&$pagesize

# 处理分拣时效

In [3]:
import pandas as pd
from datetime import date
data = df.query("检测时间.notnull() & 旧件签收时间.notnull() & 申请类别 != '寄修/返修'").copy()
data['旧件签收时间'] = pd.to_datetime(data['旧件签收时间'])
data['检测时间'] = pd.to_datetime(data['检测时间'])
data['日期'] = data['检测时间'].dt.date
data['时效'] = (data['检测时间'] - data['旧件签收时间']).dt.total_seconds() / 3600
data['时效类型'] = pd.cut(data['时效'], bins=[0,4,8,12,2480],labels=['4小时内','4-8小时','8-12小时','超12小时'])
history = data[data['日期'] < date.today()]
today = data[data['日期'] == date.today()]
history = pd.DataFrame(history['时效类型'].value_counts())
history['占比'] = history['count'] / history['count'].sum()
history = history.rename(columns={'count':'数量'})
history = history.fillna(0)
history['占比'] = history['占比'].apply(lambda x:f'{round(x*100,2)}%')
print(history)
today = pd.DataFrame(today['时效类型'].value_counts())
today['占比'] = today['count'] / today['count'].sum()
today['比率'] = today['count'] / today['count'].sum()
today = today.rename(columns={'count':'数量'})
today = today.fillna(0)
today['占比'] = today['占比'].apply(lambda x:f'{round(x * 100,2)}%')
today


          数量      占比
时效类型                
4小时内    7169   74.6%
超12小时   1675  17.43%
4-8小时    731   7.61%
8-12小时    35   0.36%


Unnamed: 0_level_0,数量,占比,比率
时效类型,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
4小时内,356,97.8%,0.978022
超12小时,8,2.2%,0.021978
4-8小时,0,0.0%,0.0
8-12小时,0,0.0%,0.0


# 读取存储 质检记录

In [5]:
import requests
import time
import uuid
import hashlib
import pandas as pd
from sqlalchemy import create_engine
from my_utility import logger
import json
from datetime import datetime, timedelta, UTC
import re

def clean_string(s):
    # 去掉所有空格、非字母数字字符并将所有字符转为大写
    return re.sub(r'\s+', '', str(s)).upper()

# 获取当前日期
def get_time_interverl_condition():
    current_date = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0)

    # 计算起始日期（当前日期减去一天）
    start_date = current_date - timedelta(days=1)

    # 格式化为ISO 8601格式，包含时区信息
    start_iso = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_iso = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")

    # 构造JSON对象
    json_obj = [{
        "name": "createdon",
        "val": [start_iso, end_iso],
        "op": "between"
    }]

    # 生成JSON字符串
    json_str = json.dumps(json_obj)
    logger.info(f'查询字符串为--{json_str}')
    return json_str


def generate_requrl(pageindex):
    logger.info(f"正在生成第{pageindex}页的URL")
    """
    从 API 获取数据并转换为 DataFrame
    """
    # 基本参数
    tenant = "laifen"
    api_name = "api/vlist/ExecuteQuery"
    timestamp = str(int(time.time() * 1000))
    reqid = str(uuid.uuid1())
    appid = "AS_department"
    queryid = "cf4c2854-813f-a095-0000-06ffe67a4b77"
    is_user_query = "true"
    is_preview = "false"
    pagesize = "5000"
    paging = "true"
    key = "u7BDpKHA6VSqTScpEqZ4cPKmYVbQTAxgTBL2Gtit"
    # orderby = "createdon descending"
    # extendConditions = quote([{"name":"new_checkon","val":"this-month","op":"this-month"}], safe='')
    # additionalConditions = quote({"createdon":"","new_signedon":"","new_checkon":"","laifen_qualityrecordtime":"","laifen_servicecompletetime":""}, safe='')
    # extendConditions = '[{"name":"createdon","val":["2025-03-13T00:00:00.000Z","2025-03-15T00:00:00.000Z"],"op":"between"}]'
    # extendConditions = '[{"name":"createdon","val":"before-today","op":"before-today"},{"name":"createdon","val":"1","op":"last-x-days"}]'
    # extendConditions = get_time_interverl_condition()
    extendConditions = '[{"name":"createdon","val":"yesterday","op":"yesterday"}]'

    args = [appid, extendConditions, pageindex, pagesize, paging, reqid, tenant, timestamp, is_preview, is_user_query,
            queryid, key]

    """
    生成签名
    """

    sign_str = "".join(args)
    sign = hashlib.sha256(sign_str.encode('utf-8')).hexdigest().upper()
    # 构建 URL
    url = (
        f"https://ap6-openapi.fscloud.com.cn/t/{tenant}/open/{api_name}"
        f"?$tenant={tenant}&$timestamp={timestamp}&$reqid={reqid}&$appid={appid}"
        f"&queryid={queryid}&isUserQuery={is_user_query}&isPreview={is_preview}"
        f"&$pageindex={pageindex}&$pagesize={pagesize}&$paging={paging}"
        f"&$extendConditions={extendConditions}&$sign={sign}"
    )
    logger.info(f"成功生成第{pageindex}页的URL: {url}")
    return url


def fetch_api_data(url, page):
    logger.info(f"正在获取第{page}页数据")
    # 发送 GET 请求
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"API 请求失败，状态码: {response.status_code}")

    # 解析 JSON 数据
    data = response.json()
    entities = data["Data"]["Entities"]
    df = pd.DataFrame(entities)
    logger.info(f"第{page}页数据，已通过API获取成功获取")
    return df


def extract_need_data(df):
    api_data = pd.DataFrame()
    api_data = api_data.assign(
        服务单=df['new_workorder_id'].apply(lambda x: x.get("name", None) if pd.notnull(x) else None),
        创建时间=df["FormattedValues"].apply(lambda x: x.get("createdon", None)),
        质检结果=df['FormattedValues'].apply(lambda x: x.get("new_result", None)),
        负责人=df['ownerid'].apply(lambda x: x.get("name", None) if pd.notnull(x) else None),
        服务人员=df['new_srv_workorder_0.new_srv_worker_id'].apply(
            lambda x: x.get('name', None) if pd.notnull(x) else None),
        产品类别=df['laifen_productgroup_id'].apply(lambda x: x.get("name", None) if pd.notnull(x) else None),
        故障现象=df['new_srv_workorder_0.laifen_error_id'].apply(
            lambda x: x.get("name", None) if pd.notnull(x) else None),
        质检说明=df['new_memo'].apply(lambda x: x if pd.notnull(x) else None),
    )
    #    # 选择需要的列
    api_data['质检说明'] = api_data['质检说明'].map(clean_string)
    logger.info(f"成功提取所需数据,共{api_data.shape[1]}列")
    return api_data


def get_qcrecord_data():
    logger.info(f"正在下载质检的数据")
    url = generate_requrl("1")
    rs = requests.get(url)
    count = rs.json()['Data']['TotalRecordCount']
    logger.info(f"当天质检数量共{count}单,共{count // 5000 + 2}页数据")
    datas = []

    for i in range(1, count // 5000 + 2):
        url = generate_requrl(str(i))
        data = fetch_api_data(url, i)
        logger.info(f"第{i}页数据已获取")
        datas.append(data)

    df = pd.concat(datas, ignore_index=True)
    df = extract_need_data(df)
    logger.info(f"已成功下载质检数据")
    return df


# 获取当前日期


def sync_qcrecord_data():
    conn = create_engine("mysql+pymysql://root:000000@localhost/demo")
    qcdata = get_qcrecord_data()

    rows = qcdata.to_sql('qc_record', conn, if_exists='append', index=False)
    if rows:
        from asbot import AsBot
        asbot = AsBot('人机黄乾')
        asbot.send_text_to_group(f'{datetime.now().date()}成功插入{rows}条质检记录')
        logger.info(f'成功插入{rows}条质检记录')
    else:
        logger.info('更换配件数据更新失败')


if '__main__' == __name__:
    get_qcrecord_data()



2025-04-05 08:24:57,100 - INFO - 正在下载当月的数据
2025-04-05 08:24:57,101 - INFO - 正在生成第1页的URL
2025-04-05 08:24:57,102 - INFO - 成功生成第1页的URL: https://ap6-openapi.fscloud.com.cn/t/laifen/open/api/vlist/ExecuteQuery?$tenant=laifen&$timestamp=1743812697102&$reqid=67292439-11b4-11f0-af94-5084929d8019&$appid=AS_department&queryid=cf4c2854-813f-a095-0000-06ffe67a4b77&isUserQuery=true&isPreview=false&$pageindex=1&$pagesize=5000&$paging=true&$extendConditions=[{"name":"createdon","val":"yesterday","op":"yesterday"}]&$sign=1EF0431F711BA4AD6D85E2152B1BBDBE90E84B4993ED642F4999CB0955618202
2025-04-05 08:24:57,348 - INFO - 当月分拣业务量共0单,共2页数据
2025-04-05 08:24:57,348 - INFO - 正在生成第1页的URL
2025-04-05 08:24:57,349 - INFO - 成功生成第1页的URL: https://ap6-openapi.fscloud.com.cn/t/laifen/open/api/vlist/ExecuteQuery?$tenant=laifen&$timestamp=1743812697349&$reqid=674ed85a-11b4-11f0-8daa-5084929d8019&$appid=AS_department&queryid=cf4c2854-813f-a095-0000-06ffe67a4b77&isUserQuery=true&isPreview=false&$pageindex=1&$pagesize=5000

{"ErrorCode":0,"Message":null,"Data":{"EntityName":null,"MinActiveRowVersion":null,"MoreRecords":true,"PagingCookie":null,"TotalRecordCount":0,"TotalRecordCountLimitExceeded":false,"CountType":0,"Entities":[]}}


2025-04-05 08:24:57,588 - INFO - 第1页数据，已通过API获取成功获取
2025-04-05 08:24:57,589 - INFO - 第1页数据已获取
2025-04-05 08:24:57,590 - INFO - 已成功下载当月数据


Empty DataFrame
Columns: []
Index: []


In [4]:
import json
from datetime import datetime, timedelta,UTC

# 获取当前日期
def get_time_interverl_condition():
    current_date = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0)
    
    # 计算起始日期（当前日期减去一天）
    start_date = current_date - timedelta(days=1)
    
    current_date = current_date
    
    # 格式化为ISO 8601格式，包含时区信息
    start_iso = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_iso = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")
    
    # 构造JSON对象
    json_obj = [{
        "name": "createdon",
        "val": [start_iso, end_iso],
        "op": "between"
    }]
    
    # 生成JSON字符串
    json_str = json.dumps(json_obj)

    return json_str

get_time_interverl_condition()
    
    


'[{"name": "createdon", "val": ["2025-03-08T16:00:00Z", "2025-03-09T16:00:00Z"], "op": "between"}]'

# 读取存储服务单更换配件明细

In [None]:
import requests
import time
import uuid
import hashlib
import pandas as pd
from my_utility import logger
import json
from sqlalchemy import create_engine
conn = create_engine("mysql+pymysql://root:000000@localhost/demo")


def generate_requrl(pageindex):
    logger.info(f"正在生成第{pageindex}页的URL")
    """
    从 API 获取数据并转换为 DataFrame
    """
    # 基本参数
    tenant = "laifen"
    api_name = "api/vlist/ExecuteQuery"
    timestamp = str(int(time.time() * 1000))
    reqid = str(uuid.uuid1())
    appid = "AS_department"
    queryid = "a8532354-813f-a0c1-0000-070e7581f6a1"
    is_user_query = "true"
    is_preview = "false"
    pagesize = "5000"
    paging = "true"
    key = "u7BDpKHA6VSqTScpEqZ4cPKmYVbQTAxgTBL2Gtit"
    # orderby = "createdon descending"
    # extendConditions = quote([{"name":"new_checkon","val":"this-month","op":"this-month"}], safe='')
    # additionalConditions = quote({"createdon":"","new_signedon":"","new_checkon":"","laifen_qualityrecordtime":"","laifen_servicecompletetime":""}, safe='')
    # extendConditions = '[{"name":"createdon","val":["2025-03-05T00:00:00.000Z","2025-03-06T00:00:00.000Z"],"op":"between"}]'
    extendConditions = '[{"name":"createdon","val":["2024-12-31T16:00:00.000Z","2025-03-07T16:00:00.000Z"],"op":"between"}]'

    args = [appid, extendConditions, pageindex, pagesize, paging, reqid, tenant, timestamp, is_preview, is_user_query,
            queryid, key]

    """
    生成签名
    """

    sign_str = "".join(args)
    sign = hashlib.sha256(sign_str.encode('utf-8')).hexdigest().upper()
    # 构建 URL
    url = (
        f"https://ap6-openapi.fscloud.com.cn/t/{tenant}/open/{api_name}"
        f"?$tenant={tenant}&$timestamp={timestamp}&$reqid={reqid}&$appid={appid}"
        f"&queryid={queryid}&isUserQuery={is_user_query}&isPreview={is_preview}"
        f"&$pageindex={pageindex}&$pagesize={pagesize}&$paging={paging}"
        f"&$extendConditions={extendConditions}&$sign={sign}"
    )
    logger.info(f"成功生成第{pageindex}页的URL: {url}")
    return url

def fetch_api_data(url, page):
    logger.info(f"正在获取第{page}页数据")
    # 发送 GET 请求
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"API 请求失败，状态码: {response.status_code}")

    # 解析 JSON 数据
    data = response.json()
    entities = data["Data"]["Entities"]
    df = pd.DataFrame(entities)
    logger.info(f"第{page}页数据，已通过API获取成功获取")
    return df

def extract_need_data(df):
    api_data = pd.DataFrame()
    api_data = api_data.assign(
        服务单=df["new_srv_workorder_0.new_name"],
        创建时间 = df['FormattedValues'].apply(lambda x: x.get('createdon',None) if pd.notnull(x) else None),
        备件名称 = df['new_product_id'].apply(lambda x: x.get("name", None) if pd.notnull(x) else None),
        备件编码=df["new_name"].apply(lambda x: x if pd.notnull(x) else None),
        创建者 = df['createdby'].apply(lambda x: x.get("name", None) if pd.notnull(x) else None),
        产品名称 = df['new_srv_workorder_0.laifen_product_id'].apply(lambda x: x.get('name',None) if pd.notnull(x) else None),
        产品类别 = df['new_srv_workorder_0.laifen_productgroup_id'].apply(lambda x: x.get("name", None) if pd.notnull(x) else None),
        故障现象 = df['new_srv_workorder_0.laifen_error_id'].apply(lambda x: x.get("name", None) if pd.notnull(x) else None),
        数量=df["new_qty"].apply(lambda x: x if pd.notnull(x) else None),
    )
    #    # 选择需要的列

    logger.info(f"成功提取所需数据,共{api_data.shape[1]}列")
    return api_data

def get_cg_efficiency_data():
    logger.info(f"正在下载当月的数据")
    url = generate_requrl("1")
    rs = requests.get(url)
    print(rs.text)
    count = rs.json()['Data']['TotalRecordCount']
    logger.info(f"当月分拣业务量共{count}单,共{count // 5000 + 2}页数据")
    datas = []

    for i in range(1, count // 5000 + 2):
        url = generate_requrl(str(i))
        data = fetch_api_data(url, i)
        logger.info(f"第{i}页数据已获取")
        datas.append(data)

    df = pd.concat(datas, ignore_index=True)
    df = extract_need_data(df)
    logger.info(f"已成功下载当月数据")
    return df


qcdata = get_cg_efficiency_data()
qcdata.to_sql('srv_change_components', conn, if_exists='replace', index=False)


In [8]:
import pandas as pd
from sqlalchemy import create_engine
pd.read_excel(r"E:\业务量BI数据\业务量数据.xlsx").to_sql('product_bv',create_engine('mysql+pymysql://root:000000@localhost/demo'),if_exists='replace',index=True,index_label='id')

600

In [5]:
import time
print(time.time())
print('dwe')

1742277517.3538322
