Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windoze/211 maven submission #334

Merged
merged 4 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions feathr_project/feathr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@
TYPEDEF_ARRAY_DERIVED_FEATURE=f"array<feathr_derived_feature_{REGISTRY_TYPEDEF_VERSION}>"
TYPEDEF_ARRAY_ANCHOR_FEATURE=f"array<feathr_anchor_feature_{REGISTRY_TYPEDEF_VERSION}>"

FEATHR_MAVEN_ARTIFACT="com.linkedin.feathr:feathr_2.12:0.4.0"
1 change: 1 addition & 0 deletions feathr_project/feathr/spark_provider/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
!noop-1.0.jar
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name:
submission_params['new_cluster']['spark_conf'] = configuration
submission_params['new_cluster']['custom_tags'] = job_tags
# the feathr main jar file is anyway needed regardless it's pyspark or scala spark
submission_params['libraries'][0]['jar'] = self.upload_or_get_cloud_path(main_jar_path)
if main_jar_path is None or main_jar_path=="":
logger.info("Main JAR file is not set, using default package from Maven")
windoze marked this conversation as resolved.
Show resolved Hide resolved
submission_params['libraries'][0]['maven'] = { "coordinates": FEATHR_MAVEN_ARTIFACT }
else:
submission_params['libraries'][0]['jar'] = self.upload_or_get_cloud_path(main_jar_path)
# see here for the submission parameter definition https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs#--request-structure-6
if python_files:
# this is a pyspark job. definition here: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs#--sparkpythontask
Expand Down
80 changes: 57 additions & 23 deletions feathr_project/feathr/spark_provider/_synapse_submission.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from copy import deepcopy
import os
import pathlib
import re
import time
import urllib.request
Expand Down Expand Up @@ -43,7 +45,8 @@ class _FeathrSynapseJobLauncher(SparkJobLauncher):
"""
Submits spark jobs to a Synapse spark cluster.
"""
def __init__(self, synapse_dev_url: str, pool_name: str, datalake_dir: str, executor_size: str, executors: int, credential = None):

def __init__(self, synapse_dev_url: str, pool_name: str, datalake_dir: str, executor_size: str, executors: int, credential=None):
# use DeviceCodeCredential if EnvironmentCredential is not available
self.credential = credential
# use the same credential for authentication to avoid further login.
Expand All @@ -60,9 +63,11 @@ def upload_or_get_cloud_path(self, local_path_or_http_path: str):
Supports transferring file from an http path to cloud working storage, or upload directly from a local storage.
"""
logger.info('Uploading {} to cloud..', local_path_or_http_path)
res_path = self._datalake.upload_file_to_workdir(local_path_or_http_path)
res_path = self._datalake.upload_file_to_workdir(
local_path_or_http_path)

logger.info('{} is uploaded to location: {}', local_path_or_http_path, res_path)
logger.info('{} is uploaded to location: {}',
local_path_or_http_path, res_path)
return res_path

def download_result(self, result_path: str, local_folder: str):
Expand All @@ -73,7 +78,7 @@ def download_result(self, result_path: str, local_folder: str):
return self._datalake.download_file(result_path, local_folder)

def submit_feathr_job(self, job_name: str, main_jar_path: str = None, main_class_name: str = None, arguments: List[str] = None,
python_files: List[str]= None, reference_files_path: List[str] = None, job_tags: Dict[str, str] = None,
python_files: List[str] = None, reference_files_path: List[str] = None, job_tags: Dict[str, str] = None,
configuration: Dict[str, str] = None):
"""
Submits the feathr job
Expand All @@ -92,21 +97,45 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str = None, main_clas
job_name (str): name of the job
main_jar_path (str): main file paths, usually your main jar file
main_class_name (str): name of your main class
arguments (str): all the arugments you want to pass into the spark job
job_tags (str): tags of the job, for exmaple you might want to put your user ID, or a tag with a certain information
arguments (str): all the arguments you want to pass into the spark job
job_tags (str): tags of the job, for example you might want to put your user ID, or a tag with a certain information
configuration (Dict[str, str]): Additional configs for the spark job
"""
assert main_jar_path, 'main_jar_path should not be none or empty but it is none or empty.'
if main_jar_path.startswith('abfs'):
main_jar_cloud_path = main_jar_path
logger.info(
'Cloud path {} is used for running the job: {}', main_jar_path, job_name)
else:
logger.info('Uploading jar from {} to cloud for running job: {}',
main_jar_path, job_name)
main_jar_cloud_path = self._datalake.upload_file_to_workdir(main_jar_path)
logger.info('{} is uploaded to {} for running job: {}',
main_jar_path, main_jar_cloud_path, job_name)

cfg = configuration.copy() # We don't want to mess up input parameters
if not main_jar_path:
# We don't have the main jar, use Maven
if not python_files:
# This is a JAR job
# Azure Synapse/Livy doesn't allow JAR job starts from Maven directly, we must have a jar file uploaded.
# so we have to use a dummy jar as the main file.
logger.info("Main JAR file is not set, using default package from Maven")
# Add Maven dependency to the job
if "spark.jars.packages" in cfg:
cfg["spark.jars.packages"] = ",".join(
[cfg["spark.jars.packages"], FEATHR_MAVEN_ARTIFACT])
else:
cfg["spark.jars.packages"] = FEATHR_MAVEN_ARTIFACT
# Use the no-op jar as the main file
# This is a dummy jar which contains only one `org.example.Noop` class with one empty `main` function which does nothing
current_dir = pathlib.Path(__file__).parent.resolve()
main_jar_path = os.path.join(current_dir, "noop-1.0.jar")
else:
# This is a PySpark job
pass
main_jar_cloud_path = None
if main_jar_path:
# Now we have a main jar, either feathr or noop
if main_jar_path.startswith('abfs'):
main_jar_cloud_path = main_jar_path
windoze marked this conversation as resolved.
Show resolved Hide resolved
logger.info(
'Cloud path {} is used for running the job: {}', main_jar_path, job_name)
else:
logger.info('Uploading jar from {} to cloud for running job: {}',
main_jar_path, job_name)
main_jar_cloud_path = self._datalake.upload_file_to_workdir(main_jar_path)
logger.info('{} is uploaded to {} for running job: {}',
main_jar_path, main_jar_cloud_path, job_name)

reference_file_paths = []
for file_path in reference_files_path:
Expand All @@ -120,7 +149,7 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str = None, main_clas
arguments=arguments,
reference_files=reference_files_path,
tags=job_tags,
configuration=configuration)
configuration=cfg)
logger.info('See submitted job here: https://web.azuresynapse.net/en-us/monitoring/sparkapplication')
return self.current_job_info

Expand Down Expand Up @@ -247,8 +276,13 @@ def create_spark_batch_job(self, job_name, main_file, class_name=None,
executor_cores = self.EXECUTOR_SIZE[self._executor_size]['Cores']
executor_memory = self.EXECUTOR_SIZE[self._executor_size]['Memory']

# need to put the jar in as dependencies for pyspark job
jars = jars + [main_file]
# If we have a main jar, it needs to be added as dependencies for pyspark job
# Otherwise it's a PySpark job with Feathr JAR from Maven
if main_file:
if not python_files:
# These 2 parameters should not be empty at the same time
raise ValueError("Main JAR is not set for the Spark job")
jars = jars + [main_file]

# If file=main_file, then it's using only Scala Spark
# If file=python_files[0], then it's using Pyspark
Expand Down Expand Up @@ -319,7 +353,7 @@ def __init__(self, datalake_dir, credential=None):
self.dir_client = self.file_system_client.get_directory_client('/')

self.datalake_dir = datalake_dir + \
'/' if datalake_dir[-1] != '/' else datalake_dir
'/' if datalake_dir[-1] != '/' else datalake_dir

def upload_file_to_workdir(self, src_file_path: str) -> str:
"""
Expand Down Expand Up @@ -394,7 +428,7 @@ def download_file(self, target_adls_directory: str, local_dir_cache: str):
for folder in result_folders:
folder_name = basename(folder)
file_in_folder = [os.path.join(folder_name, basename(file_path.name)) for file_path in self.file_system_client.get_paths(
path=folder, recursive=False) if not file_path.is_directory]
path=folder, recursive=False) if not file_path.is_directory]
local_paths = [os.path.join(local_dir_cache, file_name)
for file_name in file_in_folder]
self._download_file_list(local_paths, file_in_folder, directory_client)
Expand All @@ -405,7 +439,7 @@ def download_file(self, target_adls_directory: str, local_dir_cache: str):
self._download_file_list(local_paths, result_paths, directory_client)

logger.info('Finish downloading files from {} to {}.',
target_adls_directory,local_dir_cache)
target_adls_directory, local_dir_cache)

def _download_file_list(self, local_paths: List[str], result_paths, directory_client):
'''
Expand Down
Binary file added feathr_project/feathr/spark_provider/noop-1.0.jar
Binary file not shown.