Skip to content

Commit

Permalink
Merge branch 'diffbot-testing' of github.com:gigablast/open-source-se…
Browse files Browse the repository at this point in the history
…arch-engine into diffbot-testing

Conflicts:
	Errno.cpp
	Errno.h
  • Loading branch information
gigablast committed Jan 11, 2016
2 parents 33a4048 + 422ffae commit 032f597
Show file tree
Hide file tree
Showing 91 changed files with 7,648 additions and 832 deletions.
118 changes: 115 additions & 3 deletions BigFile.cpp
Expand Up @@ -33,7 +33,7 @@ BigFile::~BigFile () {
//#define O_DIRECT 040000

BigFile::BigFile () {
m_permissions = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH ;
//m_permissions = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH ;
m_flags = O_RDWR ; // | O_DIRECT;
m_usePartFiles = true;
// NULLify all ptrs to files
Expand Down Expand Up @@ -289,7 +289,7 @@ bool BigFile::open ( int flags ,

m_flags = flags;
//m_pc = pc;
m_permissions = permissions;
//m_permissions = permissions;
m_isClosing = false;
// this is true except when parsing big warc files
m_usePartFiles = true;//usePartFiles;
Expand Down Expand Up @@ -363,7 +363,7 @@ int BigFile::getfd ( int32_t n , bool forReading ) { // , int64_t *vfd ) {
}
// open it if not opened
if ( ! f->calledOpen() ) {
if ( ! f->open ( m_flags , m_permissions ) ) {
if ( ! f->open ( m_flags , getFileCreationFlags() ) ) {
log("disk: Failed to open file part #%"INT32".",n);
return -1;
}
Expand Down Expand Up @@ -1481,6 +1481,15 @@ bool BigFile::chopHead ( int32_t part ,
return unlinkRename ( NULL, part, true, callback, state );
}

class UnlinkRenameState {
public:
char m_oldFilename [ 1024 ];
char m_newFilename [ 1024 ];
int m_fd;
File *m_file;
collnum_t m_collnum;
};

static void *renameWrapper_r ( void *state , ThreadEntry *t ) ;
static void *unlinkWrapper_r ( void *state , ThreadEntry *t ) ;
static void doneRenameWrapper ( void *state , ThreadEntry *t ) ;
Expand Down Expand Up @@ -1604,6 +1613,38 @@ bool BigFile::unlinkRename ( // non-NULL for renames, NULL for unlinks
// save callback for when all parts are unlinked or renamed
m_callback = callback;
m_state = state;

#ifdef FIXBUG
// now use a special state in case RdbBase gets nuked
// because the collection gets deleted in the middle of this
UnlinkRenameState stackUr;
char *st =(char *)mmalloc( sizeof(UnlinkRenameState),"ulrnst");
UnlinkRenameState *urs = (UnlinkRenameState *)st;
if ( ! ur ) {
log("disk: failed to alloc unlinkrename state. "
"skipping thread.");
ur = stackUr;
}
urs->m_fd = m_fd;
urs->m_collnum = collnum; // can we supply this now?
urs->m_file = this;
urs->m_closedIt = false;
makeFilename_r ( m_baseFilename.getBufStart() ,
NULL ,
i ,
urs->m_oldFilename ,
1024 );
// rename also takes the new name
if ( ! m_isUnlink )
makeFilename_r ( m_newBaseFilename.getBufStart() ,
m_newBaseFilenameDir.getBufStart(),
i ,
urs->m_newFilename ,
1024 );
if ( ur == stackUr )
goto skipThread;
#endif

// . we spawn the thread here now
// . returns true on successful spawning
// . we can't make a disk thread cuz Threads.cpp checks its
Expand Down Expand Up @@ -1668,6 +1709,30 @@ bool BigFile::unlinkRename ( // non-NULL for renames, NULL for unlinks
}

void *renameWrapper_r ( void *state , ThreadEntry *t ) {

#ifdef FIXBUG
UnlinkRenameState *urs = (UnlinkRenameState *)state;
if ( ::rename ( urs->m_oldFilename , urs->m_newFilename ) ) {
// reset errno and return true if file does not exist
if ( errno == ENOENT ) {
log("disk: file %s does not exist.",oldFilename);
errno = 0;
}
// otherwise, it's a more serious error i guess
else log("disk: rename %s to %s: %s",
oldFilename,newFilename,mstrerror(errno));
return NULL;
}
// we must close the file descriptor in the thread otherwise the
// file will not actually be renamed in this thread
//f->close1_r();
// we can't call f->close1_r() because f might have been deleted
// because the collection was deleted.
if ( close1ByFd_r( urs->m_fd) )
urs->m_closedIt = true;
return;
#endif

// extract our class
File *f = (File *)state;
// . by getting the inode in the cache space the call to f->close()
Expand Down Expand Up @@ -1721,6 +1786,16 @@ void *renameWrapper_r ( void *state , ThreadEntry *t ) {
}

void *unlinkWrapper_r ( void *state , ThreadEntry *t ) {
#ifdef FIXBUG
UnlinkRenameState *urs = (UnlinkRenameState *)state;
::unlink ( urs->m_oldFilename );
// we can't call f->close1_r() because f might have been deleted
// because the collection was deleted.
if ( close1ByFd_r( urs->m_fd) )
urs->m_closedIt = true;
return;
#endif

// get ourselves
File *f = (File *)state;
// . by getting the inode in the cache space the call to delete(f)
Expand All @@ -1742,6 +1817,25 @@ void *unlinkWrapper_r ( void *state , ThreadEntry *t ) {
}

void doneRenameWrapper ( void *state , ThreadEntry *t ) {

#ifdef FIXBUG
// if collection got nuked, then file will be invalid
// so when we nuke a collection we scan all threads for unlink/rename
// operations that reference files from the collection being nuked and
// set their m_collectionGotNuked flag to true
UnlinkRenameState *urs = (UnlinkRenameState *)state;
File *f = urs->m_file;
collnum_t cn = urs->m_collnum;
RdbBase *base = getRdbBase ( cn );
mfree ( urs , sizeof(UrlRenameState), "urnst" );
if ( ! base ) { // urs->m_collectionGotNuked ) {
log("bigfile: captured rename on nuked collection %i",(int)cn);
g_unlinkRenameThreads--;
return;
}

#endif

// extract our class
File *f = (File *)state;
// . finish the close
Expand Down Expand Up @@ -1795,6 +1889,24 @@ void doneRenameWrapper ( void *state , ThreadEntry *t ) {
}

void doneUnlinkWrapper ( void *state , ThreadEntry *t ) {

#ifdef FIXBUG
// if collection got nuked, then file will be invalid
// so when we nuke a collection we scan all threads for unlink/rename
// operations that reference files from the collection being nuked and
// set their m_collectionGotNuked flag to true
UnlinkRenameState *urs = (UnlinkRenameState *)state;
File *f = urs->m_file;
collnum_t cn = urs->m_collnum;
RdbBase *base = getRdbBase ( cn );
mfree ( urs , sizeof(UrlRenameState), "urnst" );
if ( ! base ) { // urs->m_collectionGotNuked ) {
log("bigfile: captured unlink on nuked collection %i",(int)cn);
g_unlinkRenameThreads--;
return;
}
#endif

// extract our class
File *f = (File *)state;
// finish the close
Expand Down
2 changes: 1 addition & 1 deletion BigFile.h
Expand Up @@ -353,7 +353,7 @@ class BigFile {
SafeBuf m_newBaseFilenameDir ;//[256];


int32_t m_permissions;
//int32_t m_permissions;
int32_t m_flags;

// determined in open() override
Expand Down
68 changes: 57 additions & 11 deletions Collectiondb.cpp
Expand Up @@ -333,6 +333,9 @@ bool Collectiondb::addExistingColl ( char *coll, collnum_t collnum ) {
if ( cr->m_isCustomCrawl ) {
cr->m_getLinkInfo = false;
cr->m_computeSiteNumInlinks = false;
// limit each shard to 5 spiders per collection to prevent
// ppl from spidering the web and hogging up resources
cr->m_maxNumSpiders = 5;
}

// we need to compile the regular expressions or update the url
Expand Down Expand Up @@ -633,10 +636,11 @@ bool Collectiondb::addNewColl ( char *coll ,

// MDW: create the new directory
retry22:
if ( ::mkdir ( dname ,
S_IRUSR | S_IWUSR | S_IXUSR |
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IXOTH ) ) {
if ( ::mkdir ( dname ,
getDirCreationFlags() ) ) {
// S_IRUSR | S_IWUSR | S_IXUSR |
// S_IRGRP | S_IWGRP | S_IXGRP |
// S_IROTH | S_IXOTH ) ) {
// valgrind?
if ( errno == EINTR ) goto retry22;
g_errno = errno;
Expand Down Expand Up @@ -1401,10 +1405,11 @@ bool Collectiondb::resetColl2( collnum_t oldCollnum,
log("admin: Trying to create collection %s but "
"directory %s already exists on disk.",cr->m_coll,dname);
}
if ( ::mkdir ( dname ,
S_IRUSR | S_IWUSR | S_IXUSR |
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IXOTH ) ) {
if ( ::mkdir ( dname ,
getDirCreationFlags() ) ) {
// S_IRUSR | S_IWUSR | S_IXUSR |
// S_IRGRP | S_IWGRP | S_IXGRP |
// S_IROTH | S_IXOTH ) ) {
// valgrind?
//if ( errno == EINTR ) goto retry22;
//g_errno = errno;
Expand Down Expand Up @@ -1971,6 +1976,29 @@ bool CollectionRec::load ( char *coll , int32_t i ) {
// it is binary now
gbmemcpy ( &m_localCrawlInfo , sb.getBufStart(),sb.length() );

// if it had corrupted data from saving corrupted mem zero it out
CrawlInfo *stats = &m_localCrawlInfo;
// point to the stats for that host
int64_t *ss = (int64_t *)stats;
// are stats crazy?
bool crazy = false;
for ( int32_t j = 0 ; j < NUMCRAWLSTATS ; j++ ) {
// crazy stat?
if ( *ss > 1000000000LL ||
*ss < -1000000000LL ) {
crazy = true;
break;
}
ss++;
}
if ( m_localCrawlInfo.m_collnum != m_collnum )
crazy = true;
if ( crazy ) {
log("coll: had crazy spider stats for coll %s. zeroing out.",
m_coll);
m_localCrawlInfo.reset();
}


if ( ! g_conf.m_doingCommandLine && ! g_collectiondb.m_initializing )
log("coll: Loaded %s (%"INT32") local hasurlsready=%"INT32"",
Expand Down Expand Up @@ -3787,12 +3815,30 @@ bool CollectionRec::rebuildUrlFiltersDiffbot() {
i++;
}

// don't bother re-spidering old pages if hopcount == maxhopcount
// and only process new urls is true. because we don't need to
// harvest outlinks from them.
if ( m_diffbotOnlyProcessIfNewUrl && m_diffbotMaxHops > 0 &&
// only crawls, not bulk jobs
m_isCustomCrawl == 1 ) {
m_regExs[i].purge();
m_regExs[i].safePrintf("isindexed && hopcount==%"INT32,
m_diffbotMaxHops );
m_spiderPriorities [i] = 14;
m_spiderFreqs [i] = 0.0;
m_maxSpidersPerRule [i] = 0; // turn off spiders
m_harvestLinks [i] = false;
i++;
}

// diffbot needs to retry even on 500 or 404 errors since sometimes
// a seed url gets a 500 error mistakenly and it haults the crawl.
// so take out "!hastmperror".

m_regExs[i].set("errorcount>=1 && !hastmperror");
m_spiderPriorities [i] = 15;
m_spiderFreqs [i] = 0.0;
m_maxSpidersPerRule [i] = 0; // turn off spiders if not tmp error
m_spiderPriorities [i] = 14;
m_spiderFreqs [i] = 0.0416; // every hour
//m_maxSpidersPerRule [i] = 0; // turn off spiders if not tmp error
i++;

// and for docs that have errors respider once every 5 hours
Expand Down
1 change: 1 addition & 0 deletions Collectiondb.h
Expand Up @@ -494,6 +494,7 @@ class CollectionRec {
char m_useSimplifiedRedirects ;
char m_useIfModifiedSince ;
char m_useTimeAxis ;
char m_indexWarcs;
char m_buildVecFromCont ;
int32_t m_maxPercentSimilarPublishDate;
char m_useSimilarityPublishDate;
Expand Down
19 changes: 19 additions & 0 deletions Conf.cpp
Expand Up @@ -9,6 +9,25 @@

Conf g_conf;

static bool s_setUmask = false;;

mode_t getFileCreationFlags() {
if ( ! s_setUmask ) {
s_setUmask = true;
umask ( 0 );
}
return S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH ;
}

mode_t getDirCreationFlags() {
if ( ! s_setUmask ) {
s_setUmask = true;
umask ( 0 );
}
return S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH |
S_IXUSR | S_IXGRP;
}

Conf::Conf ( ) {
m_save = true;
m_doingCommandLine = false;
Expand Down
7 changes: 6 additions & 1 deletion Conf.h
Expand Up @@ -43,6 +43,9 @@

#define MAX_GEOCODERS 4

mode_t getFileCreationFlags();
mode_t getDirCreationFlags ();

class Conf {

public:
Expand Down Expand Up @@ -180,7 +183,9 @@ class Conf {
//bool m_tagdbUseSeals;
//int32_t m_tagdbMinFilesToMerge;
//bool m_tagdbSaveCache;


//bool m_makeAllFilesGroupWritable;

// catdb parameters
int32_t m_catdbMaxTreeMem;
//int32_t m_catdbMaxDiskPageCacheMem;
Expand Down
4 changes: 3 additions & 1 deletion Dns.cpp
Expand Up @@ -2470,7 +2470,8 @@ Host *Dns::getResponsibleHost ( key_t key ) {
// get the hostNum that should handle this
int32_t hostId = key.n1 % hostdb->getNumHosts();
// return it if it is alive
if ( ! hostdb->isDead ( hostId ) ) return hostdb->getHost ( hostId );
Host* h = hostdb->getHost ( hostId );
if ( h->m_spiderEnabled && ! hostdb->isDead ( hostId ) ) return h;
// how many are up?
int32_t numAlive = hostdb->getNumHostsAlive();
// NULL if none
Expand All @@ -2482,6 +2483,7 @@ Host *Dns::getResponsibleHost ( key_t key ) {
for ( int32_t i = 0 ; i < hostdb->m_numHosts ; i++ ) {
// get the ith host
Host *host = &hostdb->m_hosts[i];
if ( !host->m_spiderEnabled ) continue;
// skip him if he is dead
if ( hostdb->isDead ( host ) ) continue;
// count it if alive, continue if not our number
Expand Down
1 change: 1 addition & 0 deletions Errno.cpp
Expand Up @@ -196,6 +196,7 @@ case EDNSERROR : return "DNS lookup error";
case ETHREADSDISABLED:return "Threads Disabled";
case EMALFORMEDQUERY: return "Malformed query";
case ESHARDDOWN: return "One or more shards are down";
case EDOCWARC: return "Doc is WARC or ARC and support is disabled";
case EDIFFBOTREQUESTTIMEDOUTTHIRDPARTY: return "Diffbot request of third-party content timed out";
}
// if the remote error bit is clear it must be a regulare errno
Expand Down
2 changes: 2 additions & 0 deletions Errno.h
Expand Up @@ -201,6 +201,8 @@ enum {
ETHREADSDISABLED,
EMALFORMEDQUERY,
ESHARDDOWN,
EDOCWARC,
EWRONGSHARD,
EDIFFBOTREQUESTTIMEDOUTTHIRDPARTY
};
#endif

0 comments on commit 032f597

Please sign in to comment.