Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ license = "MIT"
repository = "https://github.com/RAprogramm/entity-derive"

[workspace.dependencies]
entity-core = { path = "crates/entity-core", version = "0.5" }
entity-derive = { path = "crates/entity-derive", version = "0.7" }
entity-derive-impl = { path = "crates/entity-derive-impl", version = "0.5" }
entity-core = { path = "crates/entity-core", version = "0.6" }
entity-derive = { path = "crates/entity-derive", version = "0.8" }
entity-derive-impl = { path = "crates/entity-derive-impl", version = "0.6" }
syn = { version = "2", features = ["full", "extra-traits", "parsing"] }
quote = "1"
proc-macro2 = "1"
Expand Down
4 changes: 3 additions & 1 deletion crates/entity-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[package]
name = "entity-core"
version = "0.5.4"
version = "0.6.0"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand All @@ -21,6 +21,7 @@ postgres = ["sqlx"]
clickhouse = []
mongodb = []
streams = ["serde", "serde_json", "futures"]
tracing = ["dep:tracing"]

[dependencies]
async-trait = "0.1"
Expand All @@ -30,6 +31,7 @@ sqlx = { version = "0.8", optional = true, default-features = false, features =
serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }
futures = { version = "0.3", optional = true }
tracing = { version = "0.1", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt"] }
Expand Down
12 changes: 10 additions & 2 deletions crates/entity-core/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,14 @@ impl Transaction<'_, sqlx::PgPool> {
/// # Errors
///
/// Propagates any error from the closure, from `begin`, or from `commit`.
#[cfg_attr(
feature = "tracing",
::tracing::instrument(skip_all, fields(op = "tx.run"), err(Debug))
)]
pub async fn run<F, T, E>(self, f: F) -> Result<T, E>
where
F: AsyncFnOnce(&mut TransactionContext) -> Result<T, E>,
E: From<sqlx::Error>
E: From<sqlx::Error> + core::fmt::Debug
{
let tx = self.pool.begin().await.map_err(E::from)?;
let mut ctx = TransactionContext::new(tx);
Expand Down Expand Up @@ -377,11 +381,15 @@ impl Transaction<'_, sqlx::PgPool> {
/// # Errors
///
/// Propagates any error from the closure or database transaction.
#[cfg_attr(
feature = "tracing",
::tracing::instrument(skip_all, fields(op = "tx.run_with_commit"), err(Debug))
)]
pub async fn run_with_commit<F, Fut, T, E>(self, f: F) -> Result<T, E>
where
F: FnOnce(TransactionContext) -> Fut + Send,
Fut: Future<Output = Result<T, E>> + Send,
E: From<sqlx::Error>
E: From<sqlx::Error> + core::fmt::Debug
{
let tx = self.pool.begin().await.map_err(E::from)?;
let ctx = TransactionContext::new(tx);
Expand Down
2 changes: 1 addition & 1 deletion crates/entity-derive-impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[package]
name = "entity-derive-impl"
version = "0.5.2"
version = "0.6.0"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand Down
23 changes: 22 additions & 1 deletion crates/entity-derive-impl/src/entity/sql/postgres/crud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use super::{
context::Context,
helpers::{insert_bindings, update_bindings}
};
use crate::entity::parse::ReturningMode;
use crate::{entity::parse::ReturningMode, utils::tracing::instrument};

impl Context<'_> {
/// Generate the `create` method implementation.
Expand Down Expand Up @@ -67,10 +67,13 @@ impl Context<'_> {
} = self;
let bindings = insert_bindings(entity.all_fields());

let span = instrument(&entity_name.to_string(), "create");

match returning {
ReturningMode::Full => {
let notify = self.notify_created();
quote! {
#span
async fn create(&self, dto: #create_dto) -> Result<#entity_name, Self::Error> {
let entity = #entity_name::from(dto);
let insertable = #insertable_name::from(&entity);
Expand All @@ -89,6 +92,7 @@ impl Context<'_> {
let id_name = self.id_name;
let notify = self.notify_created();
quote! {
#span
async fn create(&self, dto: #create_dto) -> Result<#entity_name, Self::Error> {
let entity = #entity_name::from(dto);
let insertable = #insertable_name::from(&entity);
Expand All @@ -103,6 +107,7 @@ impl Context<'_> {
ReturningMode::None => {
let notify = self.notify_created();
quote! {
#span
async fn create(&self, dto: #create_dto) -> Result<#entity_name, Self::Error> {
let entity = #entity_name::from(dto);
let insertable = #insertable_name::from(&entity);
Expand All @@ -118,6 +123,7 @@ impl Context<'_> {
let returning_cols = columns.join(", ");
let notify = self.notify_created();
quote! {
#span
async fn create(&self, dto: #create_dto) -> Result<#entity_name, Self::Error> {
let entity = #entity_name::from(dto);
let insertable = #insertable_name::from(&entity);
Expand Down Expand Up @@ -160,7 +166,10 @@ impl Context<'_> {
""
};

let span = instrument(&entity_name.to_string(), "find_by_id");

quote! {
#span
async fn find_by_id(&self, id: #id_type) -> Result<Option<#entity_name>, Self::Error> {
let row: Option<#row_name> = sqlx::query_as(
&format!("SELECT {} FROM {} WHERE {} = {}{}", #columns_str, #table, stringify!(#id_name), #placeholder, #deleted_filter)
Expand Down Expand Up @@ -211,10 +220,12 @@ impl Context<'_> {

let fetch_old = self.fetch_old_for_update();
let notify = self.notify_updated();
let span = instrument(&entity_name.to_string(), "update");

match returning {
ReturningMode::Full => {
quote! {
#span
async fn update(&self, id: #id_type, dto: #update_dto) -> Result<#entity_name, Self::Error> {
#fetch_old
let row: #row_name = sqlx::query_as(
Expand All @@ -231,6 +242,7 @@ impl Context<'_> {
}
ReturningMode::Id | ReturningMode::None => {
quote! {
#span
async fn update(&self, id: #id_type, dto: #update_dto) -> Result<#entity_name, Self::Error> {
#fetch_old
sqlx::query(&format!("UPDATE {} SET {} WHERE {} = {}", #table, #set_clause, stringify!(#id_name), #where_placeholder))
Expand All @@ -246,6 +258,7 @@ impl Context<'_> {
ReturningMode::Custom(columns) => {
let returning_cols = columns.join(", ");
quote! {
#span
async fn update(&self, id: #id_type, dto: #update_dto) -> Result<#entity_name, Self::Error> {
#fetch_old
sqlx::query(&format!("UPDATE {} SET {} WHERE {} = {} RETURNING {}", #table, #set_clause, stringify!(#id_name), #where_placeholder, #returning_cols))
Expand Down Expand Up @@ -277,6 +290,7 @@ impl Context<'_> {
/// ```
pub fn delete_method(&self) -> TokenStream {
let Self {
entity_name,
table,
id_name,
id_type,
Expand All @@ -288,7 +302,9 @@ impl Context<'_> {

if *soft_delete {
let notify = self.notify_soft_deleted();
let span = instrument(&entity_name.to_string(), "soft_delete");
quote! {
#span
async fn delete(&self, id: #id_type) -> Result<bool, Self::Error> {
let result = sqlx::query(&format!(
"UPDATE {} SET deleted_at = NOW() WHERE {} = {} AND deleted_at IS NULL",
Expand All @@ -303,7 +319,9 @@ impl Context<'_> {
}
} else {
let notify = self.notify_hard_deleted();
let span = instrument(&entity_name.to_string(), "delete");
quote! {
#span
async fn delete(&self, id: #id_type) -> Result<bool, Self::Error> {
let result = sqlx::query(&format!("DELETE FROM {} WHERE {} = {}", #table, stringify!(#id_name), #placeholder))
.bind(&id).execute(self).await?;
Expand Down Expand Up @@ -346,7 +364,10 @@ impl Context<'_> {
""
};

let span = instrument(&entity_name.to_string(), "list");

quote! {
#span
async fn list(&self, limit: i64, offset: i64) -> Result<Vec<#entity_name>, Self::Error> {
let rows: Vec<#row_name> = sqlx::query_as(
&format!("SELECT {} FROM {} {}ORDER BY {} DESC LIMIT {} OFFSET {}",
Expand Down
12 changes: 11 additions & 1 deletion crates/entity-derive-impl/src/entity/sql/postgres/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ use proc_macro2::TokenStream;
use quote::{format_ident, quote};

use super::context::Context;
use crate::entity::parse::{FieldDef, SqlLevel};
use crate::{
entity::parse::{FieldDef, SqlLevel},
utils::tracing::instrument
};

impl Context<'_> {
/// Generate all lookup method implementations.
Expand Down Expand Up @@ -116,8 +119,11 @@ impl Context<'_> {
let field_type = field.ty();
let method_name = format_ident!("find_by_{}", field_name_str);
let placeholder = dialect.placeholder(1);
let op = format!("find_by_{field_name_str}");
let span = instrument(&entity_name.to_string(), &op);

quote! {
#span
async fn #method_name(&self, #field_name: #field_type) -> Result<Option<#entity_name>, Self::Error> {
let row: Option<#row_name> = sqlx::query_as(
&format!("SELECT * FROM {} WHERE {} = {}", #table, stringify!(#field_name), #placeholder)
Expand All @@ -136,6 +142,7 @@ impl Context<'_> {
/// ```
fn exists_by_method(&self, field: &FieldDef) -> TokenStream {
let Self {
entity_name,
table,
dialect,
..
Expand All @@ -146,8 +153,11 @@ impl Context<'_> {
let field_type = field.ty();
let method_name = format_ident!("exists_by_{}", field_name_str);
let placeholder = dialect.placeholder(1);
let op = format!("exists_by_{field_name_str}");
let span = instrument(&entity_name.to_string(), &op);

quote! {
#span
async fn #method_name(&self, #field_name: #field_type) -> Result<bool, Self::Error> {
let exists: bool = sqlx::query_scalar(
&format!("SELECT EXISTS(SELECT 1 FROM {} WHERE {} = {})", #table, stringify!(#field_name), #placeholder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use proc_macro2::TokenStream;
use quote::{format_ident, quote};

use super::context::Context;
use crate::entity::parse::ProjectionDef;
use crate::{entity::parse::ProjectionDef, utils::tracing::instrument};

impl Context<'_> {
/// Generate all projection methods.
Expand Down Expand Up @@ -67,7 +67,11 @@ impl Context<'_> {
.collect::<Vec<_>>()
.join(", ");

let op = format!("find_by_id_{proj_snake}");
let span = instrument(&entity_name.to_string(), &op);

quote! {
#span
async fn #method_name(&self, id: #id_type) -> Result<Option<#proj_type>, Self::Error> {
let row = sqlx::query_as::<_, #proj_type>(
&format!("SELECT {} FROM {} WHERE {} = {}", #columns_str, #table, stringify!(#id_name), #placeholder)
Expand Down
5 changes: 4 additions & 1 deletion crates/entity-derive-impl/src/entity/sql/postgres/save.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use proc_macro2::TokenStream;
use quote::quote;

use super::context::Context;
use crate::entity::parse::SqlLevel;
use crate::{entity::parse::SqlLevel, utils::tracing::instrument};

impl Context<'_> {
/// Generate the `save` method implementation for aggregate roots.
Expand Down Expand Up @@ -73,7 +73,10 @@ impl Context<'_> {
let bindings = super::helpers::insert_bindings(self.entity.all_fields());
let error_type = self.entity.error_type();

let span = instrument(&entity_name.to_string(), "save");

quote! {
#span
async fn save(&self, new: #new_name) -> Result<#entity_name, #error_type> {
let mut tx = self.begin().await?;

Expand Down
12 changes: 11 additions & 1 deletion crates/entity-derive-impl/src/entity/streams/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,22 @@
use proc_macro2::TokenStream;
use quote::{format_ident, quote};

use crate::{entity::parse::EntityDef, utils::marker};
use crate::{
entity::parse::EntityDef,
utils::{marker, tracing::instrument}
};

/// Generate the subscriber struct and implementation.
pub fn generate(entity: &EntityDef) -> TokenStream {
let vis = &entity.vis;
let entity_name = entity.name();
let entity_name_str = entity_name.to_string();
let subscriber_name = format_ident!("{}Subscriber", entity_name);
let event_name = format_ident!("{}Event", entity_name);
let marker = marker::generated();
let new_span = instrument(&entity_name_str, "stream.subscribe");
let recv_span = instrument(&entity_name_str, "stream.recv");
let try_recv_span = instrument(&entity_name_str, "stream.try_recv");

let doc = format!(
"Subscriber for real-time [`{entity_name}`] change events.\n\n\
Expand All @@ -34,6 +41,7 @@ pub fn generate(entity: &EntityDef) -> TokenStream {
/// Create a new subscriber connected to the database pool.
///
/// Automatically subscribes to the entity's notification channel.
#new_span
pub async fn new(pool: &::sqlx::PgPool) -> Result<Self, ::sqlx::Error> {
let mut listener = ::sqlx::postgres::PgListener::connect_with(pool).await?;
listener.listen(#entity_name::CHANNEL).await?;
Expand All @@ -43,6 +51,7 @@ pub fn generate(entity: &EntityDef) -> TokenStream {
/// Receive the next event.
///
/// Blocks until an event is available.
#recv_span
pub async fn recv(
&mut self,
) -> Result<#event_name, ::entity_core::stream::StreamError<::sqlx::Error>> {
Expand All @@ -59,6 +68,7 @@ pub fn generate(entity: &EntityDef) -> TokenStream {
/// Try to receive an event without blocking.
///
/// Returns `None` if no event is immediately available.
#try_recv_span
pub async fn try_recv(
&mut self,
) -> Result<Option<#event_name>, ::entity_core::stream::StreamError<::sqlx::Error>> {
Expand Down
Loading
Loading