Skip to content

Commit

Permalink
Remove azure references in feast spark (#142)
Browse files Browse the repository at this point in the history
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed May 12, 2022
1 parent ea7bd05 commit 899574a
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 142 deletions.
76 changes: 0 additions & 76 deletions infra/scripts/azure-runner.sh

This file was deleted.

12 changes: 0 additions & 12 deletions python/feast_spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,9 @@ class ConfigOptions(metaclass=ConfigMeta):
#: Port for which Prometheus metric server will be running on
JOB_SERVICE_PROMETHEUS_METRIC_PORT: int = 8080

#: Default timeout when running batch ingestion
BATCH_INGESTION_PRODUCTION_TIMEOUT: str = "120"

#: Time to wait for historical feature requests before timing out.
BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS: str = "600"

#: Endpoint URL for S3 storage_client
S3_ENDPOINT_URL: Optional[str] = None

#: Account name for Azure blob storage_client
AZURE_BLOB_ACCOUNT_NAME: Optional[str] = None

#: Account access key for Azure blob storage_client
AZURE_BLOB_ACCOUNT_ACCESS_KEY: Optional[str] = None

#: Spark Job launcher. The choice of storage is connected to the choice of SPARK_LAUNCHER.
#:
#: Options: "standalone", "dataproc", "emr"
Expand Down
3 changes: 0 additions & 3 deletions python/feast_spark/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ def _k8s_launcher(config: Config) -> JobLauncher:
staging_location=staging_location,
incluster=config.getboolean(opt.SPARK_K8S_USE_INCLUSTER_CONFIG),
staging_client=get_staging_client(staging_uri.scheme, config),
# azure-related arguments are None if not using Azure blob storage
azure_account_name=config.get(opt.AZURE_BLOB_ACCOUNT_NAME, None),
azure_account_key=config.get(opt.AZURE_BLOB_ACCOUNT_ACCESS_KEY, None),
)


Expand Down
22 changes: 0 additions & 22 deletions python/feast_spark/pyspark/launchers/k8s/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,11 @@ def __init__(
stream_ingestion_resource_template_path: Optional[str],
historical_retrieval_resource_template_path: Optional[str],
staging_client: AbstractStagingClient,
azure_account_name: str,
azure_account_key: str,
):
self._namespace = namespace
self._api = _get_api(incluster=incluster)
self._staging_location = staging_location
self._staging_client = staging_client
self._azure_account_name = azure_account_name
self._azure_account_key = azure_account_key

generic_template = _load_resource_template(
generic_resource_template_path
Expand Down Expand Up @@ -260,20 +256,6 @@ def _job_from_job_info(self, job_info: JobInfo) -> SparkJob:
# We should never get here
raise ValueError(f"Unknown job type {job_info.job_type}")

def _get_azure_credentials(self):
uri = urlparse(self._staging_location)
if uri.scheme != "wasbs":
return {}
account_name = self._azure_account_name
account_key = self._azure_account_key
if account_name is None or account_key is None:
raise Exception(
"Using Azure blob storage requires Azure blob account name and access key to be set in config"
)
return {
f"spark.hadoop.fs.azure.account.key.{account_name}.blob.core.windows.net": f"{account_key}"
}

def historical_feature_retrieval(
self, job_params: RetrievalJobParameters
) -> RetrievalJob:
Expand Down Expand Up @@ -311,7 +293,6 @@ def historical_feature_retrieval(
packages=[],
jars=[],
extra_metadata={METADATA_OUTPUT_URI: job_params.get_destination_path()},
azure_credentials=self._get_azure_credentials(),
arguments=job_params.get_arguments(),
namespace=self._namespace,
extra_labels={LABEL_PROJECT: job_params.get_project()},
Expand Down Expand Up @@ -372,7 +353,6 @@ def offline_to_online_ingestion(
packages=[],
jars=[],
extra_metadata={},
azure_credentials=self._get_azure_credentials(),
arguments=ingestion_job_params.get_arguments(),
namespace=self._namespace,
extra_labels={
Expand Down Expand Up @@ -424,7 +404,6 @@ def schedule_offline_to_online_ingestion(
packages=[],
jars=[],
extra_metadata={},
azure_credentials=self._get_azure_credentials(),
arguments=ingestion_job_params.get_arguments(),
namespace=self._namespace,
extra_labels={
Expand Down Expand Up @@ -485,7 +464,6 @@ def start_stream_to_online_ingestion(
packages=[],
jars=extra_jar_paths,
extra_metadata={METADATA_JOBHASH: job_hash},
azure_credentials=self._get_azure_credentials(),
arguments=ingestion_job_params.get_arguments(),
namespace=self._namespace,
extra_labels={
Expand Down
4 changes: 0 additions & 4 deletions python/feast_spark/pyspark/launchers/k8s/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ def _prepare_job_resource(
packages: List[str],
jars: List[str],
extra_metadata: Dict[str, str],
azure_credentials: Dict[str, str],
arguments: List[str],
namespace: str,
extra_labels: Dict[str, str] = None,
Expand All @@ -145,7 +144,6 @@ def _prepare_job_resource(
_add_keys(job, ("spec",), dict(arguments=arguments))

_add_keys(job, ("spec", "sparkConf"), extra_metadata)
_add_keys(job, ("spec", "sparkConf"), azure_credentials)

if len(packages) > 1:
_append_items(job, ("spec", "deps", "packages"), packages)
Expand All @@ -165,7 +163,6 @@ def _prepare_scheduled_job_resource(
packages: List[str],
jars: List[str],
extra_metadata: Dict[str, str],
azure_credentials: Dict[str, str],
arguments: List[str],
namespace: str,
extra_labels: Dict[str, str] = None,
Expand All @@ -192,7 +189,6 @@ def _prepare_scheduled_job_resource(
packages=packages,
jars=jars,
extra_metadata=extra_metadata,
azure_credentials=azure_credentials,
arguments=arguments,
namespace=namespace,
extra_labels=extra_labels,
Expand Down
27 changes: 2 additions & 25 deletions tests/e2e/test_historical_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
from pyarrow import parquet

from feast import Client, Entity, Feature, FeatureTable, ValueType
from feast.constants import ConfigOptions as opt
from feast.data_source import BigQuerySource, FileSource
from feast_spark import Client as SparkClient
from feast_spark.pyspark.abc import SparkJobStatus

np.random.seed(0)


def read_parquet(uri, azure_account_name=None, azure_account_key=None):
def read_parquet(uri):
parsed_uri = urlparse(uri)
if parsed_uri.scheme == "file":
return pd.read_parquet(parsed_uri.path)
Expand All @@ -44,16 +43,6 @@ def read_parquet(uri, azure_account_name=None, azure_account_key=None):
files = ["s3://" + path for path in fs.glob(s3uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
elif parsed_uri.scheme == "wasbs":
import adlfs

fs = adlfs.AzureBlobFileSystem(
account_name=azure_account_name, account_key=azure_account_key
)
uripath = parsed_uri.username + parsed_uri.path
files = fs.glob(uripath + "/part-*")
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
else:
raise ValueError(f"Unsupported URL scheme {uri}")

Expand Down Expand Up @@ -87,13 +76,6 @@ def generate_data():
return transactions_df, customer_df


def _get_azure_creds(feast_client: Client):
return (
feast_client._config.get(opt.AZURE_BLOB_ACCOUNT_NAME, None),
feast_client._config.get(opt.AZURE_BLOB_ACCOUNT_ACCESS_KEY, None),
)


def test_historical_features(
feast_client: Client,
feast_spark_client: SparkClient,
Expand Down Expand Up @@ -134,12 +116,7 @@ def test_historical_features(

output_dir = job.get_output_file_uri()

# will both be None if not using Azure blob storage
account_name, account_key = _get_azure_creds(feast_client)

joined_df = read_parquet(
output_dir, azure_account_name=account_name, azure_account_key=account_key
)
joined_df = read_parquet(output_dir)

expected_joined_df = pd.DataFrame(
{
Expand Down

0 comments on commit 899574a

Please sign in to comment.