Skip to content

Commit

Permalink
Make job hashing order agnostic (#130)
Browse files Browse the repository at this point in the history
* Make job hashing order agnostic

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* linting

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* linting

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* linting

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Fix jinja2 version

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed Mar 29, 2022
1 parent eb974bc commit 72e6e17
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 10 deletions.
25 changes: 16 additions & 9 deletions python/feast_spark/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,13 +575,13 @@ def __init__(
feature_table: Dict,
source: Dict,
jar: str,
extra_jars: List[str],
redis_host: Optional[str],
redis_port: Optional[int],
redis_password: Optional[str],
redis_ssl: Optional[bool],
bigtable_project: Optional[str],
bigtable_instance: Optional[str],
extra_jars: List[str] = None,
redis_host: Optional[str] = None,
redis_port: Optional[int] = None,
redis_password: Optional[str] = None,
redis_ssl: Optional[bool] = None,
bigtable_project: Optional[str] = None,
bigtable_instance: Optional[str] = None,
cassandra_host: Optional[str] = None,
cassandra_port: Optional[int] = None,
statsd_host: Optional[str] = None,
Expand Down Expand Up @@ -621,7 +621,7 @@ def get_job_type(self) -> SparkJobType:
return SparkJobType.STREAM_INGESTION

def get_extra_jar_paths(self) -> List[str]:
return self._extra_jars
return self._extra_jars if self._extra_jars else []

def get_arguments(self) -> List[str]:
args = super().get_arguments()
Expand All @@ -633,8 +633,15 @@ def get_arguments(self) -> List[str]:
return args

def get_job_hash(self) -> str:
sorted_feature_table = self._feature_table.copy()
sorted_feature_table["entities"] = sorted(
self._feature_table["entities"], key=lambda x: x["name"]
)
sorted_feature_table["features"] = sorted(
self._feature_table["features"], key=lambda x: x["name"]
)
job_json = json.dumps(
{"source": self._source, "feature_table": self._feature_table},
{"source": self._source, "feature_table": sorted_feature_table},
sort_keys=True,
)
return hashlib.md5(job_json.encode()).hexdigest()
Expand Down
2 changes: 1 addition & 1 deletion python/requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ mypy-protobuf
avro==1.10.0
gcsfs
urllib3>=1.25.4
google-cloud-dataproc==2.0.2
pytest==6.0.0
pytest-lazy-fixture==0.6.3
pytest-timeout==1.4.2
Expand All @@ -24,3 +23,4 @@ PyYAML==5.3.1
great-expectations==0.13.2
adlfs==0.5.9
redis==4.1.*
Jinja2==3.0.3
46 changes: 46 additions & 0 deletions python/tests/test_launcher_abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from feast_spark.pyspark.abc import StreamIngestionJobParameters


def test_stream_ingestion_job_hash():
streaming_source = {
"kafka": {
"event_timestamp_column": "event_timestamp",
"bootstrap_servers": "localhost:9092",
"topic": "test",
"format": {
"class_path": "com.test.someprotos",
"json_class": "ProtoFormat",
},
}
}
feature_table = {
"features": [
{"name": "feature_1", "type": "STRING"},
{"name": "feature_2", "type": "STRING"},
],
"entities": [
{"name": "entity_1", "type": "STRING"},
{"name": "entity_2", "type": "STRING"},
],
"project": "someproject",
}
feature_table_with_different_order = {
"features": [
{"name": "feature_2", "type": "STRING"},
{"name": "feature_1", "type": "STRING"},
],
"entities": [
{"name": "entity_2", "type": "STRING"},
{"name": "entity_1", "type": "STRING"},
],
"project": "someproject",
}
param = StreamIngestionJobParameters(
source=streaming_source, feature_table=feature_table, jar=""
)
param_different_order = StreamIngestionJobParameters(
source=streaming_source,
feature_table=feature_table_with_different_order,
jar="",
)
assert param.get_job_hash() == param_different_order.get_job_hash()

0 comments on commit 72e6e17

Please sign in to comment.