Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hadoop aws/gcp jar to the spark default image #1908

Merged
merged 5 commits into from
Nov 6, 2023
Merged

Conversation

pingsutw
Copy link
Member

TL;DR

Add hadoop s3 and gcs dependencies to the default spark image. Spark needs these jar to read the data from s3 / gcs.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

Spark example:

import datetime
import random
from operator import add

import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark import Spark
from flytekit.image_spec.image_spec import ImageSpec

spark_image = ImageSpec(base_image="pingsutw/spark-v2", registry="pingsutw")


@task(
    task_config=Spark(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        },
        executor_path="/usr/bin/python3",
        applications_path="local:///usr/local/bin/entrypoint.py",
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
    container_image=spark_image,
)
def hello_spark(partitions: int) -> float:
    print("Starting Sparkfk wifth Partitions: {}".format(partitions))
    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    pi_val = 4.0 * count / n
    print("Pi val is :{}".format(pi_val))
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(cache_version="1", container_image=spark_image)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    return 1


@workflow
def wf(triggered_date: datetime.datetime = datetime.datetime.now()) -> float:
    """
    Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi = hello_spark(partitions=50)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi


if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(
        f"Running my_spark(triggered_date=datetime.datetime.now()){wf(triggered_date=datetime.datetime.now())}"
    )

Tracking Issue

NA

Follow-up issue

NA

Signed-off-by: Kevin Su <pingsutw@apache.org>
@codecov
Copy link

codecov bot commented Oct 22, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (63e6632) 62.81% compared to head (17ed538) 62.81%.

Additional details and impacted files
@@           Coverage Diff           @@
##           master    #1908   +/-   ##
=======================================
  Coverage   62.81%   62.81%           
=======================================
  Files         307      307           
  Lines       22984    22984           
  Branches     3490     3490           
=======================================
  Hits        14438    14438           
  Misses       8124     8124           
  Partials      422      422           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@iaroslav-ciupin iaroslav-ciupin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing! Appreciate it!

@kumare3
Copy link
Contributor

kumare3 commented Oct 22, 2023

This is a band-aid. How will this work on gcp? It should be at the user level imo

@pingsutw pingsutw changed the title Add hadoop-aws jar to the spark default image Add hadoop aws/gcp jar to the spark default image Oct 23, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw merged commit 145fd77 into master Nov 6, 2023
71 checks passed
ringohoffman pushed a commit to ringohoffman/flytekit that referenced this pull request Nov 24, 2023
* Add hadoop-aws jar to the spark default image

Signed-off-by: Kevin Su <pingsutw@apache.org>

* no-cache-dir

Signed-off-by: Kevin Su <pingsutw@apache.org>

---------

Signed-off-by: Kevin Su <pingsutw@apache.org>
RRap0so pushed a commit to RRap0so/flytekit that referenced this pull request Dec 15, 2023
* Add hadoop-aws jar to the spark default image

Signed-off-by: Kevin Su <pingsutw@apache.org>

* no-cache-dir

Signed-off-by: Kevin Su <pingsutw@apache.org>

---------

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Rafael Raposo <rafaelraposo@spotify.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants