In [1]:
from datetime import timedelta, datetime
import aircraftlib as aclib
from prefect import task, Flow, Parameter
from prefect.schedules import IntervalSchedule
from prefect.storage import S3
import prefect
from prefect.run_configs import KubernetesRun

In [11]:
#!pip install boto3 botocore

In [3]:
@task(max_retries=3, retry_delay=timedelta(seconds=1))
def extract_reference_data():
    logger = prefect.context.get("logger")
    logger.info("fetching reference data...")
    return a.fetch_reference_data()

In [4]:

@task(max_retries=3, retry_delay=timedelta(seconds=1))
def extract_live_data(airport, radius, ref_data):
    logger = prefect.context.get("logger")
    
    area = None
    if airport:
        airport_data = ref_data.airports[airport]
        airport_position = aclib.Position(
            lat=float(airport_data["latitude"]), long=float(airport_data["longitude"])
        )
        area = aclib.bounding_box(airport_position, radius)

    logger.info("fetching live aircraft data...")
    raw_aircraft_data = aclib.fetch_live_aircraft_data(area=area)

    return raw_aircraft_data

In [5]:
@task
def transform(raw_aircraft_data, ref_data):
    logger = prefect.context.get("logger")
    logger.info("cleaning & transform aircraft data...")

    live_aircraft_data = []
    for raw_vector in raw_aircraft_data:
        vector = aclib.clean_vector(raw_vector)
        if vector:
            aclib.add_airline_info(vector, ref_data.airlines)
            live_aircraft_data.append(vector)

    return live_aircraft_data

In [6]:
@task
def load_reference_data(ref_data):
    logger = prefect.context.get("logger")
    logger.info("saving reference data...")
    db = aclib.Database()
    db.update_reference_data(ref_data)


@task
def load_live_data(live_aircraft_data):
    logger = prefect.context.get("logger")
    logger.info("saving live aircraft data...")
    db = aclib.Database()
    db.add_live_aircraft_data(live_aircraft_data)

In [7]:
custom_confs = {
    "run_config": KubernetesRun(
        image="drtools/prefect:aircraft-etl-package", 
        image_pull_secrets=["regcred"], 
    ),   
    "storage": S3(bucket="dr-prefect"),
} 

with Flow("Aircraft-ETL", **custom_confs) as flow:
    airport = Parameter("airport", default = "IAD")
    radius = Parameter("radius", default = 200)
    
    reference_data = extract_reference_data()
    live_data = extract_live_data(airport, radius, reference_data)

    transformed_live_data = transform(live_data, reference_data)

    load_reference_data(reference_data)
    load_live_data(transformed_live_data)

In [9]:
client = prefect.Client(api_server="http://prefect-server-apollo.default.svc.cluster.local:4200")

In [10]:
client.create_project("AIRCRAFT-ETL")

'9d8dbb4e-9c9e-485f-9c2a-54f47f3c4f50'

In [11]:
flow_id = client.register(flow, project_name="AIRCRAFT-ETL")
flow_id

Result check: OK
[2020-12-22 19:10:34+0100] INFO - prefect.S3 | Uploading aircraft-etl/2020-12-22t18-10-34-141798-00-00 to dr-prefect


  """Entry point for launching an IPython kernel.
  _healthcheck.result_check(self._flows.values())  # type: ignore


Flow URL: http://localhost:8080/default/flow/10ae4a5f-5d17-4d2c-a189-cf91bdd7b034
 └── ID: c2a76fe4-8bce-4b7e-96b4-edf95c1d5d4d
 └── Project: AIRCRAFT-ETL
 └── Labels: []


'c2a76fe4-8bce-4b7e-96b4-edf95c1d5d4d'

In [12]:
client.create_flow_run(flow_id=flow_id, run_name="Run Aircraf ETL package ")

'46b85f95-c175-4f98-99b7-b32c19a2e0a1'