Skip to content

Commit

Permalink
Fix S3 path for reverse edges, ensure we create updated_row_counts_me…
Browse files Browse the repository at this point in the history
…tadata.json when only Parquet metadata is changed
  • Loading branch information
thvasilo committed May 17, 2024
1 parent 8af868a commit 7a4e2f6
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ def _configure_spark_env(
# Improve memory utilization
# Avoid timeout errors due to connection pool starving
# Allow sending large results to driver
# TODO: Only set config when running on SageMaker, allow EMR/EMR-S defaults
spark_builder = (
spark_builder.config("spark.driver.memory", f"{driver_mem_mb}m")
.config("spark.driver.memoryOverhead", f"{driver_mem_overhead_mb}m")
Expand Down
36 changes: 18 additions & 18 deletions graphstorm-processing/graphstorm_processing/distributed_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,9 @@ def run(self) -> None:
"All file row counts match, applying Parquet metadata modification on Spark leader."
)
modify_flat_array_metadata(graph_meta_dict, repartitioner)
logging.info(
"Data are now prepared for processing by the DistPart Partition pipeline, "
"use metadata.json as config file."
)
# modify_flat_array_metadata modifies file metadata in-place,
# so the original meta dict still applies
updated_metadata = graph_meta_dict
else:
if self.repartition_on_leader:
logging.info("Attempting to repartition graph data on Spark leader...")
Expand All @@ -306,20 +305,6 @@ def run(self) -> None:
if self.filesystem_type == FilesystemType.S3:
self._upload_output_files(loader, force=True)
updated_metadata = repartition_files(graph_meta_dict, repartitioner)
updated_meta_path = os.path.join(
loader.output_path, "updated_row_counts_metadata.json"
)
with open(
updated_meta_path,
"w",
encoding="utf-8",
) as f:
json.dump(updated_metadata, f, indent=4)
f.flush()
logging.info("Updated metadata written to %s", updated_meta_path)
logging.info(
"Data are now prepared for processing by the DistPart Partition pipeline."
)
except Exception as e: # pylint: disable=broad-exception-caught
# If an error happens during re-partition, we don't want to fail the entire
# gs-processing job, so we just post an error and continue
Expand All @@ -333,6 +318,21 @@ def run(self) -> None:
logging.warning("gs-repartition will need to run as a follow-up job on the data!")
timers_dict["repartition"] = time.perf_counter() - repartition_start

# If any of the metadata modification took place, write an updated metadata file
if updated_metadata:
updated_meta_path = os.path.join(loader.output_path, "updated_row_counts_metadata.json")
with open(
updated_meta_path,
"w",
encoding="utf-8",
) as f:
json.dump(updated_metadata, f, indent=4)
f.flush()
logging.info("Updated metadata written to %s", updated_meta_path)
logging.info(
"Data are now prepared for processing by the Distributed Partition pipeline."
)

with open(
os.path.join(self.local_output_path, "perf_counters.json"), "w", encoding="utf-8"
) as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,12 +1205,10 @@ def write_edge_structure(
if self.add_reverse_edges:
reversed_edges = edge_df_with_only_int_ids.select("dst_int_id", "src_int_id")
reversed_edge_structure_path = os.path.join(
self.output_prefix, f"edges/{rev_edge_type}"
self.output_prefix, f"edges/{rev_edge_type.replace(':', '_')}"
)
logging.info("Writing edge structure for reverse edge type %s...", rev_edge_type)
reverse_path_list = self._write_df(
reversed_edges, reversed_edge_structure_path.replace(":", "_")
)
reverse_path_list = self._write_df(reversed_edges, reversed_edge_structure_path)
else:
reverse_path_list = []

Expand Down

0 comments on commit 7a4e2f6

Please sign in to comment.