Skip to content

Commit 4010927

Browse files
Vexatospayonel
authored andcommitted
Offloading state saving into a separate thread.
(cherry picked from commit 0b256e0)
1 parent a8a421e commit 4010927

File tree

5 files changed

+151
-140
lines changed

5 files changed

+151
-140
lines changed

src/main/scala/li/cil/oc/OpenComputers.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import cpw.mods.fml.common.network.FMLEventChannel
99
import li.cil.oc.common.IMC
1010
import li.cil.oc.common.Proxy
1111
import li.cil.oc.server.command.CommandHandler
12-
import li.cil.oc.server.fs.BufferedFileSaveHandler
12+
import li.cil.oc.util.ThreadPoolFactory
1313
import org.apache.logging.log4j.LogManager
1414
import org.apache.logging.log4j.Logger
1515

@@ -57,12 +57,12 @@ object OpenComputers {
5757
@EventHandler
5858
def serverStart(e: FMLServerStartingEvent): Unit = {
5959
CommandHandler.register(e)
60-
BufferedFileSaveHandler.newThreadPool()
60+
ThreadPoolFactory.safePools.foreach(_.newThreadPool())
6161
}
6262

6363
@EventHandler
6464
def serverStop(e: FMLServerStoppedEvent): Unit = {
65-
BufferedFileSaveHandler.waitForSaving()
65+
ThreadPoolFactory.safePools.foreach(_.waitForCompletion())
6666
}
6767

6868
@EventHandler

src/main/scala/li/cil/oc/common/SaveHandler.scala

Lines changed: 74 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ import java.io
44
import java.io._
55
import java.nio.file._
66
import java.nio.file.attribute.BasicFileAttributes
7+
import java.util.concurrent.CancellationException
8+
import java.util.concurrent.ConcurrentLinkedDeque
9+
import java.util.concurrent.Future
10+
import java.util.concurrent.TimeUnit
11+
import java.util.concurrent.TimeoutException
712

813
import cpw.mods.fml.common.eventhandler.EventPriority
914
import cpw.mods.fml.common.eventhandler.SubscribeEvent
@@ -12,12 +17,13 @@ import li.cil.oc.Settings
1217
import li.cil.oc.api.machine.MachineHost
1318
import li.cil.oc.api.network.EnvironmentHost
1419
import li.cil.oc.util.BlockPosition
20+
import li.cil.oc.util.SafeThreadPool
21+
import li.cil.oc.util.ThreadPoolFactory
1522
import net.minecraft.nbt.CompressedStreamTools
1623
import net.minecraft.nbt.NBTTagCompound
1724
import net.minecraft.world.ChunkCoordIntPair
1825
import net.minecraft.world.World
1926
import net.minecraftforge.common.DimensionManager
20-
import net.minecraftforge.event.world.ChunkDataEvent
2127
import net.minecraftforge.event.world.WorldEvent
2228
import org.apache.commons.lang3.JavaVersion
2329
import org.apache.commons.lang3.SystemUtils
@@ -42,7 +48,32 @@ object SaveHandler {
4248
// which takes a lot of time and is completely unnecessary in those cases.
4349
var savingForClients = false
4450

45-
val saveData = mutable.Map.empty[Int, mutable.Map[ChunkCoordIntPair, mutable.Map[String, Array[Byte]]]]
51+
class SaveDataEntry(val data: Array[Byte], val pos: ChunkCoordIntPair, val name: String, val dimension: Int) extends Runnable {
52+
override def run(): Unit = {
53+
val path = statePath
54+
val dimPath = new io.File(path, dimension.toString)
55+
val chunkPath = new io.File(dimPath, s"${this.pos.chunkXPos}.${this.pos.chunkZPos}")
56+
chunkDirs.add(chunkPath)
57+
if (!chunkPath.exists()) {
58+
chunkPath.mkdirs()
59+
}
60+
val file = new io.File(chunkPath, this.name)
61+
try {
62+
// val fos = new GZIPOutputStream(new io.FileOutputStream(file))
63+
val fos = new io.BufferedOutputStream(new io.FileOutputStream(file))
64+
fos.write(this.data)
65+
fos.close()
66+
}
67+
catch {
68+
case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e)
69+
}
70+
}
71+
}
72+
73+
val stateSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("SaveHandler", 1)
74+
75+
val chunkDirs = new ConcurrentLinkedDeque[io.File]()
76+
val saving = mutable.HashMap.empty[String, Future[_]]
4677

4778
def savePath = new io.File(DimensionManager.getCurrentSaveRootDirectory, Settings.savePath)
4879

@@ -113,37 +144,34 @@ object SaveHandler {
113144
val dimension = nbt.getInteger("dimension")
114145
val chunk = new ChunkCoordIntPair(nbt.getInteger("chunkX"), nbt.getInteger("chunkZ"))
115146

147+
// Wait for the latest save task for the requested file to complete.
148+
// This prevents the chance of loading an outdated version
149+
// of this file.
150+
saving.get(name).foreach(f => try {
151+
f.get(120L, TimeUnit.SECONDS)
152+
} catch {
153+
case e: TimeoutException => OpenComputers.log.warn("Waiting for state data to save took two minutes! Aborting.")
154+
case e: CancellationException => // NO-OP
155+
})
156+
saving.remove(name)
157+
116158
load(dimension, chunk, name)
117159
}
118160

119-
def scheduleSave(dimension: Int, chunk: ChunkCoordIntPair, name: String, data: Array[Byte]) = saveData.synchronized {
161+
def scheduleSave(dimension: Int, chunk: ChunkCoordIntPair, name: String, data: Array[Byte]): Unit = {
120162
if (chunk == null) throw new IllegalArgumentException("chunk is null")
121163
else {
122-
// Make sure we get rid of old versions (e.g. left over by other mods
123-
// triggering a save - this is mostly used for RiM compatibility). We
124-
// need to do this for *each* dimension, in case computers are teleported
125-
// across dimensions.
126-
for (chunks <- saveData.values) chunks.values.foreach(_ -= name)
127-
val chunks = saveData.getOrElseUpdate(dimension, mutable.Map.empty)
128-
chunks.getOrElseUpdate(chunk, mutable.Map.empty) += name -> data
164+
// Disregarding whether or not there already was a
165+
// save submitted for the requested file
166+
// allows for better concurrency at the cost of
167+
// doing more writing operations.
168+
stateSaveHandler.withPool(_.submit(new SaveDataEntry(data, chunk, name, dimension))).foreach(saving.put(name, _))
129169
}
130170
}
131171

132172
def load(dimension: Int, chunk: ChunkCoordIntPair, name: String): Array[Byte] = {
133173
if (chunk == null) throw new IllegalArgumentException("chunk is null")
134-
// Use data from 'cache' if possible. This avoids weird things happening
135-
// when writeToNBT+readFromNBT is called by other mods (i.e. this is not
136-
// used to actually save the data to disk).
137-
saveData.get(dimension) match {
138-
case Some(chunks) => chunks.get(chunk) match {
139-
case Some(map) => map.get(name) match {
140-
case Some(data) => return data
141-
case _ =>
142-
}
143-
case _ =>
144-
}
145-
case _ =>
146-
}
174+
147175
val path = statePath
148176
val dimPath = new io.File(path, dimension.toString)
149177
val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}")
@@ -171,35 +199,28 @@ object SaveHandler {
171199
}
172200
}
173201

174-
@SubscribeEvent
175-
def onChunkSave(e: ChunkDataEvent.Save) = saveData.synchronized {
176-
val path = statePath
177-
val dimension = e.world.provider.dimensionId
178-
val chunk = e.getChunk.getChunkCoordIntPair
179-
val dimPath = new io.File(path, dimension.toString)
180-
val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}")
181-
if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) {
182-
for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete()
202+
def cleanSaveData(): Unit = {
203+
while (!chunkDirs.isEmpty) {
204+
val chunkPath = chunkDirs.poll()
205+
if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) {
206+
for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete()
207+
}
183208
}
184-
saveData.get(dimension) match {
185-
case Some(chunks) => chunks.get(chunk) match {
186-
case Some(entries) =>
187-
chunkPath.mkdirs()
188-
for ((name, data) <- entries) {
189-
val file = new io.File(chunkPath, name)
190-
try {
191-
// val fos = new GZIPOutputStream(new io.FileOutputStream(file))
192-
val fos = new io.BufferedOutputStream(new io.FileOutputStream(file))
193-
fos.write(data)
194-
fos.close()
195-
}
196-
catch {
197-
case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e)
198-
}
199-
}
200-
case _ => chunkPath.delete()
209+
210+
// Delete empty folders to keep the state folder clean.
211+
val emptyDirs = savePath.listFiles(new FileFilter {
212+
override def accept(file: File) = file.isDirectory &&
213+
// Make sure we only consider file system folders (UUID).
214+
file.getName.matches(uuidRegex) &&
215+
// We set the modified time in the save() method of unbuffered file
216+
// systems, to avoid deleting in-use folders here.
217+
System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && {
218+
val list = file.list()
219+
list == null || list.isEmpty
201220
}
202-
case _ =>
221+
})
222+
if (emptyDirs != null) {
223+
emptyDirs.filter(_ != null).foreach(_.delete())
203224
}
204225
}
205226

@@ -224,28 +245,9 @@ object SaveHandler {
224245

225246
@SubscribeEvent(priority = EventPriority.LOWEST)
226247
def onWorldSave(e: WorldEvent.Save) {
227-
saveData.synchronized {
228-
saveData.get(e.world.provider.dimensionId) match {
229-
case Some(chunks) => chunks.clear()
230-
case _ =>
231-
}
232-
}
233-
234-
// Delete empty folders to keep the state folder clean.
235-
val emptyDirs = savePath.listFiles(new FileFilter {
236-
override def accept(file: File) = file.isDirectory &&
237-
// Make sure we only consider file system folders (UUID).
238-
file.getName.matches(uuidRegex) &&
239-
// We set the modified time in the save() method of unbuffered file
240-
// systems, to avoid deleting in-use folders here.
241-
System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && {
242-
val list = file.list()
243-
list == null || list.isEmpty
244-
}
245-
})
246-
if (emptyDirs != null) {
247-
emptyDirs.filter(_ != null).foreach(_.delete())
248-
}
248+
stateSaveHandler.withPool(_.submit(new Runnable {
249+
override def run(): Unit = cleanSaveData()
250+
}))
249251
}
250252
}
251253

src/main/scala/li/cil/oc/server/fs/Buffered.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,17 @@ import java.util.concurrent.TimeoutException
99

1010
import li.cil.oc.OpenComputers
1111
import li.cil.oc.api.fs.Mode
12+
import li.cil.oc.util.ThreadPoolFactory
13+
import li.cil.oc.util.SafeThreadPool
1214
import net.minecraft.nbt.NBTTagCompound
1315
import org.apache.commons.io.FileUtils
1416

1517
import scala.collection.mutable
1618

19+
object Buffered {
20+
val fileSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("FileSystem", 1)
21+
}
22+
1723
trait Buffered extends OutputStreamFileSystem {
1824
protected def fileRoot: io.File
1925

@@ -58,7 +64,7 @@ trait Buffered extends OutputStreamFileSystem {
5864
for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) {
5965
val childPath = path + child.getName
6066
val childFile = new io.File(directory, child.getName)
61-
if (child.exists() && child .isDirectory && child.list() != null) {
67+
if (child.exists() && child.isDirectory && child.list() != null) {
6268
recurse(childPath + "/", childFile)
6369
}
6470
else if (!exists(childPath) || !isDirectory(childPath)) {
@@ -96,7 +102,9 @@ trait Buffered extends OutputStreamFileSystem {
96102

97103
override def save(nbt: NBTTagCompound) = {
98104
super.save(nbt)
99-
saving = BufferedFileSaveHandler.scheduleSave(this)
105+
saving = Buffered.fileSaveHandler.withPool(_.submit(new Runnable {
106+
override def run(): Unit = saveFiles()
107+
}))
100108
}
101109

102110
def saveFiles(): Unit = this.synchronized {

src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala

Lines changed: 0 additions & 63 deletions
This file was deleted.

0 commit comments

Comments
 (0)