Skip to content

Commit

Permalink
update s3 sensor docs to use context.cursor instead of context.last_r…
Browse files Browse the repository at this point in the history
…un_key (#7748)

* update s3 sensor docs to use context.cursor instead of context.last_run_key

* update

* return a list of run requests

* add test
  • Loading branch information
prha committed May 9, 2022
1 parent 64cf6a2 commit 5f1e9fb
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
12 changes: 7 additions & 5 deletions docs/content/concepts/partitions-schedules-sensors/sensors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -574,12 +574,14 @@ from dagster_aws.s3.sensor import get_s3_keys

@sensor(job=my_job)
def my_s3_sensor(context):
new_s3_keys = get_s3_keys("my_s3_bucket", since_key=context.last_run_key)
since_key = context.cursor or None
new_s3_keys = get_s3_keys("my_s3_bucket", since_key=since_key)
if not new_s3_keys:
yield SkipReason("No new s3 files found for bucket my_s3_bucket.")
return
for s3_key in new_s3_keys:
yield RunRequest(run_key=s3_key, run_config={})
return SkipReason("No new s3 files found for bucket my_s3_bucket.")
last_key = new_s3_keys[-1]
run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys]
context.update_cursor(last_key)
return run_requests
```

### Using resources in sensors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,14 @@ def multi_asset_sensor(context):

@sensor(job=my_job)
def my_s3_sensor(context):
new_s3_keys = get_s3_keys("my_s3_bucket", since_key=context.last_run_key)
since_key = context.cursor or None
new_s3_keys = get_s3_keys("my_s3_bucket", since_key=since_key)
if not new_s3_keys:
yield SkipReason("No new s3 files found for bucket my_s3_bucket.")
return
for s3_key in new_s3_keys:
yield RunRequest(run_key=s3_key, run_config={})
return SkipReason("No new s3 files found for bucket my_s3_bucket.")
last_key = new_s3_keys[-1]
run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys]
context.update_cursor(last_key)
return run_requests


# end_s3_sensors_marker
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from dagster import job, op, repository
from unittest import mock

from dagster import build_sensor_context, job, op, repository
from docs_snippets.concepts.partitions_schedules_sensors.sensors.sensor_alert import (
email_on_run_failure,
my_slack_on_run_failure,
Expand All @@ -8,6 +10,7 @@
from docs_snippets.concepts.partitions_schedules_sensors.sensors.sensors import (
log_file_job,
my_directory_sensor,
my_s3_sensor,
sensor_A,
sensor_B,
test_my_directory_sensor_cursor,
Expand Down Expand Up @@ -68,3 +71,15 @@ def test_sensor_testing_example():

def test_resource_sensor_example():
uses_db_connection()


def test_s3_sensor():
with mock.patch(
"docs_snippets.concepts.partitions_schedules_sensors.sensors.sensors.get_s3_keys"
) as mock_s3_keys:
mock_s3_keys.return_value = ["a", "b", "c", "d", "e"]
context = build_sensor_context()
assert context.cursor is None
run_requests = my_s3_sensor(context)
assert len(list(run_requests)) == 5
assert context.cursor == "e"

1 comment on commit 5f1e9fb

@vercel
Copy link

@vercel vercel bot commented on 5f1e9fb May 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.