From b3de25d5f2d8f4082b332936cb2de9c407bc3de8 Mon Sep 17 00:00:00 2001 From: Sandipsinh Rathod Date: Wed, 4 Jun 2025 13:26:16 -0400 Subject: [PATCH 1/2] feat: add count function to JsonlReader for counting lines in JSONL streams --- crates/async_jsonl/README.md | 11 +- crates/async_jsonl/src/async_jsonl.rs | 3 + crates/async_jsonl/src/jsonl_reader.rs | 6 +- .../tests/integration_tests.rs | 111 ++++++++++++++++++ 4 files changed, 122 insertions(+), 9 deletions(-) diff --git a/crates/async_jsonl/README.md b/crates/async_jsonl/README.md index abe30d4..77ebd70 100644 --- a/crates/async_jsonl/README.md +++ b/crates/async_jsonl/README.md @@ -101,14 +101,9 @@ async fn main() -> anyhow::Result<()> { {"name": "Bob", "age": 25}"#; let reader = Cursor::new(data.as_bytes()); - // Option 1: Using the trait let jsonl = Jsonl::new(reader); let values = jsonl.deserialize_values(); - // Option 2: Using the convenience function - let reader2 = Cursor::new(data.as_bytes()); - let values = jsonl_values(reader2); - let results: Vec = values .collect::>() .await @@ -126,7 +121,7 @@ async fn main() -> anyhow::Result<()> { ### Reading from Memory ```rust -use async_jsonl::{Jsonl, JsonlDeserialize}; +use async_jsonl::Jsonl; use futures::StreamExt; use std::io::Cursor; @@ -149,13 +144,13 @@ async fn main() -> anyhow::Result<()> { ### Counting Lines ```rust -use async_jsonl::Jsonl; +use async_jsonl::{Jsonl,JsonlReader}; #[tokio::main] async fn main() -> anyhow::Result<()> { // Count lines efficiently without full deserialization let jsonl = Jsonl::from_path("large_file.jsonl").await?; - let count = jsonl.count_lines().await?; + let count = jsonl.count().await?; println!("File contains {} non-empty lines", count); diff --git a/crates/async_jsonl/src/async_jsonl.rs b/crates/async_jsonl/src/async_jsonl.rs index e60bc14..6e47ac3 100644 --- a/crates/async_jsonl/src/async_jsonl.rs +++ b/crates/async_jsonl/src/async_jsonl.rs @@ -159,6 +159,9 @@ pub trait JsonlReader: JsonlDeserialize + JsonlValueDeserialize + Stream + Send /// } /// ``` async fn last_n(self, n: usize) -> anyhow::Result; + + /// Count the total number of lines in the JSONL stream. + async fn count(self) -> usize; } /// Extension trait to add deserialization capabilities to JSONL readers. diff --git a/crates/async_jsonl/src/jsonl_reader.rs b/crates/async_jsonl/src/jsonl_reader.rs index d2eb702..f2c73f1 100644 --- a/crates/async_jsonl/src/jsonl_reader.rs +++ b/crates/async_jsonl/src/jsonl_reader.rs @@ -1,6 +1,6 @@ use crate::take_n::{TakeNLines, TakeNLinesReverse}; use crate::{Jsonl, JsonlReader}; -use futures::Stream; +use futures::{Stream, StreamExt}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::File; @@ -18,6 +18,10 @@ impl JsonlReader for Jsonl { async fn last_n(self, n: usize) -> anyhow::Result { self.get_rev_n(n).await } + + async fn count(self) -> usize { + StreamExt::count(self).await + } } impl Jsonl { diff --git a/crates/async_jsonl_tests/tests/integration_tests.rs b/crates/async_jsonl_tests/tests/integration_tests.rs index 82360a4..5066c5f 100644 --- a/crates/async_jsonl_tests/tests/integration_tests.rs +++ b/crates/async_jsonl_tests/tests/integration_tests.rs @@ -282,6 +282,117 @@ invalid_json_line assert_eq!(results[2].as_ref().unwrap()["also_valid"], false); } +#[tokio::test] +async fn test_count_empty_file() { + let data = ""; + + let reader = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(reader); + + let count = JsonlReader::count(jsonl).await; + assert_eq!(count, 0); +} + +#[tokio::test] +async fn test_count_single_line() { + let data = r#"{"value": 42}"#; + + let reader = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(reader); + + let count = JsonlReader::count(jsonl).await; + assert_eq!(count, 1); +} + +#[tokio::test] +async fn test_count_multiple_lines() { + let data = r#"{"id": 1, "name": "Alice", "active": true} +{"id": 2, "name": "Bob", "active": false} +{"id": 3, "name": "Charlie", "active": true} +{"id": 4, "name": "Diana", "active": false} +{"id": 5, "name": "Eve", "active": true} +"#; + + let reader = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(reader); + + let count = JsonlReader::count(jsonl).await; + assert_eq!(count, 5); +} + +#[tokio::test] +async fn test_count_with_empty_lines() { + let data = r#"{"value": 1} + +{"value": 2} + + +{"value": 3} + +"#; + + let reader = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(reader); + + let count = JsonlReader::count(jsonl).await; + // Empty lines should be filtered out, so only 3 valid lines + assert_eq!(count, 3); +} + +#[tokio::test] +async fn test_count_large_file() { + // Create a larger test file with 1000 lines + let mut data = String::new(); + for i in 0..1000 { + data.push_str(&format!("{{\"id\": {}, \"value\": \"item_{}\"}}\n", i, i)); + } + + let reader = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(reader); + + let count = JsonlReader::count(jsonl).await; + assert_eq!(count, 1000); +} + +#[tokio::test] +async fn test_count_with_malformed_lines() { + // Count should include all lines, even if they contain invalid JSON + // because count operates at the line level, not JSON parsing level + let data = r#"{"valid": 1} +invalid_json_line +{"valid": 2} +another_invalid_line +{"valid": 3} +"#; + + let reader = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(reader); + + let count = JsonlReader::count(jsonl).await; + assert_eq!(count, 5); +} + +#[tokio::test] +async fn test_count_consistency_with_streaming() { + let data = r#"{"id": 1, "name": "Alice"} +{"id": 2, "name": "Bob"} +{"id": 3, "name": "Charlie"} +"#; + + // Count using the count method + let reader1 = Cursor::new(data.as_bytes()); + let jsonl1 = Jsonl::new(reader1); + let count_method_result = JsonlReader::count(jsonl1).await; + + // Count by manually consuming the stream + let reader2 = Cursor::new(data.as_bytes()); + let jsonl2 = Jsonl::new(reader2); + let manual_count = jsonl2.collect::>().await.len(); + + assert_eq!(count_method_result, manual_count); + assert_eq!(count_method_result, 3); +} + #[tokio::test] async fn test_complex_nested_values() { let data = r#"{"nested": {"inner": [1, 2, 3]}, "top": "level"} From fd78474cf6f187a96114a1f612bc91efd92f1715 Mon Sep 17 00:00:00 2001 From: Sandipsinh Rathod Date: Wed, 4 Jun 2025 14:13:46 -0400 Subject: [PATCH 2/2] feat: add example for counting lines in JSONL files --- .github/workflows/ci.yml | 6 +- Cargo.lock | 9 +- crates/async_jsonl_ci/Cargo.toml | 2 +- .../examples/count_lines.rs | 179 ++++++++++++++++++ 4 files changed, 187 insertions(+), 9 deletions(-) create mode 100644 crates/async_jsonl_examples/examples/count_lines.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 30092a6..49846dc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,7 +82,7 @@ jobs: pull-requests: write packages: write env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_TOKEN: ${{ secrets.GH_TOKEN }} CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} steps: - name: Checkout Code @@ -91,6 +91,7 @@ jobs: uses: release-plz/action@v0.5 with: command: release + fetch-depth: '0' concurrency: group: release-${{github.ref}} cancel-in-progress: false @@ -106,7 +107,7 @@ jobs: pull-requests: write packages: write env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_TOKEN: ${{ secrets.GH_TOKEN }} CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} steps: - name: Checkout Code @@ -115,6 +116,7 @@ jobs: uses: release-plz/action@v0.5 with: command: release-pr + fetch-depth: '0' concurrency: group: release-${{github.ref}} cancel-in-progress: false diff --git a/Cargo.lock b/Cargo.lock index d89fa5d..a28b699 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -316,8 +316,7 @@ dependencies = [ [[package]] name = "gh-workflow" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3f151d05aed873a8e507af24255923c1c7a5233040e7757ea6bb0f3930893ee" +source = "git+https://github.com/ssddOnTop/gh-workflow?branch=fix%2Fchange-gh-token-name-from-env#ec809ad6620fc6a599152495eee28216f731afd1" dependencies = [ "async-trait", "derive_more", @@ -334,8 +333,7 @@ dependencies = [ [[package]] name = "gh-workflow-macros" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c700d433184ad5a1547ab7e619704a6aa15e2428bdb0dfe78e18aa893f17e2b9" +source = "git+https://github.com/ssddOnTop/gh-workflow?branch=fix%2Fchange-gh-token-name-from-env#ec809ad6620fc6a599152495eee28216f731afd1" dependencies = [ "heck", "quote", @@ -345,8 +343,7 @@ dependencies = [ [[package]] name = "gh-workflow-tailcall" version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b739e49a3a29d7f3638eeee8d3ee6c0db4cbbb02070579fd9a20430614ec95f9" +source = "git+https://github.com/ssddOnTop/gh-workflow?branch=fix%2Fchange-gh-token-name-from-env#ec809ad6620fc6a599152495eee28216f731afd1" dependencies = [ "derive_setters", "gh-workflow", diff --git a/crates/async_jsonl_ci/Cargo.toml b/crates/async_jsonl_ci/Cargo.toml index 94f2757..61d939b 100644 --- a/crates/async_jsonl_ci/Cargo.toml +++ b/crates/async_jsonl_ci/Cargo.toml @@ -4,4 +4,4 @@ version = "0.1.0" edition = "2021" [dependencies] -gh-workflow-tailcall = { version = "0.5.2"} +gh-workflow-tailcall = { version = "0.5.2", git = "https://github.com/ssddOnTop/gh-workflow", branch = "fix/change-gh-token-name-from-env", commit = "6b30b75e8a5d9c8b3c0aad69e4b1ba7bd3d82dbf" } diff --git a/crates/async_jsonl_examples/examples/count_lines.rs b/crates/async_jsonl_examples/examples/count_lines.rs new file mode 100644 index 0000000..eb3456e --- /dev/null +++ b/crates/async_jsonl_examples/examples/count_lines.rs @@ -0,0 +1,179 @@ +//! Example demonstrating how to count lines in JSONL files +//! +//! This example shows how to use the .count() method to efficiently count +//! the total number of lines in a JSONL file without loading all content into memory. + +use async_jsonl::{Jsonl, JsonlReader}; +use std::io::Cursor; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + println!("=== JSONL Line Counting Examples ==="); + + // Example 1: Count lines in memory data + println!("\n=== Example 1: Counting lines in memory ==="); + + let sample_data = r#"{"id": 1, "name": "Alice", "department": "Engineering"} +{"id": 2, "name": "Bob", "department": "Marketing"} +{"id": 3, "name": "Charlie", "department": "Sales"} +{"id": 4, "name": "Diana", "department": "HR"} +{"id": 5, "name": "Eve", "department": "Engineering"} +"#; + + let reader = Cursor::new(sample_data.as_bytes()); + let jsonl = Jsonl::new(reader); + + let line_count = JsonlReader::count(jsonl).await; + println!("Total lines in sample data: {}", line_count); + + // Example 2: Count lines in a file + println!("\n=== Example 2: Counting lines in a file ==="); + + let temp_file_path = "/tmp/employee_data.jsonl"; + + // Create a sample file with employee data + let mut file = File::create(temp_file_path).await?; + let file_data = r#"{"id": 1, "name": "Alice", "salary": 75000, "active": true} +{"id": 2, "name": "Bob", "salary": 68000, "active": true} +{"id": 3, "name": "Charlie", "salary": 82000, "active": false} +{"id": 4, "name": "Diana", "salary": 71000, "active": true} +{"id": 5, "name": "Eve", "salary": 79000, "active": true} +{"id": 6, "name": "Frank", "salary": 65000, "active": false} +{"id": 7, "name": "Grace", "salary": 88000, "active": true} +"#; + file.write_all(file_data.as_bytes()).await?; + file.shutdown().await?; + + // Count lines in the file + let jsonl_file = Jsonl::from_path(temp_file_path).await?; + let file_line_count = JsonlReader::count(jsonl_file).await; + println!( + "Total lines in file '{}': {}", + temp_file_path, file_line_count + ); + + // Example 3: Handling empty files and edge cases + println!("\n=== Example 3: Edge cases ==="); + + // Empty data + let empty_reader = Cursor::new(b""); + let empty_jsonl = Jsonl::new(empty_reader); + let empty_count = JsonlReader::count(empty_jsonl).await; + println!("Lines in empty data: {}", empty_count); + + // Single line + let single_line = r#"{"single": "record"}"#; + let single_reader = Cursor::new(single_line.as_bytes()); + let single_jsonl = Jsonl::new(single_reader); + let single_count = JsonlReader::count(single_jsonl).await; + println!("Lines in single-line data: {}", single_count); + + // Data with empty lines (they get filtered out) + let data_with_empty_lines = r#"{"line": 1} + +{"line": 2} + + +{"line": 3} + +"#; + let empty_lines_reader = Cursor::new(data_with_empty_lines.as_bytes()); + let empty_lines_jsonl = Jsonl::new(empty_lines_reader); + let filtered_count = JsonlReader::count(empty_lines_jsonl).await; + println!( + "Lines in data with empty lines (filtered): {}", + filtered_count + ); + + // Example 4: Performance demonstration with larger data + println!("\n=== Example 4: Performance with larger dataset ==="); + + // Generate a larger dataset + let mut large_data = String::new(); + for i in 0..10000 { + large_data.push_str(&format!( + "{{\"id\": {}, \"timestamp\": \"2024-01-01T{:02}:{:02}:00Z\", \"value\": {}}}\n", + i, + i % 24, // hours (0-23) + i % 60, // minutes (0-59) + i * 42 // some computed value + )); + } + + let large_reader = Cursor::new(large_data.as_bytes()); + let large_jsonl = Jsonl::new(large_reader); + + let start = std::time::Instant::now(); + let large_count = JsonlReader::count(large_jsonl).await; + let elapsed = start.elapsed(); + + println!("Counted {} lines in {:?}", large_count, elapsed); + println!( + "Performance: {:.0} lines/sec", + large_count as f64 / elapsed.as_secs_f64() + ); + + // Example 5: Practical use case - counting records by file extension + println!("\n=== Example 5: Practical use case ==="); + + let log_files = vec![ + ( + "/tmp/app_errors.jsonl", + r#"{"level": "ERROR", "message": "Database timeout"} +{"level": "ERROR", "message": "Authentication failed"} +{"level": "ERROR", "message": "Rate limit exceeded"} +"#, + ), + ( + "/tmp/app_info.jsonl", + r#"{"level": "INFO", "message": "Server started"} +{"level": "INFO", "message": "User logged in"} +{"level": "INFO", "message": "Request processed"} +{"level": "INFO", "message": "Cache refreshed"} +{"level": "INFO", "message": "Backup completed"} +"#, + ), + ( + "/tmp/app_debug.jsonl", + r#"{"level": "DEBUG", "message": "Processing request ID 123"} +{"level": "DEBUG", "message": "Query executed in 45ms"} +"#, + ), + ]; + + let mut total_log_entries = 0; + + for (file_path, content) in log_files { + // Create the log file + let mut log_file = File::create(file_path).await?; + log_file.write_all(content.as_bytes()).await?; + log_file.shutdown().await?; + + // Count entries in this log file + let log_jsonl = Jsonl::from_path(file_path).await?; + let log_count = JsonlReader::count(log_jsonl).await; + total_log_entries += log_count; + + println!("Log file '{}': {} entries", file_path, log_count); + + // Clean up + tokio::fs::remove_file(file_path).await.ok(); + } + + println!("Total log entries across all files: {}", total_log_entries); + + // Clean up the temp file + tokio::fs::remove_file(temp_file_path).await.ok(); + + println!("\n=== Summary ==="); + println!("The count() method provides an efficient way to:"); + println!("• Count total lines in JSONL files without loading all data"); + println!("• Handle large files with good performance"); + println!("• Filter out empty lines automatically"); + println!("• Work with both in-memory data and files"); + println!("• Process multiple files for aggregate statistics"); + + Ok(()) +}