Skip to content

Commit

Permalink
🎉 Source Posthog - migration to low code, from alpha to beta (#18993)
Browse files Browse the repository at this point in the history
* added projects stream, added slicing for all streams (based on projects), annotations stream does not support incremental syncs anymore (due to API)

* added schema for projects streams

* source switched to low code

* added low coe yaml file

* added customized components

* code cleanup

* fix SAT

* added insights stream

* fix schema

* fix catalog

* fix yaml

* added schema

* added test

* updated SAT

* updated docs, bumped version

* update docs

* for events stream: added CartesianProductStreamSlicer, added paginator

* added custom slicer

* updated SAT

* fixed formatting

* added unit tests

* updated page_size

* fixed unit tests

* removed obsolete streams.py file

* updated seed file

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
midavadim and octavia-squidington-iii committed Dec 16, 2022
1 parent 7ae05b1 commit f7fc223
Show file tree
Hide file tree
Showing 21 changed files with 1,415 additions and 308 deletions.
Expand Up @@ -1250,11 +1250,11 @@
- name: PostHog
sourceDefinitionId: af6d50ee-dddf-4126-a8ee-7faee990774f
dockerRepository: airbyte/source-posthog
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.com/integrations/sources/posthog
icon: posthog.svg
sourceType: api
releaseStage: alpha
releaseStage: beta
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
Expand Down
Expand Up @@ -11320,7 +11320,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-posthog:0.1.7"
- dockerImage: "airbyte/source-posthog:0.1.8"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/posthog"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-posthog/Dockerfile
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-posthog
Expand Up @@ -12,15 +12,13 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams:
- events_sessions
- insights_path
- insights_sessions
- trends
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalog_events_incremental.json"
future_state_path: "integration_tests/future_state.json"
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_events_incremental.json"
future_state_path: "integration_tests/future_state_old.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
@@ -1,17 +1,26 @@
{
"streams": [
{
"stream": {
"name": "projects",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "annotations",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_at"],
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "full_refresh",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
Expand Down Expand Up @@ -74,6 +83,21 @@
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "insights",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": null,
"default_cursor_field": null,
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "full_refresh",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
}
]
}
@@ -0,0 +1,19 @@
{
"streams": [
{
"stream": {
"name": "events",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["timestamp"],
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["timestamp"],
"primary_key": [["id"]]
}
]
}

Large diffs are not rendered by default.

@@ -1,4 +1,7 @@
{
"events": { "timestamp": "2121-04-13T18:13:51.504000+00:00" },
"annotations": { "updated_at": "2121-05-27T14:09:29.961933Z" }
}
"events": {
"2331": {
"timestamp": "2221-12-10T10:21:35.003000+00:00"
}
}
}
@@ -0,0 +1,5 @@
{
"events": {
"timestamp": "2221-12-10T10:21:35.003000+00:00"
}
}
@@ -1,6 +1,7 @@
{
"feature_flags": { "id": 697 },
"events": { "timestamp": "2021-05-31T06:58:11.633000+00:00" },
"persons": { "created_at": "2021-04-13T18:13:54.269000Z" },
"annotations": { "updated_at": "2021-05-27T14:09:29.961933Z" }
}
"events": {
"2331": {
"timestamp": "2021-12-10T10:21:35.003000+00:00"
}
}
}
6 changes: 2 additions & 4 deletions airbyte-integrations/connectors/source-posthog/setup.py
Expand Up @@ -5,9 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
]
MAIN_REQUIREMENTS = ["airbyte-cdk"]

TEST_REQUIREMENTS = [
"pytest~=6.1",
Expand All @@ -21,6 +19,6 @@
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
package_data={"": ["*.json", "schemas/*.json"]},
package_data={"": ["*.json", "*.yaml", "schemas/*.json"]},
extras_require={"tests": TEST_REQUIREMENTS},
)
@@ -0,0 +1,126 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, Iterable, Mapping, MutableMapping, Optional

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice


@dataclass
class EventsSimpleRetriever(SimpleRetriever):
def request_params(
self,
stream_state: StreamSlice,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
"""Events API return records in descendent order (newest first).
Default page limit is 100 items.
Even though API mentions such pagination params as 'limit' and 'offset', they are actually ignored.
Instead, response contains 'next' url with datetime range for next OLDER records, like:
response:
{
"next": "https://app.posthog.com/api/projects/2331/events?after=2021-01-01T00%3A00%3A00.000000Z&before=2021-05-29T16%3A44%3A43.175000%2B00%3A00",
"results": [
{id ...},
{id ...},
]
}
So if next_page_token is set (contains 'after'/'before' params),
then stream_slice params ('after'/'before') should be ignored.
"""

if next_page_token:
stream_slice = {}

return self._get_request_options(
stream_slice,
next_page_token,
self.requester.get_request_params,
self.paginator.get_request_params,
self.stream_slicer.get_request_params,
)


@dataclass
class EventsCartesianProductStreamSlicer(CartesianProductStreamSlicer):
"""Connector requires support of nested state - each project should have own timestamp value, like:
{
"project_id1": {
"timestamp": "2021-02-01T10:21:35.003000Z"
},
"project_idX": {
"timestamp": "2022-11-17:00:00.000000Z"
}
}
we also have to support old-style (before 0.1.8) states, like:
{
"timestamp": "2021-17-01T10:21:35.003000Z"
}
Slicer also produces separate datetime slices for each project
"""

def __post_init__(self, options: Mapping[str, Any]):
self._cursor = {}
self._options = options

def get_stream_state(self) -> Mapping[str, Any]:
return self._cursor or {}

def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None):

if not last_record:
# this is actually initial stream state from CLI
self._cursor = stream_slice
return

project_id = str(stream_slice.get("project_id", ""))
if project_id:
current_cursor_value = self._cursor.get(project_id, {}).get("timestamp", "")
new_cursor_value = last_record.get("timestamp", "")

self._cursor[project_id] = {"timestamp": max(current_cursor_value, new_cursor_value)}

def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
"""Since each project has its own state, then we need to have a separate
datetime slices for each project
"""

slices = []

project_slicer, datetime_slicer = self.stream_slicers

# support of old style state: it contains only a single 'timestamp' field
old_style_state = stream_state if "timestamp" in stream_state else {}

for project_slice in project_slicer.stream_slices(sync_mode, stream_state):
project_id = str(project_slice.get("project_id", ""))

# use old_style_state if state does not contain states for each project
project_state = stream_state.get(project_id, {}) or old_style_state

# Each project should have own datetime slices depends on its state
project_datetime_slices = datetime_slicer.stream_slices(sync_mode, project_state)

# fix date ranges: start_time of next slice must be equal to end_time of previous slice
if project_datetime_slices and project_state:
project_datetime_slices[0]["start_time"] = project_state["timestamp"]
for i, datetime_slice in enumerate(project_datetime_slices[1:], start=1):
datetime_slice["start_time"] = project_datetime_slices[i - 1]["end_time"]

# Add project id to each slice
for datetime_slice in project_datetime_slices:
datetime_slice["project_id"] = project_id

slices.extend(project_datetime_slices)

return slices

0 comments on commit f7fc223

Please sign in to comment.