Skip to content

Commit

Permalink
aw-datastore: Clippy fix and refactoring of worker
Browse files Browse the repository at this point in the history
  • Loading branch information
johan-bjareholt committed Mar 28, 2020
1 parent 34d2dec commit 6f5200d
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 225 deletions.
86 changes: 41 additions & 45 deletions aw-datastore/src/datastore.rs
Expand Up @@ -14,8 +14,6 @@ use aw_models::BucketMetadata;
use aw_models::Event;
use aw_models::KeyValue;

use aw_transform;

use rusqlite::params;
use rusqlite::types::ToSql;

Expand Down Expand Up @@ -187,24 +185,20 @@ impl DatastoreInstance {
let mut first_init = false;
let db_version = _get_db_version(&conn);

match migrate_enabled {
true => first_init = _create_tables(&conn, db_version),
false => {
if db_version <= 0 {
return Err(DatastoreError::Uninitialized(
"Tried to open an uninitialized datastore with migration disabled"
.to_string(),
));
} else if db_version != NEWEST_DB_VERSION {
return Err(DatastoreError::OldDbVersion(format!(
"\
Tried to open an database with an incompatible database version!
Database has version {} while the supported version is {}",
db_version, NEWEST_DB_VERSION
)));
}
}
};
if migrate_enabled {
first_init = _create_tables(&conn, db_version);
} else if db_version < 0 {
return Err(DatastoreError::Uninitialized(
"Tried to open an uninitialized datastore with migration disabled".to_string(),
));
} else if db_version != NEWEST_DB_VERSION {
return Err(DatastoreError::OldDbVersion(format!(
"\
Tried to open an database with an incompatible database version!
Database has version {} while the supported version is {}",
db_version, NEWEST_DB_VERSION
)));
}

let mut ds = DatastoreInstance {
buckets_cache: HashMap::new(),
Expand Down Expand Up @@ -239,8 +233,8 @@ impl DatastoreInstance {
let opt_start_ns: Option<i64> = row.get(6)?;
let opt_start = match opt_start_ns {
Some(starttime_ns) => {
let seconds: i64 = (starttime_ns / 1000000000) as i64;
let subnanos: u32 = (starttime_ns % 1000000000) as u32;
let seconds: i64 = (starttime_ns / 1_000_000_000) as i64;
let subnanos: u32 = (starttime_ns % 1_000_000_000) as u32;
Some(DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(seconds, subnanos),
Utc,
Expand All @@ -252,8 +246,8 @@ impl DatastoreInstance {
let opt_end_ns: Option<i64> = row.get(7)?;
let opt_end = match opt_end_ns {
Some(endtime_ns) => {
let seconds: i64 = (endtime_ns / 1000000000) as i64;
let subnanos: u32 = (endtime_ns % 1000000000) as u32;
let seconds: i64 = (endtime_ns / 1_000_000_000) as i64;
let subnanos: u32 = (endtime_ns % 1_000_000_000) as u32;
Some(DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(seconds, subnanos),
Utc,
Expand Down Expand Up @@ -317,20 +311,20 @@ impl DatastoreInstance {
pub fn ensure_legacy_import(&mut self, conn: &Connection) -> Result<bool, ()> {
use super::legacy_import::legacy_import;
if !self.first_init {
return Ok(false);
Ok(false)
} else {
self.first_init = false;
match legacy_import(self, &conn) {
Ok(_) => {
info!("Successfully imported legacy database");
self.get_stored_buckets(&conn).unwrap();
return Ok(true);
Ok(true)
}
Err(err) => {
warn!("Failed to import legacy database: {:?}", err);
return Err(());
Err(())
}
};
}
}
}

Expand Down Expand Up @@ -418,16 +412,16 @@ impl DatastoreInstance {
match conn.execute("DELETE FROM buckets WHERE id = ?1", &[&bucket.bid]) {
Ok(_) => {
self.buckets_cache.remove(bucket_id);
return Ok(());
Ok(())
}
Err(err) => match err {
rusqlite::Error::SqliteFailure { 0: sqlerr, 1: _ } => match sqlerr.code {
rusqlite::ErrorCode::ConstraintViolation => {
Err(DatastoreError::BucketAlreadyExists)
}
_ => return Err(DatastoreError::InternalError(err.to_string())),
_ => Err(DatastoreError::InternalError(err.to_string())),
},
_ => return Err(DatastoreError::InternalError(err.to_string())),
_ => Err(DatastoreError::InternalError(err.to_string())),
},
}
}
Expand All @@ -441,7 +435,7 @@ impl DatastoreInstance {
}

pub fn get_buckets(&self) -> HashMap<String, Bucket> {
return self.buckets_cache.clone();
self.buckets_cache.clone()
}

pub fn insert_events(
Expand Down Expand Up @@ -543,12 +537,12 @@ impl DatastoreInstance {
/* Potentially update start */
match bucket.metadata.start {
None => {
bucket.metadata.start = Some(event.timestamp.clone());
bucket.metadata.start = Some(event.timestamp);
update = true;
}
Some(current_start) => {
if current_start > event.timestamp {
bucket.metadata.start = Some(event.timestamp.clone());
bucket.metadata.start = Some(event.timestamp);
update = true;
}
}
Expand Down Expand Up @@ -648,7 +642,7 @@ impl DatastoreInstance {
None => {
// There was no last event, insert and return
self.insert_events(conn, &bucket_id, vec![heartbeat.clone()])?;
return Ok(heartbeat.clone());
return Ok(heartbeat);
}
}
}
Expand Down Expand Up @@ -738,20 +732,20 @@ impl DatastoreInstance {
}
let duration_ns = endtime_ns - starttime_ns;

let time_seconds: i64 = (starttime_ns / 1000000000) as i64;
let time_subnanos: u32 = (starttime_ns % 1000000000) as u32;
let time_seconds: i64 = (starttime_ns / 1_000_000_000) as i64;
let time_subnanos: u32 = (starttime_ns % 1_000_000_000) as u32;
let data: serde_json::map::Map<String, Value> =
serde_json::from_str(&data_str).unwrap();

return Ok(Event {
Ok(Event {
id: Some(id),
timestamp: DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(time_seconds, time_subnanos),
Utc,
),
duration: Duration::nanoseconds(duration_ns),
data: data,
});
data,
})
},
) {
Ok(rows) => rows,
Expand All @@ -768,6 +762,7 @@ impl DatastoreInstance {
Err(err) => warn!("Corrupt event in bucket {}: {}", bucket_id, err),
};
}

Ok(list)
}

Expand Down Expand Up @@ -825,7 +820,7 @@ impl DatastoreInstance {
}
};

return Ok(count);
Ok(count)
}

pub fn insert_key_value(
Expand All @@ -848,15 +843,16 @@ impl DatastoreInstance {
}
};
let timestamp = Utc::now().timestamp();
#[allow(clippy::expect_fun_call)]
stmt.execute(params![key, data, &timestamp])
.expect(&format!("Failed to insert key-value pair: {}", key));
return Ok(());
Ok(())
}

pub fn delete_key_value(&self, conn: &Connection, key: &str) -> Result<(), DatastoreError> {
conn.execute("DELETE FROM key_value WHERE key = ?1", &[key])
.expect("Error deleting value from database");
return Ok(());
Ok(())
}

pub fn get_key_value(&self, conn: &Connection, key: &str) -> Result<KeyValue, DatastoreError> {
Expand All @@ -873,7 +869,7 @@ impl DatastoreInstance {
}
};

return match stmt.query_row(&[key], |row| {
match stmt.query_row(&[key], |row| {
Ok(KeyValue {
key: row.get(0)?,
value: row.get(1)?,
Expand All @@ -891,7 +887,7 @@ impl DatastoreInstance {
key
))),
},
};
}
}

pub fn get_keys_starting(
Expand Down
12 changes: 6 additions & 6 deletions aw-datastore/src/legacy_import.rs
Expand Up @@ -85,7 +85,7 @@ mod import {

fn get_legacy_events(
conn: &Connection,
bucket_id: &i64,
bucket_id: i64,
) -> Result<Vec<Event>, LegacyDatastoreImportError> {
let mut stmt = match conn.prepare(
"
Expand Down Expand Up @@ -121,12 +121,12 @@ mod import {
),
};

return Ok(Event {
Ok(Event {
id: None,
timestamp: timestamp,
timestamp,
duration: Duration::nanoseconds(duration_ns),
data: data,
});
data,
})
}) {
Ok(rows) => rows,
Err(err) => {
Expand Down Expand Up @@ -165,7 +165,7 @@ mod import {
Ok(_) => (),
Err(err) => panic!("Failed to create bucket '{}': {:?}", bucket.id, err),
};
let events = get_legacy_events(&legacy_conn, &bucket.bid.unwrap())?;
let events = get_legacy_events(&legacy_conn, bucket.bid.unwrap())?;
let num_events = events.len(); // Save len before lending events to insert_events
println!("Importing {} events for {}", num_events, bucket.id);
match new_ds.insert_events(new_conn, &bucket.id, events) {
Expand Down

0 comments on commit 6f5200d

Please sign in to comment.