In [None]:
import json
import time
import requests

In [None]:
CS1 = {"GFD":12340,"GHE":123}
dict(CS1)

{'GFD': 12340, 'GHE': 123}

In [None]:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""基于 aes文件 基础加密功能 封装 openapi签名算法"""
import orjson
from typing import Union
from aes import aes_encrypt, md5_encrypt


class SignBase(object):

    @classmethod
    def generate_sign(cls, encrypt_key: str, request_params: dict) -> str:
        """
        生成签名
        """
        canonical_querystring = cls.format_params(request_params)
        md5_str = md5_encrypt(canonical_querystring).upper()
        sign = aes_encrypt(encrypt_key, md5_str)
        return sign

    @classmethod
    def format_params(cls, request_params: Union[None, dict] = None) -> str:
        """
        格式化 params
        """
        if not request_params or not isinstance(request_params, dict):
            return ''

        canonical_strs = []
        sort_keys = sorted(request_params.keys())
        for k in sort_keys:
            v = request_params[k]
            if v == "":
                continue
            elif isinstance(v, (dict, list)):
                # 如果直接使用 json, 则必须使用separators=(',',':'), 去除序列化后的空格, 否则 json中带空格就导致签名异常
                # 使用 option=orjson.OPT_SORT_KEYS 保证dict进行有序 序列化(因为最终要转换为 str进行签名计算, 需要保证有序)
                canonical_strs.append(f"{k}={orjson.dumps(v, option=orjson.OPT_SORT_KEYS).decode()}")
            else:
                canonical_strs.append(f"{k}={v}")
        return "&".join(canonical_strs)


In [None]:
class lingxingapi():

    def __init__(self) -> None:
        self.appId = "ak_zmVq5LaPMxZzx"
        self.appSecret = "01FgWJe56AkTEug9FE+W9g=="

    # 获取登录令牌
    def __getAccessToken__(self) -> str:
        url = "https://openapi.lingxing.com/api/auth-server/oauth/access-token"
        payload = {
            "appId":self.appId,
            "appSecret":self.appSecret
        }
        headers = {}
        response = requests.request("POST", url, headers=headers, data=payload)
        return response.json()["data"]["access_token"]

    # 获取查询亚马逊Listing
    def __getAmzListing__(self, sid : str):
        result_response = []
        offset = 0
        while True:
            payload = {
                "app_key":self.appId,
                "access_token":self.__getAccessToken__(),
                "timestamp":int(time.time()),
                "offset": offset,
                "length": 1000,
                "sid":sid
            }
            payload.update({"sign":SignBase.generate_sign(self.appId,payload)})
            headers = {}
            api_url = "/erp/sc/data/mws/listing"
            url = f"https://openapi.lingxing.com{api_url}?" + "&".join([key + "=" + str(payload[key]) for key in payload])
            response = requests.request("POST", url, headers=headers, json=payload).json()
            result_response.extend(response["data"])
            if len(response["data"]) != 1000:
                break
            else:
                offset += 1000
        return result_response

    # 查询亚马逊店铺列表
    def __AmazonStore__(self) -> json:
        payload = {
            "app_key":self.appId,
            "access_token":self.__getAccessToken__(),
            "timestamp":int(time.time()),
            "is_parant_asin_merge":1
        }
        payload.update({"sign":SignBase.generate_sign(self.appId,payload)})
        headers = {}
        api_url = "/erp/sc/data/seller/lists"
        url = f"https://openapi.lingxing.com{api_url}?" + "&".join([key + "=" + str(payload[key]) for key in payload])
        response = requests.request("POST", url, headers=headers, json=payload).json()
        return response
        
    # 销量统计
    def __SalesStatistics__(self,start_date,end_date) -> json:
        result_response = []
        offset = 0
        while True:
            payload = {
                "app_key":self.appId,
                "access_token":self.__getAccessToken__(),
                "timestamp":int(time.time()),
                "offset": offset,
                "length": 10000,
                "result_type":1,
                "start_date":start_date,
                "end_date":end_date,
                "date_unit":4,
                "data_type":2
            }
            payload.update({"sign":SignBase.generate_sign(self.appId,payload)})
            headers = {}
            api_url = "/basicOpen/platformStatisticsV2/saleStat/pageList"
            url = f"https://openapi.lingxing.com{api_url}?" + "&".join([key + "=" + str(payload[key]) for key in payload])
            response = requests.request("POST", url, headers=headers, json=payload).json()
            result_response.extend(response["data"])
            if len(response["data"]) != 10000:
                break
            else:
                offset += 10000
        return result_response

In [None]:
from datetime import datetime, timedelta

def generate_date_range():
    # 起始日期
    start_date = datetime(2023, 1, 1)
    # 获取今天的日期
    end_date = datetime.now()
    
    # 计算日期范围的天数
    delta = end_date - start_date
    
    # 生成日期列表
    date_list = []
    for i in range(delta.days + 1):
        date = start_date + timedelta(days=i)
        date_list.append(date.strftime('%Y-%m-%d'))
    
    return date_list

# 调用函数获取日期列表
dates = generate_date_range()

In [None]:
dates

In [None]:
result_dict = {}
for _date in dates:
    time.sleep(13)
    response_1 = lingxingapi().__SalesStatistics__(start_date=_date,end_date=_date)
    for _data in response_1:
        if len(_data["parentAsin"]) == 0 or len(_data["store_name"]) == 0:
            continue
        if _data["parentAsin"][0] + "|" + _data["store_name"][0] in result_dict:
            result_dict[_data["parentAsin"][0] + "|" + _data["store_name"][0]][_date] = _data["volumeTotal"]
        else:
            result_dict.update({_data["parentAsin"][0] + "|" + _data["store_name"][0]:{_date:_data["volumeTotal"]}})

In [None]:
import pandas as pd
import os
name_list = os.listdir(r"C:\Project\zlwl_pure_backend\PMC\static\ParentAsin_fiels")
for _data in result_dict:
    if str(_data).replace("|",",") + ".xlsx" in name_list:
        continue
    print(_data)
    df = pd.DataFrame(columns=["日期","销量"])
    for _data_1 in result_dict[_data]:
        df.loc[len(df)] = [_data_1,result_dict[_data][_data_1]]
    df.to_excel(os.path.join(r"C:\Project\zlwl_pure_backend\PMC\static\ParentAsin_fiels",str(_data).replace("|",",").replace("/","-")+".xlsx"),index=False)

In [None]:
response_2 = lingxingapi().__SalesStatistics__(start_date='2025-01-01',end_date='2025-02-20')
len(response_2)

In [None]:
import pandas as pd
import os
from datetime import datetime

def process_excel_files(folder_path):
    # 获取所有Excel文件
    excel_files = [f for f in os.listdir(folder_path) if f.endswith(('.xlsx', '.xls'))]
    
    for file in excel_files:
        file_path = os.path.join(folder_path, file)
        try:
            # 读取Excel文件
            df = pd.read_excel(file_path)
            
            # 检查是否有"日期"列
            if '日期' not in df.columns:
                print(f"文件 {file} 中未找到'日期'列，跳过处理")
                continue
            
            # 转换日期列格式
            try:
                df['日期'] = pd.to_datetime(df['日期'])
            except:
                print(f"文件 {file} 的'日期'列格式无法转换，尝试其他格式")
                # 尝试其他可能的日期格式
                try:
                    df['日期'] = pd.to_datetime(df['日期'], errors='coerce')
                except:
                    print(f"文件 {file} 的'日期'列格式无法识别，跳过处理")
                    continue
            
            # 删除日期为空的行
            df = df.dropna(subset=['日期'])
            
            # 按日期排序
            df = df.sort_values(by='日期')
            
            # 删除重复日期的行，保留第一个
            df = df.drop_duplicates(subset='日期', keep='first')
            
            # 原地保存（覆盖原文件）
            df.to_excel(file_path, index=False)
            
            print(f"文件 {file} 处理完成并已原地保存")
            
        except Exception as e:
            print(f"处理文件 {file} 时出错: {str(e)}")

# 使用示例
if __name__ == "__main__":
    # 输入文件夹路径，存放原始Excel文件
    input_folder = r"C:\Project\zlwl_pure_backend\PMC\static\msku_files"
    
    process_excel_files(input_folder)

In [None]:
import pandas as pd
import os
from datetime import datetime

def process_excel_files(folder_path, target_date="2025-05-10"):
    # 获取所有Excel文件
    excel_files = [f for f in os.listdir(folder_path) if f.endswith(('.xlsx', '.xls'))]
    
    for file in excel_files:
        file_path = os.path.join(folder_path, file)
        try:
            # 读取Excel文件
            df = pd.read_excel(file_path)
            
            # 检查是否有"日期"列
            if '日期' not in df.columns:
                print(f"文件 {file} 中未找到'日期'列，跳过处理")
                continue
            
            # 转换日期列格式
            try:
                df['日期'] = pd.to_datetime(df['日期'])
            except:
                print(f"文件 {file} 的'日期'列格式无法转换，尝试其他格式")
                # 尝试其他可能的日期格式
                try:
                    df['日期'] = pd.to_datetime(df['日期'], errors='coerce')
                except:
                    print(f"文件 {file} 的'日期'列格式无法识别，跳过处理")
                    continue
            
            # 删除日期为空的行
            df = df.dropna(subset=['日期'])
            
            # 删除包含目标日期的所有行
            target_date_obj = pd.to_datetime(target_date)
            rows_before = len(df)
            df = df[df['日期'].dt.date != target_date_obj.date()]
            rows_deleted = rows_before - len(df)
            
            print(f"从文件 {file} 中删除了 {rows_deleted} 行包含日期 {target_date} 的记录")
            
            # 按日期排序
            df = df.sort_values(by='日期')
            
            # 删除重复日期的行，保留第一个
            df = df.drop_duplicates(subset='日期', keep='first')
            
            # 原地保存（覆盖原文件）
            df.to_excel(file_path, index=False)
            
            print(f"文件 {file} 处理完成并已原地保存")
            
        except Exception as e:
            print(f"处理文件 {file} 时出错: {str(e)}")

# 使用示例
if __name__ == "__main__":
    # 输入文件夹路径，存放原始Excel文件
    input_folder = r"C:\Project\zlwl_pure_backend\PMC\static\ParentAsin_fiels"
    
    # 指定要删除的目标日期
    target_date = "2025-05-10"
    
    process_excel_files(input_folder, target_date)

In [None]:
from datetime import datetime, timedelta

def get_dates_in_range(date_range_str):
    """从日期区间字符串获取详细日期列表"""
    # 分割日期区间
    start_str, end_str = date_range_str.split('-')
    
    # 解析开始和结束日期（处理可能的不同分隔符）
    start_date = datetime.strptime(start_str, '%Y.%m.%d')
    end_date = datetime.strptime(end_str, '%Y.%m.%d')
    
    # 生成日期列表
    date_list = []
    current_date = start_date
    while current_date <= end_date:
        date_list.append(current_date.strftime('%Y-%m-%d'))
        current_date += timedelta(days=1)
    
    return date_list

# 示例使用
date_range = "2025.05.05-2025.05.11"
dates = get_dates_in_range(date_range)

print(f"日期区间 {date_range} 中的详细日期:")
for date in dates:
    print(date)

In [37]:
import requests
qd = {
    "亚马逊目标业绩-父ASIN": "http://192.168.1.126:8000/TargetPerformance/amazon_partASIN/",
    "亚马逊目标业绩-子ASIN": "http://192.168.1.126:8000/TargetPerformance/amazon_asin/",
    # "沃尔玛目标业绩": "http://192.168.1.126:8000/TargetPerformance/waller_target/", # 周三
    "沃尔玛WFS库存更新": "http://192.168.1.126:8000/TargetPerformance/waller_wfs/",
    "亚马逊PMC备货计划": "http://192.168.1.126:8000/PMC/pmc_bhdata/", # 周一 下午5点
    "亚马逊PMC采购计划": "http://192.168.1.126:8000/PMC/pmc_cgdata/",
    # "AI自主学习模型": "http://192.168.1.126:8000/PMC/pmc_aimodel/", # 周日
    "领星库存明细": "http://192.168.1.126:8000/PMC/pmc_warehouse/",
    "亚马逊PMC采购计划-采购单": "http://192.168.1.126:8000/PMC/cg_orderpurchase/",
    "财务PMC成本定价": "http://192.168.1.126:8000/PMC/cw_costbasedpricing/"
}

In [38]:
response_1 = requests.post(url=qd["亚马逊PMC采购计划-采购单"])
print(response_1.text)

请等待几分钟!


In [23]:
from datetime import datetime, timedelta  
# 获取未来第12周的周一与周日日期
monday = datetime.now() - timedelta(days=datetime.now().weekday())
next_monday = monday + timedelta(weeks=11)
next_sunday = next_monday + timedelta(days=6)

In [24]:
monday,next_monday,next_sunday

(datetime.datetime(2025, 6, 2, 10, 0, 37, 862747),
 datetime.datetime(2025, 8, 18, 10, 0, 37, 862747),
 datetime.datetime(2025, 8, 24, 10, 0, 37, 862747))

In [None]:
import schedule
import time
from datetime import datetime

def job():
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"任务执行时间：{current_time}")
    for _data in qd:
        if _data == "AI自主学习模型":
            continue
        response_1 = requests.post(url=qd[_data])
        print(response_1.text)
        time.sleep(900)
    # 执行完毕后返回 CancelJob 取消任务
    return schedule.CancelJob

# 设定定时规则：每天凌晨6点执行
schedule.every().day.at("07:00").do(job)

print("定时器已启动，等待执行任务...")
while True:
    schedule.run_pending()
    # 如果没有待执行的任务，退出循环
    if not schedule.jobs:
        print("任务已执行完毕，程序退出")
        break
    time.sleep(1)

定时器已启动，等待执行任务...
任务执行时间：2025-05-31 07:00:00
请等待几分钟!
请等待几分钟!
请等待几分钟!
请等待几分钟!
请等待几分钟!
请等待几分钟!
任务已执行完毕，程序退出


In [3]:
import schedule
import time
from datetime import datetime

def job():
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"任务执行时间：{current_time}")
    response_1 = requests.post(url=qd["AI自主学习模型"])
    print(response_1.text)
    # 执行完毕后返回 CancelJob 取消任务
    return schedule.CancelJob

# 设定定时规则：每天凌晨6点执行
schedule.every().day.at("10:00").do(job)

print("定时器已启动，等待执行任务...")
while True:
    schedule.run_pending()
    # 如果没有待执行的任务，退出循环
    if not schedule.jobs:
        print("任务已执行完毕，程序退出")
        break
    time.sleep(1)

定时器已启动，等待执行任务...
任务执行时间：2025-05-31 10:00:00


ConnectionError: HTTPConnectionPool(host='192.168.1.126', port=8000): Max retries exceeded with url: /PMC/pmc_aimodel/ (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x00000148C5096E10>: Failed to establish a new connection: [WinError 10061] 由于目标计算机积极拒绝，无法连接。'))

In [None]:
print("11")

In [1]:
import hashlib
import json
import time
import requests

# 固定参数
app_key = "2aa6ce6a155fdceebec72edec9c7abf8"
app_name = "sales"
sid = "sales"

# 动态参数
timestamp = str(int(time.time()))
request_body = {
    "tradeStatusCode": 2,
    "pageNo": 1,
    "pageSize": 10,
    "createTimeBegin": "2025-04-01 00:00:00",
    "createTimeEnd": "2025-04-11 00:00:00"
}
json_body = json.dumps(request_body, separators=(',', ':'))

# 构建签名参数（保持顺序）
params = [
    ("appName", app_name),
    ("body", json_body),
    ("sid", sid),
    ("timestamp", timestamp)
]

# 生成签名字符串
sign_str = app_key
for key, value in params:
    sign_str += key + value
sign_str += app_key

# 计算MD5签名
signature = hashlib.md5(sign_str.encode('utf-8')).hexdigest()

# 构造请求
url = "https://openapi.qizhishangke.com/api/openservices/trade/v1/getSalesTradeList"
query_params = {
    "sid": sid,
    "appName": app_name,
    "timestamp": timestamp,
    "sign": signature
}

headers = {"Content-Type": "application/json"}
response = requests.post(
    url,
    params=query_params,
    data=json_body,
    headers=headers
)

print("Status Code:", response.status_code)
print("Response:", response.json())

Status Code: 200
Response: {'code': 200, 'data': {'currentPage': 1, 'data': [{'badDelivery': False, 'buyerMessage': '11   212', 'buyerNick': '', 'commission': '0.0000', 'country': '美国', 'created': '2025-04-09T10:59:16', 'csRemark': '11   111', 'currencyCode': 'USD', 'deliveryMobile': '', 'deliveryType': '款到发货', 'discount': '0.0000', 'discountCodes': '', 'erpRemark': '11   211', 'estimatedVat': '0.0000', 'exchangeRate': '7.169600', 'flagList': [], 'frozen': False, 'logisticsId': 3, 'logisticsNo': '', 'logisticsText': '递四方海外仓物流', 'modified': '2025-04-18T11:40:23', 'outerNo': '', 'payTime': '2025-04-01T16:54:36', 'platformId': 7, 'platformMaskList': ['shopify平台危险订单标识_中', 'shopify平台危险订单标识_低'], 'postAmount': 6.0, 'postCost': '0.0000', 'receivable': 82.98, 'received': 82.98, 'receiverAddress': '5329 N 33rd St', 'receiverAddress2': '', 'receiverCity': 'Gainesville', 'receiverDistrict': '', 'receiverMobile': '', 'receiverName': 'Yvette McCarthy', 'receiverProvince': 'Georgia', 'receiverTelno':