minify は、PDF処理・テキスト整形・LLM推論をノードとして組み合わせるコンパクトなワークフロービルダーです。
React ベースのノードエディタで DAG(有向非巡回グラフ)構造のワークフローを構築し、FastAPI バックエンドで非同期に実行・状態監視できます。
このリポジトリは private で開発していたプロトタイプを public 向けに整理したものです。 元の開発履歴は含めていません。
データモデルおよびグラフ構造の設計は、Dify および n8n の実装を参考にしています。
- ブラウザ上のノードエディタでワークフローを GUI 構築
- PDF アップロード → テキスト抽出 → フォーマット → LLM 推論 をパイプライン化
- 複数 LLM ノードを並列・直列に配置した DAG ワークフローの実行
- ノード間の出力を変数マッピングで参照(
{{variable}}形式) - 非同期実行によるノンブロッキング API + ポーリングによる進捗確認
- OpenAI / Anthropic / Google Gemini のモデルを統一インターフェースで切り替え
| カテゴリ | 技術 |
|---|---|
| 言語 | Python 3.12+ |
| フレームワーク | FastAPI |
| ORM | SQLAlchemy 2.0 (async) |
| データベース | PostgreSQL 16 |
| タスクキュー | ARQ + Redis |
| LLM | LiteLLM |
| PDF処理 | PyMuPDF |
| ストレージ | S3 / LocalStack |
| パッケージ管理 | uv |
| カテゴリ | 技術 |
|---|---|
| フレームワーク | React 18 + TypeScript |
| ビルドツール | Vite |
| UI | React Flow + Tailwind CSS |
| 状態管理 | Zustand |
| 技術 | 選定理由 |
|---|---|
| ARQ | Celery に比べ設定が軽量で Python ネイティブな非同期ワーカーを実現できる。Redis をブローカー兼ストアとして使うためインフラが最小構成で済む |
| LiteLLM | OpenAI・Anthropic・Gemini を単一インターフェースで呼び出せるため、モデル切り替えをノード設定の変更だけで完結させられる |
| SQLAlchemy async + asyncpg | FastAPI の async エンドポイントと一貫して非同期 I/O を維持し、DB 待ちでイベントループをブロックしない |
| LocalStack | S3 互換のローカル環境を Docker で再現し、AWS クレデンシャルなしで PDF ストレージ機能を開発・テストできる |
| React Flow | ノード・エッジの描画とドラッグ操作が組み込みで提供されており、DAG エディタの実装コストを大幅に削減できる |
| uv | pip/Poetry より高速なパッケージ解決と uv.lock による再現性を両立できる |
- Python 3.12以上
- Node.js 20以上
- Docker & Docker Compose
- uv (
curl -LsSf https://astral.sh/uv/install.sh | sh)
git clone <repository-url>
cd minifycd server
cp .env.example .env
# .env を編集して LLM API キーを設定
# OPENAI_API_KEY=sk-your-key
# ANTHROPIC_API_KEY=your-key
# GEMINI_API_KEY=your-keycd server
# 依存関係のインストール
uv sync
# インフラ起動(PostgreSQL + Redis + LocalStack)
make up
# データベース初期化(マイグレーション実行)
make db-init
# 開発サーバー起動
make devcd server
make workercd client
npm install
npm run dev| サービス | コマンド | URL |
|---|---|---|
| バックエンド | cd server && make dev |
http://localhost:8000 |
| ワーカー | cd server && make worker |
- |
| フロントエンド | cd client && npm run dev |
http://localhost:5173 |
| API ドキュメント | - | http://localhost:8000/docs |
| メソッド | パス | 説明 |
|---|---|---|
| POST | /workflows |
ワークフロー作成 |
| GET | /workflows |
ワークフロー一覧取得 |
| GET | /workflows/{workflow_id} |
ワークフロー詳細取得 |
| PUT | /workflows/{workflow_id} |
ワークフロー更新 |
| DELETE | /workflows/{workflow_id} |
ワークフロー削除 |
| メソッド | パス | 説明 |
|---|---|---|
| POST | /workflows/{workflow_id}/nodes |
ノード追加 |
| PUT | /workflows/{workflow_id}/nodes/{node_id} |
ノード更新(config, variables) |
| DELETE | /workflows/{workflow_id}/nodes/{node_id} |
ノード削除 |
| メソッド | パス | 説明 |
|---|---|---|
| POST | /workflows/{workflow_id}/edges |
エッジ追加 |
| DELETE | /workflows/{workflow_id}/edges/{edge_id} |
エッジ削除 |
| メソッド | パス | 説明 |
|---|---|---|
| POST | /workflows/{workflow_id}/executions |
ワークフロー実行開始(非同期) |
| GET | /executions/{execution_id} |
実行状態取得 |
| GET | /workflows/{workflow_id}/executions |
実行履歴一覧 |
| メソッド | パス | 説明 |
|---|---|---|
| POST | /files |
PDFファイルアップロード |
| メソッド | パス | 説明 |
|---|---|---|
| GET | /health |
アプリケーション稼働確認 |
| GET | /health/db |
データベース接続確認 |
ワークフローのエントリーポイント。入力データをそのまま後続ノードに渡す。
{
"node_type": "start",
"config": {}
}PDFファイルからテキストを抽出する。
{
"node_type": "extract_text",
"config": {
"file_key": "file_id"
}
}LLM APIを使用してテキストを生成する。
{
"node_type": "generative_ai",
"config": {
"model": "claude-3-5-haiku-20241022",
"system_prompt": "簡潔に回答するアシスタント。",
"user_prompt": "「{{text}}」を1文で要約してください。"
}
}対応モデル例:
- OpenAI:
gpt-4o,gpt-4o-mini - Anthropic:
claude-sonnet-4-20250514,claude-3-5-haiku-20241022 - Google:
gemini-1.5-pro,gemini-1.5-flash
テキストを整形する。複数のルールを同時に適用可能。
{
"node_type": "formatter",
"config": {
"to_uppercase": false,
"to_lowercase": true,
"to_fullwidth": false,
"to_halfwidth": true,
"remove_line_break": true
}
}curl -X POST http://localhost:8000/workflows/{workflow_id}/nodes \
-H "Content-Type: application/json" \
-d '{"node_type": "start", "config": {}}'curl -X POST http://localhost:8000/workflows/{workflow_id}/edges \
-H "Content-Type: application/json" \
-d '{"source": "<source_node_id>", "target": "<target_node_id>"}'curl -X PUT http://localhost:8000/workflows/{workflow_id}/nodes/{node_id} \
-H "Content-Type: application/json" \
-d '{
"variables": [
{"local_name": "text", "source_node_id": "<prev_node_id>", "source_key": "text"}
]
}'curl -X POST http://localhost:8000/workflows/{workflow_id}/executions \
-H "Content-Type: application/json" \
-d '{"input_data": {"file_id": "<file_id>"}}'curl -X POST http://localhost:8000/files \
-F "file=@sample.pdf"cd server
# 全テスト実行
make test
# E2Eテストのみ(要: make up + LLM APIキー)
make test-e2e以下のような簡易的なUIを構築し正常系について動作確認しました。
アップロードしたPDFファイルは以下のようなものです。
動画: https://github.com/user-attachments/assets/2a1f2c40-43da-4ca9-8a7a-e554cc1b3aca
以下のE2Eテストで全機能の動作確認を行いました。
テスト: test_pdf_extract_formatter_multi_llm_dag
| # | 確認内容 | API |
|---|---|---|
| 1 | PDFファイルアップロード | POST /files |
| 2 | ワークフロー作成 | POST /workflows |
| 3 | ノード作成(7ノード) | POST /workflows/{id}/nodes |
| 4 | エッジ追加(8エッジ) | POST /workflows/{id}/edges |
| 5 | ノード更新(variables設定) | PUT /workflows/{id}/nodes/{id} |
| 6 | ワークフロー非同期実行 | POST /workflows/{id}/executions |
| 7 | 実行状態ポーリング | GET /executions/{id} |
Start(file_id) → ExtractText → Formatter
↓
┌────────────────┼────────────────┐
↓ ↓ ↓
LLM(章1) LLM(章2) LLM(章3)
haiku haiku haiku
↓ ↓ ↓
└────────────────┼────────────────┘
↓
LLM(まとめ)
haiku
結果: ✅ PASSED (14.63s)
| ノード | 処理時間 | 出力 |
|---|---|---|
| Start | 5ms | (入力データをパススルー) |
| ExtractText | 69ms | 第1章 AIの基礎 ... |
| Formatter | 3ms | 第1章 aiの基礎ai(人工知能)は... |
| LLM(章1) | 1,417ms | AIは、ニューラルネットワークを活用したmachine learningとdeep learningを基盤とする技術である。 |
| LLM(章2) | 1,613ms | ワークフローは、DAG構造を用いて、ノードとエッジによる複雑な処理フローを効率的に表現するための設計手法である。 |
| LLM(章3) | 1,419ms | 実装のベストプラクティスは、テストの必須化、コードレビュー、継続的インテグレーションの導入により、AIシステムの品質を担保することです。 |
| LLM(まとめ) | 2,241ms | AIシステムの開発において、ニューラルネットワークを基盤とする先進的な技術と、DAGによる効率的なワークフロー設計を... |
ワークフローのノード実行順序はトポロジカルソートで決定します。DFS ベースの再帰実装ではなくキューベースの Kahn のアルゴリズム を採用しました。
- 各ノードの 入次数(in-degree) をカウントし、入次数 0 のノードをキューに積む
- キューから取り出したノードを実行し、その後続ノードの入次数を 1 減らす
- 入次数が 0 になった後続ノードをキューに追加、これを繰り返す
- 最終的に処理済みノード数がグラフのノード数と一致しない場合は 循環参照 として例外を投げる
再帰を使わないため、大規模グラフでのスタックオーバーフローがなく、循環検出も自然に行えます。
# domain/workflow.py
def topological_sort(self) -> list[BaseNode]:
in_degree = {nid: 0 for nid in self.nodes}
for edge in self.edges:
in_degree[edge.target] += 1
queue = deque([nid for nid, deg in in_degree.items() if deg == 0])
result = []
while queue:
node_id = queue.popleft()
result.append(self.nodes[node_id])
for edge in self.edges:
if edge.source == node_id:
in_degree[edge.target] -= 1
if in_degree[edge.target] == 0:
queue.append(edge.target)
if len(result) != len(self.nodes):
raise ValueError("Graph has a cycle")
return resultノードは 宣言的な変数マッピング(variables フィールド)で入力を定義します。
{
"variables": [
{ "local_name": "text", "source_node_id": "<prev_node_id>", "source_key": "text" }
]
}実行時は VariablePool が各ノードの出力を node_id → {key: value} の辞書として保持し、resolve() でマッピングに従って input_data を組み立ててから次のノードに渡します。これにより、ノード自体はグローバル状態を持たず、DAG の任意の位置から別ノードの出力を参照できます。
# domain/variable_pool.py
def resolve(self, mappings: list[VariableMapping]) -> dict[str, Any]:
return {
m.local_name: self.get(m.source_node_id, m.source_key)
for m in mappings
}ワークフローの実行は POST /executions で即座に execution_id を返し、実際の処理は ARQ ワーカー に委譲します。クライアントは GET /executions/{id} でポーリングして状態を確認します。
ARQ を選んだ理由は、Celery に比べて設定が軽量で Python の asyncio と自然に統合できる点です。Redis を唯一の外部依存として、ブローカー・結果ストアを兼用できます。
POST /executions → execution_id 即時返却
↓
ARQ worker が実行を非同期処理
↓
GET /executions/{id} でポーリング → PENDING / RUNNING / COMPLETED
GenerativeAI ノードの model フィールドを変えるだけで、OpenAI・Anthropic・Gemini を切り替えられます。LiteLLM が各 SDK の差異を吸収するため、バックエンドのコードはモデル名の文字列だけを扱います。プロンプト内の {{variable}} はノード実行前にテンプレート展開されます。
domain/ 層は Pydantic / dataclass による純粋なビジネスロジックのみで構成し、SQLAlchemy モデルは infra/db/ に分離しています。これによりドメインロジックが ORM に依存せず、ユニットテストで DB のモックが不要になっています。
- E2Eテストや実際のLLMノード実行には有効なAPIキーが必要です
.envファイルに設定してください- 対応プロバイダ: OpenAI, Anthropic, Google (Gemini)
- PDFアップロード機能はLocalStack(S3互換)を使用します
make upでLocalStackコンテナが起動します- バケットはアプリ起動時に自動作成されます
- 非同期タスク処理にRedisを使用します
make upでRedisコンテナが起動します- ワーカー(
make worker)の起動が必要です
.
├── README.md
├── docs/
│ ├── architecture.md # アーキテクチャ設計
│ ├── development.md # 開発ガイド
│ └── coding_standards.md
├── server/
│ ├── domain/ # ドメインモデル(ORM非依存)
│ ├── infra/ # インフラ層(DB, LLM, Storage)
│ ├── services/ # 実行エンジン, タスクキュー
│ ├── tests/ # テスト(unit, api, e2e)
│ ├── main.py # FastAPIエントリポイント
│ ├── worker.py # ARQワーカー
│ └── Makefile
└── client/
└── src/
├── components/ # UIコンポーネント
├── store/ # 状態管理(Zustand)
└── hooks/
cd server
# 開発
make dev # APIサーバー起動
make worker # バックグラウンドワーカー起動
# インフラ
make up # 全サービス起動
make down # 全サービス停止
make db-init # DB初期化
make db-reset # DBリセット
# テスト
make test # 全テスト
make test-e2e # E2Eテスト
make test-cov # カバレッジ付き
# コード品質
make lint # ruff check
make format # ruff format
make typecheck # mypy