Skip to content

Commit

Permalink
Implement loader
Browse files Browse the repository at this point in the history
  • Loading branch information
chr4 committed Jul 24, 2017
1 parent de45436 commit 4696736
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 100 deletions.
181 changes: 181 additions & 0 deletions src/loader.rs
@@ -0,0 +1,181 @@
use std::io::prelude::*;
use std::io;
use std::path::Path;
use std::fs::File;
use std::sync::{Arc, Mutex};

use futures::Future;
use futures::stream::Stream;

use hyper::{Uri, Client, Request, Method};
use hyper::header::{UserAgent, Cookie};
use hyper_tls::HttpsConnector;

use tokio_core::reactor::Core;


#[derive(Debug)]
pub enum CacheStatus {
Hit,
Miss,
Bypass,
Unset,
Unknown,
}

#[derive(Debug)]
pub struct CacheResource {
uri: Uri,
cache_status: CacheStatus,
captcha: bool,
}

#[derive(Debug)]
pub struct Loader {
uris: Mutex<Vec<CacheResource>>,
user_agent: UserAgent,
cookie: Cookie,
captcha_string: String,
}

// Make custom X-Cache-Status header known
header! { (XCacheStatus, "X-Cache-Status") => [String] }


impl Loader {
// TODO: I'd like to have self.bypass() and self.user_agent(ua: &str),
// but this is borrow checker hell
pub fn new(
uri_file: &str,
base_uri: &str,
ua_string: &str,
captcha_string: &str,
bypass: bool,
) -> Arc<Loader> {
let mut uris = Vec::new();
println!("Loading from {}", uri_file);
let lines = lines_from_file(uri_file).unwrap();

for l in lines {
// Skip erroneous lines
let line = match l {
Ok(s) => s,
Err(err) => {
println!("WARN: Error reading line from file: {}", err);
continue;
}
};

let uri: Uri = match format!("{}{}", base_uri, line).parse() {
Ok(s) => s,
Err(err) => {
println!("WARN: Error parsing URI: {}", err);
continue;
}
};

uris.push(CacheResource {
uri: uri,
cache_status: CacheStatus::Unknown,
captcha: false,
});
}

let user_agent = UserAgent::new(ua_string.to_string());

// Set cacheupdate=true cookie, to bypass (and update) existing cached sites
let mut cookie = Cookie::new();
if bypass {
cookie.append("cacheupdate", "true");
}

Arc::new(Loader {
uris: Mutex::new(uris),
user_agent: user_agent,
cookie: cookie,
captcha_string: captcha_string.to_string(),
})
}

pub fn length(&self) -> usize {
let uris = self.uris.lock().unwrap();
uris.len()
}

pub fn pop(&self) -> Option<CacheResource> {
let mut uris = self.uris.lock().unwrap();
uris.pop()
}

pub fn spawn(&self, verbose: bool) {
let mut core = Core::new().unwrap();
let handle = core.handle();

let client = Client::configure()
.keep_alive(true)
.connector(HttpsConnector::new(4, &handle).unwrap())
.build(&handle);

loop {
let mut cache_resource = match self.pop() {
Some(uri) => uri,
None => break, // Break when no URLs are left
};

let uri = cache_resource.uri.clone();
let mut req: Request = Request::new(Method::Get, uri);
req.headers_mut().set(self.user_agent.clone());
req.headers_mut().set(self.cookie.clone());

let work = client.request(req).and_then(|res| {
cache_resource.cache_status = match res.headers().get::<XCacheStatus>() {
Some(s) => lookup_cache_status(s),
None => CacheStatus::Unset,
};

if verbose {
println!(
"\n\t{:?}: {} {:?}",
cache_resource.uri,
res.status(),
cache_resource.cache_status,
);
}

res.body().concat2().and_then(move |body| {
// body is a &[8], so from_utf8_lossy() is required here
let html = String::from_utf8_lossy(body.as_ref());

if self.captcha_string.len() > 0 && html.contains(&self.captcha_string) {
cache_resource.captcha = true;
println!(
"Found '{}' in response body. Stopping thread.",
self.captcha_string
);
}

Ok(())
})
});

core.run(work).unwrap();
}
}
}

fn lookup_cache_status(status: &str) -> CacheStatus {
match status {
"MISS" => CacheStatus::Miss,
"HIT" => CacheStatus::Hit,
"BYPASS" => CacheStatus::Bypass,
_ => CacheStatus::Unknown,
}
}

fn lines_from_file<P>(filename: P) -> Result<io::Lines<io::BufReader<File>>, io::Error>
where
P: AsRef<Path>,
{
let file = File::open(filename)?;
Ok(io::BufReader::new(file).lines())
}
44 changes: 19 additions & 25 deletions src/main.rs
Expand Up @@ -10,8 +10,8 @@ extern crate tokio_core;
extern crate pbr;

mod cli;
mod loader;
mod file;
mod worker;

use std::thread;
use std::time::Duration;
Expand All @@ -21,52 +21,46 @@ use hyper::header::UserAgent;

fn main() {
let args = cli::get_args();
let user_agent = UserAgent::new(args.user_agent.to_string());

let (uris, len) = file::read_uris(&args.base_uri, &args.uri_file);
let mut loader = loader::Loader::new(
&args.uri_file,
&args.base_uri,
&args.user_agent,
&args.captcha_string,
args.bypass,
);

println!(
"Spawning {} threads to warm cache with {} URIs",
args.threads,
len
loader.length(),
);

let clone = uris.clone();
let status_loader = loader.clone();
let status = thread::spawn(move || {
let mut len = len;
let mut pb = ProgressBar::new(len);
let count = status_loader.length() as u64;
let mut pb = ProgressBar::new(count);

loop {
let new_len: u64 = {
let uris = clone.lock().unwrap();
uris.len() as u64
};

pb.add(len - new_len);
len = new_len;
let len = status_loader.length() as u64;
pb.add(count - len);
thread::sleep(Duration::from_secs(1));

// Break once all work is done
if len == 0 {
pb.finish();
break;
}

// TODO: Break when captcha was found?
}
});

// Create threads and safe handles
let mut workers: Vec<_> = vec![];
for _ in 0..args.threads {
// Clone values before move
let uris = uris.clone();
let user_agent = user_agent.clone();
let captcha_string = args.captcha_string.clone();
let mut loader = loader.clone();
let verbose = args.verbose;
let bypass = args.bypass;

workers.push(thread::spawn(move || {
worker::spawn(uris, user_agent, &captcha_string, verbose, bypass);
}));
workers.push(thread::spawn(move || { loader.spawn(verbose); }));
}

// Block until all work is done
Expand All @@ -75,5 +69,5 @@ fn main() {
}

status.join().unwrap();
println!("Done. Warmed up {} URLs.", len);
// TODO: Print status
}
75 changes: 0 additions & 75 deletions src/worker.rs

This file was deleted.

0 comments on commit 4696736

Please sign in to comment.