An unofficial async Python client for the Flowith.io web application.
Disclaimer: This library reverse-engineers the Flowith.io browser API. It is not affiliated with or endorsed by Flowith. Use responsibly.
- Text generation — single-turn and multi-turn, streaming and non-streaming
- Image generation — Gemini, GPT Image, and more
- Video generation — Seedance, Kling, etc.
- File upload —
POST /file/storewith multipart support - Conversation management — create, list, rename, delete
- Flow canvas — read node graph for any conversation
- Credits & models — fetch balance, browse model catalog
- Templates — list and filter prompt templates
- Profile & subscriptions — read/update profile, list grants
- Supabase content — notices, projects, discussions, oracle stream
- Daily rewards — claim and check status
- Account auth — OAuth URL, auth user, refresh token
- Real-time SSE — listen on
/user_stream/streamfor live generation events - Async-first — built on
aiohttp, zero blocking calls
pip install flowith_webapiRequires Python 3.10+.
- Sign in at flowith.io with Google.
- After the OAuth redirect, copy the
access_tokenfrom the URL hash:https://flowith.io/#access_token=eyJhbGci...&... - Your user ID (
subclaim in the JWT) is also needed for SSE streaming — it is parsed automatically from the JWT if you do not supply it explicitly.
To enable token rotation, also capture the refresh token from Supabase.
Generate the OAuth URL with access_type=offline and prompt=consent so
Google issues one on the first consent screen:
url = client.get_oauth_url(
provider="google",
redirect_to="https://flowith.io",
access_type="offline",
prompt="consent",
)
print(url)import os
TOKEN = os.environ["FLOWITH_TOKEN"]
USER_ID = os.environ["FLOWITH_USER_ID"] # optional — parsed from JWT
REFRESH = os.environ.get("FLOWITH_REFRESH_TOKEN", "")Keep these secret — they grant full access to your Flowith account.
The Flowith web app maintains two persistent SSE connections:
| Endpoint | Purpose |
|---|---|
GET /user_stream/sse?user_id=… |
Keepalive — emits {"heartbeat": …} every ~5 s |
GET /user_stream/stream?user_id=… |
Data channel — emits text_delta, result, image/video URLs |
Both connections send the JWT as a bare token in the authorization header
(no Bearer prefix) — this matches what the edge server expects.
When generate() is called the client:
- Creates the Supabase conversation and user/AI node rows.
- POSTs the completion request to
/completion/async(returns immediately). - Listens on
/user_stream/streamfor events whosenodeIdmatches the AI node. - Falls back to polling
flow_nodeon Supabase if the stream closes early.
import asyncio
from flowith_webapi import FlowithClient
async def main():
async with FlowithClient(TOKEN, user_id=USER_ID, refresh_token=REFRESH) as client:
r = await client.generate("What is the capital of Poland?")
print(r.text) # "Warsaw"
asyncio.run(main())r = await client.generate("Explain quantum entanglement in one sentence.")
print(r.text)
print(r.model) # model ID used
print(r.conv_id) # conversation UUIDasync for chunk in client.generate_stream("Write me a haiku about async Python."):
print(chunk.text_delta, end="", flush=True)
print()session = client.start_conversation(model="gemini-3.1-pro-preview")
r1 = await session.send("My name is Alice.")
r2 = await session.send("What is my name?")
print(r2.text) # "Your name is Alice."
async for chunk in session.send_stream("Tell me a fun fact about Alice."):
print(chunk.text_delta, end="", flush=True)
print()from flowith_webapi import ImageModel, AspectRatio
result = await client.generate_image(
"A ripe banana on a marble surface, studio lighting",
model=ImageModel.GEMINI_3_1_FLASH_IMAGE,
aspect_ratio=AspectRatio.SQUARE,
)
await result.image.save("./outputs")
print(result.image.url)from flowith_webapi import VideoModel
video = await client.generate_video(
"A cat walking on a sunny beach",
model=VideoModel.SEEDANCE_2_FAST,
timeout=300.0,
)
await video.save("./outputs")
print(video.video_url)# Upload a local file and get a public CDN URL
record = await client.upload_file("my_game.html")
print(record.url) # https://r2-bucket.flowith.net/f/…/my_game.html
# Reference the file in a generation
r = await client.generate(
f"Review this HTML file: [[file:html|my_game.html|{record.url}]]"
)
# Rotate the access token (uses stored refresh token if not supplied)
if REFRESH:
await client.refresh_access_token()# List recent conversations
convs = await client.list_conversations(limit=10)
for c in convs:
print(c.conv_id, c.title)
# Read the flow-canvas node graph
nodes = await client.get_flow_nodes(conv_id)
for n in nodes:
print(n.node_type, n.text[:80])
# Rename / delete
await client.rename_conversation(conv_id, "New Title")
await client.delete_conversation(conv_id)# Generation-data stream (/user_stream/stream) — heartbeats filtered out
async for evt in client.stream_user_stream(timeout=60):
print(evt)
# Keepalive stream (/user_stream/sse) — heartbeats included
async for evt in client.stream_user_sse(timeout=60):
print(evt) # {"heartbeat": 1779127465201}credits = await client.get_credits()
total = sum(c.remain_quota for c in credits)
print(f"Credits: {total:.2f}")
models = await client.list_models()
for m in models:
print(m.model_id, m.title)
# Active subscriptions
subs = await client.list_subscription_user_own()
print(len(subs))
# Profile
profile = await client.get_user_profile()
print(profile.language)| Method | Description |
|---|---|
generate(prompt, ...) |
Single-turn text generation (waits for full result) |
generate_stream(prompt, ...) |
Streaming text generation (yields token-by-token) |
generate_image(prompt, ...) |
Image generation |
generate_video(prompt, ...) |
Video generation |
generate_title(messages) |
Auto-generate conversation title |
upload_file(file_path, ...) |
Upload file to CDN |
start_conversation(...) |
Create ConversationSession for multi-turn use |
list_conversations(...) |
List user conversations |
get_conversation(conv_id) |
Fetch single conversation |
rename_conversation(conv_id, title) |
Rename conversation |
delete_conversation(conv_id) |
Soft-delete conversation |
get_flow_nodes(conv_id) |
Read flow-canvas node graph |
get_cooperators(conv_id) |
List conversation collaborators |
get_credits() |
Fetch credit balances |
get_credit_type() |
Return credit-type string |
get_auth_user() |
Return Supabase auth user object |
get_oauth_url(...) |
Build Supabase OAuth authorization URL |
refresh_access_token(refresh_token=None) |
Rotate access token |
logout(scope) |
Revoke auth session |
get_user_profile(user_id) |
Fetch profile row |
update_user_profile(**fields) |
Update profile fields |
list_user_upload_records(...) |
List uploaded file records |
list_subscription_user_own(...) |
List active subscription grants |
list_public_notices() |
List public notices |
list_flow_projects() |
List flow projects |
get_conv_novel_editor(conv_id) |
Fetch novel-editor content |
list_discuss(conv_id) |
List discussion entries |
get_oracle_stream(conv_id) |
Fetch oracle stream entries |
get_migration_info() |
Fetch migration version |
get_featurebase_status() |
Featurebase changelog status |
list_models(...) |
Browse model catalog |
list_templates(...) |
Browse prompt templates |
get_discord_invite() |
Return Discord invite URL |
get_daily_rewards_status() |
Daily rewards status |
claim_daily_reward() |
Claim today's reward |
check_campaign(model_id) |
Check campaign eligibility |
check_banana_campaign() |
Banana-2025 campaign status |
get_enterprise_dashboard() |
Enterprise dashboard info |
get_online_count() |
Current online user count |
upsert_online_session(payload) |
Upsert online session (RPC) |
remove_online_session(session_id) |
Remove online session (RPC) |
stream_user_stream(...) |
Stream live generation events from /user_stream/stream |
stream_user_sse(...) |
Stream keepalive heartbeat events from /user_stream/sse |
| Method / Property | Description |
|---|---|
send(prompt, ...) |
Send message, await full reply |
send_stream(prompt, ...) |
Send message, stream reply token-by-token |
generate_image(prompt, ...) |
Generate image in this conversation |
generate_video(prompt, ...) |
Generate video in this conversation |
delete() |
Delete the server-side conversation |
.conv_id |
Conversation UUID (created lazily on first send) |
.model |
Session model ID |
.turn |
Number of turns sent so far |
| Type | Key fields |
|---|---|
GenerationOutput |
text, text_delta, node_id, conv_id, model |
ImageOutput |
images: list[GeneratedImage], image (first item) |
GeneratedImage |
url, model; .save(dir) downloads to directory |
VideoOutput |
video_url, model; .save(dir) downloads to directory |
FileRecord |
url, file_type |
ConversationRecord |
conv_id, title, preview |
FlowNode |
node_id, node_type, text, model |
UserCredits |
remain_quota, init_quota, sub_type |
ModelInfo |
model_id, title, media, tier |
TemplateRecord |
template_id, name, category, prompt |
CooperatorRecord |
user_id, user_name, role |
UserProfile |
user_id, language |
UserUploadRecord |
record_id, name, file_type, created_at |
SubscriptionUserOwn |
record_id, remain_quota, subscription_type |
PublicNotice |
notice_id, title, content |
FlowProject |
project_id, title, updated_at |
ConvNovelEditor |
conv_id, content |
DiscussRecord |
discuss_id, conv_id, content |
OracleStreamRecord |
record_id, workflow, is_stop |
MigrationInfo |
version |
from flowith_webapi import (
AuthenticationError,
RateLimitError,
ValidationError,
APIError,
TimeoutError,
FileUploadError,
)
try:
r = await client.generate("Hello!")
except AuthenticationError:
print("Invalid or expired token.")
except RateLimitError as e:
print(f"Rate limited — retry in {e.retry_after_s}s")
except TimeoutError:
print("Generation timed out.")
except APIError as e:
print(f"API error {e.status_code}: {e}")import flowith_webapi
flowith_webapi.set_log_level("DEBUG") # DEBUG | INFO | WARNING | ERROR- 1MinAI-API — sister project for 1min.AI
- Claude-API — unofficial Claude.ai client