Skip to content

Commit

Permalink
Identifier Exponential Lag (spacedriveapp#269)
Browse files Browse the repository at this point in the history
A little refactor of the identifier job to remove memory leak and improve performance.
  • Loading branch information
jamiepine committed Jun 21, 2022
1 parent 39bf670 commit 7cffba2
Showing 1 changed file with 119 additions and 82 deletions.
201 changes: 119 additions & 82 deletions core/src/file/cas/identifier.rs
@@ -1,16 +1,16 @@
use std::collections::HashMap;
use std::{fs, io};
use std::path::Path;
use std::{fs, io};

use crate::job::JobReportUpdate;
use crate::prisma::file;
use crate::sys::get_location;
use crate::{
file::FileError,
job::{Job, WorkerContext},
prisma::file_path,
prisma::{file, file_path},
CoreContext,
};
use chrono::{DateTime, FixedOffset, Utc};
use futures::executor::block_on;
use prisma_client_rust::prisma_models::PrismaValue;
use prisma_client_rust::raw::Raw;
Expand All @@ -30,6 +30,9 @@ pub struct FileIdentifierJob {
pub path: String,
}

// we break this job into chunks of 100 to improve performance
static CHUNK_SIZE: usize = 100;

#[async_trait::async_trait]
impl Job for FileIdentifierJob {
fn name(&self) -> &'static str {
Expand All @@ -41,123 +44,143 @@ impl Job for FileIdentifierJob {
let location_path = location.path.unwrap_or("".to_string());

let total_count = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;

println!("Found {} orphan file paths", total_count);

let task_count = (total_count as f64 / 100f64).ceil() as usize;

let task_count = (total_count as f64 / CHUNK_SIZE as f64).ceil() as usize;
println!("Will process {} tasks", task_count);

// update job with total task count based on orphan file_paths count
ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]);

let db = ctx.core_ctx.database.clone();

let ctx = tokio::task::spawn_blocking(move || {
let _ctx = tokio::task::spawn_blocking(move || {
let mut completed: usize = 0;
let mut cursor: i32 = 1;
// map cas_id to file_path ids
let mut cas_id_lookup: HashMap<i32, String> = HashMap::new();

while completed < task_count {
let file_paths = block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)).unwrap();
// link file_path ids to a CreateFile struct containing unique file data
let mut chunk: HashMap<i32, CreateFile> = HashMap::new();
let mut cas_lookup: HashMap<String, i32> = HashMap::new();

// get chunk of orphans to process
let file_paths = match block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)) {
Ok(file_paths) => file_paths,
Err(e) => {
println!("Error getting orphan file paths: {}", e);
continue;
}
};
println!(
"Processing {:?} orphan files. ({} completed of {})",
file_paths.len(),
completed,
task_count
);

// raw values to be inserted into the database
let mut values: Vec<PrismaValue> = Vec::new();

// only rows that have a valid cas_id to be inserted
// analyze each file_path
for file_path in file_paths.iter() {
match prepare_file_values(&location_path, file_path) {
Ok((cas_id, data)) => {
cas_id_lookup.insert(file_path.id, cas_id);
values.extend(data);
// get the cas_id and extract metadata
match prepare_file(&location_path, file_path) {
Ok(file) => {
let cas_id = file.cas_id.clone();
// create entry into chunks for created file data
chunk.insert(file_path.id, file);
cas_lookup.insert(cas_id, file_path.id);
}
Err(e) => {
println!("Error processing file: {}", e);
continue;
}
};
}
if values.len() == 0 {
println!("No orphan files to process, finishing...");
break;

// find all existing files by cas id
let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect();
let existing_files: Vec<file::Data> = block_on(
db.file()
.find_many(vec![file::cas_id::in_vec(generated_cas_ids)])
.exec(),
)
.unwrap();
println!("Found {} existing files", existing_files.len());

// TODO: link existing files to file_paths
for file in existing_files.iter() {
let file_path_id = cas_lookup.get(&file.cas_id).unwrap();
block_on(
db.file_path()
.find_unique(file_path::id::equals(file_path_id.clone()))
.update(vec![file_path::file_id::set(Some(file.id.clone()))])
.exec(),
)
.unwrap();
}

// extract files that don't already exist in the database
let new_files: Vec<&CreateFile> = chunk
.iter()
.map(|(_, c)| c)
.filter(|c| !existing_files.iter().any(|d| d.cas_id == c.cas_id))
.collect();

// assemble prisma values
let mut values: Vec<PrismaValue> = Vec::new();
for file in new_files.iter() {
values.extend([
PrismaValue::String(file.cas_id.clone()),
PrismaValue::Int(file.size_in_bytes.clone()),
PrismaValue::DateTime(file.date_created.clone()),
]);
}

println!("Inserting {} unique file records ({:?} values)", file_paths.len(), values.len());

let files: Vec<FileCreated> = block_on(db._query_raw(Raw::new(
&format!(
"INSERT INTO files (cas_id, size_in_bytes) VALUES {} ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id",
vec!["({}, {})"; file_paths.len()].join(",")
),
values
))).unwrap_or_else(|e| {
// create new files
let created_files: Vec<FileCreated> = block_on(db._query_raw(Raw::new(
&format!(
"INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {}
ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id",
vec!["({}, {}, {})"; new_files.len()].join(",")
),
values,
)))
.unwrap_or_else(|e| {
println!("Error inserting files: {}", e);
Vec::new()
});

println!("Unique files: {:?}" , files);

// assign unique file to file path
println!("Assigning {} unique file ids to origin file_paths", files.len());
for (file_path_id, cas_id) in cas_id_lookup.iter() {
// get the cas id from the lookup table
let file = files.iter().find(|f| &f.cas_id == cas_id);
let file_id: i32;
if let Some(file) = file {
file_id = file.id;
} else {
let unique_file = match block_on(db.file().find_unique(file::cas_id::equals(cas_id.clone())).exec()) {
Ok(f) => match f {
Some(f) => f,
None => {
println!("Unique file does not exist, this shouldn't happen: {}", cas_id);
continue;
}
},
Err(e) => {
println!("Error finding unique file: {}", e);
continue;
}
};
file_id = unique_file.id;
}

block_on(
db.file_path()
.find_unique(file_path::id::equals(file_path_id.clone()))
.update(vec![
file_path::file_id::set(Some(file_id))
])
.exec()
).unwrap();
// associate newly created files with their respective file_paths
for file in created_files.iter() {
// TODO: This is bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk.
// Maybe an insert many could work? not sure.
let file_path_id = cas_lookup.get(&file.cas_id).unwrap();
block_on(
db.file_path()
.find_unique(file_path::id::equals(file_path_id.clone()))
.update(vec![file_path::file_id::set(Some(file.id.clone()))])
.exec(),
)
.unwrap();
}

// handle loop end
let last_row = file_paths.last().unwrap();

cursor = last_row.id;
completed += 1;

ctx.progress(vec![
JobReportUpdate::CompletedTaskCount(completed),
JobReportUpdate::Message(format!(
"Processed {} of {} orphan files",
completed,
task_count
)),
JobReportUpdate::CompletedTaskCount(completed),
JobReportUpdate::Message(format!(
"Processed {} of {} orphan files",
completed * CHUNK_SIZE,
total_count
)),
]);
}
ctx
})
.await?;

let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;

// let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;
Ok(())
}
}
Expand Down Expand Up @@ -195,30 +218,44 @@ pub async fn get_orphan_file_paths(
])
.order_by(file_path::id::order(Direction::Asc))
.cursor(file_path::id::cursor(cursor))
.take(100)
.take(CHUNK_SIZE as i64)
.exec()
.await?;
Ok(files)
}

pub fn prepare_file_values(
#[derive(Deserialize, Serialize, Debug)]
pub struct CreateFile {
pub cas_id: String,
pub size_in_bytes: i64,
pub date_created: DateTime<FixedOffset>,
}

pub fn prepare_file(
location_path: &str,
file_path: &file_path::Data,
) -> Result<(String, [PrismaValue; 2]), io::Error> {
) -> Result<CreateFile, io::Error> {
let path = Path::new(&location_path).join(Path::new(file_path.materialized_path.as_str()));
// println!("Processing file: {:?}", path);

let metadata = fs::metadata(&path)?;

let date_created: DateTime<Utc> = metadata.created().unwrap().into();

let size = metadata.len();

let cas_id = {
if !file_path.is_dir {
let mut ret = generate_cas_id(path.clone(), metadata.len()).unwrap();
let mut ret = generate_cas_id(path.clone(), size.clone()).unwrap();
ret.truncate(16);
ret
} else {
"".to_string()
}
};

println!("cas id for path {:?} is {:?}", path, cas_id);

Ok((cas_id.clone(), [PrismaValue::String(cas_id), PrismaValue::Int(0)]))
Ok(CreateFile {
cas_id,
size_in_bytes: size as i64,
date_created: date_created.into(),
})
}

0 comments on commit 7cffba2

Please sign in to comment.