Permalink
Browse files

o Initial implementation of pre-buffering thread.

  • Loading branch information...
hzeller committed Oct 7, 2012
1 parent 5f9848b commit fea3d5571dc7f87ef19a874893c59f2180e913a1
Showing with 247 additions and 46 deletions.
  1. +49 −12 conversion-buffer.cc
  2. +19 −3 conversion-buffer.h
  3. +11 −3 file-handler.h
  4. +70 −13 folve-filesystem.cc
  5. +5 −0 folve-filesystem.h
  6. +19 −0 folve-main.cc
  7. +28 −11 status-server.cc
  8. +3 −1 status-server.h
  9. +21 −0 util.cc
  10. +22 −3 util.h
View
@@ -40,7 +40,7 @@ static char *TempNameAllocated(const char *pattern) {
ConversionBuffer::ConversionBuffer(SoundSource *source, const SF_INFO &info)
: source_(source), out_filedes_(-1), snd_writing_enabled_(true),
- total_written_(0), header_end_(0) {
+ total_written_(0), max_accessed_(0), header_end_(0), file_complete_(false) {
char *filename = TempNameAllocated("folve-XXXXXX");
out_filedes_ = mkstemp(filename);
if (out_filedes_ < 0) {
@@ -58,7 +58,9 @@ ConversionBuffer::~ConversionBuffer() {
}
sf_count_t ConversionBuffer::SndTell(void *userdata) {
- return reinterpret_cast<ConversionBuffer*>(userdata)->FileSize();
+ // This will be called within writing, when our mutex is locked. So only
+ // call the version that assumed locked by mutex.
+ return reinterpret_cast<ConversionBuffer*>(userdata)->FileSize_Locked();
}
sf_count_t ConversionBuffer::SndWrite(const void *ptr, sf_count_t count,
void *userdata) {
@@ -122,6 +124,41 @@ ssize_t ConversionBuffer::SndAppend(const void *data, size_t count) {
void ConversionBuffer::HeaderFinished() { header_end_ = FileSize(); }
+// Mmh, looks like we're calling ourself while in FillUntil() loop. Investigate.
+// Until then: don't lock mutex for simple access.
+
+off_t ConversionBuffer::FileSize() const {
+ //folve::MutexLock l(&mutex_);
+ return FileSize_Locked();
+}
+
+off_t ConversionBuffer::FileSize_Locked() const {
+ return total_written_;
+}
+
+off_t ConversionBuffer::MaxAccessed() const {
+ //folve::MutexLock l(&mutex_);
+ return max_accessed_;
+}
+
+bool ConversionBuffer::IsFileComplete() const {
+ //folve::MutexLock l(&mutex_);
+ return file_complete_;
+}
+
+void ConversionBuffer::FillUntil(off_t requested_min_written) {
+ // As soon as someone tries to read beyond of what we already have, we call
+ // the callback that fills more of it.
+ // We are shared between potentially several open files. Serialize threads.
+ folve::MutexLock l(&mutex_);
+ while (!file_complete_ && total_written_ < requested_min_written) {
+ if (!source_->AddMoreSoundData()) {
+ file_complete_ = true;
+ break;
+ }
+ }
+}
+
ssize_t ConversionBuffer::Read(char *buf, size_t size, off_t offset) {
// As long as we're reading only within the header area, allow 'short' reads,
// i.e. reads that return less bytes than requested (but up to the headers'
@@ -138,15 +175,15 @@ ssize_t ConversionBuffer::Read(char *buf, size_t size, off_t offset) {
// required_min_written = offset + size; // all requested bytes.
const off_t required_min_written = offset + (offset >= header_end_ ? size : 1);
- // As soon as someone tries to read beyond of what we already have, we call
- // the callback that fills more of it.
- // We are shared between potentially several open files. Serialize threads.
- mutex_.Lock();
- while (total_written_ < required_min_written) {
- if (!source_->AddMoreSoundData())
- break;
- }
- mutex_.Unlock();
+ FillUntil(required_min_written);
- return pread(out_filedes_, buf, size, offset);
+ const ssize_t read_result = pread(out_filedes_, buf, size, offset);
+ if (read_result >= 0) {
+ const off_t new_max_accessed = offset + read_result;
+ if (new_max_accessed > max_accessed_) {
+ folve::MutexLock l(&mutex_);
+ max_accessed_ = new_max_accessed;
+ }
+ }
+ return read_result;
}
View
@@ -71,6 +71,9 @@ class ConversionBuffer {
// used for chirurgical header editing...
void WriteCharAt(unsigned char c, off_t offset);
+ // Fill read file until we have the required bytes available.
+ void FillUntil(off_t requested_min_written);
+
// Enable writing through the SNDFILE.
// If set to 'false', writes via the SNDFILE are ignored.
// To be used to suppress writing of the header or
@@ -83,8 +86,16 @@ class ConversionBuffer {
// (Long story, see Read() for details).
void HeaderFinished();
- // Current max file position.
- off_t FileSize() const { return total_written_; }
+ // Returns if we've completed this file.
+ bool IsFileComplete() const;
+
+ // Current max file position. Must not be called within AddMoreSoundData().
+ off_t FileSize() const;
+
+ // Maximum position accessed. This might be different from FileSize in case
+ // we have a pre-buffering thread running.
+ // Must not be called within AddMoreSoundData()
+ off_t MaxAccessed() const;
private:
static sf_count_t SndTell(void *userdata);
@@ -97,12 +108,17 @@ class ConversionBuffer {
// Can be NULL on error.
SNDFILE *CreateOutputSoundfile(const SF_INFO &info);
+ // Like FileSize(), but assuming that the mutex is already locked.
+ off_t FileSize_Locked() const;
+
SoundSource *const source_;
int out_filedes_;
bool snd_writing_enabled_;
off_t total_written_;
+ off_t max_accessed_;
off_t header_end_;
- folve::Mutex mutex_;
+ bool file_complete_;
+ mutable folve::Mutex mutex_;
};
#endif // FOLVE_CONVERSION_BUFFER_H
View
@@ -27,14 +27,16 @@
// gone, to show 'retired' elements in the status server.
struct HandlerStats {
HandlerStats()
- : duration_seconds(-1), progress(-1), status(OPEN), last_access(0),
+ : duration_seconds(-1), access_progress(-1), buffer_progress(-1),
+ status(OPEN), last_access(0),
max_output_value(0), in_gapless(false), out_gapless(false) {}
std::string filename; // filesystem name.
std::string format; // File format info if recognized.
std::string message; // Per file (error) message if any.
int duration_seconds; // Audio file length if known; -1 otherwise.
- float progress; // File access rogress if known; -1 otherwise.
+ float access_progress; // File access progress if known; -1 otherwise.
+ float buffer_progress; // File buffering progress if known.
enum Status { OPEN, IDLE, RETIRED };
Status status; // Status of this file handler.
@@ -67,7 +69,13 @@ class FileHandler {
// Accept processor passed on from the previous file. Can return false
// if this FileHandler cannot use it (e.g. it alrady started convolving).
- virtual bool AcceptProcessor(SoundProcessor *s) { return false; }
+ // The Receiver must not use this processor until
+ // NotifyPassedProcessorUnreferenced() is called.
+ virtual bool PassoverProcessor(SoundProcessor *s) { return false; }
+
+ // If we got a processor passed over, the caller will notify us once it
+ // is fully done with it.
+ virtual void NotifyPassedProcessorUnreferenced() { }
private:
const std::string filter_dir_;
View
@@ -75,7 +75,8 @@ class PassThroughHandler : public FileHandler {
virtual void GetHandlerStatus(HandlerStats *stats) {
*stats = info_stats_;
if (file_size_ > 0) {
- stats->progress = 1.0 * max_accessed_ / file_size_;
+ stats->access_progress = 1.0 * max_accessed_ / file_size_;
+ stats->buffer_progress = stats->access_progress;
}
}
@@ -86,6 +87,45 @@ class PassThroughHandler : public FileHandler {
HandlerStats info_stats_;
};
+// A thread, that attempts to pre-buffer data. There are some misbehaving
+// media clients out there, that buffer a whole chunk but don't start reading
+// again when they're low on buffer - in particular on small machines that
+// use considerable CPU for folve, this is not desirable.
+class PreBufferThread : public folve::Thread {
+public:
+ // Buffer thread around conversion buffer. Does not take over ownership
+ // of buffer.
+ PreBufferThread(ConversionBuffer *buffer, int buffer_ahead)
+ : buffer_(buffer), buffer_ahead_(buffer_ahead), do_run_(true) {}
+
+ void StopRunning() {
+ folve::MutexLock l(&run_mutex_);
+ do_run_ = false;
+ }
+
+ virtual void Run() {
+ const int kBufferChunk = (16 << 10); // 16k
+ while (ShouldRun() && !buffer_->IsFileComplete()) {
+ const off_t goal = buffer_->MaxAccessed() + buffer_ahead_;
+ while (!buffer_->IsFileComplete() && buffer_->FileSize() < goal) {
+ buffer_->FillUntil(buffer_->FileSize() + kBufferChunk);
+ }
+ // TODO: make condition variables not crude sleep.
+ usleep(500000);
+ }
+ }
+
+private:
+ bool ShouldRun() {
+ folve::MutexLock l(&run_mutex_);
+ return do_run_;
+ }
+ ConversionBuffer *const buffer_;
+ const int buffer_ahead_;
+ folve::Mutex run_mutex_;
+ bool do_run_;
+};
+
class SndFileHandler :
public FileHandler,
public ConversionBuffer::SoundSource {
@@ -138,6 +178,7 @@ class SndFileHandler :
virtual ~SndFileHandler() {
Close();
+ delete buffer_thread_;
delete output_buffer_;
}
@@ -176,10 +217,14 @@ class SndFileHandler :
}
*stats = base_stats_;
const int frames_done = in_info_.frames - input_frames_left_;
- if (frames_done == 0 || in_info_.frames == 0)
- stats->progress = 0.0;
- else
- stats->progress = 1.0 * frames_done / in_info_.frames;
+ if (frames_done == 0 || in_info_.frames == 0) {
+ stats->buffer_progress = 0.0;
+ stats->access_progress = 0.0;
+ } else {
+ stats->buffer_progress = 1.0 * frames_done / in_info_.frames;
+ stats->access_progress = stats->buffer_progress
+ * output_buffer_->MaxAccessed() / output_buffer_->FileSize();
+ }
if (base_stats_.max_output_value > 1.0) {
// TODO: the status server could inspect this value and make better
@@ -224,7 +269,7 @@ class SndFileHandler :
: FileHandler(filter_dir), fs_(fs),
filedes_(filedes), snd_in_(snd_in), in_info_(in_info),
base_stats_(file_info),
- error_(false), output_buffer_(NULL),
+ error_(false), output_buffer_(NULL), buffer_thread_(NULL),
snd_out_(NULL), processor_(processor),
input_frames_left_(in_info.frames) {
@@ -258,6 +303,10 @@ class SndFileHandler :
}
output_buffer_ = new ConversionBuffer(this, out_info);
+ if (fs_->pre_buffer_size() > 0) {
+ buffer_thread_ = new PreBufferThread(output_buffer_,
+ fs->pre_buffer_size());
+ }
}
virtual void SetOutputSoundfile(ConversionBuffer *out_buffer,
@@ -312,7 +361,7 @@ class SndFileHandler :
}
bool HasStarted() { return in_info_.frames != input_frames_left_; }
- virtual bool AcceptProcessor(SoundProcessor *passover_processor) {
+ virtual bool PassoverProcessor(SoundProcessor *passover_processor) {
if (HasStarted()) {
DLogf("Gapless attempt: Cannot bridge gap to already open file %s",
base_stats_.filename.c_str());
@@ -337,6 +386,12 @@ class SndFileHandler :
return true;
}
+ virtual void NotifyPassedProcessorUnreferenced() {
+ if (buffer_thread_ && !buffer_thread_->started()) {
+ buffer_thread_->Start();
+ }
+ }
+
static bool ExtractDirAndSuffix(const std::string &filename,
std::string *dir, std::string *suffix) {
const std::string::size_type slash_pos = filename.find_last_of('/');
@@ -381,29 +436,29 @@ class SndFileHandler :
&& fs_->ListDirectory(fs_dir, file_suffix, &dirset)
&& (found = dirset.upper_bound(base_stats_.filename)) != dirset.end()
&& (next_file = fs_->GetOrCreateHandler(found->c_str()))
- && next_file->AcceptProcessor(processor_));
+ && next_file->PassoverProcessor(processor_));
if (passed_processor) {
DLogf("Processor %p: Gapless pass-on from "
"'%s' to alphabetically next '%s'", processor_,
base_stats_.filename.c_str(), found->c_str());
}
- stats_mutex_.Lock();
processor_->WriteProcessed(snd_out_, r);
- stats_mutex_.Unlock();
if (passed_processor) {
base_stats_.out_gapless = true;
SaveOutputValues();
processor_ = NULL; // we handed over ownership.
+ next_file->NotifyPassedProcessorUnreferenced();
}
if (next_file) fs_->Close(found->c_str(), next_file);
} else {
- stats_mutex_.Lock();
processor_->WriteProcessed(snd_out_, r);
- stats_mutex_.Unlock();
}
if (input_frames_left_ == 0) {
Close();
}
+ if (buffer_thread_ && !buffer_thread_->started()) {
+ buffer_thread_->Start();
+ }
return input_frames_left_;
}
@@ -501,6 +556,7 @@ class SndFileHandler :
if (snd_in_) sf_close(snd_in_);
if (snd_out_) sf_close(snd_out_);
snd_out_ = NULL;
+ if (buffer_thread_) buffer_thread_->StopRunning();
close(filedes_);
}
@@ -529,6 +585,7 @@ class SndFileHandler :
bool error_;
bool copy_flac_header_verbatim_;
ConversionBuffer *output_buffer_;
+ PreBufferThread *buffer_thread_;
SNDFILE *snd_out_;
// Used in conversion.
@@ -538,7 +595,7 @@ class SndFileHandler :
} // namespace
FolveFilesystem::FolveFilesystem()
- : gapless_processing_(false),
+ : gapless_processing_(false), pre_buffer_size_(-1),
open_file_cache_(4), processor_pool_(3),
total_file_openings_(0), total_file_reopen_(0) {
}
View
@@ -91,6 +91,10 @@ class FolveFilesystem {
void set_gapless_processing(bool b) { gapless_processing_ = b; }
bool gapless_processing() const { return gapless_processing_; }
+ // Should we attempt to pre-buffer files ?
+ void set_pre_buffer_size(int b) { pre_buffer_size_ = b; }
+ int pre_buffer_size() const { return pre_buffer_size_; }
+
// Some stats.
int total_file_openings() { return total_file_openings_; }
int total_file_reopen() { return total_file_reopen_; }
@@ -119,6 +123,7 @@ class FolveFilesystem {
std::string current_config_subdir_;
bool gapless_processing_;
+ int pre_buffer_size_;
FileHandlerCache open_file_cache_;
ProcessorPool processor_pool_;
int total_file_openings_;
Oops, something went wrong.

0 comments on commit fea3d55

Please sign in to comment.