Skip to content
This repository has been archived by the owner on May 23, 2019. It is now read-only.

Commit

Permalink
bugfixes to migrate-data
Browse files Browse the repository at this point in the history
  • Loading branch information
wesyoung committed Mar 28, 2016
1 parent 03c2f6d commit 04cf466
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions v1migration/bin/migrate-data.pl
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ sub cleanup {
my $storage = CIF::StorageFactory->new_plugin({
plugin => 'elasticsearch',
nodes => [ $es_host . ':9200' ],

});

$Logger->info('checking journal: ' . $journal);
Expand Down Expand Up @@ -245,7 +246,7 @@ sub pager_routine {
} else {
$archive->load_page_info();
}

$sql .= ' ORDER BY id ASC LIMIT '.$archive->{'limit'}.' OFFSET ?';

my $total = $archive->{'total'};
Expand Down Expand Up @@ -341,13 +342,15 @@ sub _writer_routine {
my $sent = 0;
my $buckets = {};
do {
nanosleep NSECS_PER_MSEC;
#nanosleep NSECS_PER_MSEC;
if($writer->has_pollin){
$msg = $writer->recv();
if($msg ne '-1'){
$msg = JSON::XS::decode_json($msg);
if($msg->{'group'}){
my $b = DateTime::Format::DateParse->parse_datetime($msg->{'reporttime'})->ymd();
my $b = DateTime::Format::DateParse->parse_datetime($msg->{'reporttime'});
my $ts = $b->ymd();
$b = $b->strftime('%Y-%m');
$buckets->{$b} = [] unless(exists($buckets->{$b}));
push(@{$buckets->{$b}}, $msg);

Expand All @@ -357,7 +360,7 @@ sub _writer_routine {
foreach my $k (keys %$buckets){
$ret = $storage->_submission({
Observables => $buckets->{$k},
timestamp => DateTime::Format::DateParse->parse_datetime($k),
timestamp => DateTime::Format::DateParse->parse_datetime($ts),
user => {
groups => \@user_groups
},
Expand Down Expand Up @@ -443,6 +446,10 @@ sub _process_message {
if($data->{'address'}){
$data->{'address'} =~ s/hxxp\:\/\///g;
}

if($data->{'address'} =~ /^http/ && length($data->{'address'}) < 9){
return '-1';
}

if($data->{'rir'}){
$data->{'rir'} = lc($data->{'rir'});
Expand All @@ -451,6 +458,7 @@ sub _process_message {
$data = {
'observable' => $data->{'address'},
'asn' => $data->{'asn'},
'asn_desc' => $data->{'asn_desc'},
'firsttime' => $data->{'detecttime'},
'lasttime' => $data->{'detecttime'},
'reporttime' => $data->{'reporttime'},
Expand All @@ -466,8 +474,14 @@ sub _process_message {
'rir' => $data->{'rir'},
'cc' => $data->{'cc'},
};
$data->{'otype'} = observable_type($data->{'observable'});

try {
$data->{'otype'} = observable_type($data->{'observable'});
} catch {
$err = shift;
$Logger->debug($err);
$Logger->debug(Dumper($data));
return '-1';
};
$data = JSON::XS::encode_json($data);

return $data;
Expand Down

0 comments on commit 04cf466

Please sign in to comment.