Skip to content

4. Developing new connections

Caio Tomazelli edited this page Jan 8, 2024 · 1 revision

Developer workflow

Requirements

docker-compose: Follow the instructions to install it.

Run Application

./run_tightlock.sh - Initialization script. When run for the first time, it will prompt the user for an API Key and store it in a local .env file along with other variables.

For normal developer workflow, should be run without options both when running for the first time or subsequent usage.

Command Options

  • -e: Environment Flag. Controls which docker-compose command to use. Pass -e prod when deploying to GCP.

  • -i: Interactive flag. Pass -i non-interactive if there's no need for the interactive initialization prompt. Ignored when .env file already exists in the folder.

  • -k: API Key flag. Pass k PROVIDED_API_KEY if providing an API key in non-interactive mode. Ignored when .env file already exists in the folder.

Troubleshooting commands

docker-compose down -v - Cleans up containers and removes all volumes (Exsting configs will be wiped).

docker ps - Check which containers are running and inspect its names and ids.

docker logs ea5e7d0e277e - Shows logs for container with the provided id.

docker exec -ti tightlock_tightlock-api_1 bash - Runs an interactive bash in the API container (useful to run migrations or inspect logs.)

When making changes to the underlying Postgres DB in the API context:

  • alembic revision --autogenerate -m "Some message" - Creates a new revision with the changes made to the data models
  • alembic upgrade head - Applies migrations to DB
  • docker exec -ti tightlock_postgres_1 psql -U tightlock -W tightlock - Runs Postgres REPL inside DB container (useful for inspecting tables)

Dev debugging URLs

  • localhost:8080: Airflow Webserver UI (user airflow and pwd airflow in dev) - for troubleshooting connections (triggering, logs etc)
  • localhost:8047: Drill web UI - for troubleshooting queries to local_files source

Common initialization errors

If you see an error related to "password authentication", it is probably a problem with Postgres initialization script. You can run the following commands in order to fix it and restart Tightlock:

chmod 755 postgres_init.sh
docker-compose down -v 
run_tightlock.sh

Connections development

Basic steps for developing a new connection

  1. Write a implementation of the relevant protocol for the connection (destination_proto if destination, source_proto if source). Put the implementation under the dag/[sources/destinations] folder and choose a relevant name for your file. The implementation must be a class named Source or Destination inside this file (See bigquery source or ga4mp destination as examples).

Note: You can skip implementing the schema and validate methods while validating the data flow from source to destination in the backend. These methods must be implemented for integration with the 1PD Scheduler Frontend.

  1. Test the implementation (both data flowing from source to destination and integration with the frontend).

The following snippet is a minimal example of a configuration you can use for tests:

{
  "value": {
    "sources": {
      "new_conn_source": {
        "location": "new_conn_sample.csvh",
        "type": "local_file"
      }
    },
    "activations": [
      {
        "name": "new_conn_test",
        "source": {
          "$ref": "#/sources/new_conn_source"
        },
        "schedule": "",
        "destination": {
          "$ref": "#/destinations/new_conn_destination"
        }
      }
    ],
    "destinations": {
      "new_conn_destination": {
        "type": "NEW_CONN",
        "custom_field_1": "abc",
        "custom_field_2": 42,
        "custom_field_3": ""
      }
    }
  },
  "label": "Test Config 1"
}

You can save this configuration in a JSON file and submit it through the Tightlock API as follows:

curl -H "Content-Type: application/json" -X POST -H 'X-Api-Key: {EXAMPLE_API_KEY}'  localhost:8081/api/v1/configs -d @example.json

Note that the label of the config is an unique constraint and needs to be updated for each new config submission.

To verify the current config being used by Tightlock, you can call the getLatest endpoint:

curl -H "Content-Type: application/json" -H 'X-Api-Key: {EXAMPLE_API_KEY}' localhost:8081/api/v1/configs:getLatest                      

Once the connection is saved, you can trigger it via the trigger endpoint as follows. The dry_run argument is optional:

curl -X POST -H 'X-API-Key: {EXAMPLE_API_KEY}' -H 'Content-Type: application/json' -d '{"dry_run": 1}' -o - -i http://localhost:8081/api/v1/activations/new_conn_test:trigger

Make sure that you have created a file containing sample data and have saved it to sample_data/new_conn_sample.csvh with the proper data schema.

Follow the guidelines below to define where each connection data/metadata should be retrieved from:

  • Config Schema: The fields that are fixed for each implementation of your new source or destination. - "custom_field_[123]" that appears on the example can be changed to any key and there is no limit on the amount of custom fields you can add. See GA4 Config Schema for reference.

  • Data Schema: This applies only for new destinations and is defined as the schema of the "tables" that are required for the destination. Data that goes here can change on "event" level (for instance, in GA4 each row of the table can contain a different client_id, so one can use only one GA4MP destination connection to send data to multiple client_ids).

  1. Write Config and Data schema documentation in the Wiki. (Data Schema is required for Destinations only).

Common elements

Schemas

  @staticmethod
  def schema() -> Optional[ProtocolSchema]:
    """Returns the required metadata for this source/destination config.
    
    Returns:
      An optional ProtocolSchema object that defines the 
      required and optional metadata used by the implementation
      of this protocol.
    """
    ...

ProtocolSchema

class ProtocolSchema:
  """Class that defines the schema of a source or destination protocol."""

  class_name: str
  fields: Sequence[Tuple[str, type] | Tuple[str, type, field]]

Schema reserved fields

For now, the only reserved field is name, which is added to all schemas automatically (code for reference).

Schema fields attributes

Attribute Required Description Example Usage
description yes Description of the field that shows up in the frontend -
type yes Attribute type. Available options described below GA4 enum or BigQuery RawJSON
default no Default value for optional fields ga4mp non_personalized_ads fields
validation no ECMAScript-compliant regex. Used to validate inputs in the frontend bigquery dataset field
condition_field no Field that should be used to control whether or not the current field should be shown. measurement_id and firebased_app_id exclusive fields in ga4mp
condition_target no Target value of the condition_field for the current field to be shown. measurement_id and firebased_app_id exclusive fields in ga4mp
immutable no Indicates whether this field should be editable or not. Automatically generated "name" attribute.

Available attribute types

Each type is rendered as a different element in the 1PD Scheduler FE, following the mapping below:

Attribute Type HTML Element How to use it
str,int,float <input> Built-in type
bool <select> Built-in type
RawJSON <textarea> utils.SchemaUtils.raw_json_type()
KeyValue Form Array Input utils.SchemaUtils.key_value_type()
Enum <select> enum.Enum("CustomEnumType", {"OPTION1": "option1", "OPTION2": "option2"})

Validation

  def validate(self) -> ValidationResult:
    """Validates the provided config.
    
    Returns:
      A ValidationResult for the provided config.
    """
    ...

ValidationResult

class ValidationResult:
  """Class for reporting of validation results."""

  is_valid: bool
  messages: Sequence[str]

Sources

class SourceProto(Protocol):
  """Common set of methods that must be implemented by all sources."""

  def __init__(self, config: Dict[str, Any]):
    """Init method for SourceProto.
    
    Args:
      config: A dictionnary parsed from the connection config, containing
      the metadata required for the the target source.
    """
    ...

  def get_data(
      self,
      fields: Sequence[str],
      offset: int,
      limit: int,
      reusable_credentials: Optional[Sequence[Mapping[str, Any]]],
  ) -> List[Mapping[str, Any]]:
    """Retrieves data from the target source.
    
    Args:
      fields: A list of fields to be retrieved from the 
        underlying source.
      offset: The offset for the query.
      limit: The maximum number of records to return.
      reusable_credentials: An auxiliary list of reusable credentials
        that may be shared by multiple sources.
    Returns:
      A list of field-value mappings retrieved from the target data source.
    """
    ...

Destinations

class DestinationProto(Protocol):
  """Common set of methods that must be implemented by all destinations."""

  def __init__(self, config: Dict[str, Any]):
    """Init method for DestinationProto.
    
    Args:
      config: A dictionary parsed from the connection config, containing
        the metadata required for the the target destination.
    """
    ...

  def send_data(
      self, input_data: List[Mapping[str, Any]], dry_run: bool
  ) -> Optional[RunResult]:
    """Sends data to the underlying destination API.
    
    Args:
      input_data: A list of key-value mappings representing inputs for the
        target destination. The mapping keys must be a subset of the list 
        retrieved by calling the destinations "fields" method.
      dry_run: A boolean indicating whether or not to actually send
        the data to the destination or if it should only be validated
        (when validation is available).

    Returns:
      An optional RunResult object, that can only be
      None in the event of an unexpected error.
    """
    ...

  def fields(self) -> Sequence[str]:
    """Lists required fields for the destination input data.
    
    Returns:
      A sequence of fields.
    """
    ...

  def batch_size(self) -> int:
    """Returns the required batch_size for the underlying destination API.

    Returns:
      An int representing the batch_size.
    """
    ...

Design principles

For code reusability, use Mixins.

local_file class uses a DrillMixin that provides utility Drill functions that are already implemented (instead of inheriting from an abstract "DrillSource" class).

  • Keep it Simple - Avoid complexity and premature optimization. Favor code that is explicit, short and straight to the point. If some refinement is needed in the future, we can always refactor.

  • Don't repeat yourself - When designing connections that reuse a module or an API, coordinate with other contributors to avoid code duplication.