Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions airflow/api_connexion/endpoints/connection_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
# specific language governing permissions and limitations
# under the License.

# TODO(mik-laj): We have to implement it.
# Do you want to help? Please look at: https://github.com/apache/airflow/issues/8127
from flask import request

from airflow.api_connexion import parameters
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.schemas.connection_schema import (
connection_collection_item_schema, connection_collection_schema,
)
from airflow.models import Connection
from airflow.utils.session import provide_session


def delete_connection():
Expand All @@ -26,18 +33,32 @@ def delete_connection():
raise NotImplementedError("Not implemented yet.")


def get_connection():
@provide_session
def get_connection(connection_id, session):
"""
Get a connection entry
"""
raise NotImplementedError("Not implemented yet.")
query = session.query(Connection)
query = query.filter(Connection.conn_id == connection_id)
connection = query.one_or_none()
if connection is None:
raise NotFound("Connection not found")
return connection_collection_item_schema.dump(connection)


def get_connections():
@provide_session
def get_connections(session):
"""
Get all connection entries
"""
raise NotImplementedError("Not implemented yet.")
offset = request.args.get(parameters.page_offset, 0)
limit = min(int(request.args.get(parameters.page_limit, 100)), 100)

query = session.query(Connection)
query = query.offset(offset).limit(limit)

connections = query.all()
return connection_collection_schema.dump(connections)


def patch_connection():
Expand Down
16 changes: 14 additions & 2 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
# TODO(mik-laj): We have to implement it.
# Do you want to help? Please look at: https://github.com/apache/airflow/issues/8129

from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.schemas.dagrun_schema import dagrun_schema
from airflow.models import DagRun
from airflow.utils.session import provide_session


def delete_dag_run():
"""
Expand All @@ -26,11 +31,18 @@ def delete_dag_run():
raise NotImplementedError("Not implemented yet.")


def get_dag_run():
@provide_session
def get_dag_run(dag_id, dag_run_id, session):
"""
Get a DAG Run.
"""
raise NotImplementedError("Not implemented yet.")
query = session.query(DagRun)
query = query.filter(DagRun.dag_id == dag_id)
query = query.filter(DagRun.run_id == dag_run_id)
dag_run = query.one_or_none()
if dag_run is None:
raise NotFound("DAGRun not found")
return dagrun_schema.dump(dag_run)


def get_dag_runs():
Expand Down
24 changes: 24 additions & 0 deletions airflow/api_connexion/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from connexion import ProblemException


class NotFound(ProblemException):
"""Raise when the object cannot be found"""
def __init__(self, title='Object not found', detail=None):
super().__init__(status=404, title=title, detail=detail)
2 changes: 1 addition & 1 deletion airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2106,7 +2106,7 @@ components:
in: path
name: connection_id
schema:
type: integer
type: string
required: true
description: The Connection ID.

Expand Down
26 changes: 26 additions & 0 deletions airflow/api_connexion/parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Pagination parameters
page_offset = "offset"
page_limit = "limit"
filter_execution_date_gte = 'filter_execution_date_gte'
filter_execution_date_lte = 'filter_execution_date_lte'
filter_start_date_gte = "filter_start_date_gte"
filter_start_date_lte = "filter_start_date_lte"
filter_end_date_gte = 'filter_end_date_gte'
filter_end_date_lte = "filter_end_date_lte"
16 changes: 16 additions & 0 deletions airflow/api_connexion/schemas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
79 changes: 79 additions & 0 deletions airflow/api_connexion/schemas/base_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List, Optional

from marshmallow import post_dump
from marshmallow_sqlalchemy import SQLAlchemySchema

from airflow.exceptions import AirflowException


class BaseSchema(SQLAlchemySchema):

""" Base Schema for sqlalchemy models

:param COLLECTION_NAME: A name to use to return serialized data if the data is a list
:type COLLECTION_NAME: str

:param FIELDS_FROM_NONE_TO_EMPTY_STRING: A list of fields to convert to empty string if value is None
after serialization
:type FIELDS_FROM_NONE_TO_EMPTY_STRING: List[str]

:param FIELDS_FROM_NONE_TO_ZERO: A list of fields to convert to integer zero if value is None
after serialization
:type FIELDS_FROM_NONE_TO_ZERO: List[str]
"""

COLLECTION_NAME: Optional[str] = None
FIELDS_FROM_NONE_TO_EMPTY_STRING: List[str] = []
FIELDS_FROM_NONE_TO_ZERO: List[str] = []

def check_collection_name(self):
"""
Method to check that COLLLECTION_NAME attribute is not None
"""
if not self.COLLECTION_NAME:
raise AirflowException("The COLLECTION_NAME attribute is missing in the schema")

@post_dump(pass_many=True)
def wrap_with_envelope(self, data, many, **kwargs):
"""
Checks if data is a list and use the envelope key to return it together
with total_entries meta
:param data: The deserialized data
:param many: Whether the data is a collection
"""
self.check_collection_name()
if many:
data = self._process_list_data(data)
return {self.COLLECTION_NAME: data, 'total_entries': len(data)}
data = self._process_data(data)
return data

def _process_list_data(self, data):
return [self._process_data(item) for item in data]

def _process_data(self, data):
d_data = {}
for k, v in data.items():
if v is None:
if k in self.FIELDS_FROM_NONE_TO_EMPTY_STRING:
v = ''
elif k in self.FIELDS_FROM_NONE_TO_ZERO:
v = 0
d_data[k] = v
return d_data
57 changes: 57 additions & 0 deletions airflow/api_connexion/schemas/connection_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from marshmallow_sqlalchemy import auto_field

from airflow.api_connexion.schemas.base_schema import BaseSchema
from airflow.models.connection import Connection


class ConnectionCollectionItemSchema(BaseSchema):
"""
Schema for a connection item
"""
class Meta:
""" Meta """
model = Connection

COLLECTION_NAME = 'connections'
FIELDS_FROM_NONE_TO_EMPTY_STRING = ['connection_id', 'conn_type', 'host', 'login', 'schema']
FIELDS_FROM_NONE_TO_ZERO = ['port']

conn_id = auto_field(dump_to='connection_id', load_from='connection_id')
conn_type = auto_field()
host = auto_field()
login = auto_field()
schema = auto_field()
port = auto_field()


class ConnectionSchema(ConnectionCollectionItemSchema): # pylint: disable=too-many-ancestors
"""
Connection schema
"""
FIELDS_FROM_NONE_TO_EMPTY_STRING = ConnectionCollectionItemSchema.\
FIELDS_FROM_NONE_TO_EMPTY_STRING + ['extra']
password = auto_field(load_only=True)
extra = auto_field()


connection_schema = ConnectionSchema()
connection_collection_item_schema = ConnectionCollectionItemSchema()
connection_collection_schema = ConnectionCollectionItemSchema(many=True)
57 changes: 57 additions & 0 deletions airflow/api_connexion/schemas/dagrun_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from marshmallow import fields
from marshmallow_sqlalchemy import auto_field

from airflow.api_connexion.schemas.base_schema import BaseSchema
from airflow.api_connexion.schemas.enum_schemas import DagState
from airflow.models.dagrun import DagRun


class DAGRunSchema(BaseSchema):
"""
Schema for DAGRun
"""
COLLECTION_NAME = 'dag_runs'
FIELDS_FROM_NONE_TO_EMPTY_STRING = ['dag_run_id', 'dag_id', 'execution_date',
'start_date', 'end_date']

class Meta:
""" Meta """
model = DagRun

run_id = auto_field(dump_to='dag_run_id', load_from='dag_run_id')
dag_id = auto_field(dump_only=True)
execution_date = auto_field()
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = fields.Str(DagState())
external_trigger = auto_field(default=True, dump_only=True)
conf = fields.Method('get_conf')

@staticmethod
def get_conf(obj: DagRun):
"""Convert conf to object if None after serialization"""

if obj.conf is None:
return {}
return obj.conf


dagrun_schema = DAGRunSchema()
dagrun_collection_schema = DAGRunSchema(many=True)
24 changes: 24 additions & 0 deletions airflow/api_connexion/schemas/enum_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from marshmallow import fields, validate
from marshmallow.schema import Schema


class DagState(Schema):
"""DagState schema"""
state = fields.Str(validate=validate.OneOf(["success", "running", "failed"]))
Loading