Skip to content

Commit

Permalink
Clean up the retrieve and recent to provide serialized data; begin pe…
Browse files Browse the repository at this point in the history
…rsistence
  • Loading branch information
ahopkins committed Mar 5, 2019
1 parent 5ea001e commit 0e3cc24
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 49 deletions.
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ authors = ["Adam Hopkins <admhpkns@gmail.com>"]
edition = "2018"

[dependencies]
tokio = "0.1"
uuid = { version = "0.7", features = ["v5"] }
bincode = "1.1.2"
blob-uuid = "0.3.0"
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_derive = "1.0.27"
tokio = "0.1"
uuid = { version = "0.7", features = ["v5"] }
6 changes: 3 additions & 3 deletions src/lib/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use chrono::{DateTime, Utc};
use serde::Serialize;
use serde::{Serialize, Deserialize};

pub struct Channel {
pub index: Mutex<HashMap<String, usize>>,
Expand All @@ -12,9 +12,9 @@ pub struct Database {
pub channels: Arc<Mutex<HashMap<String, Channel>>>,
}

#[derive(Serialize, Debug, Clone)]
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct Message {
pub uuid: String,
pub uid: String,
pub created: DateTime<Utc>,
pub value: String,
// pub data: String,
Expand Down
59 changes: 34 additions & 25 deletions src/lib/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use crate::lib::state::Message;

pub enum Request {
Push { channel_id: String, value: String },
Retrieve { channel_id: String, uuid: String },
Retrieve { channel_id: String, uid: String },
// Update { channel_id: String, value: String },
Recent { channel_id: String, count: usize},
Recent { channel_id: String, count: usize, offset: usize },
Backup { channel_id: String },
}

pub enum Response {
Expand All @@ -17,18 +18,17 @@ pub enum Response {
Retrieve {
message: Message,
},
Foo {
message: String,
Done {
},
Error {
msg: String,
message: String,
},
}

impl Request {
pub fn parse(input: &str) -> Result<Request, String> {
println!("Incoming: {:?}", &input);
let mut parts = input.splitn(3, " ");
let mut parts = input.splitn(4, " ");
let channel_id = match parts.next() {
Some(channel_id) => channel_id,
None => return Err(format!("PUSH needs a channel_id")),
Expand All @@ -47,10 +47,14 @@ impl Request {
// })
// }
Some("PUSH") => {
let value = match parts.next() {
Some(value) => value,
let temp = match parts.next() {
Some(temp) => temp,
None => return Err(format!("PUSH needs a value")),
};
let value = match parts.next() {
Some(value) => format!("{} {}", temp, value),
None => format!("{}", temp),
};
Ok(Request::Push {
channel_id: channel_id.to_string(),
value: value.to_string(),
Expand All @@ -62,20 +66,30 @@ impl Request {
Some(count) => count,
_ => "5",
};
println!("count {:?}", count);
let offset = match parts.next() {
Some("") => "0",
Some(offset) => offset,
_ => "0",
};
Ok(Request::Recent {
channel_id: channel_id.to_string(),
count: count.parse::<usize>().unwrap()
count: count.parse::<usize>().unwrap(),
offset: offset.parse::<usize>().unwrap(),
})
}
Some("RETRIEVE") => {
let uuid = match parts.next() {
Some(uuid) => uuid,
None => return Err(format!("RETRIEVE needs a uuid")),
let uid = match parts.next() {
Some(uid) => uid,
None => return Err(format!("RETRIEVE needs a uid")),
};
Ok(Request::Retrieve {
channel_id: channel_id.to_string(),
uuid: uuid.to_string(),
uid: uid.to_string(),
})
}
Some("BACKUP") => {
Ok(Request::Backup {
channel_id: channel_id.to_string(),
})
}
Some(cmd) => Err(format!("unknown command: {}", cmd)),
Expand All @@ -87,11 +101,11 @@ impl Request {
impl Response {
pub fn serialize(&self) -> String {
match *self {
Response::Foo { ref message } => {
format!("foo {}", message)
},
// Response::Foo { ref message } => {
// format!("foo {}", message)
// },
Response::Push { ref message } => {
format!("OK {}", message.uuid)
format!("OK {}", message.uid)
},
Response::Recent { ref messages } => {
let serialized = serde_json::to_string(messages).unwrap();
Expand All @@ -101,13 +115,8 @@ impl Response {
let serialized = serde_json::to_string(message).unwrap();
format!("OK {}", serialized)
},
// Response::Value { ref key, ref value } => format!("{} = {}", key, value),
// Response::Set {
// ref key,
// ref value,
// ref previous,
// } => format!("set {} = `{}`, previous: {:?}", key, value, previous),
Response::Error { ref msg } => format!("error: {}", msg),
Response::Done { } => format!("OK Done"),
Response::Error { ref message } => format!("ER {}", message),
}
}
}
82 changes: 63 additions & 19 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,26 @@
extern crate tokio;
extern crate uuid;
extern crate chrono;
extern crate serde_derive;

mod lib;

use bincode::serialize_into;
// use bincode::SizeLimit;
use tokio::io::{lines, write_all};
use tokio::net::TcpListener;
use tokio::prelude::*;
use std::sync::{Mutex};
use std::io::BufReader;
use std::fs::{File, create_dir_all};
use std::io::{BufReader};
use std::env;
use std::net::SocketAddr;
use std::collections::HashMap;
use chrono::{Utc};
use lib::{state, types};
use uuid::Uuid;
use std::cmp;
use blob_uuid;

const MAXIMUM: usize = 10;

Expand All @@ -44,7 +49,7 @@ fn main() -> Result<(), Box<std::error::Error>> {
let responses = lines.map(move |line| {
let request = match types::Request::parse(&line) {
Ok(req) => req,
Err(e) => return types::Response::Error { msg: e },
Err(e) => return types::Response::Error { message: e },
};

match request {
Expand All @@ -64,20 +69,21 @@ fn main() -> Result<(), Box<std::error::Error>> {
let uuid = Uuid::new_v5(
&Uuid::NAMESPACE_DNS,
format!("{:?}-{:?}", channel_id, now.to_rfc3339()).as_bytes()
).to_string();
);
let uid = blob_uuid::to_blob(&uuid).to_string();
let message = state::Message {
uuid: uuid.clone(),
uid: uid.clone(),
created: now,
value,
};
let length = data.len();
data.push(message.clone());
index.insert(uuid.clone(), length);
index.insert(uid.clone(), length);
types::Response::Push {
message,
}
}
types::Request::Recent { channel_id, count } => {
types::Request::Recent { channel_id, count, offset } => {
let channels = db.channels.lock().unwrap();
let _channel = channels.get(&channel_id);
let channel = _channel.unwrap();
Expand All @@ -89,25 +95,63 @@ fn main() -> Result<(), Box<std::error::Error>> {
data.len() - cmp::min(count, MAXIMUM)
}
};
let end: usize = {
if offset > 0 {
if offset + count > data.len() {
return types::Response::Error {
message: "invalid offset".to_string(),
}
} else {
data.len() - offset
}
} else {
0
}
};

println!("index {:?}", index);
let messages = &data[index..];
let messages = match offset {
0 => &data[index..],
// _ => &data[index..],
_ => &data[(index - offset)..end],
};
types::Response::Recent {
messages: messages.to_vec(),
}
}
types::Request::Retrieve { channel_id, uuid } => {
// let channels = db.channels.lock().unwrap();
// let _channel = channels.get(&channel_id);
// let channel = _channel.unwrap();
// let data = channel.data.lock().unwrap();
// let index = channel.index.lock().unwrap();
// let message_index = &index.get(&uuid).unwrap();
// println!("index {:?}", message_index);
// let message = &data[message_index];
types::Response::Foo {
message: "bar".to_string(),
types::Request::Retrieve { channel_id, uid } => {
let channels = db.channels.lock().unwrap();
let _channel = channels.get(&channel_id);
let channel = _channel.unwrap();
let data = channel.data.lock().unwrap();
let index = channel.index.lock().unwrap();
let message = &index.get(&uid);
if let Some(_) = message {
let message_index = message.unwrap();
let message = &data[*message_index];
return types::Response::Retrieve {
message: message.clone(),
}
}
types::Response::Error {
message: "uid not found".to_string(),
}
}
types::Request::Backup { channel_id } => {
let channels = db.channels.lock().unwrap();
let _channel = channels.get(&channel_id);
let channel = _channel.unwrap();
let data = channel.data.lock().unwrap();
let message = &data[0];

let path = format!("/home/adam/Projects/merkava/.data/{}", channel_id);
match create_dir_all(path.clone()) {
Err(e) => return types::Response::Error { message: e.to_string() },
_ => ()
}
let file = format!("{}/{}.mrkv", path, message.uid);
let writer = File::create(file).unwrap();
serialize_into::<File, state::Message>(writer, &message).unwrap();
types::Response::Done {}
}
}
});
Expand Down

0 comments on commit 0e3cc24

Please sign in to comment.