Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions examples/custom_source_hn/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class HackerNewsSource(SourceSpec):
"""Source spec for HackerNews API."""

tag: str | None = None
max_results: int = 100
max_results: int = 200


@source_connector(
Expand All @@ -77,24 +77,16 @@ async def create(spec: HackerNewsSource) -> "HackerNewsConnector":
"""Create a HackerNews connector from the spec."""
return HackerNewsConnector(spec, aiohttp.ClientSession())

async def _ensure_session(self) -> aiohttp.ClientSession:
"""Ensure we have an active HTTP session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session

async def list(
self,
) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]:
"""List HackerNews threads using the search API."""
session = await self._ensure_session()

# Use HackerNews search API
search_url = "https://hn.algolia.com/api/v1/search_by_date"
params: dict[str, Any] = {"hitsPerPage": self._spec.max_results}
if self._spec.tag:
params["tags"] = self._spec.tag
async with session.get(search_url, params=params) as response:
async with self._session.get(search_url, params=params) as response:
response.raise_for_status()
data = await response.json()
for hit in data.get("hits", []):
Expand All @@ -114,12 +106,10 @@ async def get_value(
self, key: _HackerNewsThreadKey
) -> PartialSourceRowData[_HackerNewsThread]:
"""Get a specific HackerNews thread by ID using the items API."""
session = await self._ensure_session()

# Use HackerNews items API to get full thread with comments
item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}"

async with session.get(item_url) as response:
async with self._session.get(item_url) as response:
response.raise_for_status()
data = await response.json()

Expand Down Expand Up @@ -183,7 +173,7 @@ def hackernews_flow(

# Add the custom source to the flow
data_scope["threads"] = flow_builder.add_source(
HackerNewsSource(tag="story", max_results=500),
HackerNewsSource(tag="story", max_results=200),
refresh_interval=timedelta(minutes=1),
)

Expand Down
16 changes: 3 additions & 13 deletions examples/hn_trending_topics/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class HackerNewsSource(SourceSpec):
"""Source spec for HackerNews API."""

tag: str | None = None
max_results: int = 100
max_results: int = 200


@source_connector(
Expand All @@ -110,24 +110,16 @@ async def create(spec: HackerNewsSource) -> "HackerNewsConnector":
"""Create a HackerNews connector from the spec."""
return HackerNewsConnector(spec, aiohttp.ClientSession())

async def _ensure_session(self) -> aiohttp.ClientSession:
"""Ensure we have an active HTTP session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session

async def list(
self,
) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]:
"""List HackerNews threads using the search API."""
session = await self._ensure_session()

# Use HackerNews search API
search_url = "https://hn.algolia.com/api/v1/search_by_date"
params: dict[str, Any] = {"hitsPerPage": self._spec.max_results}
if self._spec.tag:
params["tags"] = self._spec.tag
async with session.get(search_url, params=params) as response:
async with self._session.get(search_url, params=params) as response:
response.raise_for_status()
data = await response.json()
for hit in data.get("hits", []):
Expand All @@ -147,12 +139,10 @@ async def get_value(
self, key: _HackerNewsThreadKey
) -> PartialSourceRowData[_HackerNewsThread]:
"""Get a specific HackerNews thread by ID using the items API."""
session = await self._ensure_session()

# Use HackerNews items API to get full thread with comments
item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}"

async with session.get(item_url) as response:
async with self._session.get(item_url) as response:
response.raise_for_status()
data = await response.json()

Expand Down