Skip to content

Commit

Permalink
Introduced class SharedMemory - base class for everything, using IPC.…
Browse files Browse the repository at this point in the history
… Related to CORE-3035.
  • Loading branch information
AlexPeshkoff committed Jun 25, 2010
1 parent eabfade commit 6cbefa1
Show file tree
Hide file tree
Showing 23 changed files with 1,007 additions and 1,104 deletions.
120 changes: 48 additions & 72 deletions src/jrd/DatabaseSnapshot.cpp
Expand Up @@ -80,14 +80,14 @@ DatabaseSnapshot::SharedData::SharedData(const Database* dbb)
name.printf(MONITOR_FILE, dbb->getUniqueFileId().c_str());

Arg::StatusVector statusVector;
base = (Header*) ISC_map_file(statusVector, name.c_str(), init, this, DEFAULT_SIZE, &handle);
if (!base)
mapFile(statusVector, name.c_str(), DEFAULT_SIZE);
if (!sh_mem_header)
{
iscLogStatus("Cannot initialize the shared memory region", statusVector.value());
status_exception::raise(statusVector);
}

fb_assert(base->version == MONITOR_VERSION);
fb_assert(sh_mem_header->mhb_version == MONITOR_VERSION);
}


Expand All @@ -97,27 +97,25 @@ DatabaseSnapshot::SharedData::~SharedData()
DumpGuard guard(this);
cleanup();

if (base->used == sizeof(Header))
ISC_remove_map_file(&handle);
if (sh_mem_header->used == sizeof(Header))
removeMapFile();
}

ISC_mutex_fini(mutex);

Arg::StatusVector statusVector;
ISC_unmap_file(statusVector, &handle);
unmapFile(statusVector);
}


void DatabaseSnapshot::SharedData::acquire()
{
checkMutex("lock", ISC_mutex_lock(mutex));
mutexLock();

if (base->allocated > handle.sh_mem_length_mapped)
if (sh_mem_header->allocated > sh_mem_length_mapped)
{
#if (defined HAVE_MMAP || defined WIN_NT)
#ifdef HAVE_OBJECT_MAP
Arg::StatusVector statusVector;
base = (Header*) ISC_remap_file(statusVector, &handle, base->allocated, false);
if (!base)
remapFile(statusVector, sh_mem_header->allocated, false);
if (!remapFile(statusVector, sh_mem_header->allocated, false))
{
status_exception::raise(statusVector);
}
Expand All @@ -130,7 +128,7 @@ void DatabaseSnapshot::SharedData::acquire()

void DatabaseSnapshot::SharedData::release()
{
checkMutex("unlock", ISC_mutex_unlock(mutex));
mutexUnlock();
}


Expand All @@ -145,9 +143,9 @@ UCHAR* DatabaseSnapshot::SharedData::read(MemoryPool& pool, ULONG& resultSize)
// and copy the data there, starting with our own dbb.

// First pass
for (ULONG offset = alignOffset(sizeof(Header)); offset < base->used;)
for (ULONG offset = alignOffset(sizeof(Header)); offset < sh_mem_header->used;)
{
UCHAR* const ptr = (UCHAR*) base + offset;
UCHAR* const ptr = (UCHAR*) sh_mem_header + offset;
const Element* const element = (Element*) ptr;
const ULONG length = alignOffset(sizeof(Element) + element->length);

Expand All @@ -163,9 +161,9 @@ UCHAR* DatabaseSnapshot::SharedData::read(MemoryPool& pool, ULONG& resultSize)
}
else
{
fb_assert(base->used >= offset + length);
memmove(ptr, ptr + length, base->used - offset - length);
base->used -= length;
fb_assert(sh_mem_header->used >= offset + length);
memmove(ptr, ptr + length, sh_mem_header->used - offset - length);
sh_mem_header->used -= length;
}
}

Expand All @@ -175,14 +173,14 @@ UCHAR* DatabaseSnapshot::SharedData::read(MemoryPool& pool, ULONG& resultSize)

fb_assert(self_dbb_offset);

UCHAR* const ptr = (UCHAR*) base + self_dbb_offset;
UCHAR* const ptr = (UCHAR*) sh_mem_header + self_dbb_offset;
const Element* const element = (Element*) ptr;
memcpy(bufferPtr, ptr + sizeof(Element), element->length);
bufferPtr += element->length;

for (ULONG offset = alignOffset(sizeof(Header)); offset < base->used;)
for (ULONG offset = alignOffset(sizeof(Header)); offset < sh_mem_header->used;)
{
UCHAR* const ptr = (UCHAR*) base + offset;
UCHAR* const ptr = (UCHAR*) sh_mem_header + offset;
const Element* const element = (Element*) ptr;
const ULONG length = alignOffset(sizeof(Element) + element->length);

Expand All @@ -205,13 +203,13 @@ ULONG DatabaseSnapshot::SharedData::setup()
ensureSpace(sizeof(Element));

// Put an up-to-date element at the tail
const ULONG offset = base->used;
UCHAR* const ptr = (UCHAR*) base + offset;
const ULONG offset = sh_mem_header->used;
UCHAR* const ptr = (UCHAR*) sh_mem_header + offset;
Element* const element = (Element*) ptr;
element->processId = process_id;
element->localId = local_id;
element->length = 0;
base->used += alignOffset(sizeof(Element));
sh_mem_header->used += alignOffset(sizeof(Element));
return offset;
}

Expand All @@ -221,30 +219,30 @@ void DatabaseSnapshot::SharedData::write(ULONG offset, ULONG length, const void*
ensureSpace(length);

// Put an up-to-date element at the tail
UCHAR* const ptr = (UCHAR*) base + offset;
UCHAR* const ptr = (UCHAR*) sh_mem_header + offset;
Element* const element = (Element*) ptr;
memcpy(ptr + sizeof(Element) + element->length, buffer, length);
ULONG previous = alignOffset(sizeof(Element) + element->length);
element->length += length;
ULONG current = alignOffset(sizeof(Element) + element->length);
base->used += (current - previous);
sh_mem_header->used += (current - previous);
}


void DatabaseSnapshot::SharedData::cleanup()
{
// Remove information about our dbb
for (ULONG offset = alignOffset(sizeof(Header)); offset < base->used;)
for (ULONG offset = alignOffset(sizeof(Header)); offset < sh_mem_header->used;)
{
UCHAR* const ptr = (UCHAR*) base + offset;
UCHAR* const ptr = (UCHAR*) sh_mem_header + offset;
const Element* const element = (Element*) ptr;
const ULONG length = alignOffset(sizeof(Element) + element->length);

if (element->processId == process_id && element->localId == local_id)
{
fb_assert(base->used >= offset + length);
memmove(ptr, ptr + length, base->used - offset - length);
base->used -= length;
fb_assert(sh_mem_header->used >= offset + length);
memmove(ptr, ptr + length, sh_mem_header->used - offset - length);
sh_mem_header->used -= length;
}
else
{
Expand All @@ -256,70 +254,48 @@ void DatabaseSnapshot::SharedData::cleanup()

void DatabaseSnapshot::SharedData::ensureSpace(ULONG length)
{
ULONG newSize = base->used + length;
ULONG newSize = sh_mem_header->used + length;

if (newSize > base->allocated)
if (newSize > sh_mem_header->allocated)
{
newSize = FB_ALIGN(newSize, DEFAULT_SIZE);

#if (defined HAVE_MMAP || defined WIN_NT)
#ifdef HAVE_OBJECT_MAP
Arg::StatusVector statusVector;
base = (Header*) ISC_remap_file(statusVector, &handle, newSize, true);
if (!base)
if (!remapFile(statusVector, newSize, true))
{
status_exception::raise(statusVector);
}
base->allocated = handle.sh_mem_length_mapped;
sh_mem_header->allocated = sh_mem_length_mapped;
#else
status_exception::raise(Arg::Gds(isc_montabexh));
#endif
}
}


void DatabaseSnapshot::SharedData::checkMutex(const TEXT* string, int state)
void DatabaseSnapshot::SharedData::mutexBug(int osErrorCode, const char* string)
{
if (state)
{
TEXT msg[BUFFER_TINY];
gds__log("MONITOR: mutex %s error, status = %d", string, osErrorCode);

sprintf(msg, "MONITOR: mutex %s error, status = %d", string, state);
gds__log(msg);

//fprintf(stderr, "%s\n", msg);
exit(FINI_ERROR);
}
//fprintf(stderr, "%s\n", msg);
exit(FINI_ERROR);
}


void DatabaseSnapshot::SharedData::init(void* arg, sh_mem* shmemData, bool initialize)
bool DatabaseSnapshot::SharedData::initialize(bool initialize)
{
SharedData* const shmem = (SharedData*) arg;
fb_assert(shmem);

#ifdef WIN_NT
checkMutex("init", ISC_mutex_init(&shmem->winMutex, shmemData->sh_mem_name));
shmem->mutex = &shmem->winMutex;
#endif

Header* const header = (Header*) shmemData->sh_mem_address;

if (!initialize)
if (initialize)
{
#ifndef WIN_NT
checkMutex("map", ISC_map_mutex(shmemData, &header->mutex, &shmem->mutex));
#endif
return;
}
// Initialize the shared data header
sh_mem_header->mhb_type = SRAM_DATABASE_SNAPSHOT;
sh_mem_header->mhb_version = MONITOR_VERSION;

// Initialize the shared data header
header->version = MONITOR_VERSION;
header->used = alignOffset(sizeof(Header));
header->allocated = shmemData->sh_mem_length_mapped;
sh_mem_header->used = alignOffset(sizeof(Header));
sh_mem_header->allocated = sh_mem_length_mapped;
}

#ifndef WIN_NT
checkMutex("init", ISC_mutex_init(shmemData, &header->mutex, &shmem->mutex));
#endif
return true;
}


Expand Down
33 changes: 13 additions & 20 deletions src/jrd/DatabaseSnapshot.h
Expand Up @@ -25,6 +25,7 @@

#include "../common/classes/array.h"
#include "../common/classes/init.h"
#include "../jrd/isc_s_proto.h"

namespace Jrd {

Expand Down Expand Up @@ -184,20 +185,19 @@ class DatabaseSnapshot
};

public:
class SharedData
class SnapshotHeader : public MemoryHeader
{
public:
ULONG used;
ULONG allocated;
};

class SharedData : public SharedMemory<SnapshotHeader>
{
static const ULONG MONITOR_VERSION = 3;
static const ULONG DEFAULT_SIZE = 1048576;

struct Header
{
ULONG version;
ULONG used;
ULONG allocated;
#ifndef WIN_NT
struct mtx mutex;
#endif
};
typedef SnapshotHeader Header;

struct Element
{
Expand All @@ -212,6 +212,9 @@ class DatabaseSnapshot
explicit SharedData(const Database*);
~SharedData();

bool initialize(bool);
void mutexBug(int osErrorCode, const char* text);

void acquire();
void release();

Expand All @@ -228,16 +231,6 @@ class DatabaseSnapshot

void ensureSpace(ULONG);

static void checkMutex(const TEXT*, int);
static void init(void*, sh_mem*, bool);

sh_mem handle;
#ifdef WIN_NT
struct mtx winMutex;
#endif
struct mtx* mutex;
Header* base;

const SLONG process_id;
const SLONG local_id;
};
Expand Down
1 change: 0 additions & 1 deletion src/jrd/cch.h
Expand Up @@ -35,7 +35,6 @@
#include "../jrd/que.h"
#include "../jrd/lls.h"
#include "../jrd/pag.h"
#include "../jrd/isc.h"

//#define CCH_DEBUG

Expand Down
1 change: 0 additions & 1 deletion src/jrd/dbt.cpp
Expand Up @@ -28,7 +28,6 @@
// here when the file was removed.
// Most probably only a few of the includes are needed
#include "../jrd/common.h"
#include "../jrd/isc.h"
#include "../jrd/jrd.h"
#include "../jrd/lck.h"
#include "../jrd/ods.h"
Expand Down

0 comments on commit 6cbefa1

Please sign in to comment.