Skip to content

Commit

Permalink
feat: add ingestion_metadata field (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyang1520 authored Sep 7, 2022
1 parent 8d5699e commit a74a943
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 12 deletions.
2 changes: 1 addition & 1 deletion examples/flask_example/flaskapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def track_revenue(user_id):
@app.route("/flush")
def flush_event():
amp_client.flush()
return f"<p>All events flushed</p>"
return "<p>All events flushed</p>"


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion src/amplitude/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from amplitude.client import Amplitude
from amplitude.event import BaseEvent, EventOptions, Identify, Revenue, IdentifyEvent, \
GroupIdentifyEvent, RevenueEvent, Plan
GroupIdentifyEvent, RevenueEvent, Plan, IngestionMetadata
from amplitude.config import Config
from amplitude.constants import PluginType
from amplitude.plugin import EventPlugin, DestinationPlugin
7 changes: 5 additions & 2 deletions src/amplitude/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Optional, Callable

from amplitude import constants
from amplitude.event import BaseEvent, Plan
from amplitude.event import BaseEvent, Plan, IngestionMetadata
from amplitude.storage import InMemoryStorageProvider, StorageProvider, Storage


Expand All @@ -33,6 +33,7 @@ class Config:
storage_provider (amplitude.storage.StorageProvider, optional): Default to InMemoryStorageProvider.
Provide storage instance for events buffer.
plan (amplitude.event.Plan, optional): Tracking plan information. Default to None.
ingestion_metadata (amplitude.event.IngestionMetadata, optional): Ingestion metadata. Default to None.
Properties:
options: A dictionary contains minimum id length information. None if min_id_length not set.
Expand All @@ -55,7 +56,8 @@ def __init__(self, api_key: str = None,
use_batch: bool = False,
server_url: Optional[str] = None,
storage_provider: StorageProvider = InMemoryStorageProvider(),
plan: Plan = None):
plan: Plan = None,
ingestion_metadata: IngestionMetadata = None):
"""The constructor of Config class"""
self.api_key: str = api_key
self._flush_queue_size: int = flush_queue_size
Expand All @@ -71,6 +73,7 @@ def __init__(self, api_key: str = None,
self.storage_provider: StorageProvider = storage_provider
self.opt_out: bool = False
self.plan: Plan = plan
self.ingestion_metadata: IngestionMetadata = ingestion_metadata

def get_storage(self) -> Storage:
"""Use configured StorageProvider to create a Storage instance then return.
Expand Down
71 changes: 68 additions & 3 deletions src/amplitude/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
BaseEvent: Basic event class. Subclass of EventOptions.
Identify: A class used to create identify and group identify event.
IdentifyEvent: A special event class. Used to update user properties without an actual event.
GroupIdentifyEvent: A special event class. Used to update group properties without an actual event
GroupIdentifyEvent: A special event class. Used to update group properties without an actual event.
Revenue: A class used to create revenue event.
RevenueEvent: A special event class. Used to record revenue information.
Plan: Tracking plan info includes branch, source, version, version_id.
IngestionMetadata: Ingestion metadata includes source name, source version.
"""

import copy
Expand Down Expand Up @@ -71,7 +72,52 @@ def get_plan_body(self):
result[PLAN_KEY_MAPPING[key][0]] = self.__dict__[key]
else:
logger.error(
f"Plan.{key} expected {PLAN_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.")
f"{type(self).__name__}.{key} expected {PLAN_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.")
return result


INGESTION_METADATA_KEY_MAPPING = {
"source_name": ["source_name", str],
"source_version": ["source_version", str],
}


class IngestionMetadata:
"""IngestionMetadata holds metadata information. Instance of IngestionMetadata class can be value of event's `ingestion_metadata` attribute.
Args:
source_name (str, optional): Name of the ingestion source in metadata.
source_version (str, optional): Version of the ingestion source in metadata.
Methods:
get_body(): return a dict object that contains ingestion metadata information.
"""

def __init__(self, source_name: Optional[str] = None, source_version: Optional[str] = None):
"""The constructor for the IngestionMetadata class
Args:
source_name (str, optional): Name of the ingestion source in metadata.
source_version (str, optional): Version of the ingestion source in metadata.
"""
self.source_name: Optional[str] = source_name
self.source_version: Optional[str] = source_version

def get_body(self):
"""Convert this object instance to dict instance
Returns:
A dictionary with data of this object instance
"""
result = {}
for key in INGESTION_METADATA_KEY_MAPPING:
if not self.__dict__[key]:
continue
if isinstance(self.__dict__[key], INGESTION_METADATA_KEY_MAPPING[key][1]):
result[INGESTION_METADATA_KEY_MAPPING[key][0]] = self.__dict__[key]
else:
logger.error(
f"{type(self).__name__}.{key} expected {INGESTION_METADATA_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.")
return result


Expand Down Expand Up @@ -113,6 +159,7 @@ def get_plan_body(self):
"insert_id": ["insert_id", str],
"library": ["library", str],
"plan": ["plan", Plan],
"ingestion_metadata": ["ingestion_metadata", IngestionMetadata],
"group_properties": ["group_properties", dict],
"partner_id": ["partner_id", str],
"version_name": ["version_name", str]
Expand Down Expand Up @@ -158,6 +205,7 @@ class EventOptions:
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
we have already seen before within the past 7 days will be deduplicated.
plan (Plan, optional): Tracking plan properties.
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
partner_id (str, optional): The partner id.
version_name (str, optional): The version name.
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
Expand All @@ -167,7 +215,7 @@ class EventOptions:
retry (int): The retry attempt of the event instance.
Methods:
get_event_body(): Retrun a dictionary with data of the event instance
get_event_body(): Return a dictionary with data of the event instance
callback(code, message): Trigger callback method of the event instance.
"""

Expand Down Expand Up @@ -203,6 +251,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id: Optional[int] = None,
insert_id: Optional[str] = None,
plan: Optional[Plan] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
partner_id: Optional[str] = None,
version_name: Optional[str] = None,
callback=None):
Expand Down Expand Up @@ -240,6 +289,7 @@ def __init__(self, user_id: Optional[str] = None,
self.insert_id: Optional[str] = None
self.library: Optional[str] = None
self.plan: Optional[Plan] = None
self.ingestion_metadata: Optional[IngestionMetadata] = None
self.partner_id: Optional[str] = None
self.version_name: Optional[str] = None
self["user_id"] = user_id
Expand Down Expand Up @@ -274,6 +324,7 @@ def __init__(self, user_id: Optional[str] = None,
self["session_id"] = session_id
self["insert_id"] = insert_id
self["plan"] = plan
self["ingestion_metadata"] = ingestion_metadata
self["partner_id"] = partner_id
self["version_name"] = version_name
self.event_callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = callback
Expand Down Expand Up @@ -309,6 +360,8 @@ def get_event_body(self) -> dict:
event_body[value[0]] = self[key]
if "plan" in event_body:
event_body["plan"] = event_body["plan"].get_plan_body()
if "ingestion_metadata" in event_body:
event_body["ingestion_metadata"] = event_body["ingestion_metadata"].get_body()
for properties in ["user_properties", "event_properties", "group_properties"]:
if properties in event_body:
for key, value in event_body[properties].items():
Expand Down Expand Up @@ -394,6 +447,7 @@ class BaseEvent(EventOptions):
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
we have already seen before within the past 7 days will be deduplicated.
plan (Plan, optional): Tracking plan properties.
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
partner_id (str, optional): The partner id.
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
parameters: an event instance, an integer code of response status, an optional string message.
Expand Down Expand Up @@ -439,6 +493,7 @@ def __init__(self, event_type: str,
session_id: Optional[int] = None,
insert_id: Optional[str] = None,
plan: Optional[Plan] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
partner_id: Optional[str] = None,
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None):
"""The constructor of the BaseEvent class"""
Expand Down Expand Up @@ -474,6 +529,7 @@ def __init__(self, event_type: str,
session_id=session_id,
insert_id=insert_id,
plan=plan,
ingestion_metadata=ingestion_metadata,
partner_id=partner_id,
callback=callback)
self.event_type: str = event_type
Expand Down Expand Up @@ -730,6 +786,7 @@ class GroupIdentifyEvent(BaseEvent):
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
we have already seen before within the past 7 days will be deduplicated.
plan (Plan, optional): Tracking plan properties.
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
partner_id (str, optional): The partner id.
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
parameters: an event instance, an integer code of response status, an optional string message.
Expand Down Expand Up @@ -772,6 +829,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id: Optional[int] = None,
insert_id: Optional[str] = None,
plan: Optional[Plan] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
partner_id: Optional[str] = None,
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None,
identify_obj: Optional[Identify] = None):
Expand Down Expand Up @@ -812,6 +870,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id=session_id,
insert_id=insert_id,
plan=plan,
ingestion_metadata=ingestion_metadata,
partner_id=partner_id,
callback=callback)
if identify_obj:
Expand Down Expand Up @@ -862,6 +921,7 @@ class IdentifyEvent(BaseEvent):
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
we have already seen before within the past 7 days will be deduplicated.
plan (Plan, optional): Tracking plan properties.
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
partner_id (str, optional): The partner id.
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
parameters: an event instance, an integer code of response status, an optional string message.
Expand Down Expand Up @@ -904,6 +964,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id: Optional[int] = None,
insert_id: Optional[str] = None,
plan: Optional[Plan] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
partner_id: Optional[str] = None,
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None,
identify_obj: Optional[Identify] = None):
Expand Down Expand Up @@ -943,6 +1004,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id=session_id,
insert_id=insert_id,
plan=plan,
ingestion_metadata=ingestion_metadata,
partner_id=partner_id,
callback=callback)
if identify_obj:
Expand Down Expand Up @@ -1082,6 +1144,7 @@ class RevenueEvent(BaseEvent):
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
we have already seen before within the past 7 days will be deduplicated.
plan (Plan, optional): Tracking plan properties.
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
partner_id (str, optional): The partner id.
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
parameters: an event instance, an integer code of response status, an optional string message.
Expand Down Expand Up @@ -1124,6 +1187,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id: Optional[int] = None,
insert_id: Optional[str] = None,
plan: Optional[Plan] = None,
ingestion_metadata: Optional[IngestionMetadata] = None,
partner_id: Optional[str] = None,
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None,
revenue_obj: Optional[Revenue] = None):
Expand Down Expand Up @@ -1164,6 +1228,7 @@ def __init__(self, user_id: Optional[str] = None,
session_id=session_id,
insert_id=insert_id,
plan=plan,
ingestion_metadata=ingestion_metadata,
partner_id=partner_id,
callback=callback)
if revenue_obj:
Expand Down
13 changes: 10 additions & 3 deletions src/amplitude/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,10 @@ class ContextPlugin(Plugin):
Methods:
apply_context_data(event): Add SDK name and version to event.library.
execute(event): Set event default timestamp and insert_id if not set elsewhere.
Add SDK name and version to event.library.
execute(event):
- Set event default timestamp and insert_id if not set elsewhere.
- Add SDK name and version to event.library.
- Mount plan, ingestion_metadata if not yet.
"""

def __init__(self):
Expand All @@ -201,7 +203,10 @@ def apply_context_data(self, event: BaseEvent):
event.library = self.context_string

def execute(self, event: BaseEvent) -> BaseEvent:
"""Set event default timestamp and insert_id if not set elsewhere. Add SDK name and version to event.library.
"""
- Set event default timestamp and insert_id if not set elsewhere.
- Add SDK name and version to event.library.
- Mount plan, ingestion_metadata if not yet.
Args:
event (BaseEvent): The event to be processed.
Expand All @@ -212,6 +217,8 @@ def execute(self, event: BaseEvent) -> BaseEvent:
event["insert_id"] = str(uuid.uuid4())
if self.configuration.plan and (not event.plan):
event["plan"] = self.configuration.plan
if self.configuration.ingestion_metadata and (not event.ingestion_metadata):
event["ingestion_metadata"] = self.configuration.ingestion_metadata
self.apply_context_data(event)
return event

Expand Down
Loading

0 comments on commit a74a943

Please sign in to comment.