# Boss 直聘自动化 - 决策与通知 Demo

本 Notebook 演示如下流程：
- 使用 `jobs/criteria.yaml` 录入岗位要求
- 获取在线简历（服务已实现），必要时进行 OCR 转 Markdown
- 结合 YAML 岗位要求 + 简历文本，调用 OpenAI 输出是否「打招呼」决策
- 若决策为 greet，发送钉钉通知（可选，需配置 `DINGTALK_WEBHOOK`）

前置条件：
- 服务已运行，并能打开聊天页与在线简历
- 建议设置环境变量：`OPENAI_API_KEY`（如使用 OpenAI 方案）与可选 `DINGTALK_WEBHOOK`


In [1]:
import os
from boss_client import BossClient

BASE_URL = os.environ.get('BOSS_SERVICE_URL', 'http://127.0.0.1:5001')
client = BossClient(BASE_URL)
client.get_status()


{'status': 'running',
 'logged_in': False,
 'timestamp': '2025-09-23T13:05:11.066421',
 'notifications_count': 9}

In [2]:
# 读取岗位要求 YAML
import yaml, json
with open('jobs/criteria.yaml','r',encoding='utf-8') as f:
    criteria = yaml.safe_load(f)
criteria


{'roles': [{'id': 'default',
   'position': '大模型算法工程师',
   'description': '负责大语言模型/多模态模型的训练、微调与落地；\n参与推理性能优化与工程化部署；\n与业务协作将模型能力嵌入产品。\n',
   'target_profile': '- 计算机/数学/电子工程等相关专业，硕士及以上优先；\n- 熟悉 Transformer/Lora/PEFT 等；\n- 3-8 年算法或相关经验，具备端到端项目经验。\n',
   'filters': {'must_have': ['Python',
     {'深度学习框架': ['PyTorch', 'TensorFlow']},
     'LLM/LMM 相关经验'],
    'nice_to_have': ['C++', '分布式训练(DeepSpeed/Megatron)', '向量数据库/检索增强(RAG)'],
    'must_not': ['仅运维/测试背景', '与岗位无关的实习仅有']},
   'keywords': {'positive': ['大模型',
     'LLM',
     'Transformer',
     'Fine-tuning',
     'LoRA',
     'RLHF',
     '知识蒸馏',
     'RAG'],
    'negative': ['爬虫', '外包', '纯NLP规则']},
   'scoring': {'weights': {'experience': 0.35,
     'skills_match': 0.35,
     'projects': 0.2,
     'education': 0.1},
    'threshold': {'greet': 0.7, 'borderline': 0.6}},
   'notes': '可根据岗位变体微调 filters/keywords/weights。\n'}]}

In [6]:
# 拉取在线简历图片，并尝试 OCR（本地->OpenAI）
CHAT_ID = os.environ.get('DEMO_CHAT_ID', '46232784-0')  # 替换为真实 chat_id
# 使用新的优化API获取简历
fetched = client.get_resume(CHAT_ID, capture_method="auto")
print(fetched.text)
if fetched.image_base64:
    from PIL import Image
    import io, base64
    img = Image.open(io.BytesIO(base64.b64decode(fetched['image_base64'])))
    img


{
  "geekWorkExpList": [
    {
      "startYearMonStr": "2024.07",
      "endYearMonStr": "至今",
      "company": "北京智谱华章科技股份有限公司",
      "positionName": "算法",
      "workYearDesc": "1年2个月"
    },
    {
      "startYearMonStr": "2023.03",
      "endYearMonStr": "2024.06",
      "company": "百度",
      "positionName": "算法工程师",
      "workYearDesc": "1年3个月"
    },
    {
      "startYearMonStr": "2020.06",
      "endYearMonStr": "2023.03",
      "company": "北京奇虎360科技有限公司",
      "positionName": "算法工程师",
      "workYearDesc": "2年9个月"
    },
    {
      "startYearMonStr": "2019.11",
      "endYearMonStr": "2020.01",
      "company": "bigo",
      "positionName": "算法工程师",
      "workYearDesc": "2个月"
    },
    {
      "startYearMonStr": "2018.09",
      "endYearMonStr": "2019.06",
      "company": "北京快手科技有限公司",
      "positionName": "算法工程师",
      "workYearDesc": "9个月"
    },
    {
      "startYearMonStr": "2016.06",
      "endYearMonStr": "2016.09",
      "company": "北京天睿视迅科技有限公司-实习",
      "

In [7]:
# 使用新的ResumeResult对象
img_b64 = None
md_text = None

if fetched.success:
    # 优先使用文本数据（WASM方法），回退到图片OCR
    if fetched.has_text:
        md_text = fetched.text
        print(f"✅ 直接获取到文本数据: {len(md_text)} 字符")
    elif fetched.has_image:
        img_b64 = fetched.image_base64 or (fetched.images_base64[0] if fetched.images_base64 else None)
        print(f"✅ 获取到图片数据，需要OCR处理")
    else:
        raise RuntimeError("无有效数据（文本或图片）")
else:
    raise RuntimeError(f"拉取在线简历失败: {fetched.error}")

# 尝试本地OCR
if img_b64:
    ocr_local = client.ocr_local_from_b64(img_b64)
    if ocr_local.get('success'):
        md_text = ocr_local['markdown']
    else:
        # 回退 OpenAI
        api_key = os.environ.get('OPENAI_API_KEY')
        if not api_key:
            raise RuntimeError('缺少 OPENAI_API_KEY 且本地OCR失败')
        ocr_ai = client.ocr_openai_from_b64(img_b64, api_key)
        if not ocr_ai.get('success'):
            raise RuntimeError(f"OpenAI OCR失败: {ocr_ai}")
        md_text = ocr_ai['markdown']

    print(md_text[:500])


✅ 直接获取到文本数据: 2797 字符


In [8]:
# 组合 YAML + 简历，调用 OpenAI 进行匹配决策
role = criteria['roles'][0]
import requests, json
api_key = os.environ.get('OPENAI_API_KEY')
if not api_key:
    raise RuntimeError('缺少 OPENAI_API_KEY')

headers = {'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json'}
prompt = {
    'role': 'user',
    'content': (
        "你是资深HR。根据以下职位要求(YAML)与候选人简历(Markdown)进行匹配评分，输出JSON："
        "{score: 0-1, decision: 'greet'|'skip'|'borderline', reasons: [..], highlights: [..], risks: [..]}。"
        "职位YAML:\n" + yaml.safe_dump(role, allow_unicode=True) + "\n简历Markdown:\n" + md_text
    )
}
payload = {
    'model': os.environ.get('OPENAI_TEXT_MODEL', 'gpt-4o-mini'),
    'messages': [prompt],
    'response_format': {"type": "json_object"}
}
resp = requests.post('https://api.openai.com/v1/chat/completions', headers=headers, json=payload, timeout=60)
resp.raise_for_status()
content = resp.json()['choices'][0]['message']['content']
decision = json.loads(content)
decision


RuntimeError: 缺少 OPENAI_API_KEY

In [None]:
# 可选：若决策为 greet，发送钉钉通知
if isinstance(decision, dict) and decision.get('decision') == 'greet':
    hook = os.environ.get('DINGTALK_WEBHOOK')
    if hook:
        msg = {
            "msgtype": "text",
            "text": {"content": f"建议打招呼: chat_id={CHAT_ID}\nscore={decision.get('score')}\nreasons={decision.get('reasons')}"}
        }
        try:
            requests.post(hook, json=msg, timeout=10)
            print('DingTalk 已通知')
        except Exception as e:
            print('DingTalk 通知失败:', e)
else:
    print('决策不是 greet，跳过通知')


In [9]:
client.get_notifications()['notifications'][-3:]

[{'timestamp': '2025-09-23T13:22:42.734501',
  'level': 'info',
  'message': '导航到聊天页面...'},
 {'timestamp': '2025-09-23T13:22:42.735919',
  'level': 'info',
  'message': '已导航到聊天页面'},
 {'timestamp': '2025-09-23T13:22:42.736561',
  'level': 'success',
  'message': '持久化浏览器会话启动成功！'}]

In [10]:
messages = client.get_messages()['messages']
print(f'一共{len(messages)}条消息, last 3: ')
messages[-3:]



一共10条消息, last 3: 


[{'chat_id': '714343452-0',
  'candidate': '李元杰',
  'message': 'Boss 你好，我对这个岗位特别感兴趣！非常想要加入，可以看下我的简历，期待回复哈~',
  'status': '—',
  'job_title': '大模型算法工程师',
  'time': '昨天'},
 {'chat_id': '652934751-0',
  'candidate': '李坤',
  'message': '您好，请问贵公司目前还在招聘相关岗位吗？本人具备3年以上的算法研发经验，熟悉大模型的部署与优化，具备 Agent + MCP 的开发实战经验，并且熟练掌握 RAG 技术，深入理解 BERT、Transformer 等核心算法，期待有机会与您进一步沟通。',
  'status': '—',
  'job_title': '大模型算法工程师',
  'time': '昨天'},
 {'chat_id': '643094543-0',
  'candidate': '金久五',
  'message': '您好，我对这份工作非常感兴趣，希望可以有机会与您进一步沟通。',
  'status': '—',
  'job_title': '大模型算法工程师',
  'time': '昨天'}]

In [None]:
client.request_resume('6961654-0')

In [12]:
# 🖼️ 专门获取Canvas图像的演示
print("🖼️ Canvas图像获取演示")
print("=" * 50)

# 设置目标chat_id（可以从环境变量或手动设置）
DEMO_CHAT_ID = os.environ.get('DEMO_CHAT_ID', '46232784-0')  # 替换为实际的chat_id
print(f"目标Chat ID: {DEMO_CHAT_ID}")

# 使用新的API获取简历，强制使用image模式来获取截图
fetched = client.get_resume(DEMO_CHAT_ID, capture_method="image")

print(f"\n📊 获取结果:")
print(f"成功: {fetched.success}")
print(f"方法: {fetched.capture_method}")
print(f"详情: {fetched.details}")

if fetched.success:
    if fetched.has_image:
        print(f"✅ 获取到图像数据")
        print(f"   图像数量: {fetched.image_count}")
        print(f"   尺寸: {fetched.width} x {fetched.height}")
        
        # 显示第一张图片
        if fetched.image_base64:
            print(f"   单张图片大小: {len(fetched.image_base64)} 字符")
            
        # 如果有多张图片（分页截图）
        if fetched.images_base64:
            print(f"   分页图片数量: {len(fetched.images_base64)}")
            for i, img_b64 in enumerate(fetched.images_base64):
                print(f"   - 图片 {i+1}: {len(img_b64)} 字符")
                
    else:
        print("❌ 未获取到图像数据")
        
else:
    print(f"❌ 获取失败: {fetched.error}")


🖼️ Canvas图像获取演示
目标Chat ID: 46232784-0

📊 获取结果:
成功: False
方法: image
详情: 
❌ 获取失败: 未找到指定对话项


In [None]:
# 🖼️ 显示Canvas图像
from PIL import Image
import io
import base64
import matplotlib.pyplot as plt

def display_resume_images(fetched):
    """显示简历图像的便利函数"""
    if not fetched.success or not fetched.has_image:
        print("❌ 无图像数据可显示")
        return
    
    # 收集所有图像
    images_to_show = []
    
    # 单张图片
    if fetched.image_base64:
        try:
            img_data = base64.b64decode(fetched.image_base64)
            img = Image.open(io.BytesIO(img_data))
            images_to_show.append(("主图像", img))
        except Exception as e:
            print(f"❌ 解码主图像失败: {e}")
    
    # 多张分页图片
    if fetched.images_base64:
        for i, img_b64 in enumerate(fetched.images_base64):
            try:
                img_data = base64.b64decode(img_b64)
                img = Image.open(io.BytesIO(img_data))
                images_to_show.append((f"分页 {i+1}", img))
            except Exception as e:
                print(f"❌ 解码分页图像 {i+1} 失败: {e}")
    
    if not images_to_show:
        print("❌ 没有可显示的图像")
        return
    
    # 显示图像
    print(f"📸 显示 {len(images_to_show)} 张图像:")
    
    if len(images_to_show) == 1:
        # 单张图片直接显示
        title, img = images_to_show[0]
        print(f"   {title}: {img.size[0]} x {img.size[1]} pixels")
        display(img)
    else:
        # 多张图片使用matplotlib子图显示
        fig, axes = plt.subplots(1, len(images_to_show), figsize=(15, 5))
        if len(images_to_show) == 1:
            axes = [axes]
        
        for i, (title, img) in enumerate(images_to_show):
            axes[i].imshow(img)
            axes[i].set_title(f"{title}\n{img.size[0]}x{img.size[1]}")
            axes[i].axis('off')
            print(f"   {title}: {img.size[0]} x {img.size[1]} pixels")
        
        plt.tight_layout()
        plt.show()

# 如果前一个cell成功获取了图像，则显示
if 'fetched' in locals() and fetched.success:
    display_resume_images(fetched)
else:
    print("⚠️ 请先运行上一个cell获取图像数据")


In [None]:
# 💾 保存Canvas图像到文件
import os
from datetime import datetime

def save_resume_images(fetched, output_dir="output/canvas_images"):
    """保存简历图像到文件"""
    if not fetched.success or not fetched.has_image:
        print("❌ 无图像数据可保存")
        return []
    
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    
    # 生成文件名前缀（使用时间戳）
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    prefix = f"resume_{timestamp}"
    
    saved_files = []
    
    # 保存主图像
    if fetched.image_base64:
        filename = f"{prefix}_main.png"
        filepath = os.path.join(output_dir, filename)
        try:
            img_data = base64.b64decode(fetched.image_base64)
            with open(filepath, 'wb') as f:
                f.write(img_data)
            saved_files.append(filepath)
            print(f"✅ 保存主图像: {filepath}")
        except Exception as e:
            print(f"❌ 保存主图像失败: {e}")
    
    # 保存分页图像
    if fetched.images_base64:
        for i, img_b64 in enumerate(fetched.images_base64):
            filename = f"{prefix}_page_{i+1}.png"
            filepath = os.path.join(output_dir, filename)
            try:
                img_data = base64.b64decode(img_b64)
                with open(filepath, 'wb') as f:
                    f.write(img_data)
                saved_files.append(filepath)
                print(f"✅ 保存分页 {i+1}: {filepath}")
            except Exception as e:
                print(f"❌ 保存分页 {i+1} 失败: {e}")
    
    print(f"\n📁 总共保存了 {len(saved_files)} 个文件到 {output_dir}")
    return saved_files

# 使用便利方法保存图像
if 'fetched' in locals() and fetched.success:
    saved_files = save_resume_images(fetched)
else:
    print("⚠️ 请先运行前面的cell获取图像数据")


In [None]:
# 💾 保存Canvas图像到文件
import os
from datetime import datetime

def save_resume_images(fetched, output_dir="output/canvas_images"):
    """保存简历图像到文件"""
    if not fetched.success or not fetched.has_image:
        print("❌ 无图像数据可保存")
        return []
    
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    
    # 生成文件名前缀（使用时间戳）
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    prefix = f"resume_{timestamp}"
    
    saved_files = []
    
    # 保存主图像
    if fetched.image_base64:
        filename = f"{prefix}_main.png"
        filepath = os.path.join(output_dir, filename)
        try:
            img_data = base64.b64decode(fetched.image_base64)
            with open(filepath, 'wb') as f:
                f.write(img_data)
            saved_files.append(filepath)
            print(f"✅ 保存主图像: {filepath}")
        except Exception as e:
            print(f"❌ 保存主图像失败: {e}")
    
    # 保存分页图像
    if fetched.images_base64:
        for i, img_b64 in enumerate(fetched.images_base64):
            filename = f"{prefix}_page_{i+1}.png"
            filepath = os.path.join(output_dir, filename)
            try:
                img_data = base64.b64decode(img_b64)
                with open(filepath, 'wb') as f:
                    f.write(img_data)
                saved_files.append(filepath)
                print(f"✅ 保存分页 {i+1}: {filepath}")
            except Exception as e:
                print(f"❌ 保存分页 {i+1} 失败: {e}")
    
    print(f"\n📁 总共保存了 {len(saved_files)} 个文件到 {output_dir}")
    return saved_files

# 使用便利方法保存图像和客户端便利方法演示
if 'fetched' in locals() and fetched.success:
    saved_files = save_resume_images(fetched)
    
    # 也演示客户端便利方法
    print("\n🛠️ 客户端便利方法演示:")
    DEMO_CHAT_ID = os.environ.get('DEMO_CHAT_ID', '46232784-0')
    
    # 快速获取并保存图像
    try:
        quick_path = client.get_resume_image(DEMO_CHAT_ID, save_path="output/canvas_images/client_quick.png")
        if quick_path:
            print(f"✅ 客户端快速保存: {quick_path}")
        else:
            print("❌ 客户端快速保存失败")
    except Exception as e:
        print(f"❌ 客户端方法异常: {e}")
        
else:
    print("⚠️ 请先运行前面的cell获取图像数据")
