Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions tests/providers/apache/cassandra/sensors/test_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest

import pytest

from airflow import DAG
from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
from airflow.utils import timezone

DEFAULT_DATE = timezone.datetime(2017, 1, 1)


@pytest.mark.integration("cassandra")
class TestCassandraRecordSensor(unittest.TestCase):
def setUp(self) -> None:
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}

self.dag = DAG('test_dag_id', default_args=args)

def test_poke(self):
hook = CassandraHook(cassandra_conn_id="cassandra_default")
session = hook.get_conn()
cqls = [
"DROP TABLE IF EXISTS s.t",
"CREATE TABLE s.t (pk1 text, pk2 text, c text, PRIMARY KEY (pk1, pk2))",
"INSERT INTO s.t (pk1, pk2, c) VALUES ('foo', 'bar', 'baz')",
]
for cql in cqls:
session.execute(cql)
Comment on lines +42 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a pure unit test and it has side effects (the table is not dropped in tear down). I would suggest to mock connection with database (this will make the test faster) and add system tests (and example DAG). For example here's system test for PostgresToGCS:
https://github.com/apache/airflow/blob/master/tests/providers/google/cloud/operators/test_postgres_to_gcs_system.py

System tests are not run regularly but this hopefully will change soon as @potiuk is working on that :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as the sensor uses the hook methods, what's your idea to mock the hook methods? For example, mocking record_exists in here:


    def poke(self, context: Dict[str, str]) -> bool:
        self.log.info('Sensor check existence of record: %s', self.keys)
        hook = CassandraHook(self.cassandra_conn_id)
        return hook.record_exists(self.table, self.keys)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would mock CassandraHook and:

  • check if it was initialized with required parameter
  • the record_exists of initialized hook was called with expected parameters
  • check if the expected value was returned (mocking it)
  • make sure that all methods I mock are already covered by tests. In this case, I would check tests for CassandraHook

At least that is the approach we use for GCP operators / hooks. Once you cover the hook with tests you can mock the hook in operator tests. In my opinion, it's a nice way to separate and highlight what exactly is tested.

Of course, mocking only asserts that arguments are passed as expected and some methods are called. To check integrity we should use system test (that runs example DAG). It's something still discussed but is easy to do with SystemTest:
https://github.com/apache/airflow/blob/master/TESTING.rst#id24


true_sensor = CassandraRecordSensor(
dag=self.dag,
task_id="test_task",
cassandra_conn_id="cassandra_default",
table="s.t",
keys={"pk1": "foo", "pk2": "bar"}
)
self.assertTrue(true_sensor.poke({}))

false_sensor = CassandraRecordSensor(
dag=self.dag,
task_id="test_task",
cassandra_conn_id="cassandra_default",
table="s.u",
keys={"pk1": "foo", "pk2": "baz"}
)
self.assertFalse(false_sensor.poke({}))

session.shutdown()
hook.shutdown_cluster()
68 changes: 68 additions & 0 deletions tests/providers/apache/cassandra/sensors/test_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest

import pytest

from airflow import DAG
from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
from airflow.utils import timezone

DEFAULT_DATE = timezone.datetime(2017, 1, 1)


@pytest.mark.integration("cassandra")
class TestCassandraHook(unittest.TestCase):
def setUp(self) -> None:
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}

self.dag = DAG('test_dag_id', default_args=args)

def test_poke(self):
hook = CassandraHook("cassandra_default_with_schema")
session = hook.get_conn()
cqls = [
"DROP TABLE IF EXISTS s.t",
"CREATE TABLE s.t (pk1 text PRIMARY KEY)",
]
for cql in cqls:
session.execute(cql)

true_sensor = CassandraTableSensor(
dag=self.dag,
task_id="test_task",
cassandra_conn_id="cassandra_default",
table="s.t"
)
self.assertTrue(true_sensor.poke({}))

true_sensor = CassandraTableSensor(
dag=self.dag,
task_id="test_task",
cassandra_conn_id="cassandra_default",
table="s.u"
)
self.assertFalse(true_sensor.poke({}))

session.shutdown()
hook.shutdown_cluster()
2 changes: 0 additions & 2 deletions tests/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@

MISSING_TEST_FILES = {
'tests/providers/amazon/aws/hooks/test_athena.py',
'tests/providers/apache/cassandra/sensors/test_record.py',
'tests/providers/apache/cassandra/sensors/test_table.py',
'tests/providers/apache/hdfs/sensors/test_web_hdfs.py',
'tests/providers/apache/hive/operators/test_vertica_to_hive.py',
'tests/providers/apache/pig/operators/test_pig.py',
Expand Down