Skip to content

Commit

Permalink
fix makefile issue with check_black/check_isort (#6944)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Mar 4, 2022
1 parent 66ae1c0 commit 4145d10
Show file tree
Hide file tree
Showing 95 changed files with 183 additions and 104 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ black:
examples/docs_snippets

check_black:
-black --check --fast \
black --check --fast \
--extend-exclude="examples/docs_snippets|snapshots" \
examples integration_tests helm python_modules .buildkite
-black --check --fast \
black --check --fast \
examples/docs_snippets


Expand All @@ -45,11 +45,11 @@ isort:
`git ls-files 'examples/docs_snippets/*.py'`

check_isort:
-isort --check \
isort --check \
`git ls-files '.buildkite/*.py' 'examples/*.py' 'integration_tests/*.py' 'helm/*.py' 'python_modules/*.py' \
':!:examples/docs_snippets' \
':!:snapshots'`
-isort --check \
isort --check \
`git ls-files 'examples/docs_snippets/*.py'`

yamllint:
Expand Down
9 changes: 6 additions & 3 deletions docs/content/concepts/assets/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ By default, materializing an asset will pickle it to a local file named "my_asse
To apply an IO manager to a set of assets, you can include it with them in an <PyObject object="AssetGroup" />.

```python file=/concepts/assets/asset_io_manager.py startafter=start_marker endbefore=end_marker
from dagster import AssetGroup, asset
from dagster_aws.s3 import s3_pickle_asset_io_manager, s3_resource

from dagster import AssetGroup, asset


@asset
def upstream_asset():
Expand All @@ -204,9 +205,10 @@ When `upstream_asset` is materialized, the value `[1, 2, 3]` will be will be pic
Different assets can have different IO managers:

```python file=/concepts/assets/asset_different_io_managers.py startafter=start_marker endbefore=end_marker
from dagster import AssetGroup, asset, fs_asset_io_manager
from dagster_aws.s3 import s3_pickle_asset_io_manager, s3_resource

from dagster import AssetGroup, asset, fs_asset_io_manager


@asset(io_manager_key="s3_io_manager")
def upstream_asset():
Expand All @@ -233,9 +235,10 @@ When `upstream_asset` is materialized, the value `[1, 2, 3]` will be will be pic
The same assets can be bound to different resources and IO managers in different environments. For example, for local development, you might want to store assets on your local filesystem while, in production, you might want to store the assets in S3.

```python file=/concepts/assets/asset_io_manager_prod_local.py startafter=start_marker endbefore=end_marker
from dagster import AssetGroup, asset, fs_asset_io_manager
from dagster_aws.s3 import s3_pickle_asset_io_manager, s3_resource

from dagster import AssetGroup, asset, fs_asset_io_manager


@asset
def upstream_asset():
Expand Down
3 changes: 2 additions & 1 deletion docs/content/concepts/io-management/io-managers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ Not all the outputs in a job should necessarily be stored the same way. Maybe so
To select the IOManager for a particular output, you can set an `io_manager_key` on <PyObject module="dagster" object="Out" />, and then refer to that `io_manager_key` when setting IO managers in your job. In this example, the output of `op_1` will go to `fs_io_manager` and the output of `op_2` will go to `s3_pickle_io_manager`.

```python file=/concepts/io_management/io_manager_per_output.py startafter=start_marker endbefore=end_marker
from dagster import Out, fs_io_manager, job, op
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import Out, fs_io_manager, job, op


@op(out=Out(io_manager_key="fs"))
def op_1():
Expand Down
3 changes: 2 additions & 1 deletion docs/content/deployment/guides/aws.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,11 @@ To enable parallel computation (e.g., with the multiprocessing or Dagster celery
You'll first need to need to use <PyObject module="dagster_aws.s3" object="s3_pickle_io_manager"/> as your IO Manager or customize your own persistent io managers (see [example](/concepts/io-management/io-managers#defining-an-io-manager)).

```python file=/deploying/aws/io_manager.py
from dagster import Int, Out, job, op
from dagster_aws.s3.io_manager import s3_pickle_io_manager
from dagster_aws.s3.resources import s3_resource

from dagster import Int, Out, job, op


@op(out=Out(Int))
def my_op():
Expand Down
3 changes: 2 additions & 1 deletion docs/content/deployment/guides/celery.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ The dagster-celery executor compiles a job and its associated configuration into
Let's construct a very parallel toy job that uses the Celery executor.

```python file=/deploying/celery_job.py
from dagster import job, op
from dagster_celery import celery_executor

from dagster import job, op


@op
def not_much():
Expand Down
6 changes: 4 additions & 2 deletions docs/content/deployment/guides/dask.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ First, run `pip install dagster-dask`.
Then, create a job with the dask executor:

```python file=/deploying/dask_hello_world.py startafter=start_local_job_marker endbefore=end_local_job_marker
from dagster import job, op
from dagster_dask import dask_executor

from dagster import job, op


@op
def hello_world():
Expand Down Expand Up @@ -58,11 +59,12 @@ If you want to use a Dask cluster for distributed execution, you will first need
You'll also need an IO manager that uses persistent shared storage, which should be attached to the job along with any resources on which it depends. Here, we use the <PyObject module="dagster_aws.s3" object="s3_pickle_io_manager"/>:

```python file=/deploying/dask_hello_world_distributed.py startafter=start_distributed_job_marker endbefore=end_distributed_job_marker
from dagster import job, op
from dagster_aws.s3.io_manager import s3_pickle_io_manager
from dagster_aws.s3.resources import s3_resource
from dagster_dask import dask_executor

from dagster import job, op


@op
def hello_world():
Expand Down
3 changes: 2 additions & 1 deletion docs/content/deployment/guides/gcp.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ You'll probably also want to configure a GCS bucket to store op outputs via pers
You'll first need to need to create a job using <PyObject module="dagster_gcp.gcs" object="gcs_pickle_io_manager"/> as its IO Manager (or [define a custom IO Manager](/concepts/io-management/io-managers#defining-an-io-manager)):

```python file=/deploying/gcp/gcp_job.py
from dagster import job
from dagster_gcp.gcs.io_manager import gcs_pickle_io_manager
from dagster_gcp.gcs.resources import gcs_resource

from dagster import job


@job(
resource_defs={
Expand Down
3 changes: 3 additions & 0 deletions docs/content/guides/dagster/dagster_type_factories.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The first thing we want to do is visualize the distribution of trip lengths. We
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from dagster import AssetMaterialization, job, op


Expand Down Expand Up @@ -162,6 +163,7 @@ Fortunately, it's easy to write a factory function that will wrap any Pandera sc
```python file=/guides/dagster/dagster_type_factories/factory.py
import pandas as pd
import pandera as pa

from dagster import DagsterType, TypeCheck


Expand Down Expand Up @@ -208,6 +210,7 @@ import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pandera as pa

from dagster import AssetMaterialization, In, Out, job, op
from dagster.config.field import Field

Expand Down
1 change: 0 additions & 1 deletion docs/content/guides/dagster/re-execution.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ Again, let's revist the job `unreliable_job`, which has a op named `unreliable`.

```python file=/guides/dagster/reexecution/reexecution_api.py endbefore=end_initial_execution_marker
from dagster import DagsterInstance, reexecute_pipeline

from docs_snippets.guides.dagster.reexecution.unreliable_job import unreliable_job

instance = DagsterInstance.ephemeral()
Expand Down
1 change: 1 addition & 0 deletions docs/content/integrations/airflow.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ You can compile Dagster jobs into DAGs that can be understood by Airflow. Each o
import csv

import requests

from dagster import job, op


Expand Down
5 changes: 3 additions & 2 deletions docs/content/integrations/dagstermill.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ We can simply turn a notebook into an op using <PyObject module="dagstermill" ob

```python file=/legacy/data_science/iris_classify.py
import dagstermill as dm

from dagster import job
from dagster.utils import script_relative_path

Expand Down Expand Up @@ -96,9 +97,9 @@ We'll illustrate this process by adding a non-notebook op to our job, which will

```python literalinclude showLines emphasize-lines=2,10,16 caption=iris_classify_2.py file=/legacy/data_science/iris_classify_2.py
import dagstermill as dm

from dagster import InputDefinition, job
from dagster.utils import script_relative_path

from docs_snippets.legacy.data_science.download_file import download_file

k_means_iris = dm.define_dagstermill_op(
Expand Down Expand Up @@ -223,9 +224,9 @@ For instance, suppose we want to make the number of clusters (the \_k\_ in k-mea

```python literalinclude showLines emphasize-lines=10-12 caption=iris_classify_3.py file=/legacy/data_science/iris_classify_3.py
import dagstermill as dm

from dagster import Field, InputDefinition, Int, job
from dagster.utils import script_relative_path

from docs_snippets.legacy.data_science.download_file import download_file

k_means_iris = dm.define_dagstermill_op(
Expand Down
39 changes: 26 additions & 13 deletions docs/content/integrations/dbt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ Note that you can pass in any keyword to these functions that you wish, and they
One common way to use this integration is to have the a step in your job run all of the models in a dbt project. For this case, the easiest method is to configure the resource so it knows where your dbt project is, and import the `dbt_run_op` to use in your job.

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_run endbefore=end_marker_dbt_cli_run dedent=4
from dagster import job
from dagster_dbt import dbt_cli_resource, dbt_run_op

from dagster import job

my_dbt_resource = dbt_cli_resource.configured(
{"project_dir": "path/to/dbt/project"}
)
Expand All @@ -110,9 +111,10 @@ Note that in both cases, the `models` option takes in a list of strings. The str
If you know what models you want to select ahead of time, you might prefer specifying this while configuring your resource. Because you aren't specifying any specific arguments at runtime, you can use the prebuilt `dbt_run_op`, instead of writing your own.

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_run_specific_models_preconfig endbefore=end_marker_dbt_cli_run_specific_models_preconfig dedent=4
from dagster import job
from dagster_dbt import dbt_cli_resource, dbt_run_op

from dagster import job

my_dbt_resource = dbt_cli_resource.configured(
{"project_dir": "path/to/dbt/project", "models": ["tag:staging"]}
)
Expand Down Expand Up @@ -142,9 +144,10 @@ def run_models(context, some_condition: bool):
Dagster supports creating [multiple jobs from the same graph](/concepts/ops-jobs-graphs/jobs-graphs#from-a-graph). dbt has a similar concept, [profiles](https://docs.getdbt.com/dbt-cli/configure-your-profile). You might want to run a dev version of your graph that targets the development-specific dbt profile, and then have a prod version that runs using the prod dbt profile. This example shows how to accomplish this.

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_profile_modes endbefore=end_marker_dbt_cli_profile_modes dedent=4
from dagster import graph
from dagster_dbt import dbt_cli_resource, dbt_run_op

from dagster import graph

@graph
def my_dbt():
dbt_run_op()
Expand Down Expand Up @@ -173,9 +176,10 @@ Sometimes, you'll want to run multiple different dbt commands in the same job. T
One common use case would be to first run `dbt run` to update all of your models, and then run `dbt test` to check that they all are working as expected, seen below.

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_run_after_another_op endbefore=end_marker_dbt_cli_run_after_another_op dedent=4
from dagster import job
from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op

from dagster import job

my_dbt_resource = dbt_cli_resource.configured(
{"project_dir": "path/to/dbt/project"}
)
Expand Down Expand Up @@ -212,9 +216,10 @@ For convenience during local development, you may also use `dagster_dbt.local_db
All of the prebuilt dbt ops in this library are compatible with the `dbt_rpc_resource` (which sends requests and then exists immediately) and the `dbt_rpc_sync_resource` (which sends requests and waits for the task to complete). Therefore, we can use the same ops as in the CLI examples as long as we provide the correct resource to the job.

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run endbefore=end_marker_dbt_rpc_run dedent=4
from dagster import job
from dagster_dbt import dbt_run_op

from dagster import job

@job(resource_defs={"dbt": my_remote_rpc})
def my_dbt_job():
dbt_run_op()
Expand All @@ -225,9 +230,10 @@ def my_dbt_job():
This is similar to having `"params": {"models": "tag:staging"}` in your dbt RPC request body.

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run_specific_models endbefore=end_marker_dbt_rpc_run_specific_models dedent=4
from dagster import job, op
from dagster_dbt import dbt_rpc_resource

from dagster import job, op

my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080})

@op(required_resource_keys={"dbt"})
Expand All @@ -244,9 +250,10 @@ Note that the job above will NOT wait until the dbt RPC server has finished exec
#### Running a dbt project and polling the RPC server until it has finished executing

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run_and_wait endbefore=end_marker_dbt_rpc_run_and_wait dedent=4
from dagster import job, op
from dagster_dbt import dbt_rpc_sync_resource

from dagster import job, op

my_remote_sync_rpc = dbt_rpc_sync_resource.configured(
{"host": "80.80.80.80", "port": 8080}
)
Expand All @@ -269,9 +276,10 @@ For full documentation on all available config, [visit the API docs for dagster-
`dagster_dbt` is configured to automatically create [Asset Materializations](/concepts/assets/asset-materializations) for each of your dbt models when you run the `run` command (via either the CLI or over RPC). These materializations are populated with metadata that is automatically parsed from the dbt response. The available metadata differs between dbt versions. If there's anything you would like to add to these materializations, you can explicitly invoke the underlying <PyObject module='dagster_dbt.utils' object='generate_materializations'/> utility function, and modify each of the resulting materializations like so:

```python file=/integrations/dbt.py startafter=start_marker_dbt_asset_mats endbefore=end_marker_dbt_asset_mats dedent=4
from dagster import op
from dagster_dbt.utils import generate_materializations

from dagster import op

@op(required_resource_keys={"dbt"})
def dbt_run_with_custom_assets(context):
dbt_result = context.resources.dbt.run()
Expand All @@ -287,9 +295,10 @@ def dbt_run_with_custom_assets(context):
**dbt CLI: Set the dbt profile and target to load**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_profile_and_target endbefore=end_marker_dbt_cli_config_profile_and_target dedent=4
from dagster import job
from dagster_dbt import dbt_cli_resource

from dagster import job

config = {"profile": PROFILE_NAME, "target": TARGET_NAME}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
Expand All @@ -300,9 +309,10 @@ def my_job():
**dbt CLI: Set the path to the dbt executable**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_executable endbefore=end_marker_dbt_cli_config_executable dedent=4
from dagster import job
from dagster_dbt import dbt_cli_resource

from dagster import job

config = {"dbt_executable": "path/to/dbt/executable"}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
Expand All @@ -313,9 +323,10 @@ def my_job():
**dbt CLI: Select specific models to run**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_select_models endbefore=end_marker_dbt_cli_config_select_models dedent=4
from dagster import job
from dagster_dbt import dbt_cli_resource

from dagster import job

config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
Expand All @@ -328,9 +339,10 @@ For more details, [visit the official documentation on dbt's node selection synt
**dbt CLI: Exclude specific models**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_exclude_models endbefore=end_marker_dbt_cli_config_exclude_models dedent=4
from dagster import job
from dagster_dbt import dbt_cli_resource

from dagster import job

config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
Expand All @@ -343,9 +355,10 @@ For more details, [visit the official documentation on dbt's node selection synt
**dbt CLI: Set key-values for dbt vars**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_vars endbefore=end_marker_dbt_cli_config_vars dedent=4
from dagster import job
from dagster_dbt import dbt_cli_resource

from dagster import job

config = {"vars": {"key": "value"}}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
Expand Down
3 changes: 2 additions & 1 deletion docs/content/integrations/pandera.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import random

import pandas as pd
import pandera as pa
from dagster import Out, job, op
from dagster_pandera import pandera_schema_to_dagster_type
from pandera.typing import Series

from dagster import Out, job, op

APPLE_STOCK_PRICES = {
"name": ["AAPL", "AAPL", "AAPL", "AAPL", "AAPL"],
"date": ["2018-01-22", "2018-01-23", "2018-01-24", "2018-01-25", "2018-01-26"],
Expand Down
2 changes: 2 additions & 0 deletions docs/content/tutorial/advanced-tutorial/configuring-ops.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Let's rebuild a job we've seen before, but this time using our newly parameteriz
import csv

import requests

from dagster import get_dagster_logger, job, op


Expand Down Expand Up @@ -141,6 +142,7 @@ Dagster allows specifying a "config schema" on any configurable object to help c
import csv

import requests

from dagster import op


Expand Down

1 comment on commit 4145d10

@vercel
Copy link

@vercel vercel bot commented on 4145d10 Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.