Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run commands in imageSpec #1909

Merged
merged 6 commits into from
Dec 15, 2023
Merged

Run commands in imageSpec #1909

merged 6 commits into from
Dec 15, 2023

Conversation

pingsutw
Copy link
Member

@pingsutw pingsutw commented Oct 23, 2023

TL;DR

Add commands to ImageSpec, so people can download jars or any other packages.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

Another example for the spark task

import datetime
import random
from operator import add

import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark import Spark
from flytekit.image_spec.image_spec import ImageSpec

spark_image = ImageSpec(registry="pingsutw", apt_packages=["wget"])
spark_image.with_commands(["wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.2/hadoop-aws-3.2.2.jar -P /opt/spark/jars",
                          "wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -P /opt/spark/jars",
                          "wget https://github.com/GoogleCloudDataproc/hadoop-connectors/releases/download/v2.2.17/util-hadoop-hadoop3-2.2.17.jar -P /opt/spark/jars"])

@task(
    task_config=Spark(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        },
        executor_path="/usr/bin/python3",
        applications_path="local:///usr/local/bin/entrypoint.py",
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
    container_image=spark_image,
)
def hello_spark(partitions: int) -> float:
    print("Starting Sparkfk wifth Partitions: {}".format(partitions))
    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    pi_val = 4.0 * count / n
    print("Pi val is :{}".format(pi_val))
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(cache_version="1", container_image=spark_image)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    return 1


@workflow
def wf(triggered_date: datetime.datetime = datetime.datetime.now()) -> float:
    """
    Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi = hello_spark(partitions=50)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi

Tracking Issue

https://github.com/flyteorg/flyte/issues/

Follow-up issue

NA

Signed-off-by: Kevin Su <pingsutw@apache.org>
@codecov
Copy link

codecov bot commented Oct 23, 2023

Codecov Report

Attention: 10 lines in your changes are missing coverage. Please review.

Comparison is base (744c167) 85.80% compared to head (c200da5) 85.69%.
Report is 43 commits behind head on master.

Files Patch % Lines
flytekit/image_spec/image_spec.py 64.28% 5 Missing and 5 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1909      +/-   ##
==========================================
- Coverage   85.80%   85.69%   -0.11%     
==========================================
  Files         313      297      -16     
  Lines       23278    22660     -618     
  Branches     3526     3476      -50     
==========================================
- Hits        19973    19419     -554     
+ Misses       2702     2642      -60     
+ Partials      603      599       -4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@pingsutw pingsutw marked this pull request as ready for review November 21, 2023 19:16
Signed-off-by: Kevin Su <pingsutw@apache.org>
cosmicBboy
cosmicBboy previously approved these changes Nov 21, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
@wild-endeavor
Copy link
Contributor

does this go down the road a little bit of recreating what a dockerfile is? won't one day people want to add additional commands and stuff? i thought imagespec supported a full dockerfile does it not?

eapolinario
eapolinario previously approved these changes Dec 7, 2023
@eapolinario
Copy link
Collaborator

does this go down the road a little bit of recreating what a dockerfile is? won't one day people want to add additional commands and stuff? i thought imagespec supported a full dockerfile does it not?

It recreates a portion of Dockerfiles with a lot of structure. For example, you can't have multi-stage builds, you have commands run in arbitrary places.


return new_image_spec

def with_apt_packages(self, apt_packages: Union[str, List[str]]) -> "ImageSpec":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional apt packages

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

new_image_spec.apt_packages = []

if isinstance(apt_packages, List):
new_image_spec.apt_packages.extend(apt_packages)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this difference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if apt_packages is a str (single package), we cannot use extend. we have to use append here.

@kumare3
Copy link
Contributor

kumare3 commented Dec 8, 2023

this is pretty cool, lets close on this

Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw merged commit 68e98df into master Dec 15, 2023
72 of 74 checks passed
RRap0so pushed a commit to RRap0so/flytekit that referenced this pull request Dec 15, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Rafael Raposo <rafaelraposo@spotify.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants