Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run databricks task locally #1951

Merged
merged 22 commits into from
Dec 8, 2023
Merged

Run databricks task locally #1951

merged 22 commits into from
Dec 8, 2023

Conversation

pingsutw
Copy link
Member

@pingsutw pingsutw commented Nov 10, 2023

TL;DR

This PR allows to submit databricks job from local, and save intermediate data in the blob store. It simplifies the process of testing and developing Databricks tasks locally.

Two ways to run the databricks job locally.

  1. pyflyte run databricks.py wf - Run a spark task in the local process
  2. pyflyte run --raw-output-data-prefix s3://databricks-agent/demo databricks.py wf - submit to databricks platform. Fall back to 1 (local execution) if agent raises an exception.

Note: To submit job from local, you need AWS credential and Databricks access key in the environment variable.

export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export AWS_SESSION_TOKEN=...
export DATABRICKS_TOKEN=...

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

image
import datetime
import os
import random
from operator import add

from click.testing import CliRunner

import flytekit
from flytekit import Resources, Secret, task, workflow, ImageSpec
from flytekit.clis.sdk_in_container import pyflyte
from flytekitplugins.spark import Databricks

SECRET_GROUP = "token-info"
SECRET_NAME = "token_secret"

image = ImageSpec(base_image="pingsutw/databricks:v4", registry="pingsutw")

@task(
    task_config=Databricks(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "600M",
            "spark.executor.memory": "600M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "1",
            "spark.driver.cores": "1",
        },
        executor_path="/databricks/python3/bin/python",
        applications_path="dbfs:///FileStore/tables/entrypoint.py",
        databricks_conf={
            "run_name": "flytekit databricks plugin example",
            "new_cluster": {
                "spark_version": "13.3.x-scala2.12",
                "node_type_id": "m6i.large",  # TODO: test m6i.large, i3.xlarge
                "num_workers": 3,
                "aws_attributes": {
                    "availability": "ON_DEMAND",
                    "instance_profile_arn": "arn:aws:iam::xxxxxx:instance-profile/databricks-agent",
                    "ebs_volume_type": "GENERAL_PURPOSE_SSD",
                    "ebs_volume_count": 1,
                    "ebs_volume_size": 100,
                },
            },
            "timeout_seconds": 3600,
            "max_retries": 1,
        },
        databricks_instance="xxxxxxx.cloud.databricks.com",
    ),
    limits=Resources(mem="2000M"),
    # container_image=image,
    container_image="pingsutw/databricks:v7"
)
def hello_spark(partitions: int) -> float:
    print("Starting Spark with Partitions: {}".format(partitions))

    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    pi_val = 4.0 * count / n
    print("Pi val is :{}".format(pi_val))
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(cache_version="1")
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    return 1


@workflow
def wf(
    triggered_date: datetime.datetime = datetime.datetime.now(),
) -> float:
    """
    Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi = hello_spark(partitions=50)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi


if __name__ == '__main__':
    runner = CliRunner()
    result = runner.invoke(pyflyte.main,
                           ["run",
                            "--raw-output-data-prefix",
                            "s3://flyte-batch/spark/",
                            "/Users/kevin/git/flytekit/flyte-example/databricks_wf",
                            "wf"])
    print(result.output)
FROM databricksruntime/standard:13.3-LTS
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytesnacks

ENV PYTHONPATH /databricks/driver
ENV PATH="/databricks/python3/bin:$PATH"
USER 0

RUN sudo apt-get update && sudo apt-get install -y make build-essential libssl-dev git
RUN /databricks/python3/bin/pip install git+https://github.com/Future-Outlier/flytekit.git@master#subdirectory=plugins/flytekit-spark
RUN /databricks/python3/bin/pip install markupsafe==2.0.0

COPY flyte-example/databricks_wf.py /databricks/driver/
WORKDIR /databricks/driver
ENV PYTHONPATH /databricks/driver

Tracking Issue

flyteorg/flyte#3936

Follow-up issue

NA

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Copy link

codecov bot commented Nov 10, 2023

Codecov Report

Attention: 18 lines in your changes are missing coverage. Please review.

Comparison is base (9c8481e) 85.91% compared to head (9d23a1d) 85.90%.
Report is 1 commits behind head on master.

Files Patch % Lines
flytekit/extend/backend/base_agent.py 66.66% 12 Missing and 3 partials ⚠️
...ugins/flytekit-spark/flytekitplugins/spark/task.py 86.66% 2 Missing ⚠️
...gins/flytekit-spark/flytekitplugins/spark/agent.py 94.11% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1951      +/-   ##
==========================================
- Coverage   85.91%   85.90%   -0.02%     
==========================================
  Files         306      306              
  Lines       22818    22867      +49     
  Branches     3466     3470       +4     
==========================================
+ Hits        19605    19644      +39     
- Misses       2622     2629       +7     
- Partials      591      594       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Future-Outlier
Copy link
Member

Future-Outlier commented Nov 10, 2023

Here's how I try to specify the path.

if __name__ == '__main__':
    runner = CliRunner()
    result = runner.invoke(pyflyte.main,
                           ["run",
                            "--raw-output-data-prefix",
                            "s3://flyte-batch/spark/",
                            "/mnt/c/code/dev/example/plugins/databricks_wf",
                            "wf"])
    print(result.output)

Can you explain how to set the --raw-output-data-prefix?
I tested it in the local environment.

Environment variable:

# for flyte s3 minio
export FLYTE_AWS_ENDPOINT="http://localhost:30080/"
export FLYTE_AWS_ACCESS_KEY_ID="minio"
export FLYTE_AWS_SECRET_ACCESS_KEY="miniostorage"

My S3 Storage:

Error Message:

(dev) root@googler:/mnt/c/code/dev/example/plugins# python databricks_wf.py
Running Execution on local.
Failed with Exception Code: USER:AssertionError
Underlying Exception: Not Found
Failed to put data from /tmp/tmpbfgzko5e/script_mode.tar.gz to s3://flyte-batch/spark/025c7d20ac403c3c26629b35c0bca000/script_mode.tar.gz (recursive=False).

Original exception: Not Found

@Future-Outlier
Copy link
Member

Future-Outlier commented Nov 10, 2023

How to Setup

Let's say you have a python file called databricks_wf.py,
and you want to run it on the databricks platform.

0. Databricks SETUP

(0) Setup your workspace
https://docs.flyte.org/en/latest/deployment/plugins/webapi/databricks.html#deployment-plugin-setup-webapi-databricks

(1) Enable BYOC (bring your own container)

curl -X PATCH -n \
  -H "Authorization: Bearer <your-personal-access-token>" \
  https://<databricks-instance>/api/2.0/workspace-conf \
  -d '{
    "enableDcs": "true"
    }'

Note: remeber to use the token, it doesn't be written in the docs
reference: https://docs.databricks.com/en/clusters/custom-containers.html
(2) Upload your entrypoint.py to dbfs (DataBricks File System)
Copy the python file in (0) and upload it to dbfs.

You can browse your dbfs in catalog to check if the file exist.
image

(3) instance profile
(Kevin, please elaborate more, details is so important, or please give an example)
reference: https://docs.databricks.com/en/aws/iam/instance-profile-tutorial.html

1. Build your Dockerfile (Will support ImageSpec in the future)

FROM databricksruntime/standard:13.3-LTS
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytesnacks

ENV PYTHONPATH /databricks/driver
ENV PATH="/databricks/python3/bin:$PATH"
USER 0

RUN sudo apt-get update && sudo apt-get install -y make build-essential libssl-dev git
RUN /databricks/python3/bin/pip install git+https://github.com/Future-Outlier/flytekit.git@master#subdirectory=plugins/flytekit-spark
RUN /databricks/python3/bin/pip install markupsafe==2.0.0

COPY flyte-example/databricks_wf.py /databricks/driver/
WORKDIR /databricks/driver
ENV PYTHONPATH /databricks/driver
docker built -t pingsutw/databricks:v7 . 

Note: you have to put your python file to your PYTHONPATH

2. Run the code

Locally

(Wait for Kevin's reply)

Remotely

You can use pyflyte register or pyflyte register --non-fast
the second one will skip zipping and uploading the package
(which means you don't need to download the input from s3, will make the workflow faster)

pyflyte register databricks_wf.py --version DB-FIRST
pyflyte register --non-fast databricks_wf.py --version DB-SECOND

Now, you can run it!

@Future-Outlier
Copy link
Member

Future-Outlier commented Nov 10, 2023

I run it successfully in remote development since I don't have the AWS s3's secret, so I can't put the data, but all functions work well!
image

@Future-Outlier
Copy link
Member

Is the applications_path="local:///usr/local/bin/entrypoint.py", in config necessary?

@pingsutw
Copy link
Member Author

This is the command to run it locally.

pyflyte --verbose run --raw-output-data-prefix s3://flyte-batch/spark/ flyte-example/databricks_wf.py wf

Signed-off-by: Kevin Su <pingsutw@apache.org>
@kumare3
Copy link
Contributor

kumare3 commented Nov 11, 2023

I thought about it some-more, after delibration i think this is confusing.
Here is what I think makes more sense,

  1. By default all local executions of tasks with agent will automatically try to invoke remote services. For agents with PythonFunctionTask or AgentExecutorMixin we should check if raw-output-prefix is set. If not, we raise an error
class AgentFunctionTaskExecutor():
  ...
def execute():
   if ctx.raw_output_prefix is local:
      raise AssertionError("Using agent {self.name} locally needs to have a way to pass the data/code from local to remote. This needs the configuration of a common shared blob store like S3, gcs etc. This can be achieved using `--raw-output-prefix` in `pyflyte run`. If you want to run the task code locally without invoking the remote service (e.g. testing) use `--local-agent-emulation` flag in `pyflyte run`
    ... continue to execution
  1. Thus a user has to specify pyflyte run --raw-output-prefix or pyflyte run --local-agent-emulation to run it correctly.
  2. This is almost self documenting

def execute(self, **kwargs) -> Any:
if isinstance(self.task_config, Databricks):
# Since we only have databricks agent
return AsyncAgentExecutorMixin.execute(self, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this also automatically invoke the local method?

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw marked this pull request as ready for review November 24, 2023 11:19
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Copy link
Contributor

@kumare3 kumare3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
also i can try and test this?

@pingsutw pingsutw merged commit 5a45657 into master Dec 8, 2023
72 of 74 checks passed
Future-Outlier pushed a commit to Future-Outlier/flytekit that referenced this pull request Dec 12, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Future Outlier <eric901201@gmai.com>
RRap0so pushed a commit to RRap0so/flytekit that referenced this pull request Dec 15, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Rafael Raposo <rafaelraposo@spotify.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants