Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,13 @@ impl ServerCommandHandler for GetConsumerGroup {
numeric_topic_id as u32,
);

shard
.streams2
.with_consumer_group_by_id_async(
&self.stream_id,
&self.topic_id,
&self.group_id,
async |(root, members)| {
let consumer_group = mapper::map_consumer_group(root, members);
sender.send_ok_response(&consumer_group).await
},
)
.await?;
let consumer_group = shard.streams2.with_consumer_group_by_id(
&self.stream_id,
&self.topic_id,
&self.group_id,
|(root, members)| mapper::map_consumer_group(root, members),
);
sender.send_ok_response(&consumer_group).await?;
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,16 @@ impl ServerCommandHandler for GetConsumerGroups {
numeric_topic_id as u32,
);

shard
.streams2
.with_consumer_groups_async(&self.stream_id, &self.topic_id, async |cgs| {
cgs.with_components_async(async |cgs| {
let (roots, members) = cgs.into_components();
let consumer_groups = mapper::map_consumer_groups(roots, members);
sender.send_ok_response(&consumer_groups).await
})
.await
})
.await?;
let consumer_groups =
shard
.streams2
.with_consumer_groups(&self.stream_id, &self.topic_id, |cgs| {
cgs.with_components(|cgs| {
let (roots, members) = cgs.into_components();
mapper::map_consumer_groups(roots, members)
})
});
sender.send_ok_response(&consumer_groups).await?;
Ok(())
}
}
Expand Down
14 changes: 7 additions & 7 deletions core/server/src/binary/handlers/topics/get_topic_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ impl ServerCommandHandler for GetTopic {
self.topic_id.get_u32_value().unwrap_or(0),
)?;

shard
.streams2
.with_topic_by_id_async(&self.stream_id, &self.topic_id, async |(root, _, stats)| {
let response = mapper::map_topic(&root, &stats);
sender.send_ok_response(&response).await
})
.await?;
let response =
shard
.streams2
.with_topic_by_id(&self.stream_id, &self.topic_id, |(root, _, stats)| {
mapper::map_topic(&root, &stats)
});
sender.send_ok_response(&response).await?;
Ok(())
}
}
Expand Down
17 changes: 6 additions & 11 deletions core/server/src/binary/handlers/topics/get_topics_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,13 @@ impl ServerCommandHandler for GetTopics {
.borrow()
.get_topics(session.get_user_id(), numeric_stream_id as u32);

shard
.streams2
.with_topics_async(&self.stream_id, async |topics| {
topics
.with_components_async(async |topics| {
let (roots, _, stats) = topics.into_components();
let response = mapper::map_topics(&roots, &stats);
sender.send_ok_response(&response).await
})
.await
let response = shard.streams2.with_topics(&self.stream_id, |topics| {
topics.with_components(|topics| {
let (roots, _, stats) = topics.into_components();
mapper::map_topics(&roots, &stats)
})
.await?;
});
sender.send_ok_response(&response).await?;
Ok(())
}
}
Expand Down
66 changes: 22 additions & 44 deletions core/server/src/http/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,12 @@ async fn get_consumer_group(
numeric_topic_id as u32,
)?;

let consumer_group = {
let future = SendWrapper::new(
state
.shard
.shard()
.streams2
.with_consumer_group_by_id_async(
&identifier_stream_id,
&identifier_topic_id,
&identifier_group_id,
async |(root, members)| mapper::map_consumer_group(root, members),
),
);
future.await
};
let consumer_group = state.shard.shard().streams2.with_consumer_group_by_id(
&identifier_stream_id,
&identifier_topic_id,
&identifier_group_id,
|(root, members)| mapper::map_consumer_group(root, members),
);

Ok(Json(consumer_group))
}
Expand Down Expand Up @@ -150,20 +141,16 @@ async fn get_consumer_groups(
numeric_topic_id as u32,
)?;

let consumer_groups = {
let future = SendWrapper::new(state.shard.shard().streams2.with_consumer_groups_async(
&identifier_stream_id,
&identifier_topic_id,
async |cgs| {
cgs.with_components_async(async |cgs| {
let (roots, members) = cgs.into_components();
mapper::map_consumer_groups(roots, members)
})
.await
},
));
future.await
};
let consumer_groups = state.shard.shard().streams2.with_consumer_groups(
&identifier_stream_id,
&identifier_topic_id,
|cgs| {
cgs.with_components(|cgs| {
let (roots, members) = cgs.into_components();
mapper::map_consumer_groups(roots, members)
})
},
);

Ok(Json(consumer_groups))
}
Expand Down Expand Up @@ -213,21 +200,12 @@ async fn create_consumer_group(

// Get the created consumer group details
let group_id_identifier = Identifier::numeric(group_id as u32).unwrap();
let consumer_group_details = {
let future = SendWrapper::new(
state
.shard
.shard()
.streams2
.with_consumer_group_by_id_async(
&command.stream_id,
&command.topic_id,
&group_id_identifier,
async |(root, members)| mapper::map_consumer_group(root, members),
),
);
future.await
};
let consumer_group_details = state.shard.shard().streams2.with_consumer_group_by_id(
&command.stream_id,
&command.topic_id,
&group_id_identifier,
|(root, members)| mapper::map_consumer_group(root, members),
);

// Apply state change
let entry_command = EntryCommand::CreateConsumerGroup(CreateConsumerGroupWithId {
Expand Down
81 changes: 42 additions & 39 deletions core/server/src/shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,55 +572,58 @@ impl IggyShard {
);
match consumer {
PollingConsumer::Consumer(consumer_id, _) => {
self.streams2.with_partition_by_id(
let (offset_value, path) = self.streams2.with_partition_by_id(
&stream_id,
&topic_id,
partition_id,
partitions::helpers::store_consumer_offset(
consumer_id,
numeric_stream_id,
numeric_topic_id,
partition_id,
offset,
&self.config.system,
),
);
self.streams2
.with_partition_by_id_async(
&stream_id,
&topic_id,
partition_id,
partitions::helpers::persist_consumer_offset_to_disk(
self.id,
|(.., offsets, _, _)| {
let hdl = offsets.pin();
let item = hdl.get_or_insert(
consumer_id,
),
)
.await?;
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer(
consumer_id as u32,
&self.config.system.get_consumer_offsets_path(numeric_stream_id, numeric_topic_id, partition_id),
),
);
item.offset.store(offset, std::sync::atomic::Ordering::Relaxed);
let offset_value = item.offset.load(std::sync::atomic::Ordering::Relaxed);
let path = item.path.clone();
(offset_value, path)
},
);
crate::streaming::partitions::storage2::persist_offset(
self.id,
&path,
offset_value,
)
.await?;
}
PollingConsumer::ConsumerGroup(cg_id, _) => {
self.streams2.with_partition_by_id(
let (offset_value, path) = self.streams2.with_partition_by_id(
&stream_id,
&topic_id,
partition_id,
partitions::helpers::store_consumer_group_member_offset(
cg_id,
numeric_stream_id,
numeric_topic_id,
partition_id,
offset,
&self.config.system,
),
);
self.streams2.with_partition_by_id_async(
&stream_id,
&topic_id,
partition_id,
partitions::helpers::persist_consumer_group_member_offset_to_disk(
self.id,
|(.., offsets, _)| {
let hdl = offsets.pin();
let item = hdl.get_or_insert(
cg_id,
),
)
.await?;
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer_group(
cg_id as u32,
&self.config.system.get_consumer_group_offsets_path(numeric_stream_id, numeric_topic_id, partition_id),
),
);
item.offset.store(offset, std::sync::atomic::Ordering::Relaxed);
let offset_value = item.offset.load(std::sync::atomic::Ordering::Relaxed);
let path = item.path.clone();
(offset_value, path)
},
);
crate::streaming::partitions::storage2::persist_offset(
self.id,
&path,
offset_value,
)
.await?;
}
}
}
Expand Down
48 changes: 30 additions & 18 deletions core/server/src/shard/system/consumer_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,26 +258,38 @@ impl IggyShard {
) -> Result<(), IggyError> {
match polling_consumer {
PollingConsumer::Consumer(id, _) => {
self.streams2
.with_partition_by_id_async(
stream_id,
topic_id,
partition_id,
partitions::helpers::persist_consumer_offset_to_disk(self.id, *id),
)
.await
let (offset_value, path) = self.streams2.with_partition_by_id(
stream_id,
topic_id,
partition_id,
|(.., offsets, _, _)| {
let hdl = offsets.pin();
let item = hdl
.get(id)
.expect("persist_consumer_offset_to_disk: offset not found");
let offset = item.offset.load(std::sync::atomic::Ordering::Relaxed);
let path = item.path.clone();
(offset, path)
},
);
partitions::storage2::persist_offset(self.id, &path, offset_value).await
}
PollingConsumer::ConsumerGroup(_, id) => {
self.streams2
.with_partition_by_id_async(
stream_id,
topic_id,
partition_id,
partitions::helpers::persist_consumer_group_member_offset_to_disk(
self.id, *id,
),
)
.await
let (offset_value, path) = self.streams2.with_partition_by_id(
stream_id,
topic_id,
partition_id,
|(.., offsets, _)| {
let hdl = offsets.pin();
let item = hdl.get(id).expect(
"persist_consumer_group_member_offset_to_disk: offset not found",
);
let offset = item.offset.load(std::sync::atomic::Ordering::Relaxed);
let path = item.path.clone();
(offset, path)
},
);
partitions::storage2::persist_offset(self.id, &path, offset_value).await
}
}
}
Expand Down
Loading