Skip to content

Commit

Permalink
Merge pull request #8 from adhamsalama/refactor-threadpool
Browse files Browse the repository at this point in the history
Refactor threadpool and some other minor enhancements and changes
  • Loading branch information
adhamsalama committed Feb 10, 2023
2 parents ee95397 + f01dc81 commit b3a2572
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 141 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "blitzkrieg"
description = "An HTTP Web Server."
version = "0.1.6"
version = "0.1.7"
edition = "2021"
license = "MIT"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
5 changes: 1 addition & 4 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,13 @@ pub struct Request {
pub method: HTTPMethod,
pub path: String,
pub headers: HashMap<String, String>,
// queries: HashMap<String, String>,
// cookies: HashMap<String, String>,
pub body: Option<BodyType>,
}

pub struct Response {
pub status_code: u16,
pub headers: Option<HashMap<String, String>>,
pub cookies: Option<HashMap<String, String>>,
pub body: Option<String>,
pub body: Option<Vec<u8>>,
}

pub fn parse_tcp_stream(stream: &mut TcpStream) -> (String, Vec<u8>) {
Expand Down
69 changes: 10 additions & 59 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,12 @@
//! # Blitzkrieg
//!
//! An HTTP Web Server written from scratch in Rust.
//!
//! This is written for educational purposes and is not meant to be used in production.

/// A module for parsing HTTP.
pub mod http;
/// A module for implementing a Server struct.
pub mod server;
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let (sender, receiver) = mpsc::channel();

let receiver = Arc::new(Mutex::new(receiver));

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}

ThreadPool { workers, sender }
}

pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}

pub struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}

impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});

Worker { id, thread }
}
}
/// A module for implementing a threadpool for the server.
pub mod threadpool;
42 changes: 0 additions & 42 deletions src/main.rs

This file was deleted.

44 changes: 10 additions & 34 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
use crate::{
http::parse_http_string, http::parse_tcp_stream, http::Request, http::Response,
threadpool::ThreadPool,
};
use std::{
io::prelude::*,
net::{TcpListener, TcpStream},
sync::Arc,
};

use crate::{
http::parse_http_string,
http::parse_tcp_stream,
http::BodyType,
http::Request,
http::{parse_formdata, Response},
ThreadPool,
};

pub struct Server {
pub threadpool: ThreadPool,
pub listener: TcpListener,
Expand All @@ -35,8 +30,8 @@ impl Server {
self.threadpool.execute(move || {
let (mut stream, request) = build_http_request(stream.unwrap());
let response = handler(request);
let response_string = build_http_response_string(response);
stream.write_all(response_string.as_bytes()).unwrap();
let response_bytes = build_http_response_string(response);
stream.write_all(&response_bytes).unwrap();
});
}
}
Expand All @@ -48,36 +43,17 @@ pub fn build_http_request(mut stream: TcpStream) -> (TcpStream, Request) {
return (stream, request);
}

pub fn build_http_response_string(response: Response) -> String {
pub fn build_http_response_string(response: Response) -> Vec<u8> {
let mut res = String::from("HTTP/1.1 ");
let status_code = response.status_code.to_string();
res = format!("{}{}", res, status_code);
res.push_str("\r\n");
for (key, value) in response.headers.unwrap_or_default() {
res = format!("{}{}: {}\r\n", res, key, value);
}
res.push_str("Server: Blitzkrieg\r\n");
res.push_str("\r\n");
res.push_str(&response.body.unwrap_or_default());
let mut res = res.as_bytes().to_owned();
res.append(&mut response.body.unwrap_or_default());
res
}
pub fn print_http_request(request: Request) {
println!("Request path {}", request.path);
println!("Request headers {:?}", request.headers);
println!(
"Request content type {:?}",
request.headers.get("Content-Type").unwrap()
);
match request.body.unwrap() {
BodyType::FormdataBody(body) => {
let formdatafields = body.fields;
let formdatafiles = body.files;
for field in formdatafields.unwrap_or_else(|| vec![]) {
println!("Field name {}, value {}", field.name, field.value);
}
// for field in formdatafiles.unwrap_or_else(|| vec![]) {
// println!("File name {}, content {}", field.name, field.content);
// }
}
BodyType::Text(text) => println!("Raw text {:?}", text), // BodyType::Text(text) => println!("Body is text {text}"),
}
}
58 changes: 58 additions & 0 deletions src/threadpool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
pub workers: Vec<Worker>,
pub sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let (sender, receiver) = mpsc::channel();

let receiver = Arc::new(Mutex::new(receiver));

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}

ThreadPool { workers, sender }
}

pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}

pub struct Worker {
pub id: usize,
pub thread: thread::JoinHandle<()>,
}

impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
job();
});

Worker { id, thread }
}
}

0 comments on commit b3a2572

Please sign in to comment.