From 1b90b0dd61c22ffba6d578f73cf5aca88629b1be Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Mon, 12 Sep 2016 03:41:32 +0800 Subject: [PATCH 1/2] Make DiskBytesSpilled metric in PySpark shuffle right The origin way increase the DiskBytesSpilled metric with the file size during each spill in ExternalMerger && ExternalGroupBy, but we only need the last size. No extra Tests, because this just update the metrics Author: Li Hu --- python/pyspark/shuffle.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index e974cda9fc3e1..65983542cf7f4 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -322,13 +322,15 @@ def _spill(self): self.pdata.extend([{} for i in range(self.partitions)]) else: + diskBytesSpilled = 0 for i in range(self.partitions): p = os.path.join(path, str(i)) with open(p, "wb") as f: # dump items in batch self.serializer.dump_stream(iter(self.pdata[i].items()), f) self.pdata[i].clear() - DiskBytesSpilled += os.path.getsize(p) + diskBytesSpilled += os.path.getsize(p) + DiskBytesSpilled = diskBytesSpilled self.spills += 1 gc.collect() # release the memory as much as possible @@ -746,6 +748,7 @@ def _spill(self): self.pdata.extend([{} for i in range(self.partitions)]) else: + diskBytesSpilled = 0 for i in range(self.partitions): p = os.path.join(path, str(i)) with open(p, "wb") as f: @@ -757,7 +760,8 @@ def _spill(self): else: self.serializer.dump_stream(self.pdata[i].items(), f) self.pdata[i].clear() - DiskBytesSpilled += os.path.getsize(p) + diskBytesSpilled += os.path.getsize(p) + DiskBytesSpilled = diskBytesSpilled self.spills += 1 gc.collect() # release the memory as much as possible From 71d9560744051a7d87152c678c358e4d69c38fa1 Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Mon, 12 Sep 2016 16:13:29 +0800 Subject: [PATCH 2/2] Correct the DiskBytesSpilled metrics We do not consider the DiskBytesSpilled can be increased by other methods at the prev commit --- python/pyspark/shuffle.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 65983542cf7f4..ab1c615ab2a77 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -322,15 +322,16 @@ def _spill(self): self.pdata.extend([{} for i in range(self.partitions)]) else: - diskBytesSpilled = 0 for i in range(self.partitions): p = os.path.join(path, str(i)) + beforeWriteSize = os.path.getsize(p) with open(p, "wb") as f: # dump items in batch self.serializer.dump_stream(iter(self.pdata[i].items()), f) self.pdata[i].clear() - diskBytesSpilled += os.path.getsize(p) - DiskBytesSpilled = diskBytesSpilled + afterWriteSize = os.path.getsize(p) + DiskBytesSpilled += (afterWriteSize - beforeWriteSize) + self.spills += 1 gc.collect() # release the memory as much as possible @@ -748,9 +749,9 @@ def _spill(self): self.pdata.extend([{} for i in range(self.partitions)]) else: - diskBytesSpilled = 0 for i in range(self.partitions): p = os.path.join(path, str(i)) + beforeWriteSize = os.path.getsize(p) with open(p, "wb") as f: # dump items in batch if self._sorted: @@ -760,8 +761,8 @@ def _spill(self): else: self.serializer.dump_stream(self.pdata[i].items(), f) self.pdata[i].clear() - diskBytesSpilled += os.path.getsize(p) - DiskBytesSpilled = diskBytesSpilled + afterWriteSize = os.path.getsize(p) + DiskBytesSpilled += (afterWriteSize - beforeWriteSize) self.spills += 1 gc.collect() # release the memory as much as possible