diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..aaae1d6 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,1236 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "anstream" +version = "0.6.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "backtrace" +version = "0.3.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "bitflags" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" + +[[package]] +name = "bumpalo" +version = "3.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + +[[package]] +name = "cc" +version = "1.2.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1599538de2394445747c8cf7935946e3cc27e9625f889d979bfb2aaf569362" +dependencies = [ + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" + +[[package]] +name = "chrono" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", +] + +[[package]] +name = "ciphermq" +version = "1.0.0" +dependencies = [ + "async-trait", + "chrono", + "clap", + "dashmap", + "futures-util", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-rustls", + "toml", + "tracing", + "tracing-subscriber", + "uuid", +] + +[[package]] +name = "clap" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40b6887a1d8685cebccf115538db5c0efe625ccac9696ad45c409d96566e910f" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0c66c08ce9f0c698cbce5c0279d0bb6ac936d8674174fe48f736533b964f59e" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c7947ae4cc3d851207c1adb5b5e260ff0cca11446b1d6d1423788e442257ce" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" + +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.1+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "hashbrown" +version = "0.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "indexmap" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" +dependencies = [ + "equivalent", + "hashbrown 0.15.4", +] + +[[package]] +name = "io-uring" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + +[[package]] +name = "js-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "libc" +version = "0.2.174" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" + +[[package]] +name = "lock_api" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" + +[[package]] +name = "memchr" +version = "2.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" + +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" +dependencies = [ + "libc", + "wasi 0.11.1+wasi-snapshot-preview1", + "windows-sys 0.59.0", +] + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "once_cell_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" + +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + +[[package]] +name = "parking_lot" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "redox_syscall" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d04b7d0ee6b4a0207a0a7adb104d23ecb0b47d6beae7152d0fa34b692b29fd6" +dependencies = [ + "bitflags", +] + +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.16", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" + +[[package]] +name = "rustls" +version = "0.23.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7160e3e10bf4535308537f3c4e1641468cd0e485175d6163087c0393c7d46643" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" +dependencies = [ + "zeroize", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + +[[package]] +name = "rustversion" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" + +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "serde_spanned" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" +dependencies = [ + "serde", +] + +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + +[[package]] +name = "syn" +version = "2.0.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "tokio" +version = "1.46.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" +dependencies = [ + "backtrace", + "bytes", + "io-uring", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "slab", + "socket2", + "tokio-macros", + "windows-sys 0.52.0", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-rustls" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" +dependencies = [ + "rustls", + "tokio", +] + +[[package]] +name = "toml" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "toml_write", + "winnow", +] + +[[package]] +name = "toml_write" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" + +[[package]] +name = "tracing" +version = "0.1.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "uuid" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" +dependencies = [ + "getrandom 0.3.3", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + +[[package]] +name = "windows-result" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "winnow" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74c7b26e3480b707944fc872477815d29a8e429d2f93a1ce000f5fa84a15cbcd" +dependencies = [ + "memchr", +] + +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags", +] + +[[package]] +name = "zeroize" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" diff --git a/Cargo.toml b/Cargo.toml index 4aacb5b..cbd47e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,16 +8,19 @@ tokio = { version = "1.40.0", features = ["full"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" futures-util = "0.3.31" -tracing = "0.1.41" -tracing-subscriber = "0.3.19" dashmap = "6.1.0" uuid = { version = "1.10", features = ["v4"] } thiserror = "2.0.12" -chrono = "0.4" -tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring", "logging"] } +tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"] } rustls = { version = "0.23", default-features = false, features = ["ring", "tls12"] } rustls-pemfile = "2.2.0" -clap = { version = "4.5", features = ["derive"] } async-trait = "0.1" toml = "0.8.19" - +rusqlite = { version = "0.37.0", features = ["bundled"] } +aes-gcm = "0.10.3" +base64 = "0.22.1" +rand = "0.8.5" +x509-parser = "0.16" +chrono = "0.4.38" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" \ No newline at end of file diff --git a/README.md b/README.md index 880d04c..af970b1 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,11 @@ ![GitHub License](https://img.shields.io/badge/license-MIT-blue.svg) ![Rust](https://img.shields.io/badge/Rust-1.75%2B-orange.svg) ![Python](https://img.shields.io/badge/Python-3.8%2B-blue.svg) -**CipherMQ** is a secure, high-performance message broker designed for encrypted message transmission between senders and receivers using a push-based architecture. It leverages **hybrid encryption** (RSA + AES-GCM) for message confidentiality and authenticity, combined with **Mutual TLS (mTLS)** for secure client-server communication. The system ensures **zero message loss** and **exactly-once delivery** through robust acknowledgment mechanisms, with messages temporarily held in memory and routed via exchanges and queues. +**CipherMQ** is a secure, high-performance message broker designed for encrypted message transmission between senders and receivers using a push-based architecture. It leverages **hybrid encryption** (x25519 + AES-GCM-256) for message confidentiality and authenticity, combined with **Mutual TLS (mTLS)** for secure client-server communication. The system ensures **zero message loss** and **exactly-once delivery** through robust acknowledgment mechanisms, with messages temporarily held in memory and routed via exchanges and queues. Public keys are securely stored in an SQLite database with AES-GCM encryption, and receivers register their public keys with the server for secure distribution to senders. -This version introduces **TLS support**, enhancing security for client-server connections. -You can see some benchmarks regarding CipherMQ at [benchmark_report](benchmarks/benchmark_report.md). Initial architecture of CipherMQ is as follows: + +Initial architecture of CipherMQ is as follows: @@ -23,6 +23,8 @@ You can see some benchmarks regarding CipherMQ at [benchmark_report](benchmarks/ + + ## Table of Contents 1. [Features](#features) 2. [Prerequisites](#prerequisites) @@ -39,8 +41,9 @@ You can see some benchmarks regarding CipherMQ at [benchmark_report](benchmarks/ ## Features -- **Mutual TLS (mTLS)**: Secure client-server communication with two-way authentication using X.509 certificates. -- **Hybrid Encryption**: RSA encrypts session keys, and AES-GCM ensures message encryption and authentication. +- **Mutual TLS (mTLS)**: Ensures secure client-server communication with two-way authentication using ECDSA P-384 certificates. +- **Hybrid Encryption**: Utilizes x25519 for session key encryption and AES-GCM-256 for message encryption and authentication. +- **Public Key Registration**: Receivers register their public keys with the server using the `register_key` command, which are securely stored and retrievable by senders via the `get_key` command. - **Zero Message Loss**: Sender retries until server acknowledgment (`ACK `), and server retries delivery until receiver acknowledgment (`ack `). - **Exactly-Once Delivery**: Receiver deduplicates messages using `message_id` to prevent reprocessing. - **Batch Processing**: Sender collects and sends messages in batches, ensuring all queued messages are delivered. @@ -48,7 +51,7 @@ You can see some benchmarks regarding CipherMQ at [benchmark_report](benchmarks/ - **Push-Based Messaging**: Messages are delivered to connected consumers. - **Thread-Safe Data Structures**: Uses `DashMap` for safe multi-threaded operations. - **Flexible Routing**: Supports exchanges and queues with routing keys for efficient message delivery. -- **Clear Acknowledgment Logging**: Both sender and receiver log ACKs for visibility (e.g., `✅ [SENDER] Server ACK received` and `✅ [RECEIVER] Server confirmed ACK`). +- **Secure Key Storage**: Public keys are encrypted with AES-GCM and stored in an SQLite database. @@ -57,7 +60,8 @@ You can see some benchmarks regarding CipherMQ at [benchmark_report](benchmarks/ To run CipherMQ with TLS, you need: - [Rust](https://www.rust-lang.org/): Version 1.56 or higher (for the server). - [Python](https://www.python.org/): Version 3.8 or higher (for Sender and Receiver). -- [Key Generation](https://slproweb.com/products/Win32OpenSSL.html): Use OpenSSL or the provided `RSA.py` script to generate keys. +- Certificates Generation: Use the provided `generate_certs.py` script to generate mTLS certificates. +- Key Generation: Use the provided `key_maker.py` script to generate x25519 key pairs. @@ -74,22 +78,13 @@ cd CipherMQ cargo build --release ``` ### 3. Generate mTLS Certificates -Generate a CA certificate, server certificate, and client certificate for mTLS: +Run the provided `generate_certs.py` script to generate CA certificates, server certificate, and client certificate for mTLS: -> **Note**: Press Enter to skip each question ```bash -mkdir -p certs src/client/certs -cd certs -openssl genrsa -out ca.key 2048 -openssl req -x509 -new -key ca.key -out ca.crt -days 3650 -openssl genrsa -out server.key 2048 -openssl req -new -key server.key -out server.csr -openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 365 -openssl genrsa -out client.key 2048 -openssl req -new -key client.key -out client.csr -openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 365 -rm server.csr client.csr ca.srl -cp ca.crt client.crt client.key ../src/client/certs/ +cd root +pip install cryptography +generate_certs.py + ``` This creates: @@ -101,21 +96,17 @@ This creates: > **Note**: Store `ca.key` securely and do not distribute it. It is only used for certificate generation. +### 4. Generate x25519 Keys - - - -### 4. Generate RSA Keys -Run the provided `RSA.py` script to generate RSA key pairs for hybrid encryption: +Run the provided `key_maker.py` script to generate x25519 key pairs for hybrid encryption for the receiver: ```bash cd src/client -pip install pycryptodome -python RSA.py +python key_maker.py ``` -This creates: -- `receiver_private.pem`: Receiver's private key for decryption. -- `receiver_public.pem`: Public key for sender encryption. +This creates: +- `receiver/certs/receiver_private.key`: Receiver's private key for decryption. +- `receiver/certs/receiver_public.key`: Public key for sender encryption. @@ -124,41 +115,40 @@ This creates: ### Server Configuration Create a `config.toml` file in the `CipherMQ` root directory: ```toml +[server] connection_type = "tls" address = "127.0.0.1:5672" + +[tls] cert_path = "certs/server.crt" key_path = "certs/server.key" ca_cert_path = "certs/ca.crt" + +[database] +dbname = "public_keys.db" + +[encryption] +aes_key = "YOUR_BASE64_ENCODED_32_BYTE_AES_KEY" ``` -### Client Configuration -Create a config.json file in the src/client directory for Sender and Receiver, ensuring the following values are set: -```json -{ - "exchange_name": "ciphermq_exchange", - "queue_name": "ciphermq_queue", - "routing_key": "ciphermq_key", - "server_address": "127.0.0.1", - "server_port": 5672, - "tls": { - "certificate_path": "certs/ca.crt", - "client_cert_path": "certs/client.crt", - "client_key_path": "certs/client.key", - "protocol": "PROTOCOL_TLS_CLIENT", - "verify_mode": "CERT_REQUIRED", - "check_hostname": false - } -} +> **Note**: Replace `YOUR_BASE64_ENCODED_32_BYTE_AES_KEY` with a 32-byte key encoded in base64. Generate it using: +```bash +openssl rand -base64 32 ``` +### Client Configuration + `config.json` file in both `sender/` and `receiver/` directories:` + > **Note**: Ensure `exchange_name`, `queue_name`, and `routing_key` match across Sender, Receiver, and server for proper message routing. -> **Security Note**: Restrict access to `server.key`, `client.key`, and `receiver_private.pem` (e.g., `chmod 600`). +> **Security Note**: Restrict access to `server.key`, `client.key`, and `receiver_private.key (e.g., `chmod 600`). + ## Usage + ### 1. Run the Server -Start the server with mTLS support: +Start the server with TLS support: ```bash cd root cargo run --release @@ -167,29 +157,29 @@ cargo run --release ### 2. Run the Receiver Start the receiver to subscribe to messages: ```bash -cd src/client +cd src/client/receiver python Receiver.py ``` -The receiver connects to the server via TLS, declares and binds to `ciphermq_queue`, decrypts messages, and saves them to `received_messages.jsonl`. +The receiver connects to the server via TLS, registers its public key using the `register_key` command, declares and binds to `my_queue`, decrypts messages, and saves them to `data/received_messages.jsonl`. ### 3. Run the Sender -Send encrypted messages in batches: + ```bash -cd src/client +cd src/client/sender python Sender.py ``` -The sender encrypts messages using hybrid encryption, sends them in batches via `ciphermq_exchange` and `ciphermq_key`, and retries until acknowledgment. - +The sender fetches the receiver's public key using the `get_key` command, encrypts messages using hybrid encryption, sends them in batches via `my_exchange` and `my_key`, and retries until acknowledgment. ## Architecture CipherMQ is a message broker system with the following components: -- **Server** (`main.rs`, `server.rs`, `connection.rs`, `state.rs`, `config.rs`, `auth.rs`): A Rust-based broker that handles mTLS connections, message routing, and delivery using exchanges and queues. -- **Sender** (`Sender.py`): Encrypts messages with hybrid encryption (RSA + AES-GCM), sends them in batches, and ensures delivery with retries. -- **Receiver** (`Receiver.py`): Receives, decrypts, deduplicates, and stores messages in JSONL format, with acknowledgment retries. +- **Server** (`main.rs`, `server.rs`, `connection.rs`, `state.rs`, `config.rs`, `auth.rs`, `storage.rs`): A Rust-based broker that handles mTLS connections, message routing, and delivery using exchanges and queues. Public keys are encrypted with AES-GCM and stored in an SQLite database. The server supports the `register_key` command to store receiver public keys and the `get_key` command to provide them to senders. +- **Sender** (`sender.py`): Fetches receiver public keys using `get_key`, encrypts messages with hybrid encryption (x25519 + AES-GCM-256), sends them in batches, and ensures delivery with retries. +- **Receiver** (`receiver.py`): Registers its public key with the server using `register_key`, receives, decrypts, deduplicates, and stores messages in JSONL format, with acknowledgment retries. - **mTLS Integration** (`auth.rs`, `connection.rs`): Supports secure two-way authentication using `tokio-rustls` and `WebPkiClientVerifier`. -- **Hybrid Encryption**: Combines RSA for session key encryption and AES-GCM for message encryption and authentication. +- **Hybrid Encryption**: Combines x25519 for session key encryption and AES-GCM-256 for message encryption and authentication. +- **Key Storage** (`storage.rs`): Public keys are encrypted with AES-GCM and stored in an SQLite database, accessible via `register_key` and `get_key` commands. For a detailed architecture overview, see [CipherMQ Project Architecture](docs/Project_Architecture.md). @@ -197,16 +187,19 @@ For a detailed architecture overview, see [CipherMQ Project Architecture](docs/P ## Diagrams The following diagrams, located in `docs/diagrams`, illustrate CipherMQ's architecture and mTLS flow: -- **[Sequence Diagram](docs/diagrams/Sequence_diagram.png)**: Shows the end-to-end message flow, including mTLS handshakes. -- **[Activity Diagram](docs/diagrams/Activity_Diagram.png)**: Details the operational flow, including mTLS connection setup. +- **[Sequence Diagram](docs/diagrams/Sequence_diagram.png)**: Shows the end-to-end message flow, including mTLS handshakes, public key registration, and hybrid encryption. +- **[Activity Diagram](docs/diagrams/Activity_Diagram.png)**: Details the operational flow, including mTLS connection setup, key registration, and message processing. ## Future Improvements -- Support standard protocols like AMQP or MQTT. -- Enable distributed server scaling. -- Replacing OpenSSL commands with specialized Rust libraries for generating keys and certificates. -- For generating a key to migrate from 2048-bit RSA to ECDSA(P-384) +- Support standard protocols like AMQP or MQTT for broader compatibility. +- Enable distributed server scaling for high availability. +- Replacing Python Script with specialized Rust libraries for generating keys and certificates. - Implement certificate rotation and CRL/OCSP for enhanced security. - Add support for Hardware Security Modules (HSM) for key management. +- Add persistent message storage to handle server restarts. +- Enhance monitoring with structured logging (optional reintroduction). + + ## Contributing Contributions are welcome! Please: diff --git a/config.toml b/config.toml index 7ec3229..c6efddc 100644 --- a/config.toml +++ b/config.toml @@ -1,8 +1,15 @@ -connection_type = "tls" +[server] address = "127.0.0.1:5672" +connection_type = "tls" - -cert_path = "certs/server.crt" -key_path = "certs/server.key" +[tls] +cert_path = "certs/server.crt" +key_path = "certs/server.key" ca_cert_path = "certs/ca.crt" +[database] +dbname = "public_keys.db" + +[encryption] +algorithm = "x25519_chacha20_poly1305" +aes_key = "YOUR_BASE64_ENCODED_32_BYTE_AES_KEY" \ No newline at end of file diff --git a/docs/Project_Architecture.md b/docs/Project_Architecture.md index af50808..724a9b2 100644 --- a/docs/Project_Architecture.md +++ b/docs/Project_Architecture.md @@ -1,34 +1,47 @@ # CipherMQ Project Architecture: A Secure Message Broker with mTLS and Guaranteed Delivery + + ## 1. Introduction -**CipherMQ** is a high-performance, secure message broker designed to transmit encrypted messages between senders and receivers using a push-based architecture. It ensures **zero message loss** and **exactly-once delivery** through robust acknowledgment mechanisms, with messages temporarily held in memory (except for logs and receiver output). The system leverages **hybrid encryption** (RSA + AES-GCM) for message confidentiality and authenticity and **Mutual Transport Layer Security (mTLS)** for secure client-server communication with two-way authentication. -This version introduces **mTLS support**, enhancing transport-layer security with client and server certificate verification, as implemented in `auth.rs`, `connection.rs`, and `main.rs`, with configuration via `config.toml` and `config.json`. The project consists of: +**CipherMQ** is a high-performance, secure message broker designed to transmit encrypted messages between senders and receivers using a push-based architecture. It ensures **zero message loss** and **exactly-once delivery** through robust acknowledgment mechanisms, with messages temporarily held in memory (except for receiver output). The system leverages **hybrid encryption** (x25519 + AES-GCM-256) for message confidentiality and authenticity and **Mutual Transport Layer Security (mTLS)** for secure client-server communication with two-way authentication. Public keys are securely stored in an SQLite database with AES-GCM encryption, and receivers register their public keys with the server for secure distribution to senders. + +This version introduces **mTLS support** and **timestamped console logs**, enhancing transport-layer security and monitoring. Logging is simplified to console output with timestamps (e.g., `[2025-07-14 10:05:00] [SENDER] Message sent`). The project consists of: + - **Server** (`main.rs`): A Rust-based message broker for routing and delivering messages over mTLS. -- **Sender** (`Sender.py`): A Python script that encrypts and sends messages in batches with retry logic. -- **Receiver** (`Receiver.py`): A Python script that receives, decrypts, deduplicates, and stores messages. +- **Sender** (`sender.py`): A Python script that fetches receiver public keys, encrypts, and sends messages in batches with retry logic. +- **Receiver** (`receiver.py`): A Python script that registers its public key, receives, decrypts, deduplicates, and stores messages. + +This document provides a comprehensive overview of the architecture, covering server, clients, mTLS, encryption, key distribution, and acknowledgment mechanisms. + -This document provides a comprehensive overview of the architecture, covering server, clients, mTLS, encryption, and acknowledgment mechanisms. ## 2. Architecture Overview + CipherMQ operates as a message broker with **queues** and **exchanges**, supporting mTLS connections via a text-based protocol. Key features include: + - **Mutual TLS (mTLS)**: Secures client-server communication with two-way authentication using `tokio-rustls` (server) and Python’s `ssl` module (clients), configurable via `config.toml` and `config.json`. -- **Hybrid Encryption**: Combines RSA for session key encryption and AES-GCM for message encryption and authentication. +- **Hybrid Encryption**: Combines x25519 for session key encryption and AES-GCM-256 for message encryption and authentication. +- **Public Key Distribution**: Receivers register public keys using `register_key`, and senders retrieve them using `get_key`, with keys stored securely in SQLite. - **Zero Message Loss**: Sender and server retries with acknowledgments ensure reliable delivery. - **Exactly-Once Delivery**: Receiver deduplicates messages using `message_id`. - **Asynchronous Processing**: Built with Tokio for concurrent, high-performance connection handling. - **Thread-Safe Data Structures**: Uses `DashMap` for multi-threaded operations. - **Flexible Routing**: Supports exchanges, queues, and routing keys. -- **Clear Logging**: Detailed logs for ACKs (e.g., `✅ [SENDER] Server ACK received`, `✅ [RECEIVER] Server confirmed ACK`). +- **Timestamped Console Logs**: Console output with timestamps for monitoring (e.g., `[2025-07-14 10:05:00] [RECEIVER] Decrypted message`). The architecture is illustrated in the [Sequence Diagram](diagrams/Sequence_diagram.png) and [Activity Diagram](diagrams/Activity_Diagram.png) in `docs/diagrams`. + + ## 3. Architectural Components -### 3.1. Server (`main.rs`, `server.rs`, `connection.rs`, `state.rs`, `config.rs`, `auth.rs`) -The server is the core of CipherMQ, managing message routing, delivery, and secure connections with mTLS. +### 3.1. Server (`main.rs`, `server.rs`, `connection.rs`, `state.rs`, `config.rs`, `auth.rs`, `storage.rs`) + +The server is the core of CipherMQ, managing message routing, delivery, secure connections with mTLS, and public key storage. #### 3.1.1. Data Structures (`state.rs`) + - **ServerState**: - `queues: DashMap>`: Stores messages for each queue. - `bindings: DashMap>`: Maps exchanges to queues. @@ -39,21 +52,25 @@ The server is the core of CipherMQ, managing message routing, delivery, and secu - `connected_clients: Arc>`: Tracks active client count. - **EncryptedInputData**: - `message_id: String`: Unique UUID for deduplication. - - `enc_session_key: String`: RSA-encrypted session key (base64-encoded). - - `nonce: String`: AES-GCM nonce (base64-encoded). - - `tag: String`: AES-GCM authentication tag (base64-encoded). + - `receiver_client_id: String`: Target receiver identifier. + - `enc_session_key: String`: x25519-encrypted session key (base64-encoded). + - `nonce: String`: AES-GCM nonce (12 bytes, base64-encoded). + - `tag: String`: AES-GCM authentication tag (16 bytes, base64-encoded). - `ciphertext: String`: Encrypted message content (base64-encoded). - **MessageStatus**: - `sent_time: Option`: Time message was queued. - `delivered_time: Option`: Time message was delivered. - `acknowledged_time: Option`: Time message was acknowledged. -#### 3.1.2. Key Server Functions (`server.rs`, `state.rs`) +#### 3.1.2. Key Server Functions (`server.rs`, `state.rs`, `storage.rs`) + - **declare_queue**: Creates a new queue in `queues`. - **declare_exchange**: Creates a new exchange in `exchanges`. - **bind_queue**: Binds a queue to an exchange with a routing key, stored in `bindings`. - **publish**: Queues messages, sends `ACK ` to sender, and delivers to consumers via `consumers`. Checks for duplicate `message_id`s. - **publish_batch**: Processes a batch of messages, sending individual `ACK ` responses. +- **register_key**: Stores a receiver’s public key in SQLite with AES-GCM encryption, identified by `client_id`. +- **get_key**: Retrieves a receiver’s public key from SQLite for a sender. - **register_consumer**: Registers a consumer channel for push-based delivery. - **consume**: Retrieves messages manually (pull mode). - **acknowledge**: Marks message as `Acknowledged`, removes it from `queues`, and sends `ACK_CONFIRMED ` to receiver. @@ -61,7 +78,8 @@ The server is the core of CipherMQ, managing message routing, delivery, and secu - **reset_stats**: Clears `message_status` and `request_times`. #### 3.1.3. Connection Management (`connection.rs`, `main.rs`, `auth.rs`) -- **Protocol**: Text-based protocol over mTLS (e.g., `publish `). + +- **Protocol**: Text-based protocol over mTLS (e.g., `publish_batch `). - **mTLS Support**: - Uses `tokio-rustls` with `WebPkiClientVerifier` for two-way authentication (`auth.rs`). - Loads server certificates (`server.crt`, `server.key`) and CA certificate (`ca.crt`) for client verification via `config.toml`. @@ -70,113 +88,171 @@ The server is the core of CipherMQ, managing message routing, delivery, and secu - **Acknowledgment**: - Sends `ACK ` to sender after queuing. - Sends `ACK_CONFIRMED ` to receiver after acknowledgment. -- **Error Handling**: Logs errors via `tracing` (e.g., `TLS handshake failed`) and sends error messages to clients. +- **Error Handling**: Logs errors via `tracing` (e.g., `[2025-07-14 10:05:00] TLS handshake failed`) and sends error messages to clients. -### 3.2. Sender (`Sender.py`) -The sender encrypts and sends messages in batches with retry logic. +### 3.2. Sender (`sender.py`) + +The sender fetches receiver public keys, encrypts, and sends messages in batches with retry logic. #### 3.2.1. Key Components -- **encrypt_message_hybrid**: - - Generates a 16-byte session key, encrypts it with RSA (`PKCS1_OAEP`, 2048-bit key from `receiver_public.pem`), and encrypts the message with AES-GCM. - - Outputs JSON with `message_id` (UUID), `enc_session_key`, `nonce`, `tag`, and `ciphertext` (all base64-encoded). + +- **get_public_key**: + - Sends `get_key ` to server to retrieve the receiver’s public key, stored locally as `certs/_public.key`. +- **encrypt_message**: + - Generates a 32-byte session key, encrypts it with x25519 (`SealedBox` from `pynacl`), and encrypts the message with AES-GCM-256. + - Outputs JSON with `message_id` (UUID), `receiver_client_id`, `enc_session_key`, `nonce`, `tag`, and `ciphertext` (all base64-encoded). - **send_message_batch**: - - Sends a batch of messages, retries up to 3 times (5-second timeout) until `ACK ` is received for each message. + - Sends a batch of messages, retries up to 3 times (10-second timeout) until `ACK ` is received for each message. - Stores messages in `pending_messages` until acknowledged. - - Logs: `✅ [SENDER] Server ACK received for message `. -- **collect_and_send_messages**: - - Collects messages into batches (up to 10 messages or 1-second timeout). - - Ensures all queued messages in `message_queue` are sent. + - Logs: `[2025-07-14 10:05:00] [SENDER] Server ACK received for message `. +- **encrypt_message_batch**: + - Collects messages into batches (up to 2 messages by default). + - Ensures all queued messages in `message_queue` are encrypted and sent. - **mTLS Support**: - Uses `ssl.SSLContext` with `PROTOCOL_TLS_CLIENT`, loads `ca.crt` for server verification, and `client.crt`, `client.key` for client authentication. - Disables hostname verification for self-signed certificates (`check_hostname=False`). - **Configuration**: - - Reads `config.json` for `exchange_name`, `routing_key`, `server_address`, `server_port`, `certificate_path`, `client_cert_path`, and `client_key_path`. + - Reads `sender_config.json` for `receiver_client_ids`, `exchange_name`, `routing_key`, `server_address`, `server_port`, `certificate_path`, `client_cert_path`, and `client_key_path`. #### 3.2.2. Architectural Features + - **Reliability**: Retries and batching ensure no message loss. - **Security**: mTLS secures transport; hybrid encryption protects messages. -- **Logging**: Logs encryption, sending, batching, and ACK receipt. +- **Logging**: Console logs with timestamps for encryption, sending, batching, and ACK receipt. - **Deduplication**: Uses UUID to prevent server-side duplicates. -### 3.3. Receiver (`Receiver.py`) -The receiver retrieves, decrypts, and stores messages. +### 3.3. Receiver (`receiver.py`) + +The receiver registers its public key, retrieves, decrypts, and stores messages. #### 3.3.1. Key Components + +- **register_public_key**: + - Sends `register_key receiver_1 ` to server to store the public key in SQLite with AES-GCM encryption. - **decrypt_message_hybrid**: - - Decrypts session key with RSA private key (`PKCS1_OAEP`, `receiver_private.pem`) and message with AES-GCM. + - Decrypts session key with x25519 private key (`SealedBox` from `pynacl`) and message with AES-GCM-256. - Verifies integrity using `tag`. - **receive_messages**: - Subscribes to a queue, receives messages, and deduplicates using `processed_messages` set. - - Sends `ack ` with retries (3 attempts, 5-second timeout) until `ACK_CONFIRMED ` is received. - - Logs: `✅ [RECEIVER] Server confirmed ACK for message `. + - Sends `ack ` with retries (3 attempts, 10-second timeout) until `ACK_CONFIRMED ` is received. + - Logs: `[2025-07-14 10:05:00] [RECEIVER] Server confirmed ACK for message `. - **process_messages**: - - Stores decrypted messages in `received_messages.jsonl` with `message_id` and timestamp. + - Stores decrypted messages in `data/received_messages.jsonl` with `message_id` and timestamp. - Batches writes (up to 10 messages) for efficiency. - **signal_handler**: Ensures graceful shutdown on SIGINT. - **mTLS Support**: - Uses `ssl.SSLContext` to load `ca.crt` for server verification and `client.crt`, `client.key` for client authentication. - - Logs cipher (e.g., `Cipher: TLS_AES_256_GCM_SHA384`). + - Logs cipher (e.g., `[2025-07-14 10:05:00] [RECEIVER] Cipher: TLS_AES_256_GCM_SHA384`). - **Configuration**: - - Reads `config.json` for `queue_name`, `exchange_name`, `routing_key`, `server_address`, `server_port`, `certificate_path`, `client_cert_path`, and `client_key_path`. + - Reads `receiver_config.json` for `queue_name`, `exchange_name`, `routing_key`, `server_address`, `server_port`, `certificate_path`, `client_cert_path`, and `client_key_path`. #### 3.3.2. Architectural Features + - **Deduplication**: Prevents reprocessing using `processed_messages`. - **Reliability**: Retries ACKs to ensure server cleanup. - **Security**: mTLS and hybrid encryption ensure secure transport and message integrity. -- **Logging**: Logs receipt, decryption, ACKs, and file writes. +- **Logging**: Console logs with timestamps for receipt, decryption, ACKs, and file writes. ### 3.4. Hybrid Encryption -- **RSA**: Encrypts a 16-byte session key with `PKCS1_OAEP` (2048-bit key). -- **AES-GCM**: Encrypts message, generates `nonce` (12 bytes) and `tag` (16 bytes) for authentication. + +- **x25519**: Encrypts a 32-byte session key with `SealedBox` (using `receiver_public.key`). +- **AES-GCM-256**: Encrypts message, generates `nonce` (12 bytes) and `tag` (16 bytes) for authentication. - **Process**: - 1. Sender generates session key, encrypts it with receiver’s public key (`receiver_public.pem`). - 2. Message is encrypted with AES-GCM, producing `ciphertext`, `nonce`, and `tag`. - 3. Receiver decrypts session key with private key (`receiver_private.pem`) and message with AES-GCM, verifying `tag`. + 1. Sender retrieves receiver’s public key using `get_key `. + 2. Sender generates session key, encrypts it with x25519, and encrypts message with AES-GCM-256, producing `ciphertext`, `nonce`, and `tag`. + 3. Receiver decrypts session key with private key (`receiver_private.key`) and message with AES-GCM-256, verifying `tag`. + +### 3.5. Public Key Distribution + +- **Receiver**: + + - Generates x25519 key pair (`receiver_private.key`, `receiver_public.key`). + - Sends `register_key receiver_1 ` to server upon connection. + - Server stores public key in SQLite with AES-GCM encryption. + +- **Sender**: + + - Sends `get_key receiver_1` to retrieve receiver’s public key. + - Stores public key locally as `certs/receiver_1_public.key` for encryption. + +- **Security**: + + - Public keys are encrypted in SQLite to prevent unauthorized access. + + - mTLS ensures only authenticated clients can register or retrieve keys. + + ## 4. Component Interactions + 1. **Key and Certificate Generation**: + - OpenSSL generates `ca.crt`, `server.crt`, `server.key`, `client.crt`, and `client.key` for mTLS. - - OpenSSL generates `receiver_public.pem` and `receiver_private.pem` for hybrid encryption. -2. **Sender to Server**: + - Python script generates `receiver_public.key` and `receiver_private.key` for hybrid encryption. + +2. **Receiver to Server**: + + - Receiver authenticates with `client.crt` and `client.key`, verifies server with `ca.crt`. + - Receiver sends `register_key receiver_1 ` to store its public key. + - Receiver declares `my_queue`, binds to `my_exchange` with `my_key`, and subscribes with `consume`. + +3. **Sender to Server**: + - Sender authenticates with `client.crt` and `client.key`, verifies server with `ca.crt`. - - Sender encrypts messages, sends batches to `ciphermq_exchange` with `ciphermq_key`. + - Sender sends `get_key receiver_1` to retrieve receiver’s public key. + - Sender encrypts messages, sends batches to `my_exchange` with `my_key`. - Server queues messages, sends `ACK ` for each. - - Sender logs: `✅ [SENDER] Server ACK received for message `. -3. **Server to Receiver**: - - Receiver authenticates with `client.crt` and `client.key`, verifies server with `ca.crt`. - - Server routes messages to `ciphermq_queue` and pushes to consumers. - - Receiver deduplicates, decrypts, and stores messages in `received_messages.jsonl`. - - Receiver sends `ack `, logs: `📤 [RECEIVER] Sending ACK for message `. - - Server sends `ACK_CONFIRMED `, receiver logs: `✅ [RECEIVER] Server confirmed ACK`. -4. **Message Removal**: + - Sender logs: `[2025-07-14 10:05:00] [SENDER] Server ACK received for message `. + +4. **Server to Receiver**: + + - Server routes messages to `my_queue` and pushes to consumers. + - Receiver deduplicates, decrypts, and stores messages in `data/received_messages.jsonl`. + - Receiver sends `ack `, logs: `[2025-07-14 10:05:00] [RECEIVER] Sending ACK for message `. + - Server sends `ACK_CONFIRMED `, receiver logs: `[2025-07-14 10:05:00] [RECEIVER] Server confirmed ACK`. + +5. **Message Removal**: + - Server removes message from `queues` after `ack `. - - Logs: `Message acknowledged by client, removed from queues and status`. + + - Logs: `[2025-07-14 10:05:00] Message acknowledged by client, removed from queues and status`. + + ## 5. Acknowledgment Mechanism + - **Sender-Server ACK**: + - Server sends `ACK ` after queuing message. - - Sender retries up to 3 times (5-second timeout) until ACK is received. + - Sender retries up to 3 times (10-second timeout) until ACK is received. - Sender removes message from `pending_messages` upon ACK. - - Logged: `✅ [SENDER] Server ACK received`. + - Logged: `[2025-07-14 10:05:00] [SENDER] Server ACK received`. + - **Receiver-Server ACK**: + - Receiver sends `ack ` after processing message. - Server sends `ACK_CONFIRMED ` and removes message from `queues` and `message_status`. - - Receiver retries up to 3 times (5-second timeout). - - Logged: `✅ [RECEIVER] Server confirmed ACK`. + - Receiver retries up to 3 times (10-second timeout). + - Logged: `[2025-07-14 10:05:00] [RECEIVER] Server confirmed ACK`. + - **Independence**: Sender and receiver ACKs are decoupled for modularity. + - **Retry Logic**: Server does not retry delivery; clients handle retries. + + ## 6. Logging and Verification + - **Sender**: - - Logs encryption, batching, sending, ACK receipt, and removal from `pending_messages`. - - Example: `📤 [SENDER] Message added to pending messages`. + - Logs encryption, public key retrieval, batching, sending, ACK receipt, and removal from `pending_messages` with timestamps. + - Example: `[2025-07-14 10:05:00] [SENDER] Encrypted message for receiver_1`. - **Receiver**: - - Logs message receipt, decryption, ACK sending, confirmation, and storage in `received_messages.jsonl`. - - Example: `✅ [RECEIVER] Decrypted message : `. + - Logs public key registration, message receipt, decryption, ACK sending, confirmation, and storage in `data/received_messages.jsonl` with timestamps. + - Example: `[2025-07-14 10:05:00] [RECEIVER] Decrypted message : `. - **Server**: - - Logs via `tracing` for connections, message statuses, and errors. - - Example: `Published message to queue 'ciphermq_key' via exchange 'ciphermq_exchange'`. + - Logs via `tracing` for connections, key registration, message statuses, and errors with timestamps. + - Example: `[2025-07-14 10:05:00] Published message to queue 'my_key' via exchange 'my_exchange'`. - **Verification**: - Check `pending_messages` (sender) for unacknowledged messages. - - Check `received_messages.jsonl` (receiver) for processed messages. - - Check server logs for `Acknowledged` status or errors (e.g., `TLS handshake failed`). \ No newline at end of file + - Check `data/received_messages.jsonl` (receiver) for processed messages. + - Check server logs for `Acknowledged` status or errors (e.g., `[2025-07-14 10:05:00] TLS handshake failed`). \ No newline at end of file diff --git a/docs/diagrams/Activity_Diagram.png b/docs/diagrams/Activity_Diagram.png index 81b0113..ff1cb0e 100644 Binary files a/docs/diagrams/Activity_Diagram.png and b/docs/diagrams/Activity_Diagram.png differ diff --git a/docs/diagrams/Diagram.png b/docs/diagrams/Diagram.png index 8aa07e4..65e6078 100644 Binary files a/docs/diagrams/Diagram.png and b/docs/diagrams/Diagram.png differ diff --git a/docs/diagrams/Sequence_diagram.png b/docs/diagrams/Sequence_diagram.png index e101402..07bd8ae 100644 Binary files a/docs/diagrams/Sequence_diagram.png and b/docs/diagrams/Sequence_diagram.png differ diff --git a/generate_certs.py b/generate_certs.py new file mode 100644 index 0000000..7a0c32e --- /dev/null +++ b/generate_certs.py @@ -0,0 +1,115 @@ +import os +from cryptography import x509 +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.hashes import SHA384 +from cryptography.x509.oid import NameOID +from datetime import datetime, timedelta +from pathlib import Path + +def generate_key_pair(): + """Generate an ECDSA P-384 key pair.""" + private_key = ec.generate_private_key(curve=ec.SECP384R1()) + public_key = private_key.public_key() + return private_key, public_key + +def create_ca_cert(): + """Create CA certificate with CN=CipherMQ-CA.""" + private_key, public_key = generate_key_pair() + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, "CipherMQ-CA") + ]) + cert_builder = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.utcnow()) + .not_valid_after(datetime.utcnow() + timedelta(days=3650)) + .add_extension( + x509.BasicConstraints(ca=True, path_length=None), critical=True + ) + ) + ca_cert = cert_builder.sign(private_key, SHA384()) + return private_key, ca_cert + +def create_cert(subject_cn, ca_cert, ca_key, is_server=False): + """Create a server or client certificate signed by the CA.""" + private_key, public_key = generate_key_pair() + subject = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, subject_cn) + ]) + cert_builder = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(ca_cert.subject) + .public_key(public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.utcnow()) + .not_valid_after(datetime.utcnow() + timedelta(days=365)) + ) + if is_server: + cert_builder = cert_builder.add_extension( + x509.SubjectAlternativeName([x509.DNSName("localhost")]), + critical=False + ) + cert = cert_builder.sign(ca_key, SHA384()) + return private_key, cert + +def save_pem_file(path, data, is_private_key=False): + """Save data to a PEM file with appropriate permissions.""" + mode = 0o600 if is_private_key else 0o644 + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "wb") as f: + f.write(data) + os.chmod(path, mode) + +def main(): + # Define directories + certs_dir = Path("certs") + sender_certs_dir = Path("src/client/sender/certs") + receiver_certs_dir = Path("src/client/receiver/certs") + + # Create directories + certs_dir.mkdir(exist_ok=True) + sender_certs_dir.mkdir(parents=True, exist_ok=True) + receiver_certs_dir.mkdir(parents=True, exist_ok=True) + + # Generate CA certificate + ca_key, ca_cert = create_ca_cert() + save_pem_file(certs_dir / "ca.key", ca_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ), is_private_key=True) + save_pem_file(certs_dir / "ca.crt", ca_cert.public_bytes(serialization.Encoding.PEM)) + + # Generate server certificate + server_key, server_cert = create_cert("CipherMQ-Server", ca_cert, ca_key, is_server=True) + save_pem_file(certs_dir / "server.key", server_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ), is_private_key=True) + save_pem_file(certs_dir / "server.crt", server_cert.public_bytes(serialization.Encoding.PEM)) + + # Generate client certificates for Sender and Receiver + client_cns = [ + ("CipherMQ-Sender", sender_certs_dir), + ("CipherMQ-Receiver", receiver_certs_dir) + ] + for cn, dest_dir in client_cns: + client_key, client_cert = create_cert(cn, ca_cert, ca_key) + save_pem_file(dest_dir / "client.key", client_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ), is_private_key=True) + save_pem_file(dest_dir / "client.crt", client_cert.public_bytes(serialization.Encoding.PEM)) + save_pem_file(dest_dir / "ca.crt", ca_cert.public_bytes(serialization.Encoding.PEM)) + + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] Client certificate created with CN = '{cn}' in {dest_dir}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/client/key_maker.py b/src/client/key_maker.py new file mode 100644 index 0000000..b077121 --- /dev/null +++ b/src/client/key_maker.py @@ -0,0 +1,18 @@ +from nacl.public import PrivateKey +from base64 import b64encode +import os + +certs_dir = "receiver/certs" + +os.makedirs(certs_dir, exist_ok=True) + +private_key = PrivateKey.generate() +public_key = private_key.public_key + +with open(os.path.join(certs_dir, "receiver_private.key"), "w") as f: + f.write(b64encode(bytes(private_key)).decode("utf-8")) + +with open(os.path.join(certs_dir, "receiver_public.key"), "w") as f: + f.write(b64encode(bytes(public_key)).decode("utf-8")) + +print("x25519 key pair generated in certs/") diff --git a/src/client/receiver/Receiver.py b/src/client/receiver/Receiver.py new file mode 100644 index 0000000..759b22d --- /dev/null +++ b/src/client/receiver/Receiver.py @@ -0,0 +1,323 @@ +import asyncio +import json +import signal +import ssl +import sys +import os +import time +from datetime import datetime +from base64 import b64decode, b64encode +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from cryptography.hazmat.primitives import serialization +from nacl.public import PrivateKey, SealedBox + +# Extract client_id from client certificate +def extract_client_id(tls_config): + try: + with open(tls_config["client_cert_path"], "rb") as cert_file: + cert_data = cert_file.read() + cert = x509.load_pem_x509_certificate(cert_data, default_backend()) + cn = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME) + if not cn: + raise ValueError("No Common Name found in client certificate") + return cn[0].value + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] ❌ [RECEIVER] Error extracting client_id from certificate: {e}") + sys.exit(1) + +# Load configuration +try: + os.makedirs("certs", exist_ok=True) + with open("config.json", "r") as config_file: + config = json.load(config_file) + QUEUE_NAME = config["queue_name"] + EXCHANGE_NAME = config["exchange_name"] + ROUTING_KEY = config["routing_key"] + SERVER_ADDRESS = config["server_address"] + SERVER_PORT = config["server_port"] + TLS_CONFIG = config["tls"] + CLIENT_ID = extract_client_id(TLS_CONFIG) + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Extracted client_id from certificate: {CLIENT_ID}") +except FileNotFoundError: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] ❌ [RECEIVER] Configuration file 'config - receiver.json' not found.") + sys.exit(1) +except KeyError as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] ❌ [RECEIVER] Missing key in configuration file: {e}") + sys.exit(1) + +# Thread-safe queues +message_queue = asyncio.Queue() +temp_message_queue = asyncio.Queue() +running = True +processed_messages = set() + +# Load private and public keys +try: + with open("certs/receiver_private.key", "r") as key_file: + private_key_bytes = b64decode(key_file.read()) + PRIVATE_KEY = PrivateKey(private_key_bytes) + with open("certs/receiver_public.key", "r") as key_file: + PUBLIC_KEY = PrivateKey(private_key_bytes).public_key +except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] ❌ [RECEIVER] Error loading keys: {e}") + sys.exit(1) + +# Configure SSL context for mTLS +ssl_context = ssl.SSLContext(getattr(ssl, TLS_CONFIG["protocol"])) +ssl_context.load_verify_locations(TLS_CONFIG["certificate_path"]) +ssl_context.load_cert_chain( + certfile=TLS_CONFIG["client_cert_path"], + keyfile=TLS_CONFIG["client_key_path"] +) +ssl_context.verify_mode = getattr(ssl, TLS_CONFIG["verify_mode"]) +ssl_context.check_hostname = TLS_CONFIG["check_hostname"] + +# Register public key with server +async def register_public_key(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + public_key_b64 = b64encode(PUBLIC_KEY._public_key).decode('utf-8') + command = f"register_key {CLIENT_ID} {public_key_b64}\n" + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Sending command: {command.strip()}") + writer.write(command.encode('utf-8')) + await writer.drain() + response = (await reader.readline()).decode('utf-8').strip() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Server response for public key registration: {response}") + if response != "Public key registered": + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Failed to register public key") + return False + return True + +# Decrypt message using private key with hybrid encryption +async def decrypt_message_hybrid(data: dict) -> str: + try: + required_keys = {"enc_session_key", "nonce", "tag", "ciphertext", "message_id"} + if not required_keys.issubset(data.keys()): + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Incomplete message for hybrid decryption: {data}") + return None + enc_session_key = b64decode(data['enc_session_key']) + nonce = b64decode(data['nonce']) + tag = b64decode(data['tag']) + ciphertext = b64decode(data['ciphertext']) + sealed_box = SealedBox(PRIVATE_KEY) + session_key = sealed_box.decrypt(enc_session_key) + cipher = AESGCM(session_key) + encrypted_bytes = ciphertext + tag + plaintext = cipher.decrypt(nonce, encrypted_bytes, None) + return plaintext.decode('utf-8') + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Error in hybrid decryption: {e}") + return None + +# Sends acknowledgment with retries +async def send_ack_with_retry(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, message_id: str): + max_retries = 3 + timeout = 10 + for attempt in range(max_retries): + try: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Sending ACK for message {message_id} (Attempt {attempt + 1}/{max_retries})") + writer.write(f"ack {message_id}\n".encode('utf-8')) + await writer.drain() + start_time = time.time() + while time.time() - start_time < timeout: + try: + response = (await asyncio.wait_for(reader.readline(), timeout=2.0)).decode('utf-8').strip() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] ACK response: {response}") + if response.startswith(f"ACK_CONFIRMED {message_id}"): + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Server confirmed ACK for message {message_id}") + return True + elif response.startswith("Message:"): + parts = response.split(" ", 2) + if len(parts) >= 2 and parts[1] not in processed_messages: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Received new message in ACK response, queuing: {response}") + await temp_message_queue.put(response) + elif response.startswith("ACK_CONFIRMED"): + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Received ACK for another message: {response}") + else: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Unexpected server response: {response}") + except asyncio.TimeoutError: + continue + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] No correct ACK received, retrying ({attempt + 1}/{max_retries})") + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Error sending ACK for message {message_id}: {e}") + await asyncio.sleep(2 ** attempt) + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Failed to acknowledge message {message_id}") + return False + +# Processes a single message +async def process_message(message: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + if message.startswith("Message:"): + try: + parts = message.split(" ", 2) + if len(parts) < 3: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Invalid message format") + return + message_id = parts[1] + if message_id in processed_messages: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Duplicate message {message_id}, sending ACK") + await send_ack_with_retry(reader, writer, message_id) + return + message_data = json.loads(parts[2]) + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Processing message {message_id}") + decrypted = await decrypt_message_hybrid(message_data) + if decrypted: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Decrypted message {message_id}: {decrypted}") + await message_queue.put((message_id, decrypted)) + processed_messages.add(message_id) + await send_ack_with_retry(reader, writer, message_id) + else: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Decryption failed for message {message_id}") + except json.JSONDecodeError as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] JSON error in message parsing: {e}") + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Error processing message: {e}") + +# Receives and processes messages +async def receive_messages(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + global running + try: + if not await register_public_key(reader, writer): + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Public key registration failed. Exiting") + return + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Declaring queue {QUEUE_NAME}") + writer.write(f"declare_queue {QUEUE_NAME}\n".encode('utf-8')) + await writer.drain() + response = (await reader.readline()).decode('utf-8').strip() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Server response for queue declaration: {response}") + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Declaring exchange {EXCHANGE_NAME}") + writer.write(f"declare_exchange {EXCHANGE_NAME}\n".encode('utf-8')) + await writer.drain() + response = (await reader.readline()).decode('utf-8').strip() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Server response for exchange declaration: {response}") + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Binding queue {QUEUE_NAME} to exchange {EXCHANGE_NAME}") + writer.write(f"bind {QUEUE_NAME} {EXCHANGE_NAME} {ROUTING_KEY}\n".encode('utf-8')) + await writer.drain() + response = (await reader.readline()).decode('utf-8').strip() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Server response for binding: {response}") + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Subscribing to queue {QUEUE_NAME}") + writer.write(f"consume {QUEUE_NAME}\n".encode('utf-8')) + await writer.drain() + response = (await reader.readline()).decode('utf-8').strip() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Server response for subscription: {response}") + while running: + try: + # Process messages from temp_message_queue first + try: + message = await asyncio.wait_for(temp_message_queue.get(), timeout=0.1) + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Processing queued message: {message}") + if message.startswith("Message:"): + parts = message.split(" ", 2) + if len(parts) >= 2 and parts[1] in processed_messages: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Skipping duplicate queued message {parts[1]}") + temp_message_queue.task_done() + continue + await process_message(message, reader, writer) + temp_message_queue.task_done() + continue + except asyncio.TimeoutError: + pass + # Read new messages from server + line = await asyncio.wait_for(reader.readline(), timeout=1.0) + if not line: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Connection closed") + break + message = line.decode('utf-8').strip() + if not message: + continue + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Raw message received: {message}") + if message.startswith("Message:"): + parts = message.split(" ", 2) + if len(parts) >= 2 and parts[1] in processed_messages: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Duplicate message {parts[1]} from server, sending ACK") + await send_ack_with_retry(reader, writer, parts[1]) + continue + await process_message(message, reader, writer) + elif message.startswith("ACK_CONFIRMED"): + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Received ACK confirmation: {message}") + else: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Unhandled message: {message}") + except asyncio.TimeoutError: + continue + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Error receiving messages: {e}") + break + finally: + writer.close() + await writer.wait_closed() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Connection closed") + +# Processes decrypted messages with batching in JSONL format +async def process_messages(): + global running + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Starting message processing") + batch_size = 10 + batch = [] + output_file = "data/received_messages.jsonl" + try: + os.makedirs(os.path.dirname(output_file) or ".", exist_ok=True) + with open(output_file, "a", encoding='utf-8') as f: + while running: + try: + message_id, message = await asyncio.wait_for(message_queue.get(), timeout=1.0) + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Message read from queue: {message_id}") + batch.append({"message_id": message_id, "message": message, "timestamp": time.time()}) + if len(batch) >= batch_size: + for item in batch: + json.dump(item, f, ensure_ascii=False) + f.write("\n") + f.flush() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Batch of {len(batch)} messages saved to {output_file}") + batch.clear() + message_queue.task_done() + except asyncio.TimeoutError: + if batch: + for item in batch: + json.dump(item, f, ensure_ascii=False) + f.write("\n") + f.flush() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Batch of {len(batch)} messages saved to {output_file}") + batch.clear() + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Error saving message to {output_file}: {e}") + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Error opening/writing file {output_file}: {e}") + finally: + if batch: + try: + with open(output_file, "a", encoding='utf-8') as f: + for item in batch: + json.dump(item, f, ensure_ascii=False) + f.write("\n") + f.flush() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Final batch of {len(batch)} messages saved to {output_file}") + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Error saving final batch to {output_file}: {e}") + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Message processing stopped") + +# Handles SIGINT for graceful shutdown +def signal_handler(loop): + global running + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Received SIGINT, shutting down") + running = False + loop.stop() + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + sys.exit(0) + +# Main function with reconnection logic +async def main(): + loop = asyncio.get_running_loop() + signal.signal(signal.SIGINT, lambda s, f: signal_handler(loop)) + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Starting message processing task") + asyncio.create_task(process_messages()) + while running: + try: + reader, writer = await asyncio.open_connection(SERVER_ADDRESS, SERVER_PORT, ssl=ssl_context, server_hostname="localhost") + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] TLS connection established. Cipher: {writer.get_extra_info('cipher')}") + await receive_messages(reader, writer) + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [RECEIVER] Connection failed: {e}. Retrying in 1 second") + await asyncio.sleep(1) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/client/receiver/config.json b/src/client/receiver/config.json new file mode 100644 index 0000000..a6040b6 --- /dev/null +++ b/src/client/receiver/config.json @@ -0,0 +1,15 @@ +{ + "queue_name": "CipherMQ_queue", + "exchange_name": "CipherMQ_exchange", + "routing_key": "CipherMQ_key", + "server_address": "127.0.0.1", + "server_port": 5672, + "tls": { + "protocol": "PROTOCOL_TLS_CLIENT", + "certificate_path": "certs/ca.crt", + "client_cert_path": "certs/client.crt", + "client_key_path": "certs/client.key", + "verify_mode": "CERT_REQUIRED", + "check_hostname": false + } +} \ No newline at end of file diff --git a/src/client/sender/Sender.py b/src/client/sender/Sender.py new file mode 100644 index 0000000..8cfc1ef --- /dev/null +++ b/src/client/sender/Sender.py @@ -0,0 +1,261 @@ +import asyncio +import json +import ssl +import sys +import os +import time +import uuid +import random +from datetime import datetime +from base64 import b64decode, b64encode +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from nacl.public import PrivateKey, PublicKey, SealedBox + +# Extract client_id from client certificate +def extract_client_id(tls_config): + try: + with open(tls_config["client_cert_path"], "rb") as cert_file: + cert_data = cert_file.read() + cert = x509.load_pem_x509_certificate(cert_data, default_backend()) + cn = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME) + if not cn: + raise ValueError("No Common Name found in client certificate") + return cn[0].value + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] ❌ [SENDER] Error extracting client_id from certificate: {e}") + sys.exit(1) + +# Load configuration +try: + os.makedirs("certs", exist_ok=True) + with open("config.json", "r") as config_file: + config = json.load(config_file) + EXCHANGE_NAME = config["exchange_name"] + ROUTING_KEY = config["routing_key"] + SERVER_ADDRESS = config["server_address"] + SERVER_PORT = config["server_port"] + TLS_CONFIG = config["tls"] + RECEIVER_CLIENT_IDS = config.get("receiver_client_ids") + if isinstance(RECEIVER_CLIENT_IDS, str): + RECEIVER_CLIENT_IDS = [RECEIVER_CLIENT_IDS] + CLIENT_ID = extract_client_id(TLS_CONFIG) + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Extracted client_id from certificate: {CLIENT_ID}") +except FileNotFoundError: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] ❌ [SENDER] Configuration file 'config - sender.json' not found.") + sys.exit(1) +except KeyError as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] ❌ [SENDER] Missing key in configuration file: {e}") + sys.exit(1) + +# Stores pending messages until acknowledged +pending_messages = {} + +# Configure SSL context for mTLS +ssl_context = ssl.SSLContext(getattr(ssl, TLS_CONFIG["protocol"])) +ssl_context.load_verify_locations(TLS_CONFIG["certificate_path"]) +ssl_context.load_cert_chain( + certfile=TLS_CONFIG["client_cert_path"], + keyfile=TLS_CONFIG["client_key_path"] +) +ssl_context.verify_mode = getattr(ssl, TLS_CONFIG["verify_mode"]) +ssl_context.check_hostname = TLS_CONFIG["check_hostname"] + +# Get public key for a receiver from server +async def get_public_key(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, receiver_client_id: str) -> str: + command = f"get_key {receiver_client_id}\n" + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Sending command: {command.strip()}") + try: + writer.write(command.encode('utf-8')) + await writer.drain() + response = (await asyncio.wait_for(reader.readline(), timeout=5)).decode('utf-8').strip() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Server response: {response}") + if response.startswith("Public key:"): + public_key = response.split(" ", 2)[2] + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Received public key for {receiver_client_id}") + os.makedirs("receiver_keys", exist_ok=True) + with open(f"receiver_keys/ {receiver_client_id}_public.key", "w") as f: + f.write(public_key) + return public_key + else: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Failed to get public key for {receiver_client_id}: {response}") + return None + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Error fetching public key for {receiver_client_id}: {e}") + return None + +# Encrypt message using receiver's public key with hybrid encryption +def encrypt_message(message: str, public_key_b64: str, receiver_client_id: str) -> dict: + try: + public_key_bytes = b64decode(public_key_b64) + public_key = PublicKey(public_key_bytes) + session_key = os.urandom(32) + sealed_box = SealedBox(public_key) + enc_session_key = sealed_box.encrypt(session_key) + cipher = AESGCM(session_key) + nonce = os.urandom(12) + message_bytes = message.encode('utf-8') + ciphertext = cipher.encrypt(nonce, message_bytes, None) + tag = ciphertext[-16:] + ciphertext = ciphertext[:-16] + return { + "message_id": str(uuid.uuid4()), + "receiver_client_id": receiver_client_id, + "enc_session_key": b64encode(enc_session_key).decode('utf-8'), + "nonce": b64encode(nonce).decode('utf-8'), + "tag": b64encode(tag).decode('utf-8'), + "ciphertext": b64encode(ciphertext).decode('utf-8') + } + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Encryption failed: {e}") + return None + +# Generate a batch of messages with counter +def generate_message_batch(count: int = 20) -> list: + message_templates = [ + "Secure communication established with CipherMQ.", + "Data transfer completed successfully via mTLS.", + "Hybrid encryption ensured message security.", + "Message dispatched through secure channel.", + "End-to-end encryption verified for this transmission." + ] + messages = [] + for i in range(1, count + 1): + message_content = random.choice(message_templates) + message = f"Count: {i} - {message_content}" + messages.append({"message": message}) + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Generated {len(messages)} messages with counter") + return messages + +# Encrypt a batch of messages for receivers +async def encrypt_message_batch(messages: list) -> list: + encrypted_messages = [] + message_ids = set() + for message in messages: + target_receiver_ids = [message.get('receiver_client_id')] if message.get('receiver_client_id') else RECEIVER_CLIENT_IDS + for receiver_client_id in target_receiver_ids: + public_key_path = f"receiver_keys/ {receiver_client_id}_public.key" + if not os.path.exists(public_key_path): + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Public key for {receiver_client_id} not found") + continue + with open(public_key_path, "r") as f: + public_key_b64 = f.read().strip() + encrypted_message = encrypt_message(message['message'], public_key_b64, receiver_client_id) + if encrypted_message and encrypted_message['message_id'] not in message_ids: + encrypted_messages.append(encrypted_message) + message_ids.add(encrypted_message['message_id']) + pending_messages[encrypted_message['message_id']] = encrypted_message + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Encrypted message {encrypted_message['message_id']} for {receiver_client_id}") + return encrypted_messages + +# Sends a batch of messages with retries using publish_batch +async def send_message_batch(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, messages: list): + if not messages: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] No messages to send") + return + max_retries = 3 + timeout = 10 + message_ids = {msg['message_id']: msg for msg in messages} + failed_messages = [] + for attempt in range(max_retries): + try: + messages_str = json.dumps(messages, ensure_ascii=False) + command = f"publish_batch {EXCHANGE_NAME} {ROUTING_KEY} {messages_str}\n" + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Sending command: {command[:100]}...") + writer.write(command.encode('utf-8')) + await writer.drain() + received_acks = set() + start_time = time.time() + while len(received_acks) < len(messages) and time.time() - start_time < timeout: + response = (await asyncio.wait_for(reader.readline(), timeout=1)).decode('utf-8').strip() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Server response: {response}") + if response.startswith("ACK "): + message_id = response.split(" ", 2)[1] + if message_id in message_ids: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Server ACK received for message {message_id}") + received_acks.add(message_id) + pending_messages.pop(message_id, None) + else: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Unexpected ACK for message {message_id}") + elif response.startswith("Error:"): + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Server error: {response}") + if "for message" in response: + message_id = response.split("for message ", 1)[1].strip() + if message_id in message_ids: + failed_messages.append(message_ids[message_id]) + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Message {message_id} failed and will be retried") + elif response.startswith("Invalid batch format"): + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Invalid batch format: {response}") + failed_messages = messages + break + else: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Unexpected server response: {response}") + if len(received_acks) == len(messages): + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] All messages in batch acknowledged: {list(message_ids.keys())}") + return + else: + failed_messages = [message_ids[mid] for mid in message_ids if mid not in received_acks] + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Missing ACKs for messages: {[mid for mid in message_ids if mid not in received_acks]}, retrying ({attempt + 1}/{max_retries})") + except asyncio.TimeoutError: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Timeout waiting for server response, retrying ({attempt + 1}/{max_retries})") + failed_messages = messages + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Error sending batch: {e}") + failed_messages = messages + messages = failed_messages + failed_messages = [] + await asyncio.sleep(2 ** attempt) + if messages: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Failed to send messages after {max_retries} attempts: {[msg['message_id'] for msg in messages]}") + +# Fetch public keys for all receivers +async def fetch_all_public_keys(): + public_keys = {} + try: + reader, writer = await asyncio.open_connection(SERVER_ADDRESS, SERVER_PORT, ssl=ssl_context, server_hostname="localhost") + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] TLS connection established for fetching public keys. Cipher: {writer.get_extra_info('cipher')}") + for receiver_client_id in RECEIVER_CLIENT_IDS: + public_key = await get_public_key(reader, writer, receiver_client_id) + if public_key: + public_keys[receiver_client_id] = public_key + else: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Skipping {receiver_client_id} due to missing public key") + writer.close() + await writer.wait_closed() + except Exception as e: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Failed to fetch public keys: {e}") + if not public_keys: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] No valid public keys fetched. Exiting") + return {} + return public_keys + +# Sends generated messages +async def send_generated_messages(count: int = 20, send_batch_size: int = 10): + public_keys = await fetch_all_public_keys() + if not public_keys: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] No valid public keys available. Exiting") + return + global RECEIVER_CLIENT_IDS + RECEIVER_CLIENT_IDS = list(public_keys.keys()) + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Updated RECEIVER_CLIENT_IDS: {RECEIVER_CLIENT_IDS}") + messages = generate_message_batch(count) + encrypted_messages = await encrypt_message_batch(messages) + reader, writer = await asyncio.open_connection(SERVER_ADDRESS, SERVER_PORT, ssl=ssl_context, server_hostname="localhost") + try: + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] TLS connection established for sending messages. Cipher: {writer.get_extra_info('cipher')}") + for i in range(0, len(encrypted_messages), send_batch_size): + batch = encrypted_messages[i:i + send_batch_size] + await send_message_batch(reader, writer, batch) + await asyncio.sleep(0.1) # Small delay to allow receiver to process ACKs + finally: + writer.close() + await writer.wait_closed() + print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] [SENDER] Connection closed") + +async def main(): + message_count = 10 + await send_generated_messages(message_count) + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/src/client/sender/config.json b/src/client/sender/config.json new file mode 100644 index 0000000..2136ad0 --- /dev/null +++ b/src/client/sender/config.json @@ -0,0 +1,16 @@ +{ + "receiver_client_ids": ["CipherMQ-Receiver"], + "queue_name": "CipherMQ_queue", + "exchange_name": "CipherMQ_exchange", + "routing_key": "CipherMQ_key", + "server_address": "127.0.0.1", + "server_port": 5672, + "tls": { + "protocol": "PROTOCOL_TLS_CLIENT", + "certificate_path": "certs/ca.crt", + "client_cert_path": "certs/client.crt", + "client_key_path": "certs/client.key", + "verify_mode": "CERT_REQUIRED", + "check_hostname": false + } +} \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index a3ef609..f8984c1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,19 +1,89 @@ use serde::Deserialize; use std::fs; +use toml; +use thiserror::Error; -#[derive(Deserialize)] -pub struct Config { - pub connection_type: String, +#[derive(Error, Debug)] +pub enum ConfigError { + #[error("Failed to read config file: {0}")] + ReadError(#[from] std::io::Error), + #[error("Failed to parse config file: {0}")] + ParseError(#[from] toml::de::Error), + #[error("Missing required field: {0}")] + MissingField(String), +} + +#[derive(Deserialize, Debug, Clone)] +pub struct ServerConfig { pub address: String, + pub connection_type: String, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct TlsConfig { pub cert_path: Option, pub key_path: Option, pub ca_cert_path: Option, } +#[derive(Deserialize, Debug, Clone)] +pub struct DatabaseConfig { + pub dbname: String, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct EncryptionConfig { + pub aes_key: String, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct Config { + pub server: ServerConfig, + pub tls: TlsConfig, + pub database: DatabaseConfig, + pub encryption: EncryptionConfig, +} + impl Config { - pub fn load(path: &str) -> Result> { + pub fn load(path: &str) -> Result { + tracing::info!("Loading configuration from {}", path); let config_content = fs::read_to_string(path)?; let config: Config = toml::from_str(&config_content)?; + + if config.server.connection_type == "tls" { + if config.tls.cert_path.is_none() { + tracing::error!("Missing cert_path for TLS configuration"); + return Err(ConfigError::MissingField("tls.cert_path".to_string())); + } + if config.tls.key_path.is_none() { + tracing::error!("Missing key_path for TLS configuration"); + return Err(ConfigError::MissingField("tls.key_path".to_string())); + } + if config.tls.ca_cert_path.is_none() { + tracing::error!("Missing ca_cert_path for TLS configuration"); + return Err(ConfigError::MissingField("tls.ca_cert_path".to_string())); + } + } + + let config = Config { + server: ServerConfig { + address: config.server.address, + connection_type: config.server.connection_type, + }, + tls: TlsConfig { + cert_path: config.tls.cert_path, + key_path: config.tls.key_path, + ca_cert_path: config.tls.ca_cert_path, + }, + database: DatabaseConfig { + dbname: if config.database.dbname.is_empty() { "public_keys.db".to_string() } else { config.database.dbname }, + }, + encryption: EncryptionConfig { + aes_key: config.encryption.aes_key, + }, + }; + + tracing::info!("Configuration loaded successfully"); Ok(config) } -} +} \ No newline at end of file diff --git a/src/connection.rs b/src/connection.rs index 20f7de4..301195a 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -5,7 +5,6 @@ use tokio_rustls::TlsAcceptor; use tokio_rustls::server::TlsStream; use std::io::{self}; - #[async_trait] pub trait Connection: AsyncRead + AsyncWrite + Send + Sync + Unpin { async fn read_buf(&mut self, buf: &mut Vec) -> io::Result; diff --git a/src/main.rs b/src/main.rs index e2012a1..da0ba20 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,9 @@ use crate::auth::{AuthHandler, MTlsAuth}; use crate::config::Config; -use crate::connection::{create_listener}; +use crate::connection::create_listener; use crate::server::handle_client; use crate::state::ServerState; +use crate::storage::Storage; use std::sync::Arc; use tracing::{error, info}; @@ -11,21 +12,23 @@ mod server; mod connection; mod config; mod auth; +mod storage; #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); let config = Config::load("config.toml")?; - let state = Arc::new(tokio::sync::RwLock::new(ServerState::new())); - let listener = create_listener(&config.address).await?; - info!("Server listening on {}", config.address); + let storage = Arc::new(Storage::new(&config.database).await?); + let state = Arc::new(tokio::sync::RwLock::new(ServerState::new(storage.clone()))); + let listener = create_listener(&config.server.address).await?; + info!("Server listening on {}", config.server.address); - match config.connection_type.as_str() { + match config.server.connection_type.as_str() { "tls" => { - let cert_path = config.cert_path.ok_or("Missing cert_path for TLS")?; - let key_path = config.key_path.ok_or("Missing key_path for TLS")?; - let ca_cert_path = config.ca_cert_path.ok_or("Missing ca_cert_path for TLS")?; + let cert_path = config.tls.cert_path.ok_or("Missing cert_path for TLS")?; + let key_path = config.tls.key_path.ok_or("Missing key_path for TLS")?; + let ca_cert_path = config.tls.ca_cert_path.ok_or("Missing ca_cert_path for TLS")?; let auth_handler = MTlsAuth::new(&cert_path, &key_path, &ca_cert_path)?; loop { diff --git a/src/server.rs b/src/server.rs index 94af4bc..f8bcdda 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,9 +5,11 @@ use tokio::sync::{mpsc, RwLock}; use tokio::time::{timeout, Duration}; use tracing::{error, info, warn, instrument}; use crate::connection::Connection; +use uuid::Uuid; #[instrument(skip(stream, state))] pub async fn handle_client(mut stream: impl Connection, state: Arc>) { + let client_id = Uuid::new_v4().to_string(); state.write().await.increment_client_count().await; let mut buffer = Vec::with_capacity(4096); let (tx, mut rx) = mpsc::unbounded_channel::<(String, EncryptedInputData)>(); @@ -17,14 +19,14 @@ pub async fn handle_client(mut stream: impl Connection, state: Arc { match result { Ok(Ok(n)) if n == 0 => { - info!("Client disconnected"); + info!("Client {} disconnected", client_id); state.write().await.decrement_client_count().await; return; } Ok(Ok(_)) => { let start_time = Instant::now(); let request = String::from_utf8_lossy(&buffer).trim().to_string(); - info!("Received request: {}", request); + info!("Received request from client {}: {}", client_id, request); let parts: Vec<&str> = request.splitn(2, ' ').collect(); let command = parts.get(0).unwrap_or(&""); @@ -44,7 +46,7 @@ pub async fn handle_client(mut stream: impl Connection, state: Arc { @@ -62,7 +64,7 @@ pub async fn handle_client(mut stream: impl Connection, state: Arc(message_str) { Ok(message) => { - match state.read().await.publish(args[0], args[1], message.clone(), start_time) { + match state.read().await.publish(args[0], args[1], message.clone(), client_id.clone()).await { Ok(()) => { stream .write_all(format!("ACK {}\n", message.message_id).as_bytes()) @@ -99,7 +101,7 @@ pub async fn handle_client(mut stream: impl Connection, state: Arc { let mut acks = Vec::new(); for message in messages { - match state.read().await.publish(exchange, routing_key, message.clone(), start_time) { + match state.read().await.publish(exchange, routing_key, message.clone(), client_id.clone()).await { Ok(()) => { acks.push(format!("ACK {}", message.message_id)); } @@ -132,7 +134,7 @@ pub async fn handle_client(mut stream: impl Connection, state: Arc { if !args_str.is_empty() { - if let Some((message_id, message)) = state.read().await.consume(args_str) { + if let Some((message_id, message)) = state.read().await.consume(args_str).await { let message_str = serde_json::to_string(&message).unwrap(); stream .write_all(format!("Message: {} {}\n", message_id, message_str).as_bytes()) @@ -148,7 +150,7 @@ pub async fn handle_client(mut stream: impl Connection, state: Arc { if !args_str.is_empty() { info!("Processing ACK for message {}", args_str); - state.read().await.acknowledge(args_str); + state.read().await.acknowledge(args_str).await; stream .write_all(format!("ACK_CONFIRMED {}\n", args_str).as_bytes()) .await @@ -166,9 +168,52 @@ pub async fn handle_client(mut stream: impl Connection, state: Arc { - state.read().await.reset_stats(); + state.write().await.reset_stats(); stream.write_all(b"Stats reset\n").await.unwrap(); } + "register_key" => { + let args: Vec<&str> = args_str.splitn(2, ' ').collect(); + if args.len() == 2 { + let client_id = args[0]; + let public_key = args[1]; + match state.read().await.save_public_key(client_id, public_key).await { + Ok(()) => { + stream.write_all(b"Public key registered\n").await.unwrap(); + } + Err(e) => { + stream + .write_all(format!("Error registering key: {}\n", e).as_bytes()) + .await + .unwrap(); + } + } + } else { + stream.write_all(b"Missing client ID or public key\n").await.unwrap(); + } + } + "get_key" => { + if !args_str.is_empty() { + match state.read().await.get_public_key(args_str).await { + Ok(Some(key)) => { + stream + .write_all(format!("Public key: {}\n", key).as_bytes()) + .await + .unwrap(); + } + Ok(None) => { + stream.write_all(b"No public key found\n").await.unwrap(); + } + Err(e) => { + stream + .write_all(format!("Error retrieving key: {}\n", e).as_bytes()) + .await + .unwrap(); + } + } + } else { + stream.write_all(b"Missing client ID\n").await.unwrap(); + } + } _ => { stream.write_all(b"Unknown command\n").await.unwrap(); } @@ -178,7 +223,7 @@ pub async fn handle_client(mut stream: impl Connection, state: Arc { - error!("Error reading from stream: {}", e); + error!("Error reading from stream for client {}: {}", client_id, e); state.write().await.decrement_client_count().await; return; } @@ -195,7 +240,7 @@ pub async fn handle_client(mut stream: impl Connection, state: Arc, pub request_times: DashMap>, pub connected_clients: Arc>, + pub storage: Arc, } impl ServerState { - pub fn new() -> Self { + pub fn new(storage: Arc) -> Self { ServerState { queues: DashMap::new(), bindings: DashMap::new(), @@ -60,6 +63,7 @@ impl ServerState { message_status: DashMap::new(), request_times: DashMap::new(), connected_clients: Arc::new(RwLock::new(0)), + storage, } } @@ -93,12 +97,37 @@ impl ServerState { } } - pub fn publish( + pub fn register_consumer(&self, queue_name: &str, sender: mpsc::UnboundedSender<(String, EncryptedInputData)>) { + if self.queues.contains_key(queue_name) { + let mut consumers = self.consumers.entry(queue_name.to_string()).or_insert_with(Vec::new); + consumers.push(sender.clone()); + info!("Consumer registered for queue '{}'", queue_name); + // Send any existing messages in the queue to the new consumer + if let Some(queue) = self.queues.get(queue_name) { + for (message_id, message) in queue.iter() { + if !self.message_status.contains_key(message_id) { + continue; // Skip if message is already acknowledged + } + if sender.send((message_id.clone(), message.clone())).is_err() { + warn!("Failed to send message {} to new consumer for queue '{}'", message_id, queue_name); + } else { + if let Some(mut status) = self.message_status.get_mut(message_id) { + status.delivered(Instant::now()); + } + } + } + } + } else { + warn!("Cannot register consumer: Queue '{}' does not exist", queue_name); + } + } + + pub async fn publish( &self, exchange_name: &str, routing_key: &str, message: EncryptedInputData, - received_time: Instant, + _client_id: String, ) -> Result<(), String> { let message_id = message.message_id.clone(); if self.message_status.contains_key(&message_id) { @@ -122,7 +151,7 @@ impl ServerState { } } self.message_status - .insert(message_id.clone(), MessageStatus::sent(received_time)); + .insert(message_id.clone(), MessageStatus::sent(Instant::now())); info!( "Published message {} to queue '{}' via exchange '{}'", message_id, routing_key, exchange_name @@ -133,20 +162,7 @@ impl ServerState { } } - pub fn register_consumer( - &self, - queue_name: &str, - sender: mpsc::UnboundedSender<(String, EncryptedInputData)>, - ) { - self.declare_queue(queue_name); - self.consumers - .entry(queue_name.to_string()) - .or_insert_with(Vec::new) - .push(sender); - info!("Consumer registered for queue '{}'", queue_name); - } - - pub fn consume(&self, queue_name: &str) -> Option<(String, EncryptedInputData)> { + pub async fn consume(&self, queue_name: &str) -> Option<(String, EncryptedInputData)> { if let Some(mut queue) = self.queues.get_mut(queue_name) { if !queue.is_empty() { let (message_id, message) = queue.remove(0); @@ -159,7 +175,7 @@ impl ServerState { None } - pub fn acknowledge(&self, message_id: &str) { + pub async fn acknowledge(&self, message_id: &str) { if !self.message_status.contains_key(message_id) { warn!("Ignoring duplicate ACK for message {}", message_id); return; @@ -170,6 +186,9 @@ impl ServerState { for mut queue in self.queues.iter_mut() { queue.retain(|(id, _)| id != message_id); } + for mut consumers in self.consumers.iter_mut() { + consumers.retain(|sender| !sender.is_closed()); + } self.message_status.remove(message_id); info!("Message {} acknowledged by client, removed from queues and status", message_id); } @@ -180,6 +199,14 @@ impl ServerState { info!("Server stats reset"); } + pub fn record_request_time(&self, command: &str, duration: f64) { + self.request_times + .entry(command.to_string()) + .or_insert_with(Vec::new) + .push(duration); + info!("Recorded request time for '{}': {}s", command, duration); + } + pub async fn increment_client_count(&self) { let mut count = self.connected_clients.write().await; *count += 1; @@ -190,11 +217,24 @@ impl ServerState { *count -= 1; } - pub fn record_request_time(&self, command: &str, duration: f64) { - self.request_times - .entry(command.to_string()) - .or_insert_with(Vec::new) - .push(duration); + pub async fn save_public_key(&self, client_id: &str, public_key: &str) -> Result<(), String> { + let key_data = PublicKeyData { + client_id: client_id.to_string(), + public_key: public_key.to_string(), + }; + self.storage + .save_public_key(&key_data) + .await + .map_err(|e| format!("Failed to save public key: {}", e))?; + info!("Public key saved for client {}", client_id); + Ok(()) + } + + pub async fn get_public_key(&self, client_id: &str) -> Result, String> { + self.storage + .get_public_key(client_id) + .await + .map_err(|e| format!("Failed to get public key: {}", e)) } pub async fn get_stats(&self) -> serde_json::Value { diff --git a/src/storage.rs b/src/storage.rs new file mode 100644 index 0000000..0509142 --- /dev/null +++ b/src/storage.rs @@ -0,0 +1,181 @@ +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::{self, Sender}; +use std::sync::Arc; +use rusqlite::{Connection, params, OptionalExtension}; +use aes_gcm::{Aes256Gcm, Key, Nonce}; +use aes_gcm::aead::Aead; +use aes_gcm::KeyInit; +use base64::engine::general_purpose::STANDARD as BASE64; +use base64::Engine; +use rand::Rng; +use crate::config::Config; +use thiserror::Error; +use tokio::sync::Mutex; +use tracing::{info, error}; + +#[derive(Error, Debug)] +pub enum StorageError { + #[error("SQLite error: {0}")] + SQLite(#[from] rusqlite::Error), + #[error("Channel send error: {0}")] + ChannelSend(String), + #[error("Channel receive error")] + ChannelReceive, + #[error("Encryption error: {0}")] + EncryptionError(String), +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct PublicKeyData { + pub client_id: String, + pub public_key: String, +} + +pub struct Storage { + sender: Sender, +} + +enum StorageCommand { + SavePublicKey(PublicKeyData, Sender>), + GetPublicKey(String, Sender, StorageError>>), +} + +impl Storage { + pub async fn new(config: &crate::config::DatabaseConfig) -> Result { + info!("Attempting to connect to SQLite database: {}", config.dbname); + + let conn = Connection::open(&config.dbname) + .map_err(StorageError::SQLite)?; + + info!("Creating public_keys table if not exists"); + conn.execute( + "CREATE TABLE IF NOT EXISTS public_keys ( + client_id TEXT PRIMARY KEY, + public_key_ciphertext TEXT NOT NULL, + nonce TEXT NOT NULL, + tag TEXT NOT NULL + )", + [] + ).map_err(StorageError::SQLite)?; + info!("Table creation completed"); + + let conn = Arc::new(Mutex::new(conn)); + let (sender, mut receiver) = mpsc::channel::(100); + info!("Database worker task started with channel capacity 100"); + + let conn_clone = conn.clone(); + tokio::spawn(async move { + let config = Config::load("config.toml").expect("Failed to load config"); + let aes_key = BASE64.decode(&config.encryption.aes_key) + .expect("Invalid AES key"); + let key = Key::::from_slice(&aes_key); + let cipher = Aes256Gcm::new(key); + + loop { + match receiver.recv().await { + Some(command) => { + match command { + StorageCommand::SavePublicKey(key_data, reply) => { + info!("Received SavePublicKey command for client {}", key_data.client_id); + let result = async { + let nonce_bytes: [u8; 12] = rand::thread_rng().gen(); + let nonce = Nonce::from_slice(&nonce_bytes); + let public_key_bytes = BASE64.decode(&key_data.public_key) + .map_err(|e| StorageError::EncryptionError(format!("Invalid public key: {}", e)))?; + let ciphertext = cipher.encrypt(nonce, &public_key_bytes[..]) + .map_err(|e| StorageError::EncryptionError(format!("Encryption failed: {}", e)))?; + let tag = ciphertext[ciphertext.len() - 16..].to_vec(); + let ciphertext = ciphertext[..ciphertext.len() - 16].to_vec(); + let nonce_b64 = BASE64.encode(nonce_bytes); + let tag_b64 = BASE64.encode(&tag); + let ciphertext_b64 = BASE64.encode(&ciphertext); + + let conn = conn_clone.lock().await; + conn.execute( + "INSERT OR REPLACE INTO public_keys (client_id, public_key_ciphertext, nonce, tag) + VALUES (?, ?, ?, ?)", + params![key_data.client_id, ciphertext_b64, nonce_b64, tag_b64] + ).map_err(StorageError::SQLite)?; + Ok(()) + }.await; + + if let Err(e) = &result { + error!("Failed to save public key for client {}: {}", key_data.client_id, e); + } + reply.send(result).await.unwrap_or_else(|e| { + error!("Failed to send SavePublicKey response: {:?}", e); + }); + } + StorageCommand::GetPublicKey(client_id, reply) => { + info!("Received GetPublicKey command for client {}", client_id); + let result = async { + let conn = conn_clone.lock().await; + let public_key = conn.query_row( + "SELECT public_key_ciphertext, nonce, tag FROM public_keys WHERE client_id = ?", + params![client_id], + |row| { + let ciphertext_b64: String = row.get(0)?; + let nonce_b64: String = row.get(1)?; + let tag_b64: String = row.get(2)?; + Ok((ciphertext_b64, nonce_b64, tag_b64)) + } + ).optional().map_err(StorageError::SQLite)?; + + let public_key = match public_key { + Some((ciphertext_b64, nonce_b64, tag_b64)) => { + let ciphertext = BASE64.decode(&ciphertext_b64) + .map_err(|e| StorageError::EncryptionError(format!("Invalid ciphertext: {}", e)))?; + let nonce_bytes = BASE64.decode(&nonce_b64) + .map_err(|e| StorageError::EncryptionError(format!("Invalid nonce: {}", e)))?; + let tag = BASE64.decode(&tag_b64) + .map_err(|e| StorageError::EncryptionError(format!("Invalid tag: {}", e)))?; + let mut ciphertext_with_tag = ciphertext; + ciphertext_with_tag.extend_from_slice(&tag); + let nonce = Nonce::from_slice(&nonce_bytes); + let plaintext = cipher.decrypt(nonce, &ciphertext_with_tag[..]) + .map_err(|e| StorageError::EncryptionError(format!("Decryption failed: {}", e)))?; + Some(BASE64.encode(plaintext)) + } + None => None, + }; + Ok(public_key) + }.await; + + if let Err(e) = &result { + error!("Failed to get public key for client {}: {}", client_id, e); + } + reply.send(result).await.unwrap_or_else(|e| { + error!("Failed to send GetPublicKey response: {:?}", e); + }); + } + } + } + None => { + info!("Database worker task terminated: channel closed"); + break; + } + } + } + }); + + Ok(Storage { sender }) + } + + pub async fn save_public_key(&self, key_data: &PublicKeyData) -> Result<(), StorageError> { + let (reply_tx, mut reply_rx) = mpsc::channel(1); + self.sender + .send(StorageCommand::SavePublicKey(key_data.clone(), reply_tx)) + .await + .map_err(|e| StorageError::ChannelSend(format!("Failed to send SavePublicKey command: {}", e)))?; + reply_rx.recv().await.ok_or(StorageError::ChannelReceive)? + } + + pub async fn get_public_key(&self, client_id: &str) -> Result, StorageError> { + let (reply_tx, mut reply_rx) = mpsc::channel(1); + self.sender + .send(StorageCommand::GetPublicKey(client_id.to_string(), reply_tx)) + .await + .map_err(|e| StorageError::ChannelSend(format!("Failed to send GetPublicKey command: {}", e)))?; + reply_rx.recv().await.ok_or(StorageError::ChannelReceive)? + } +} \ No newline at end of file