Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Databricks WebAPI 2.1 version and Support existing_cluster_id and new_cluster options to create a Job #4361

Merged
merged 2 commits into from
Nov 5, 2023

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Nov 4, 2023

TL;DR

  • Fix Delete function, post request data format error
  • Add Message in TaskInfo when Pending State
  • Fix the bug in lifeCycleState. (SKIPPED doesn't have resultState, but TERMINATING does)
    image

https://docs.databricks.com/en/workflows/jobs/jobs-2.0-api.html#runresultstate

  • Currently, we only support Databricks API 2.0 and only the new_cluster key in the Databricks config.
    However, some users want to use Databricks API 2.1 and want to use existing_cluster_id in the databricks config.
    Here's the screenshot from the official documentation.
    image
    you can find the difference between existing_cluster_id and new_cluster

https://docs.databricks.com/en/workflows/jobs/jobs-2.0-api.html#request-structure

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

Config yaml in dev mode

tasks:
  task-plugins:
    enabled-plugins:
      - container
      - sidecar
      - K8S-ARRAY
      - databricks
    default-for-task-types:
      spark: databricks
      custom_task: agent-service
      container: container
      container_array: K8S-ARRAY

plugins:
  databricks:
    entrypointFile: dbfs:///FileStore/tables/entrypoint.py
    databricksInstance: <YOUR WORKSPACE ID>.gcp.databricks.com```

Example Code

import datetime
import os
import random

import flytekit
from flytekit import Secret, task, workflow
from flytekitplugins.spark import Databricks, Spark

@task(
    task_config=Databricks(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        },
        executor_path="/databricks/python3/bin/python",
        applications_path="dbfs:/entrypoint.py",
        databricks_conf={
            "run_name": "flytekit databricks plugin example",
            "new_cluster": {
                "spark_version": "12.2.x-scala2.12",
                "node_type_id": "n2-highmem-4",
                "num_workers": 1,
                "docker_image": {"url": "<docker-image-url>"},
                "spark_conf": {
                    "spark.driver.memory": "1000M",
                    "spark.executor.memory": "1000M",
                    "spark.executor.cores": "1",
                    "spark.executor.instances": "2",
                    "spark.driver.cores": "1",
                },
            },
        },
        databricks_instance=os.environ["DATABRICKS_HOST"],
    ),
)
def hello_spark(partitions: int) -> float:
    print("Starting Spark with Partitions: {}".format(partitions))

    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    pi_val = 4.0 * count / n
    print("Pi val is :{}".format(pi_val))
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(cache_version="1")
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    return 1


@workflow
def my_databricks_job(
    triggered_date: datetime.datetime = datetime.datetime.now(),
) -> float:
    """
    Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi = hello_spark(partitions=50)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi


if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(
        f"Running To run a Spark job on Databricks platform(triggered_date=datetime.datetime.now()){my_databricks_job(triggered_date=datetime.datetime.now())}"
    )

Test it

pyflyte register spark_example.py --version API-V2

Screenshots

Delete Function correct

Note: I terminated it after triggering it for 24 seconds.
The job terminated at 27 seconds.
image
image

Add Message in TaskInfo when Pending State

image

Create a job.

Note: I print the log in databricks/plugin.go

image

image

Tracking Issue

#3936
#4362

Related PRs

flyteorg/flytekit#1935

…_id and new_cluster options to create a Job

Signed-off-by: Future Outlier <eric901201@gmai.com>
@Future-Outlier Future-Outlier changed the title Support Databricks WebAPI 2.1 version and Support existing_cluster_id and new_cluster options to create a Job Support Databricks WebAPI 2.1 version and Support existing_cluster_id and new_cluster options to create a Job Nov 5, 2023
Copy link

codecov bot commented Nov 5, 2023

Codecov Report

Attention: 2 lines in your changes are missing coverage. Please review.

Comparison is base (617e481) 59.10% compared to head (273be77) 59.33%.
Report is 4 commits behind head on master.

❗ Current head 273be77 differs from pull request most recent head 1596536. Consider uploading reports for the commit 1596536 to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4361      +/-   ##
==========================================
+ Coverage   59.10%   59.33%   +0.23%     
==========================================
  Files         614      544      -70     
  Lines       52065    38980   -13085     
==========================================
- Hits        30771    23130    -7641     
+ Misses      18844    13570    -5274     
+ Partials     2450     2280     -170     
Flag Coverage Δ
unittests ?

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...ugins/go/tasks/plugins/webapi/databricks/plugin.go 65.34% <77.77%> (+3.84%) ⬆️

... and 551 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Future Outlier <eric901201@gmai.com>
@pingsutw pingsutw merged commit 9fe34db into flyteorg:master Nov 5, 2023
40 of 41 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants