In [None]:
import os
import json
from typing import List

from alibabacloud_sls20201230.client import Client as Sls20201230Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_sls20201230 import models as sls_20201230_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
import os
import pandas as pd
from odps import ODPS
from datetime import datetime, timedelta

ALIBABA_CLOUD_ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]
ALIBABA_CLOUD_ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]

HOURS_TO_RUN=10

odps = ODPS(
    ALIBABA_CLOUD_ACCESS_KEY_ID,
    ALIBABA_CLOUD_ACCESS_KEY_SECRET,
    project="summerfarm_ds_dev",
    endpoint="http://service.cn-hangzhou.maxcompute.aliyun.com/api",
)

config = open_api_models.Config(
            access_key_id=ALIBABA_CLOUD_ACCESS_KEY_ID,
            access_key_secret=ALIBABA_CLOUD_ACCESS_KEY_SECRET
        )
        # Endpoint 请参考 https://api.aliyun.com/product/Sls
config.endpoint = f'cn-hangzhou.log.aliyuncs.com'
sls_client=Sls20201230Client(config)

def get_odps_sql_result_as_df(sql):
    instance = odps.execute_sql(
        sql,
        hints={"odps.sql.hive.compatible": True, "odps.sql.type.system.odps2": True},
    )
    instance.wait_for_success()
    pd_df = None
    with instance.open_reader(tunnel=True) as reader:
        # type of pd_df is pandas DataFrame
        pd_df = reader.to_pandas()

    if pd_df is not None:
        print(f"sql:\n{sql}\ncolumns:{pd_df.columns}")
        return pd_df
    return None

In [None]:
# 获取SLS日志的查询结果

headers = {
    "accept": "application/json",
    "user-agent": "AlibabaCloud API Workbench",
    "x-log-apiversion": "0.6.0",
    "x-log-bodyrawsize": "0",
    "x-log-signaturemethod": "hmac-sha256",
    "content-type": "application/json",
}

from_time = int(datetime.now().timestamp() - 3600*HOURS_TO_RUN)
to_time = int(datetime.now().timestamp())

print(from_time, to_time)

get_logs_v2headers = sls_20201230_models.GetLogsV2Headers(
    common_headers=headers, accept_encoding="lz4"
)
query="""
msg:"消息发送成功" and inboundFlag:area_sku and msg:"body" | 
select 
    sku,old_price,current_price,array_agg(distinct area_no)area_no_list,updater
    ,date_format(time,'%Y-%m-%d %H:%i') hour_of_day,current_price - old_price as price_diff
    ,abs(round(100.0*(current_price - old_price)/old_price,2)) as price_diff_ratio 
    from (
        select cast(json_extract(split(msg,'body:')[2],'$.type') as varchar) type
        ,cast(json_extract(split(msg,'body:')[2],'$.old[0].price') as double) as old_price
        ,cast(json_extract(split(msg,'body:')[2],'$.data[0].sku') as varchar) sku
        ,cast(json_extract(split(msg,'body:')[2],'$.data[0].area_no') as varchar) area_no
        ,cast(json_extract(split(msg,'body:')[2],'$.data[0].price') as double) current_price
        ,cast(json_extract(split(msg,'body:')[2],'$.data[0].updater') as varchar) updater,time from log) 
    where type like 'UPDATE' and old_price is not null 
    group by updater,sku,old_price,current_price,hour_of_day 
    having price_diff_ratio >=50.0 
    order by price_diff_ratio desc
"""
print(query)
get_logs_v2request = sls_20201230_models.GetLogsV2Request(from_=from_time, to=to_time, query=query)
runtime = util_models.RuntimeOptions()

alert_data_list=[]
try:
    # 复制代码运行请自行打印 API 的返回值
    response=sls_client.get_logs_v2with_options(
        "k8s-log-c7d28cba17d0a416ca4f52459592b8d38",
        "prod-dts-stdout-log",
        get_logs_v2request,
        get_logs_v2headers,
        runtime,
    )
    alert_data_list=response.body.data
    print(alert_data_list)
except Exception as error:
    # 此处仅做打印展示，请谨慎对待异常处理，在工程项目中切勿直接忽略异常。
    # 错误 message
    print(error)
    # 诊断地址
    # print(error.data.get("Recommend"))
    # UtilClient.assert_as_string(error.message)

In [None]:
if alert_data_list is None or len(alert_data_list) <= 0:
    print(f"恭喜啦，过去{HOURS_TO_RUN}小时没有发现价格异常:{alert_data_list}")
    raise SystemExit

sku_array = [item["sku"] for item in alert_data_list]
area_no_list = [json.loads(item["area_no_list"]) for item in alert_data_list]

area_no_list_all = set()
for sub_list in area_no_list:
    area_no_list_all.update(sub_list)


area_df = None
sku_df = None


def get_area_info_by_ids(area_no_set):
    if len(area_no_set) <= 0:
        return None
    area_sql = f"""SELECT area_no,area_name,administrative_area
    FROM summerfarm_tech.ods_area_df WHERE ds=MAX_PT('summerfarm_tech.ods_area_df') and area_no in ({",".join(area_no_set)});"""
    return get_odps_sql_result_as_df(area_sql)


def get_sku_info_by_ids(sku_array):
    if len(sku_array) <= 0:
        return None
    sku_array = [f"'{item}'" for item in sku_array]
    sku_sql = f"""SELECT sku_id,concat(spu_name,':',sku_spec) sku_name
    FROM summerfarm_tech.dim_sku_df WHERE ds=MAX_PT('summerfarm_tech.dim_sku_df') and sku_id in ({','.join(sku_array)});"""
    return get_odps_sql_result_as_df(sku_sql)


area_df = get_area_info_by_ids(area_no_list_all)
sku_df = get_sku_info_by_ids(sku_array)

In [None]:
# 拼接area和SKU信息

sku_map={}
if sku_df is not None:
    for index,row in sku_df.iterrows():
        sku_map[row['sku_id']]=row['sku_name']

area_map={}
if area_df is not None:
    for index,row in area_df.iterrows():
        area_map[row['area_no']]=row['area_name']

data_list=[]
for row in alert_data_list:
    row['sku_name']=sku_map[row['sku']]
    area_no_list=json.loads(row['area_no_list'])
    area_no_list=[area_map[int(item)] for item in area_no_list]
    row['area_names']=",".join(area_no_list)
    print(row)

In [None]:
# 发送告警
import requests

feishu_url = (
    "https://open.feishu.cn/open-apis/bot/v2/hook/60222790-74a1-4bfb-a0f9-b6fe464ad3ab"
)

markdown_content = ""
for alert_data in alert_data_list:
    is_negative = ""
    if float(alert_data["price_diff"]) < 0:
        is_negative = "-"
    markdown_content = f"{markdown_content}**【<font color='blue'>SKU:{alert_data['sku']}, {alert_data['sku_name']}</font>】**\n\n"
    markdown_content = f"{markdown_content}- 价格波动:**{alert_data['price_diff']}**\n"
    markdown_content = f"{markdown_content}- 波动幅度:**<font color='red'>{is_negative}{alert_data['price_diff_ratio']}%</font>**\n"
    markdown_content = f"{markdown_content}- 当前价格:{alert_data['current_price']}\n"
    markdown_content = f"{markdown_content}- 历史价格:{alert_data['old_price']}\n"
    markdown_content = f"{markdown_content}- 运营区域:{alert_data['area_names']}\n"
    markdown_content = f"{markdown_content}- 最近操作人:{alert_data['updater']}\n"
    markdown_content = f"{markdown_content}- 操作时间:{alert_data['hour_of_day']}\n\n\n"

data = {
    "msg_type": "interactive",
    "card": {
        "header": {
            "template": "red",
            "title": {
                "content": f"**<font color='red'>过去 {HOURS_TO_RUN}小时发现了 {len(alert_data_list)}条SKU价格波动异常</font>**",
                "tag": "lark_md",
            },
        },
        "elements": [
            {
                "tag": "markdown",
                "content": markdown_content,
            }
        ],
    },
}

print(data)

headers = {"Content-Type": "application/json"}
if len(alert_data_list)>0:
    feishu_result = requests.post(
        url=feishu_url, json=data, headers=headers, verify=False
    ).json()
    print(feishu_result)
else:
    print(f"恭喜，过去 {HOURS_TO_RUN} 小时未发现价格异常。")