# Strava insights

## Build a simple Prefect flow

TODO: add an exercise below

In [None]:
from prefect import Flow, task
from prefect.run_configs.docker import DockerRun
from prefect.storage.local import Local

RUN_CONFIG = DockerRun(
    image="viadot:latest",
    env={"SOME_VAR": "value"},
    labels=["dev"],
)

with Flow("My first Prefect flow", storage=Local(), run_config=RUN_CONFIG) as flow:
    pass

## Build a Prefect flow to run an Airbyte sync

In [11]:
airbyte_server_host = "webapp"
airbyte_server_port = "80"
airbyte_api_version = "v1"
airbyte_base_url = (
    f"http://{airbyte_server_host}:"
    f"{airbyte_server_port}/api/{airbyte_api_version}"
)
connection_id = "4a6d8e08-936a-4820-bfb6-3c676866adc4"
url = airbyte_base_url + "/web_backend/connections/get/"

session = requests.Session()
response = session.post(url, json={"connectionId": connection_id})
response.json()

{'connectionId': '4a6d8e08-936a-4820-bfb6-3c676866adc4',
 'name': 'default',
 'namespaceDefinition': 'source',
 'namespaceFormat': '${SOURCE_NAMESPACE}',
 'prefix': '',
 'sourceId': 'a33c9f0f-f51b-4a4a-b712-af055b175333',
 'destinationId': '286b5093-0455-4c3c-9d99-816edb562f3e',
 'operationIds': [],
 'syncCatalog': {'streams': [{'stream': {'name': 'pokemon',
     'jsonSchema': {'type': 'object',
      '$schema': 'http://json-schema.org/draft-07/schema#',
      'properties': {'id': {'type': ['null', 'integer']},
       'name': {'type': ['null', 'string']},
       'forms': {'type': ['null', 'array'],
        'items': {'type': ['null', 'object'],
         'properties': {'url': {'type': ['null', 'string']},
          'name': {'type': ['null', 'string']}}}},
       'moves': {'type': ['null', 'array'],
        'items': {'type': ['null', 'object'],
         'properties': {'move': {'type': ['null', 'object'],
           'properties': {'url': {'type': ['null', 'string']},
            'name': {'

In [19]:
from time import sleep
import uuid

import requests
from requests import RequestException

from prefect import Task
from prefect.engine.signals import FAIL
from prefect.utilities.tasks import defaults_from_attrs
import logging
import re
from typing import List, Tuple


class ConnectionNotFoundException(Exception):
    pass


class AirbyteServerNotHealthyException(Exception):
    pass


class JobNotFoundException(Exception):
    pass


class AirbyteConnectionTask2(Task):
    """
    Task for triggering Airbyte Connections, where "A connection is
    a configuration for syncing data between a source and a destination."
    For more information refer to the
    [Airbyte docs](https://docs.airbyte.io/understanding-airbyte/connections)
    This task assumes that the Airbyte Open-Source, since "For
    Airbyte Open-Source you don't need the API Token for
    Authentication! All endpoints are possible to access using the
    API without it."
    For more information refer to the [Airbyte docs](https://docs.airbyte.io/api-documentation)
    Args:
        - airbyte_server_host (str, optional): Hostname of Airbyte server where connection is configured.
            Defaults to localhost.
        - airbyte_server_port (str, optional): Port that the Airbyte server is listening on.
            Defaults to 8000.
        - airbyte_api_version (str, optional): Version of Airbyte API to use to trigger connection sync.
            Defaults to v1.
        - connection_id (str, optional): Default connection id to
            use for sync jobs, if none is specified to `run`.
        - **kwargs (Any, optional): additional kwargs to pass to the
            base Task constructor
    """

    # Connection statuses
    CONNECTION_STATUS_ACTIVE = "active"
    CONNECTION_STATUS_INACTIVE = "inactive"
    CONNECTION_STATUS_DEPRECATED = "deprecated"

    # Job statuses
    # pending┃running┃incomplete┃failed┃succeeded┃cancelled
    JOB_STATUS_SUCCEEDED = "succeeded"
    JOB_STATUS_FAILED = "failed"
    JOB_STATUS_PENDING = "pending"
    
    # Attempt statuses
    ATTEMPT_STATUS_SUCCEEDED = "succeeded"
    ATTEMPT_STATUS_FAILED = "failed"
    ATTEMPT_STATUS_RUNNING = "running"
    

    def __init__(
        self,
        airbyte_server_host: str = "localhost",
        airbyte_server_port: int = 8000,
        airbyte_api_version: str = "v1",
        connection_id: str = None,
        display_airbyte_logs: bool = False,
        **kwargs,
    ):
        self.airbyte_server_host = airbyte_server_host
        self.airbyte_server_port = airbyte_server_port
        self.airbyte_api_version = airbyte_api_version
        self.connection_id = connection_id
        self.display_airbyte_logs = display_airbyte_logs
        super().__init__(**kwargs)

    def _check_health_status(self, session: requests.Session) -> bool:
        get_connection_url = self.airbyte_base_url + "/health/"
        try:
            response = session.get(get_connection_url)
            health_status = response.json()["db"]
            if not health_status:
                raise AirbyteServerNotHealthyException(
                    f"Airbyte Server health status: {health_status}"
                )
            return True
        except RequestException as e:
            raise AirbyteServerNotHealthyException(e)
            
    def _get_connection_source_and_dest(self, session: requests.Session) -> Tuple[str, str]:
        """Get source and destination names for the connection ID.
        Note `web_backend` in the URL"""
        get_connection_url = self.airbyte_base_url + "/web_backend/connections/get/"
        try:
            response = session.post(
                get_connection_url, json={"connectionId": self.connection_id}
            )
            source = response.json()["source"]["name"]
            destination = response.json()["destination"]["name"]
            return source, destination
        except RequestException as e:
            raise AirbyteServerNotHealthyException(e)

    def _get_connection_status(self, session: requests.Session) -> str:
        get_connection_url = self.airbyte_base_url + "/connections/get/"
        
        # TODO - Missing authentication because Airbyte servers currently do not support authentication
        try:
            response = session.post(
                get_connection_url, json={"connectionId": self.connection_id}
            )

            # check whether a schedule exists ...
            schedule = response.json()["schedule"]
            if schedule:
                self.logger.warning("Found existing Connection schedule, removing ...")

                # mandatory fields for Connection update ...
                sync_catalog = response.json()["syncCatalog"]
                connection_status = response.json()["status"]

                update_connection_url = self.airbyte_base_url + "/connections" "/update/"
                response2 = session.post(
                    update_connection_url,
                    json={
                        "connectionId": self.connection_id,
                        "syncCatalog": sync_catalog,
                        "schedule": None,
                        "status": connection_status,
                    },
                )

                if response2.status_code == 200:
                    self.logger.info("Schedule removed.")
                else:
                    self.logger.warning("Schedule not removed.")
                    self.logger.warning(response2.json())

            connection_status = response.json()["status"]
            return connection_status
        except RequestException as e:
            raise AirbyteServerNotHealthyException(e)

    def _trigger_manual_sync_connection(self, session: requests.Session) -> Tuple[int, int]:
        """
        Trigger a manual sync of the Connection, see:
        https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc
        -api-docs.html#post-/v1/connections/sync
        Args:
            session: requests session with which to make call to Airbyte server
        Returns: job_id, job_created_at - timestamp of sync job creation
        """
        get_connection_url = self.airbyte_base_url + "/connections/sync/"

        # TODO - missing authentication ...
        try:
            response = session.post(
                get_connection_url, json={"connectionId": self.connection_id}
            )
            if response.status_code == 200:
                job_id = response.json()["job"]["id"]
                job_created_at = response.json()["job"]["createdAt"]
                return job_id, job_created_at
            elif response.status_code == 404:
                raise ConnectionNotFoundException(
                    f"Connection {connection_id} not found, please double "
                    f"check the connection_id ..."
                )
        except RequestException as e:
            raise AirbyteServerNotHealthyException(e)
    
    @staticmethod
    def _escape_ansi(line: str) -> str:
        ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
        return ansi_escape.sub('', line)
    
    def _get_stripped_logger(self):
        """Helper method for _log_airbyte_logs()"""
        root_logger = logging.getLogger("prefect")
        
        handler = [h for h in root_logger.handlers if type(h) == logging.StreamHandler][0]
        prev_formatter = handler.formatter
        formatter = logging.Formatter(fmt='%(message)s')
        handler.setFormatter(formatter)
        
        stripped_logger = prefect_root_logger.getChild(self.name)
        
        return stripped_logger, prev_formatter
    
    def _recover_prefect_logger(self, prev_formatter: logging.Formatter) -> None:
        """Helper method for _log_airbyte_logs()"""
        root_logger = logging.getLogger("prefect")
        
        handler = [h for h in root_logger.handlers if type(h) == logging.StreamHandler][0]
        handler.setFormatter(prev_formatter)
    
    def _log_airbyte_logs(self, logs: List[str]) -> None:
        """Take a list of Airbyte logs and log them with the Prefect logger"""
        logger, prev_formatter = self._get_stripped_logger()
        for log in logs:
            log_escaped = self._escape_ansi(log)
            logger.info(log_escaped)
        self._recover_prefect_logger(prev_formatter)

    def _get_job_status(
        self, session: requests.Session, job_id: int
    ) -> Tuple[str, int, int]:
        get_connection_url = self.airbyte_base_url + "/jobs/get/"
        try:
            response = session.post(get_connection_url, json={"id": job_id})
            if response.status_code == 200:
                job_status = response.json()["job"]["status"]
                
                if job_status == self.JOB_STATUS_SUCCEEDED:
                    attempts = response.json()["attempts"]
                    successful_attempt = [
                        a for a in attempts if a["attempt"]["status"] == self.ATTEMPT_STATUS_SUCCEEDED
                    ][0]
                    
                    if self.display_airbyte_logs:
                        self._log_airbyte_logs(successful_attempt["logs"]["logLines"])
                                     
                job_created_at = response.json()["job"]["createdAt"]
                job_updated_at = response.json()["job"]["updatedAt"]
                return job_status, job_created_at, job_updated_at
            elif response.status_code == 404:
                self.logger.error(f"Job {job_id} not found...")
                raise JobNotFoundException(f"Job {job_id} not found...")
        except RequestException as e:
            raise AirbyteServerNotHealthyException(e)

    @defaults_from_attrs(
        "airbyte_server_host",
        "airbyte_server_port",
        "airbyte_api_version",
        "connection_id",
    )
    def run(
        self,
        airbyte_server_host: str = None,
        airbyte_server_port: int = None,
        airbyte_api_version: str = None,
        connection_id: str = None,
        display_airbyte_logs: bool = None,
        poll_interval_s: int = 15,
    ) -> dict:
        """
        Task run method for triggering an Airbyte Connection.
        *It is assumed that the user will have previously configured
        a Source & Destination into a Connection.*
        e.g. MySql -> CSV
        An invocation of `run` will attempt to start a sync job for
        the specified `connection_id` representing the Connection in
        Airbyte.
        `run` will poll Airbyte Server for the Connection status and
        will only complete when the sync has completed or
        when it receives an error status code from an API call.
        Args:
            - airbyte_server_host (str, optional): Hostname of Airbyte server where connection is
                configured. Will overwrite the value provided at init if provided.
            - airbyte_server_port (str, optional): Port that the Airbyte server is listening on.
                Will overwrite the value provided at init if provided.
            - airbyte_api_version (str, optional): Version of Airbyte API to use to trigger connection
                sync. Will overwrite the value provided at init if provided.
            - connection_id (str, optional): if provided,
                will overwrite the value provided at init.
            - display_airbyte_logs (boolean, optional): Whether to display detailed sync logs
                from Airbyte.
            - poll_interval_s (int, optional): this task polls the
                Airbyte API for status, if provided this value will
                override the default polling time of 15 seconds.
        Returns:
            - dict: connection_id (str) and succeeded_at (timestamp str)
        """
        
        if not connection_id:
            raise ValueError("`connection_id` *must* be provided.")

        try:
            uuid.UUID(connection_id)
        except (TypeError, ValueError):
            raise ValueError(
                "Parameter `connection_id` *must* be a valid UUID \
                i.e. 32 hex characters, including hyphens."
            )
            
        self.connection_id = connection_id

        # see https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com
        # /rapidoc-api-docs.html#overview
        self.airbyte_base_url = (
            f"http://{airbyte_server_host}:"
            f"{airbyte_server_port}/api/{airbyte_api_version}"
        )

        session = requests.Session()
        self._check_health_status(session)

        connection_status = self._get_connection_status(session)
        if connection_status == self.CONNECTION_STATUS_ACTIVE:
            # Trigger manual sync on the Connection ...
            source, destination = self._get_connection_source_and_dest(session)
            self.logger.info(f"Triggering Airbyte sync '{source}' -> '{destination}'...")
                
            job_id, job_created_at = self._trigger_manual_sync_connection(session)

            job_status = self.JOB_STATUS_PENDING

            while job_status not in [self.JOB_STATUS_FAILED, self.JOB_STATUS_SUCCEEDED]:
                job_status, job_created_at, job_updated_at = self._get_job_status(
                    session, job_id
                )

                if job_status == self.JOB_STATUS_SUCCEEDED:
                    self.logger.info(f"Job {job_id} succeeded.")
                elif job_status == self.JOB_STATUS_FAILED:
                    self.logger.error(f"Job {job_id} failed.")
                else:
                    # wait for next poll interval
                    sleep(poll_interval_s)

            return {
                "connection_id": connection_id,
                "status": connection_status,
                "job_status": job_status,
                "job_created_at": job_created_at,
                "job_updated_at": job_updated_at,
            }
        elif connection_status == self.CONNECTION_STATUS_INACTIVE:
            self.logger.error(
                f"Please enable the Connection {connection_id} in Airbyte Server."
            )
            raise FAIL(
                f"Please enable the Connection {connection_id} in Airbyte Server."
            )
        elif connection_status == self.CONNECTION_STATUS_DEPRECATED:
            self.logger.error(f"Connection {connection_id} is deprecated.")
            raise FAIL(f"Connection {connection_id} is deprecated.")

In [20]:
from datetime import timedelta

from prefect import Flow, Parameter, task
from prefect.run_configs.docker import DockerRun
from prefect.storage.local import Local
from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask
from prefect.tasks.dbt.dbt import DbtShellTask

RUN_CONFIG = DockerRun(
    image="viadot:latest",
    env={"SOME_VAR": "value"},
    labels=["dev"],
)

sync_airbyte_connection = AirbyteConnectionTask2(
    max_retries=3,
    retry_delay=timedelta(seconds=10)
)

with Flow("Determine common contributors flow", storage=Local(), run_config=RUN_CONFIG) as flow:
    # Airbyte connection strings
    airbyte_strava_connection_id = Parameter("AIRBYTE_POKE_CONNECTION_ID")

    # Sync Strava
    airbyte_github_sync = sync_airbyte_connection(
        airbyte_server_host="webapp",
        airbyte_server_port=80,
        airbyte_api_version="v1",
        connection_id=airbyte_strava_connection_id,
    )
    
flow.run(AIRBYTE_POKE_CONNECTION_ID="4a6d8e08-936a-4820-bfb6-3c676866adc4")

[2021-12-20 00:25:04+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'Determine common contributors flow'
[2021-12-20 00:25:04+0000] INFO - prefect.TaskRunner | Task 'AIRBYTE_POKE_CONNECTION_ID': Starting task run...
[2021-12-20 00:25:04+0000] INFO - prefect.TaskRunner | Task 'AIRBYTE_POKE_CONNECTION_ID': Finished task run for task with final state: 'Success'
[2021-12-20 00:25:04+0000] INFO - prefect.TaskRunner | Task 'AirbyteConnectionTask2': Starting task run...
[2021-12-20 00:25:04+0000] INFO - prefect.AirbyteConnectionTask2 | Triggering Airbyte sync 'workshop_poke' -> 'workshop_local_json'...
[2021-12-20 00:25:19+0000] INFO - prefect.AirbyteConnectionTask2 | Job 47 succeeded.
[2021-12-20 00:25:19+0000] INFO - prefect.TaskRunner | Task 'AirbyteConnectionTask2': Finished task run for task with final state: 'Success'
[2021-12-20 00:25:19+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


<Success: "All reference tasks succeeded.">

Run below in a terminal to see if the file was actually reloaded:  
`cat /tmp/airbyte_local/workshop/poke/_airbyte_raw_pokemon.jsonl | python -m json.tool | grep _airbyte_emitted_at`

## Build the full Prefect flow

In [None]:
from datetime import timedelta

from prefect import Flow, Parameter, task
from prefect.run_configs.docker import DockerRun
from prefect.storage.local import Local
from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask
from prefect.tasks.dbt.dbt import DbtShellTask


...