Skip to content

Commit

Permalink
Merge branch 'ia' into testing
Browse files Browse the repository at this point in the history
Conflicts:
	Parms.cpp
	Threads.cpp
  • Loading branch information
gigablast committed Oct 12, 2015
2 parents 08877b6 + d045138 commit c37ab26
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 22 deletions.
18 changes: 17 additions & 1 deletion PageInject.cpp
Expand Up @@ -702,7 +702,23 @@ void handleRequest7 ( UdpSlot *slot , int32_t netnice ) {
InjectionRequest *ir = (InjectionRequest *)slot->m_readBuf;

// now just supply the first guy's char ** and size ptr
deserializeMsg2 ( &ir->ptr_url, &ir->size_url );
if ( ! deserializeMsg2 ( &ir->ptr_url, &ir->size_url ) ) {
log("inject: error deserializing inject request from "
"host ip %s port %i",iptoa(slot->m_ip),(int)slot->m_port);
g_errno = EBADREQUEST;
g_udpServer.sendErrorReply(slot,g_errno);
//g_corruptCount++;
return;
}


if ( ! ir->ptr_url || strncmp(ir->ptr_url,"http",4) != 0 ) {
log("inject: trying to inject NULL or non http url.");
g_errno = EBADURL;
//g_corruptCount++;
g_udpServer.sendErrorReply(slot,g_errno);
return;
}

CollectionRec *cr = g_collectiondb.getRec ( ir->m_collnum );
if ( ! cr ) {
Expand Down
7 changes: 3 additions & 4 deletions XmlDoc.cpp
Expand Up @@ -3386,7 +3386,6 @@ void doneReadingArchiveFileWrapper ( void *state ) {
bool XmlDoc::indexWarcOrArc ( char ctype ) {

int8_t *hc = getHopCount();
char *warcDate = NULL;
if ( ! hc ) return true; // error?
if ( hc == (void *)-1 ) return false;

Expand Down Expand Up @@ -3593,7 +3592,7 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
char *warcLen = strstr(warcHeader,"Content-Length:");
char *warcUrl = strstr(warcHeader,"WARC-Target-URI:");
char *warcType = strstr(warcHeader,"WARC-Type:");
warcDate = strstr(warcHeader,"WARC-Date:");
char *warcDate = strstr(warcHeader,"WARC-Date:");
char *warcIp = strstr(warcHeader,"WARC-IP-Address:");
char *warcCon = strstr(warcHeader,"Content-Type:");

Expand Down Expand Up @@ -3915,14 +3914,14 @@ bool XmlDoc::indexWarcOrArc ( char ctype ) {
ir->size_metadata = newMetadata.length();

newMetadata.nullTerm();
log("injected capture date into metadata %s ", ir->ptr_metadata);
//log("injected capture date into metadata %s ", ir->ptr_metadata);

// set 'timestamp' for injection
//
ir->m_firstIndexed = recTime;
ir->m_lastSpidered = recTime;


log("build: warc record time was %s %"INT64, warcDate, recTime);
// set 'ip' for injection

ir->m_injectDocIp = 0;
Expand Down
5 changes: 3 additions & 2 deletions fctypes.cpp
Expand Up @@ -2515,7 +2515,7 @@ int32_t deserializeMsg ( int32_t baseSize ,
return baseSize + (p - stringBuf);//getStringBuf());
}

void deserializeMsg2 ( char **firstStrPtr , // ptr_url
bool deserializeMsg2 ( char **firstStrPtr , // ptr_url
int32_t *firstSizeParm ) { // size_url
int nptrs=((char *)firstSizeParm-(char *)firstStrPtr)/sizeof(char *);
// point to our string buffer
Expand All @@ -2531,7 +2531,7 @@ void deserializeMsg2 ( char **firstStrPtr , // ptr_url
// make it NULL if size is 0 though
if ( *sizePtr == 0 ) *strPtr = NULL;
// sanity check
if ( *sizePtr < 0 ) { char *xx = NULL; *xx =0; }
if ( *sizePtr < 0 ) return false;//{ char *xx = NULL; *xx =0; }
// advance our destination ptr
p += *sizePtr;
// advance both ptrs to next string
Expand All @@ -2540,6 +2540,7 @@ void deserializeMsg2 ( char **firstStrPtr , // ptr_url
}
// return how many bytes we processed
//return baseSize + (p - stringBuf);//getStringBuf());
return true;
}

// print it to stdout for debugging Dates.cpp
Expand Down
2 changes: 1 addition & 1 deletion fctypes.h
Expand Up @@ -627,6 +627,6 @@ int32_t deserializeMsg ( int32_t baseSize ,
char **firstStrPtr ,
char *stringBuf ) ;

void deserializeMsg2 ( char **firstStrPtr , int32_t *firstSizeParm );
bool deserializeMsg2 ( char **firstStrPtr , int32_t *firstSizeParm );

#endif
26 changes: 13 additions & 13 deletions main.cpp
Expand Up @@ -4943,7 +4943,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
// ensure directory is there, if
// not then make it
"ssh %s 'mkdir %s' ; "
"scp -r %s %s:%s"
"scp -p -r %s %s:%s"
, ipStr
, h2->m_dir

Expand Down Expand Up @@ -5029,7 +5029,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
if ( ! f.doesExist() ) target = "gb";

sprintf(tmp,
"scp " // blowfish is faster
"scp -p " // blowfish is faster
"%s%s "
"%s:%s/gb.installed%s",
dir,
Expand Down Expand Up @@ -5065,7 +5065,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
// don't copy to ourselves
//if ( h2->m_hostId == h->m_hostId ) continue;
sprintf(tmp,
"scp "
"scp -p "
"%sgb.new "
"%s:%s/tmpgb.installed &",
dir,
Expand All @@ -5078,7 +5078,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
// don't copy to ourselves
//if ( h2->m_hostId == h->m_hostId ) continue;
sprintf(tmp,
"scp %sgb.conf %shosts.conf %s:%s %s",
"scp -p %sgb.conf %shosts.conf %s:%s %s",
dir ,
dir ,
//h->m_hostId ,
Expand Down Expand Up @@ -5460,7 +5460,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
}
*/
sprintf(tmp,
"scp "
"scp -p "
"%scatdb/content.rdf.u8 "
"%s:%scatdb/content.rdf.u8",
dir,
Expand All @@ -5469,7 +5469,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"scp "
"scp -p "
"%scatdb/structure.rdf.u8 "
"%s:%scatdb/structure.rdf.u8",
dir,
Expand All @@ -5478,7 +5478,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"scp "
"scp -p "
"%scatdb/gbdmoz.structure.dat "
"%s:%scatdb/gbdmoz.structure.dat",
dir,
Expand All @@ -5487,7 +5487,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"scp "
"scp -p "
"%scatdb/gbdmoz.content.dat "
"%s:%scatdb/gbdmoz.content.dat",
dir,
Expand All @@ -5510,7 +5510,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
// don't copy to ourselves
if ( h2->m_hostId == 0 ) continue;
sprintf(tmp,
"scp "
"scp -p "
"%scatdb/content.rdf.u8.new "
"%s:%scatdb/content.rdf.u8.new",
dir,
Expand All @@ -5519,7 +5519,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"scp "
"scp -p "
"%scatdb/structure.rdf.u8.new "
"%s:%scatdb/structure.rdf.u8.new",
dir,
Expand All @@ -5528,7 +5528,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"scp "
"scp -p "
"%scatdb/gbdmoz.structure.dat.new "
"%s:%scatdb/gbdmoz.structure.dat.new",
dir,
Expand All @@ -5537,7 +5537,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"scp "
"scp -p "
"%scatdb/gbdmoz.content.dat.new "
"%s:%scatdb/gbdmoz.content.dat.new",
dir,
Expand All @@ -5546,7 +5546,7 @@ int install ( install_flag_konst_t installFlag , int32_t hostId , char *dir ,
log(LOG_INIT,"admin: %s", tmp);
system ( tmp );
sprintf(tmp,
"scp "
"scp -p "
"%scatdb/gbdmoz.content.dat.new.diff "
"%s:%scatdb/gbdmoz.content.dat.new.diff",
dir,
Expand Down
21 changes: 20 additions & 1 deletion script/inject/__main__.py
Expand Up @@ -324,13 +324,15 @@ def handler(signum, frame):
injectItem(itemName, db, 'production')
sys.exit(0)

if len(sys.argv) == 4:
if sys.argv[1] == 'forcefile':
global staleTime
staleTime = datetime.timedelta(0,0,0)
from multiprocessing.pool import ThreadPool
fileName = sys.argv[2]
items = filter(lambda x: x, open(fileName, 'r').read().split('\n'))
pool = ThreadPool(processes=len(items))
threads = int(sys.argv[3])
pool = ThreadPool(processes=threads)
#print zip(files, repeat(getDb(), len(files)), repeat('production', len(files)))
def injectItemTupleWrapper(itemName):
db = getDb()
Expand All @@ -341,6 +343,23 @@ def injectItemTupleWrapper(itemName):
answer = pool.map(injectItemTupleWrapper, items)
sys.exit(0)

if sys.argv[1] == 'injectitems':
from multiprocessing.pool import ThreadPool
fileName = sys.argv[2]
items = filter(lambda x: x, open(fileName, 'r').read().split('\n'))
threads = int(sys.argv[3])
pool = ThreadPool(processes=threads)
#print zip(files, repeat(getDb(), len(files)), repeat('production', len(files)))
def injectItemTupleWrapper(itemName):
db = getDb()
ret = injectItem(itemName, db, 'production')
db.close()
return ret

answer = pool.map(injectItemTupleWrapper, items)
sys.exit(0)



if sys.argv[1] == 'run':
threads = int(sys.argv[2])
Expand Down
Binary file modified script/warc-inject
Binary file not shown.

0 comments on commit c37ab26

Please sign in to comment.