Skip to content

Commit

Permalink
mv server's DocManager to flowy_collaboration crate
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy committed Dec 12, 2021
1 parent d3bfca1 commit 90e3ba1
Show file tree
Hide file tree
Showing 25 changed files with 578 additions and 473 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Expand Up @@ -13,5 +13,5 @@ Cargo.lock
**/target/
**/*.db
.idea/
**/flowy-test/**
**/temp/**
.ruby-version
4 changes: 4 additions & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 9 additions & 93 deletions backend/src/services/doc/editor.rs
Expand Up @@ -3,110 +3,26 @@ use crate::{
web_socket::{entities::Socket, WsMessageAdaptor, WsUser},
};
use actix_web::web::Data;
use backend_service::errors::{internal_error, ServerError};
use dashmap::DashMap;
use backend_service::errors::internal_error;

use flowy_collaboration::{
core::{
document::Document,
sync::{RevisionSynchronizer, RevisionUser, SyncResponse},
},
protobuf::{Doc, UpdateDocParams},
};
use lib_ot::{protobuf::Revision, rich_text::RichTextDelta};
use sqlx::PgPool;
use std::{
convert::TryInto,
sync::{
atomic::{AtomicI64, Ordering::SeqCst},
Arc,
},
core::sync::{RevisionUser, SyncResponse},
protobuf::UpdateDocParams,
};

#[rustfmt::skip]
// ┌──────────────────────┐ ┌────────────┐
// ┌───▶│ RevisionSynchronizer │────▶│ Document │
// │ └──────────────────────┘ └────────────┘
// ┌────────────────┐ │
// │ServerDocEditor │────┤ ┌───────────┐
// └────────────────┘ │ ┌───▶│ WsUser │
// │ │ └───────────┘
// │ ┌────────┐ ┌───────────┐ │ ┌───────────┐
// └───▶│ Users │◆──────│ DocUser ├───┼───▶│ Socket │
// └────────┘ └───────────┘ │ └───────────┘
// │ ┌───────────┐
// └───▶│ PgPool │
// └───────────┘
pub struct ServerDocEditor {
pub doc_id: String,
pub rev_id: AtomicI64,
synchronizer: Arc<RevisionSynchronizer>,
users: DashMap<String, DocUser>,
}

impl ServerDocEditor {
pub fn new(doc: Doc) -> Result<Self, ServerError> {
let delta = RichTextDelta::from_bytes(&doc.data).map_err(internal_error)?;
let users = DashMap::new();
let synchronizer = Arc::new(RevisionSynchronizer::new(
&doc.id,
doc.rev_id,
Document::from_delta(delta),
));

Ok(Self {
doc_id: doc.id.clone(),
rev_id: AtomicI64::new(doc.rev_id),
synchronizer,
users,
})
}

#[tracing::instrument(
level = "debug",
skip(self, user),
fields(
user_id = %user.id(),
rev_id = %rev_id,
)
)]
pub async fn new_doc_user(&self, user: DocUser, rev_id: i64) -> Result<(), ServerError> {
self.users.insert(user.id(), user.clone());
self.synchronizer.new_conn(user, rev_id);
Ok(())
}

#[tracing::instrument(
level = "debug",
skip(self, user, revision),
fields(
cur_rev_id = %self.rev_id.load(SeqCst),
base_rev_id = %revision.base_rev_id,
rev_id = %revision.rev_id,
),
err
)]
pub async fn apply_revision(&self, user: DocUser, mut revision: Revision) -> Result<(), ServerError> {
self.users.insert(user.id(), user.clone());
let revision = (&mut revision).try_into().map_err(internal_error)?;
self.synchronizer.apply_revision(user, revision).unwrap();
Ok(())
}

pub fn document_json(&self) -> String { self.synchronizer.doc_json() }
}
use sqlx::PgPool;
use std::sync::Arc;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct DocUser {
pub user: Arc<WsUser>,
pub(crate) socket: Socket,
pub pg_pool: Data<PgPool>,
}

impl DocUser {
pub fn id(&self) -> String { self.user.id().to_string() }
}

impl RevisionUser for DocUser {
fn user_id(&self) -> String { self.user.id().to_string() }

fn recv(&self, resp: SyncResponse) {
let result = match resp {
SyncResponse::Pull(data) => {
Expand Down
224 changes: 5 additions & 219 deletions backend/src/services/doc/manager.rs
@@ -1,29 +1,13 @@
use crate::{
services::doc::{
editor::{DocUser, ServerDocEditor},
read_doc,
ws_actor::{DocWsActor, DocWsMsg},
},
web_socket::{entities::Socket, WsBizHandler, WsClientData, WsUser},
services::doc::ws_actor::{DocWsActor, DocWsMsg},
web_socket::{WsBizHandler, WsClientData},
};
use actix_web::web::Data;
use async_stream::stream;
use backend_service::errors::{internal_error, Result as DocResult, ServerError};
use dashmap::DashMap;
use flowy_collaboration::protobuf::{Doc, DocIdentifier};
use futures::stream::StreamExt;
use lib_ot::protobuf::Revision;
use flowy_collaboration::core::sync::DocManager;
use sqlx::PgPool;
use std::sync::{atomic::Ordering::SeqCst, Arc};
use tokio::{
sync::{mpsc, oneshot},
task::spawn_blocking,
};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};

#[rustfmt::skip]
// ┌──────────────┐ ┌────────────┐ 1 n ┌───────────────┐
// │ DocumentCore │────▶│ DocManager │─────▶│ OpenDocHandle │
// └──────────────┘ └────────────┘ └───────────────┘
pub struct DocumentCore {
pub manager: Arc<DocManager>,
ws_sender: mpsc::Sender<DocWsMsg>,
Expand Down Expand Up @@ -67,201 +51,3 @@ impl WsBizHandler for DocumentCore {
});
}
}

#[rustfmt::skip]
// ┌────────────┐ 1 n ┌───────────────┐ ┌──────────────────┐ ┌────────────────┐
// │ DocManager │───────▶│ OpenDocHandle │────▶│ DocMessageQueue │───▶│ServerDocEditor │
// └────────────┘ └───────────────┘ └──────────────────┘ └────────────────┘
pub struct DocManager {
open_doc_map: DashMap<String, Arc<OpenDocHandle>>,
}

impl std::default::Default for DocManager {
fn default() -> Self {
Self {
open_doc_map: DashMap::new(),
}
}
}

impl DocManager {
pub fn new() -> Self { DocManager::default() }

pub async fn get(&self, doc_id: &str, pg_pool: Data<PgPool>) -> Result<Option<Arc<OpenDocHandle>>, ServerError> {
match self.open_doc_map.get(doc_id) {
None => {
let params = DocIdentifier {
doc_id: doc_id.to_string(),
..Default::default()
};
let doc = read_doc(pg_pool.get_ref(), params).await?;
let handle = spawn_blocking(|| OpenDocHandle::new(doc, pg_pool))
.await
.map_err(internal_error)?;
let handle = Arc::new(handle?);
self.open_doc_map.insert(doc_id.to_string(), handle.clone());
Ok(Some(handle))
},
Some(ctx) => Ok(Some(ctx.clone())),
}
}
}

pub struct OpenDocHandle {
pub sender: mpsc::Sender<DocMessage>,
}

impl OpenDocHandle {
pub fn new(doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
let (sender, receiver) = mpsc::channel(100);
let queue = DocMessageQueue::new(receiver, doc, pg_pool)?;
tokio::task::spawn(queue.run());
Ok(Self { sender })
}

pub async fn add_user(&self, user: Arc<WsUser>, rev_id: i64, socket: Socket) -> Result<(), ServerError> {
let (ret, rx) = oneshot::channel();
let msg = DocMessage::NewConnectedUser {
user,
socket,
rev_id,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
}

pub async fn apply_revision(
&self,
user: Arc<WsUser>,
socket: Socket,
revision: Revision,
) -> Result<(), ServerError> {
let (ret, rx) = oneshot::channel();
let msg = DocMessage::ReceiveRevision {
user,
socket,
revision,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
}

pub async fn document_json(&self) -> DocResult<String> {
let (ret, rx) = oneshot::channel();
let msg = DocMessage::GetDocJson { ret };
self.send(msg, rx).await?
}

pub async fn rev_id(&self) -> DocResult<i64> {
let (ret, rx) = oneshot::channel();
let msg = DocMessage::GetDocRevId { ret };
self.send(msg, rx).await?
}

pub(crate) async fn send<T>(&self, msg: DocMessage, rx: oneshot::Receiver<T>) -> DocResult<T> {
let _ = self.sender.send(msg).await.map_err(internal_error)?;
let result = rx.await?;
Ok(result)
}
}

#[derive(Debug)]
pub enum DocMessage {
NewConnectedUser {
user: Arc<WsUser>,
socket: Socket,
rev_id: i64,
ret: oneshot::Sender<DocResult<()>>,
},
ReceiveRevision {
user: Arc<WsUser>,
socket: Socket,
revision: Revision,
ret: oneshot::Sender<DocResult<()>>,
},
GetDocJson {
ret: oneshot::Sender<DocResult<String>>,
},
GetDocRevId {
ret: oneshot::Sender<DocResult<i64>>,
},
}

struct DocMessageQueue {
receiver: Option<mpsc::Receiver<DocMessage>>,
edit_doc: Arc<ServerDocEditor>,
pg_pool: Data<PgPool>,
}

impl DocMessageQueue {
fn new(receiver: mpsc::Receiver<DocMessage>, doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
let edit_doc = Arc::new(ServerDocEditor::new(doc)?);
Ok(Self {
receiver: Some(receiver),
edit_doc,
pg_pool,
})
}

async fn run(mut self) {
let mut receiver = self
.receiver
.take()
.expect("DocActor's receiver should only take one time");

let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream.for_each(|msg| self.handle_message(msg)).await;
}

async fn handle_message(&self, msg: DocMessage) {
match msg {
DocMessage::NewConnectedUser {
user,
socket,
rev_id,
ret,
} => {
log::debug!("Receive new doc user: {:?}, rev_id: {}", user, rev_id);
let user = DocUser {
user: user.clone(),
socket: socket.clone(),
pg_pool: self.pg_pool.clone(),
};
let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await);
},
DocMessage::ReceiveRevision {
user,
socket,
revision,
ret,
} => {
let user = DocUser {
user: user.clone(),
socket: socket.clone(),
pg_pool: self.pg_pool.clone(),
};
let _ = ret.send(self.edit_doc.apply_revision(user, revision).await);
},
DocMessage::GetDocJson { ret } => {
let edit_context = self.edit_doc.clone();
let json = spawn_blocking(move || edit_context.document_json())
.await
.map_err(internal_error);
let _ = ret.send(json);
},
DocMessage::GetDocRevId { ret } => {
let rev_id = self.edit_doc.rev_id.load(SeqCst);
let _ = ret.send(Ok(rev_id));
},
}
}
}

0 comments on commit 90e3ba1

Please sign in to comment.