# Survey Deployment & Collection Agents

## Initial Imports

In [3]:
import warnings
warnings.filterwarnings('ignore')

import os
import requests
import yaml

from crewai import Agent, Task, Crew
from crewai.tools import BaseTool

# Directly set environment variables (no .env)
os.environ['OPENAI_API_KEY'] = 
os.environ['OPENAI_MODEL_NAME'] = 'gpt-4o-mini'
os.environ['SURVEY_API_KEY']    = 
os.environ['SURVEY_API_URL']    = 'https://api.typeform.com'


In [4]:
from pydantic import BaseModel, Field

class PublishSurveyInput(BaseModel):
    survey_definition: dict = Field(
        ..., description="Typeform 表单定义 JSON"
    )

class FetchSurveyInput(BaseModel):
    survey_id: str = Field(
        ..., description="要获取的表单 ID"
    )

import os, requests
from crewai.tools import BaseTool
from typing import Type

class PublishSurveyTool(BaseTool):
    name: str = "publish_survey"
    description: str = "Publish a new survey via Typeform API"
    args_schema: Type[PublishSurveyInput] = PublishSurveyInput

    def _run(self, survey_definition: dict) -> dict:
        headers = {
            'Authorization': f"Bearer {os.environ['SURVEY_API_KEY']}",
            'Content-Type': 'application/json'
        }
        resp = requests.post(
            f"{os.environ['SURVEY_API_URL']}/forms",
            headers=headers,
            json=survey_definition
        )
        resp.raise_for_status()
        return resp.json()


class FetchSurveyTool(BaseTool):
    name: str = "fetch_survey"
    description: str = "Fetch survey metadata and all responses"
    args_schema: Type[FetchSurveyInput] = FetchSurveyInput

    def _run(self, survey_id: str) -> dict:
        headers = {'Authorization': f"Bearer {os.environ['SURVEY_API_KEY']}"}
        meta = requests.get(
            f"{os.environ['SURVEY_API_URL']}/forms/{survey_id}",
            headers=headers
        )
        meta.raise_for_status()
        resp = requests.get(
            f"{os.environ['SURVEY_API_URL']}/forms/{survey_id}/responses",
            headers=headers
        )
        resp.raise_for_status()
        return {'survey': meta.json(), 'responses': resp.json()}



In [5]:
with open("config/agents/deploy_survey_agent.yaml") as f:
    deploy_cfg = yaml.safe_load(f)["deploy_survey_agent"]
with open("config/agents/collect_survey_agent.yaml") as f:
    collect_cfg = yaml.safe_load(f)["collect_survey_agent"]
with open("config/tasks/post_survey_task.yaml") as f:
    post_cfg = yaml.safe_load(f)["post_survey"]
with open("config/tasks/get_survey_task.yaml") as f:
    get_cfg = yaml.safe_load(f)["get_survey"]

In [10]:
publish_tool = PublishSurveyTool()
fetch_tool   = FetchSurveyTool()
deploy_agent  = Agent(config=deploy_cfg, tools=[publish_tool])
collect_agent = Agent(config=collect_cfg, tools=[fetch_tool])
post_task = Task(config=post_cfg, agent=deploy_agent, run=publish_tool.run)
get_task  = Task(config=get_cfg,  agent=collect_agent, run=fetch_tool.run)


#示例 survey 定义
survey_definition = {
    "title": "Customer Satisfaction Survey",
    "fields": [
        {
            "title": "How satisfied are you with our service?",
            "type": "multiple_choice",
            "properties": {
                "choices": [
                    {"label": "Very Satisfied"},
                    {"label": "Satisfied"},
                    {"label": "Neutral"},
                    {"label": "Dissatisfied"}
                ]
            }
        }
    ]
}


direct_pub = publish_tool.run(survey_definition)
print("Direct publish output:")
print(direct_pub)

form_id = direct_pub["id"]
print("Published form_id =", form_id)

Using Tool: publish_survey
Direct publish output:
{'id': 'fJnSqdOU', 'type': 'quiz', 'title': 'Customer Satisfaction Survey', 'workspace': {'href': 'https://api.typeform.com/workspaces/Psh3Cz'}, 'theme': {'href': 'https://api.typeform.com/themes/qHWOQ7'}, 'settings': {'language': 'en', 'progress_bar': 'proportion', 'meta': {'allow_indexing': False}, 'hide_navigation': False, 'is_public': True, 'is_trial': False, 'show_progress_bar': True, 'show_typeform_branding': True, 'are_uploads_public': False, 'show_time_to_complete': True, 'show_number_of_submissions': False, 'show_cookie_consent': False, 'show_question_number': True, 'show_key_hint_on_choices': True, 'autosave_progress': True, 'free_form_navigation': False, 'use_lead_qualification': False, 'pro_subdomain_enabled': False, 'auto_translate': True, 'partial_responses_to_all_integrations': False}, 'thankyou_screens': [{'id': 'DefaultTyScreen', 'ref': 'default_tys', 'title': "Thanks for completing this typeform\nNow *create your own* 

In [14]:
direct_fetch = fetch_tool.run(form_id)
print("Direct fetch output:")
# direct_fetch
direct_fetch['responses']['items']

Using Tool: fetch_survey
Direct fetch output:


[{'landing_id': 'q82fnsrq1qfgvnel44q82fnsr9u61pxk',
  'token': 'q82fnsrq1qfgvnel44q82fnsr9u61pxk',
  'response_id': 'q82fnsrq1qfgvnel44q82fnsr9u61pxk',
  'response_type': 'completed',
  'landed_at': '2025-04-29T16:04:53Z',
  'submitted_at': '2025-04-29T16:04:53Z',
  'metadata': {'user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
   'platform': 'other',
   'referer': 'https://form.typeform.com/to/fJnSqdOU',
   'network_id': '2018d24b75',
   'browser': 'default'},
  'hidden': {},
  'calculated': {'score': 0},
  'answers': [{'field': {'id': 'TTowXoGzjljy',
     'type': 'multiple_choice',
     'ref': '01JT14N3SDJ264S9WXAA5N53CD'},
    'type': 'choice',
    'choice': {'id': '3KT7GMA6EJqX',
     'ref': '01JT14N3SDZM6R50507ZVPRJ9A',
     'label': 'Very Satisfied'}}],
  'thankyou_screen_ref': 'default_tys'}]

In [None]:
# attmept achiving this with agents

# publish_tool = PublishSurveyTool()
# fetch_tool   = FetchSurveyTool()

# deploy_agent  = Agent(config=deploy_cfg,  tools=[publish_tool])
# collect_agent = Agent(config=collect_cfg, tools=[fetch_tool])

# post_task = Task(
#     config=post_cfg,
#     agent=deploy_agent,
#     run=publish_tool.run
# )
# get_task = Task(
#     config=get_cfg,
#     agent=collect_agent,
#     run=fetch_tool.run
# )

# if __name__ == "__main__":
#     crew = Crew(
#         agents=[deploy_agent, collect_agent],
#         tasks =[post_task, get_task],
#         verbose=True
#     )

#     survey_definition = {
#         "title": "Customer Satisfaction Survey",
#         "fields": [
#             {
#                 "title": "How satisfied are you with our service?",
#                 "type": "multiple_choice",
#                 "properties": {
#                     "choices": [
#                         {"label": "Very Satisfied"},
#                         {"label": "Satisfied"},
#                         {"label": "Neutral"},
#                         {"label": "Dissatisfied"}
#                     ]
#                 }
#             }
#         ]
#     }

#     direct_pub = publish_tool.run(survey_definition)
#     print("Direct publish output:", direct_pub)
#     fid = direct_pub["id"]
#     direct_fetch = fetch_tool.run(fid)
#     print("Direct fetch output:", direct_fetch)

#     # print("\n=== Using Crew.kickoff ===")
#     # deploy_out  = crew.kickoff(inputs={"survey_definition": survey_definition})
#     # print("Crew deployed:", deploy_out)
#     # fid = deploy_out["id"]
#     # collect_out = crew.kickoff(inputs={"survey_id": fid})
#     # print("Crew collected:", collect_out)
