Skip to content

Add HivePartitionAsyncSensor#28874

Closed
phanikumv wants to merge 1 commit intoapache:mainfrom
astronomer:hive_partition
Closed

Add HivePartitionAsyncSensor#28874
phanikumv wants to merge 1 commit intoapache:mainfrom
astronomer:hive_partition

Conversation

@phanikumv
Copy link
Contributor

@phanikumv phanikumv commented Jan 12, 2023

This PR donates the HivePartitionAsyncSensor from astronomer-providers repo to Airflow


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@phanikumv phanikumv changed the title Add async hive sensor Add HivePartitionAsyncSensor Jan 12, 2023
# the sasl library anyway (and there sasl library version is not relevant)
- sasl>=0.3.1; python_version>="3.9"
- thrift>=0.9.2
- impyla
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impyla doesn't make any asyncio calls in execute_async.

Copy link
Contributor Author

@phanikumv phanikumv Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have written our own asyncio method as shown below, because impyla returns immediately after submitting the query.

Please check the link
https://github.com/cloudera/impyla/blob/v0.16a2/impala/hiveserver2.py#L334-L338

async def partition_exists(self, table: str, schema: str, partition: str, polling_interval: float) -> str:
        """
        Checks for the existence of a partition in the given hive table.

        :param table: table in hive where the partition exists.
        :param schema: database where the hive table exists
        :param partition: partition to check for in given hive database and hive table.
        :param polling_interval: polling interval in seconds to sleep between checks
        """
        client = self.get_hive_client()
        cursor = client.cursor()
        query = f"show partitions {schema}.{table} partition({partition})"
        cursor.execute_async(query)
        while cursor.is_executing():
            await asyncio.sleep(polling_interval)
        results = cursor.fetchall()
        if len(results) == 0:
            return "failure"
        return "success"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unfortunetly not transform sync code into asyncio. If this kind transformation would be so easy than we had to transform any sync method to asyncio implementation.

So offhand most of impyla methods makes blocking io requests:
when you call cursor.execute_async as well as cursor.is_executing() and also cursor.fetchall()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most sync method wait for the operation to complete, which is not what impyla does here.

@kaxil any thoughts ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not every async implementation is asyncio-compatible.

asyncio stand for Asynchronous I/O however impyla provides asynchronous execution with block I/O.

Implementation of impyla would be perfect for regular Sensor rather than Trigger and defer Operators

Comment on lines +1056 to +1057
self.conn = self.get_connection(conn_id=metastore_conn_id)
self.auth_mechanism = self.conn.extra_dejson.get("authMechanism", "PLAIN")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block async thread

@phanikumv phanikumv marked this pull request as ready for review January 12, 2023 09:28
@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments