-
Notifications
You must be signed in to change notification settings - Fork 0
Messages
Message retrieval is done through ServerClient. You will need a server API token.
import postmark
client = postmark.ServerClient("your-server-token")import asyncio
from datetime import datetime
async def main():
result = await client.outbound.list(count=25, offset=0)
print(f"{result.total} total messages, showing {len(result.items)}")
for msg in result.items:
print(f" {msg.received_at:%Y-%m-%d} {msg.subject} → {msg.recipients}")
asyncio.run(main())All filter parameters are explicitly typed — IDE autocomplete and type checkers will catch misspelled or invalid values:
result = await client.outbound.list(
count=50,
offset=0,
recipient="alice@example.com", # filter by recipient
from_email="you@example.com", # filter by sender
tag="transactional", # filter by tag
status="Sent", # filter by status
subject="Welcome", # filter by subject
message_stream="outbound", # filter by stream
from_date=datetime(2024, 1, 1), # start of date range
to_date=datetime(2024, 1, 31), # end of date range
metadata={"campaign": "launch"}, # filter by one metadata key/value
)Use the async generator to auto-paginate through all outbound messages. Accepts the same typed filter parameters as list():
import asyncio
async def main():
async for msg in client.outbound.stream(max_messages=200, tag="transactional"):
print(f" {msg.message_id} {msg.subject}")
asyncio.run(main())Retrieve full details and event history for a specific message.
async def main():
detail = await client.outbound.get("message-id-here")
print(f"Status: {detail.status}")
print(f"Subject: {detail.subject}")
for event in detail.message_events:
print(f" {event.received_at} {event.type}")Retrieve the raw content of a sent message.
async def main():
dump = await client.outbound.get_dump("message-id-here")
print(dump.body)Returns Page[OpenEvent]. All filter parameters are typed keyword arguments:
async def main():
result = await client.outbound.list_opens(count=25)
for o in result.items:
print(f" {o.received_at} {o.client.name} {o.os.name}")Available filters: recipient, tag, client_name, client_company, client_family, os_name, os_family, os_company, platform, country, region, city, message_stream, from_date, to_date.
Returns Page[OpenEvent]:
async def main():
result = await client.outbound.list_message_opens("message-id-here")
for o in result.items:
print(f" {o.received_at} {o.client.name}")Returns Page[ClickEvent]. All filter parameters are typed keyword arguments:
async def main():
result = await client.outbound.list_clicks(count=25)
for c in result.items:
print(f" {c.received_at} {c.original_link}")Available filters: recipient, tag, client_name, client_company, client_family, os_name, os_family, os_company, platform, country, region, city, message_stream, from_date, to_date.
Returns Page[ClickEvent]:
async def main():
result = await client.outbound.list_message_clicks("message-id-here")
for c in result.items:
print(f" {c.received_at} {c.original_link}")import asyncio
async def main():
result = await client.inbound.list(count=10)
print(f"Total inbound messages: {result.total}")
for msg in result.items:
print(f" [{msg.message_id}] {msg.subject}")
print(f" From: {msg.from_email}")
print(f" Status: {msg.status}")
print(f" Date: {msg.date}")
asyncio.run(main())async def main():
detail = await client.inbound.get("message-id-here")
print(f"From: {detail.from_email}")
print(f"Subject: {detail.subject}")
print(f"Body: {detail.text_body}")Re-trigger the inbound webhook for a message if delivery failed.
async def main():
await client.inbound.retry("message-id-here")
print("Webhook retried")Force processing of a message that was blocked by inbound rules.
async def main():
await client.inbound.bypass("message-id-here")
print("Message bypassed")