Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pyflyte run spark task #2280

Merged
merged 2 commits into from Mar 25, 2024
Merged

pyflyte run spark task #2280

merged 2 commits into from Mar 25, 2024

Conversation

pingsutw
Copy link
Member

Tracking issue

NA

Why are the changes needed?

Since only the Spark driver downloads the workflow code, pyflyte run doesn't work with Spark tasks as the Spark executors also need the workflow code.

What changes were proposed in this pull request?

Always copy the code into the image if the task is a spark task.

How was this patch tested?

remote cluster

pyflyte run --remote flyte-example/test2.py spark_wf

Setup process

import random

from click.testing import CliRunner

from flytekit.clis.sdk_in_container import pyflyte
from flytekitplugins.spark import Spark

import flytekit
from flytekit import ImageSpec, task, Resources, workflow
from operator import add

custom_image = ImageSpec(
    name="flytekit",
    registry="pingsutw",  # your docker registry
)


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(
    task_config=Spark(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
            "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
        },
    ),
    limits=Resources(mem="2000M"),
    container_image=custom_image,
)
def hello_spark(partitions: int) -> float:
    print("Starting Spark with Partitions: {}".format(partitions))

    n = 1 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )

    pi_val = 4.0 * count / n
    return pi_val


@workflow
def spark_wf(partitions: int = 10) -> float:
    return hello_spark(partitions=partitions)


if __name__ == '__main__':
    runner = CliRunner()
    result = runner.invoke(pyflyte.main,
                           ["register",
                            "--non-fast",
                            "--version",
                            "v3",
                            "/Users/kevin/git/flytekit/flyte-example/test2.py",
                            ])
    print(result.output)

Screenshots

image
  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

NA

Docs link

NA

Signed-off-by: Kevin Su <pingsutw@gmail.com>
Signed-off-by: Kevin Su <pingsutw@gmail.com>
@dosubot dosubot bot added the size:M This PR changes 30-99 lines, ignoring generated files. label Mar 18, 2024
Copy link

codecov bot commented Mar 18, 2024

Codecov Report

Attention: Patch coverage is 93.33333% with 1 lines in your changes are missing coverage. Please review.

Project coverage is 83.33%. Comparing base (3f45131) to head (e1ce17f).
Report is 4 commits behind head on master.

Files Patch % Lines
...ugins/flytekit-spark/flytekitplugins/spark/task.py 80.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2280      +/-   ##
==========================================
- Coverage   83.79%   83.33%   -0.46%     
==========================================
  Files         332      309      -23     
  Lines       25147    24068    -1079     
  Branches     3703     3494     -209     
==========================================
- Hits        21071    20058    -1013     
+ Misses       3450     3385      -65     
+ Partials      626      625       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TEST

@dosubot dosubot bot added the lgtm This PR has been approved by maintainer label Mar 20, 2024
@pingsutw pingsutw merged commit ecc7835 into master Mar 25, 2024
46 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lgtm This PR has been approved by maintainer size:M This PR changes 30-99 lines, ignoring generated files.
Projects
None yet
3 participants