Skip to content

Commit

Permalink
Threading cleanup in MainServer. See also [64d5136].
Browse files Browse the repository at this point in the history
There were a couple problems here, first the threads were converted from detached to attached but no provision was made to join the threads. The other problem was that at some point before the port we started queueing deletes but again made no provision for actually deleting files if someone decided to shut down the backend before the possibly long list of deletes was processed (we sleep for some time before processing each delete.)
  • Loading branch information
daniel-kristjansson committed Mar 27, 2011
1 parent d144c22 commit fd7a077
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 70 deletions.
195 changes: 145 additions & 50 deletions mythtv/programs/mythbackend/mainserver.cpp
Expand Up @@ -117,7 +117,92 @@ int delete_file_immediately(const QString &filename,

};

DeleteStruct::~DeleteStruct()
{
if (fd >= 0)
{
::close(fd);
fd = -1;
}
if (pg)
{
delete pg;
pg = NULL;
}
ms = NULL;
}

void DeleteStruct::run(void)
{
ms->DoDeleteThread(this);
}

TruncateThread::~TruncateThread()
{
{
QMutexLocker locker(&MainServer::run_truncate_lock);
MainServer::run_truncate = false;
MainServer::run_truncate_wait.wakeAll();
}
m_run = false;
{
QMutexLocker locker(&m_lock);
m_wait.wakeAll();
}
wait();
QMutexLocker locker(&m_lock);
while (!m_list.empty())
{
DeleteStruct *ds = m_list.front();
m_list.pop_front();
delete ds;
}
m_parent = NULL;
}

void TruncateThread::AddDelete(DeleteStruct *ds)
{
if (ds)
{
QMutexLocker locker(&m_lock);
m_list.push_back(ds);
m_wait.wakeAll();
}
}

void TruncateThread::run(void)
{
QMutexLocker locker(&m_lock);
while (m_run)
{
if (!m_list.empty())
{
DeleteStruct *ds = m_list.front();
m_list.pop_front();
locker.unlock();
if (gCoreContext->GetNumSetting("TruncateDeletesSlowly", 0))
{
MainServer::TruncateAndClose(
ds->pg, ds->fd, ds->filename, ds->size);
}
else
{
close(ds->fd);
}
ds->fd = -1;
delete ds;
locker.relock();
}
if (m_run && m_list.empty())
m_wait.wait(locker.mutex());
}
}

QMutex MainServer::truncate_and_close_lock;
QMutex MainServer::run_truncate_lock;
bool MainServer::run_truncate = true;
QWaitCondition MainServer::run_truncate_wait;

const uint MainServer::kMasterServerReconnectTimeout = 1000; //ms

class ProcessRequestThread : public QThread
Expand Down Expand Up @@ -179,10 +264,14 @@ MainServer::MainServer(bool master, int port,
QMap<int, EncoderLink *> *tvList,
Scheduler *sched, AutoExpire *expirer) :
encoderList(tvList), mythserver(NULL), masterServerReconnect(NULL),
masterServer(NULL), ismaster(master), masterBackendOverride(false),
masterServer(NULL), ismaster(master), doDeleteSleeps(true),
deleteThreadPool(new QThreadPool()),
truncateThread(new TruncateThread(this)), masterBackendOverride(false),
m_sched(sched), m_expirer(expirer), deferredDeleteTimer(NULL),
autoexpireUpdateTimer(NULL), m_exitCode(GENERIC_EXIT_OK)
{
deleteThreadPool->setMaxThreadCount(1);

PreviewGeneratorQueue::CreatePreviewGeneratorQueue(
PreviewGenerator::kLocalAndRemote, ~0, 0);
PreviewGeneratorQueue::AddListener(this);
Expand Down Expand Up @@ -242,6 +331,38 @@ MainServer::MainServer(bool master, int port,

MainServer::~MainServer()
{
if (mythserver)
mythserver->disconnect();

// tell truncate runs not to waste time truncating,
// we'll just delete in the dtor
{
QMutexLocker locker(&MainServer::run_truncate_lock);
MainServer::run_truncate = false;
MainServer::run_truncate_wait.wakeAll();
}

// tell delete runs not to waste time sleeping...
{
QMutexLocker locker(&deletelock);
doDeleteSleeps = false;
deleteWait.wakeAll();
}

if (deleteThreadPool)
{
// wait for delete runs to finish...
deleteThreadPool->waitForDone();
delete deleteThreadPool;
deleteThreadPool = NULL;
}

if (truncateThread)
{
delete truncateThread;
truncateThread = NULL;
}

PreviewGeneratorQueue::RemoveListener(this);
PreviewGeneratorQueue::TeardownPreviewGeneratorQueue();

Expand Down Expand Up @@ -1756,26 +1877,13 @@ void MainServer::HandleFillProgramInfo(QStringList &slist, PlaybackSock *pbs)
SendResponse(pbssock, strlist);
}

void DeleteThread::run(void)
{
if (!m_parent)
return;

MainServer *ms = m_parent->ms;
ms->DoDeleteThread(m_parent);

delete m_parent;
this->deleteLater();
}

void MainServer::DoDeleteThread(const DeleteStruct *ds)
{
// sleep a little to let frontends reload the recordings list
// after deleting a recording, then we can hammer the DB and filesystem
sleep(3);
usleep(rand()%2000);

deletelock.lock();
if (doDeleteSleeps)
deleteWait.wait(&deletelock, 2000 + rand()%2000);

QString logInfo = QString("chanid %1 at %2")
.arg(ds->chanid).arg(ds->recstartts.toString());
Expand Down Expand Up @@ -1866,7 +1974,8 @@ void MainServer::DoDeleteThread(const DeleteStruct *ds)
else
{
delete_file_immediately(ds->filename, followLinks, false);
sleep(2);
if (doDeleteSleeps)
deleteWait.wait(&deletelock, 2000);
if (checkFile.exists())
errmsg = true;
}
Expand Down Expand Up @@ -1914,7 +2023,15 @@ void MainServer::DoDeleteThread(const DeleteStruct *ds)
deletelock.unlock();

if (slowDeletes && fd >= 0)
TruncateAndClose(&pginfo, fd, ds->filename, size);
{
DeleteStruct *nds = new DeleteStruct;
nds->ms = ds->ms;
nds->pg = new ProgramInfo(pginfo);
nds->filename = ds->filename;
nds->fd = fd;
nds->size = ds->size;
truncateThread->AddDelete(nds);
}
}

void MainServer::DeleteRecordedFiles(const DeleteStruct *ds)
Expand Down Expand Up @@ -2151,6 +2268,10 @@ bool MainServer::TruncateAndClose(ProgramInfo *pginfo, int fd,
{
QMutexLocker locker(&truncate_and_close_lock);

QMutexLocker locker2(&run_truncate_lock);
if (!run_truncate)
return 0 == close(fd);

if (pginfo)
{
pginfo->SetPathname(filename);
Expand Down Expand Up @@ -2178,8 +2299,9 @@ bool MainServer::TruncateAndClose(ProgramInfo *pginfo, int fd,
.arg(sleep_time));

int count = 0;
while (fsize > 0)
while (run_truncate && (fsize > 0))
{
locker2.unlock();
//VERBOSE(VB_FILE, QString("Truncating '%1' to %2 MB")
// .arg(filename).arg(fsize / (1024.0 * 1024.0), 0, 'f', 2));

Expand All @@ -2200,7 +2322,8 @@ bool MainServer::TruncateAndClose(ProgramInfo *pginfo, int fd,

count++;

usleep(sleep_time * 1000);
locker2.relock();
run_truncate_wait.wait(locker2.mutex(), sleep_time);
}

bool ok = (0 == close(fd));
Expand Down Expand Up @@ -2523,9 +2646,7 @@ void MainServer::DoHandleDeleteRecording(

recinfo.SaveDeletePendingFlag(true);

DeleteThread *deleteThread = new DeleteThread;
deleteThread->SetParent(ds);
deleteThread->start();
deleteThreadPool->start(ds);
}
else
{
Expand Down Expand Up @@ -4482,29 +4603,6 @@ void MainServer::GetFilesystemInfos(vector <FileSystemInfo> &fsInfos)
}
}

void TruncateThread::run(void)
{
if (!m_parent)
return;

MainServer *ms = m_parent->ms;
ms->DoTruncateThread(m_parent);

delete m_parent;
this->deleteLater();
}

void MainServer::DoTruncateThread(const DeleteStruct *ds)
{
if (gCoreContext->GetNumSetting("TruncateDeletesSlowly", 0))
TruncateAndClose(NULL, ds->fd, ds->filename, ds->size);
else
{
QMutexLocker dl(&deletelock);
close(ds->fd);
}
}

bool MainServer::HandleDeleteFile(QStringList &slist, PlaybackSock *pbs)
{
return HandleDeleteFile(slist[1], slist[2], pbs);
Expand Down Expand Up @@ -4582,10 +4680,7 @@ bool MainServer::HandleDeleteFile(QString filename, QString storagegroup,
ds->filename = fullfile;
ds->fd = fd;
ds->size = size;

TruncateThread *truncateThread = new TruncateThread;
truncateThread->SetParent(ds);
truncateThread->run();
truncateThread->AddDelete(ds);
}

return true;
Expand Down
51 changes: 31 additions & 20 deletions mythtv/programs/mythbackend/mainserver.h
Expand Up @@ -2,6 +2,8 @@
#define MAINSERVER_H_

#include <QReadWriteLock>
#include <QThreadPool>
#include <QRunnable>
#include <QEvent>
#include <QMutex>
#include <QHash>
Expand Down Expand Up @@ -31,9 +33,18 @@ class MythServer;
class VideoScanner;
class QTimer;

typedef struct deletestruct
class DeleteStruct : public QRunnable
{
public:
DeleteStruct() :
ms(NULL), pg(NULL),
chanid(0), fd(-1), size(0), forceMetadataDelete(true) { }
virtual ~DeleteStruct();

void run(void);

MainServer *ms;
ProgramInfo *pg;
uint chanid;
QDateTime recstartts;
QDateTime recendts;
Expand All @@ -42,35 +53,29 @@ typedef struct deletestruct
off_t size;
QString title;
bool forceMetadataDelete;
} DeleteStruct;

class DeleteThread : public QThread
{
Q_OBJECT
public:
DeleteThread() : m_parent(NULL) {}
void SetParent(DeleteStruct *parent) { m_parent = parent; }
void run(void);
private:
DeleteStruct *m_parent;
};

class TruncateThread : public QThread
{
Q_OBJECT
public:
TruncateThread() : m_parent(NULL) {}
void SetParent(DeleteStruct *parent) { m_parent = parent; }
void run(void);
TruncateThread(MainServer *p) : m_parent(p) { start(); }
virtual ~TruncateThread();
void AddDelete(DeleteStruct*);
virtual void run(void);
private:
DeleteStruct *m_parent;
MainServer *m_parent;
volatile bool m_run;
QMutex m_lock;
QWaitCondition m_wait;
QList<DeleteStruct*> m_list;
};

class MainServer : public QObject, public MythSocketCBs
{
Q_OBJECT

friend class DeleteThread;
friend class DeleteStruct;
friend class TruncateThread;
public:
MainServer(bool master, int port,
Expand Down Expand Up @@ -210,9 +215,7 @@ class MainServer : public QObject, public MythSocketCBs

int GetfsID(vector<FileSystemInfo>::iterator fsInfo);

static void *SpawnTruncateThread(void *param);
void DoTruncateThread(const DeleteStruct *ds);
static void *SpawnDeleteThread(void *param);
void DoDeleteThread(const DeleteStruct *ds);
void DeleteRecordedFiles(const DeleteStruct *ds);
void DoDeleteInDB(const DeleteStruct *ds);
Expand Down Expand Up @@ -249,7 +252,12 @@ class MainServer : public QObject, public MythSocketCBs

bool ismaster;

QMutex deletelock;
QMutex deletelock;
bool doDeleteSleeps;
QWaitCondition deleteWait;
QThreadPool *deleteThreadPool;
TruncateThread *truncateThread;

QMutex threadPoolLock;
QWaitCondition threadPoolCond;
MythDeque<ProcessRequestThread *> threadPool;
Expand All @@ -271,6 +279,9 @@ class MainServer : public QObject, public MythSocketCBs

QTimer *autoexpireUpdateTimer; // audited ref #5318
static QMutex truncate_and_close_lock;
static QMutex run_truncate_lock;
static bool run_truncate;
static QWaitCondition run_truncate_wait;

QMap<QString, int> fsIDcache;
QMutex fsIDcacheLock;
Expand Down

0 comments on commit fd7a077

Please sign in to comment.