Skip to content

Add Databricks sensor check partition and delta table events#23567

Closed
thcidale0808 wants to merge 5 commits intoapache:mainfrom
thcidale0808:add_databricks_sensor_check_partition
Closed

Add Databricks sensor check partition and delta table events#23567
thcidale0808 wants to merge 5 commits intoapache:mainfrom
thcidale0808:add_databricks_sensor_check_partition

Conversation

@thcidale0808
Copy link

This PR propose the creation of two new sensors within the Databricks provider package:

  • Table Partition Sensor: Sense if the given partition is present in the Databricks delta table.
  • Delta Table Event Sensor: Sense if there are new events for a given table. It takes a timestamp to check whether the table events are new.
    Both sensors leverage the existing DatabricksSQLHook to query the information from Databricks.

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragement file, named {pr_number}.significant.rst, in newsfragments.

@ferruzzi
Copy link
Contributor

ferruzzi commented May 9, 2022

Failing static checks, please run the pre-commit tests and fix those.

@thcidale0808
Copy link
Author

@ferruzzi your comments make sense. I've pushed the fixes.

@ferruzzi
Copy link
Contributor

Nice. You'll need a committer's approval to merge it, but looks good to me. 👍

Copy link
Contributor

@alexott alexott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to update provider.yaml to register sensors, plus add documentation

databricks_conn_id: str,
table_name: str,
partition_name: str,
database_name: Optional[str] = 'default',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Base Sensor you use catalog & schema, and here you use only database (schema). Please unify that

self.timestamp = timestamp
self.database_name = 'default' if not database_name else database_name

def poke(self, context: Context) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would generalize this into the DatabricksSqlSensor and just pass different conditions

return self.partition_name in record


class DatabricksDeltaTableChangeSensor(DatabricksBaseSensor):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, It would be useful to keep last "known" version, and compare it with the current - like I did for Delta sharing: https://github.com/apache/airflow/pull/22692/files#diff-219de7033c7aa02bc63b2bb0b8f6441f2d076bc48f52af129ed70d2e132b6944R98

@github-actions
Copy link

github-actions bot commented Jul 6, 2022

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jul 6, 2022
@github-actions github-actions bot closed this Jul 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants