Permalink
Browse files

implemented "datanest.organizations.batch_size" which will make

OrganizationsDatanestHarvester push data into back-end(s) in smaller
batches thus avoiding OutOfMemoryError
note: This forced out calls like 'clear()' or 'deleteByQuery("*:*")'
which served as substitute for missing update procedure. Thus, as of
now, records which "vanish" from the source will simply accumulate in
ODN. Updated records should get updated properly also in ODN. On the
other hand, it's sort of archiving feature, insurance against data
vanishing both from the source and ODN copy. But anyway, roper update
mechanism is needed.
  • Loading branch information...
1 parent 4ad887d commit 7de3508b217726e628fe21445ca4eec7dde8f376 @hanecak hanecak committed Mar 14, 2012
@@ -56,6 +56,7 @@
public final static String KEY_DATANEST_ORGANIZATIONS_URL = "datanest.organizations.url";
public final static String KEY_DATANEST_ORGANIZATIONS_SEZAME_REPO_NAME = "datanest.organizations.sesame_repo_name";
+ public final static String KEY_DATANEST_ORGANIZATIONS_BATCH_SIZE = "datanest.organizations.batch_size";
private final static int ATTR_INDEX_ID = 0;
protected final static int ATTR_INDEX_NAME = 1;
@@ -134,6 +135,8 @@ public void update() throws OdnHarvesterException,
// read the rows
String[] row;
+ int batchSize = Integer.valueOf(datanestProperties.getProperty(KEY_DATANEST_ORGANIZATIONS_BATCH_SIZE));
+ int itemCount = 0;
int debugProcessOnlyNItems = Integer.valueOf(datanestProperties.getProperty(KEY_DEBUG_PROCESS_ONLY_N_ITEMS));
while ((row = csvReader.readNext()) != null) {
try {
@@ -144,9 +147,14 @@ public void update() throws OdnHarvesterException,
logger.warn("skipping following record: "
+ Arrays.deepToString(row));
}
-
- if (debugProcessOnlyNItems > 0 &&
- records.size() > debugProcessOnlyNItems)
+
+ if (records.size() >= batchSize) {
+ store(records);
+ records.clear();
+ }
+
+ if (debugProcessOnlyNItems > 0 &&
+ ++itemCount > debugProcessOnlyNItems)
break;
}
@@ -195,18 +195,18 @@ public void store(RdfData records)
connection = repo.getConnection();
- // As of now, the "update" consist of fresh "whole at once" copy of
- // the new data loaded into the repository. Thus, we need to remove
- // existing data from the repository before loading the new data so
- // as to prevent old, stale data to be left in the repository (like
- // items which were valid yesterday, but then deemed "bad" or
- // whatever and deleted).
- // Note: Yes, that is costly and we want to fix that later on.
// FIXME: Implement proper "update" procedure.
+ // As of now, we're not clearing old records, only replacing old
+ // copies with fresh copies (assuming "ID" was not changed). If we
+ // want a clean-up, we need to manually clean the back-end and rerun
+ // harvesting.
+ // Note of caution: 'store()' can be called for "a batch" (at least
+ // for 'OrganizationsDatanestHarvester' it is) which means that
+ // simple "DELETE all" here wont have a desired effect as it removed
+ // all the "new" items from previous batch and leave the back-end
+ // only with content from last batch.
if (contexts != null && contexts.length > 0) {
- connection.clear(convertedContexts);
-
- // why we duplicate the 'clear()' and 'add()' statements:
+ // why we duplicate the 'add()' statements:
// 'getStatements(null, null, null, true);' is not the same as
// 'getStatements(null, null, null, true, (Resource)null);' -
// see
@@ -215,17 +215,6 @@ public void store(RdfData records)
records.getRdfBaseURI(), RDFFormat.RDFXML,
convertedContexts);
} else {
- // CRUDE HACK, FIXME: If we use contexts for the "all"
- // repository to distinguish statements in terms of where they
- // came from so that we can do a proper clean-up before
- // "update", I'm then not able yet to make a proper query on top
- // of statements from different contexts. Thus for now I'm not
- // using contexts and for "all" repository I'm not doing the
- // automatic clean-up, which means that "Clean" needs to be done
- // on the repo manually!!!
- if (!repoName.equals("all"))
- connection.clear();
-
connection.add(new StringReader(records.getRdfData()),
records.getRdfBaseURI(), RDFFormat.RDFXML);
}
@@ -127,17 +127,13 @@ public void store(List<SolrItem> records)
OdnRepositoryException odnRepoException = null;
try {
- // As of now, the "update" consist of fresh "whole at once" copy of
- // the new data loaded into the repository. Thus, we need to remove
- // existing data from the repository before loading the new data so
- // as to prevent old, stale data to be left in the repository (like
- // items which were valid yesterday, but then deemed "bad" or
- // whatever and deleted).
- // Note: Yes, that is costly and we want to fix that later on.
// FIXME: Implement proper "update" procedure. For now disabled as
// we're pushing multiple data sets into one index meaning that if
// we left this here, insertion of 2nd data set will mean deletion
- // of 1st etc. Workaround: Clean the index manualy if necessary.
+ // of 1st etc. Plus, 'store()' might be called for multiple batches
+ // and we do not want to be left with only last batch in th
+ // back-end.
+ // Workaround: Clean the index manualy if necessary.
//solrServer.deleteByQuery("*:*"); // CAUTION: deletes everything!
solrServer.addBeans(records);
@@ -4,8 +4,11 @@ datanest.api.key = xxx
# code, I'm going to use local copy of the dump:
datanest.organizations.url = file:///tmp/organisations-dump.csv
datanest.organizations.sesame_repo_name=organizations
+datanest.organizations.batch_size = 100000
+
datanest.procurements.url = file:///tmp/procurements-dump.csv
datanest.procurements.sesame_repo_name=procurements
+
datanest.political_party_donors.url = file:///tmp/sponzori_stran-dump.csv
datanest.political_party_donors.sesame_repo_name=political_party_donors

0 comments on commit 7de3508

Please sign in to comment.