From f93f93b00274e0fb1b3e40a121b510b184687231 Mon Sep 17 00:00:00 2001 From: Sandipsinh Rathod Date: Tue, 3 Jun 2025 13:37:34 -0400 Subject: [PATCH 1/3] feat: add methods to read first and last n lines from JSONL files --- crates/async_jsonl/src/async_jsonl.rs | 160 +++++++++++++- .../examples/reverse_reading.rs | 98 +++++++++ .../async_jsonl_tests/tests/reverse_tests.rs | 202 ++++++++++++++++++ 3 files changed, 459 insertions(+), 1 deletion(-) create mode 100644 crates/async_jsonl_examples/examples/reverse_reading.rs create mode 100644 crates/async_jsonl_tests/tests/reverse_tests.rs diff --git a/crates/async_jsonl/src/async_jsonl.rs b/crates/async_jsonl/src/async_jsonl.rs index e91e3b1..ce3d968 100644 --- a/crates/async_jsonl/src/async_jsonl.rs +++ b/crates/async_jsonl/src/async_jsonl.rs @@ -2,7 +2,7 @@ use futures::Stream; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::File; -use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader, Lines}; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncSeek, BufReader, Lines}; /// Iterator to read JSONL file as raw JSON strings pub struct Jsonl { @@ -59,3 +59,161 @@ impl Stream for Jsonl { } } } + +/// Stream that yields n lines from the beginning of a JSONL file +pub struct TakeNLines { + lines: Lines>, + remaining: usize, +} + +impl TakeNLines { + fn new(reader: R, n: usize) -> Self { + let buf_reader = BufReader::new(reader); + Self { + lines: buf_reader.lines(), + remaining: n, + } + } +} + +impl Stream for TakeNLines { + type Item = anyhow::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.remaining == 0 { + return Poll::Ready(None); + } + + match Pin::new(&mut self.lines).poll_next_line(cx) { + Poll::Ready(Ok(Some(line))) => { + let line = line.trim(); + if !line.is_empty() { + self.remaining -= 1; + Poll::Ready(Some(Ok(line.to_string()))) + } else { + // Skip empty lines and try again + self.poll_next(cx) + } + } + Poll::Ready(Ok(None)) => Poll::Ready(None), // EOF + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(anyhow::anyhow!("IO error: {}", e)))), + Poll::Pending => Poll::Pending, + } + } +} + +/// Stream that yields n lines from the end of a JSONL file +pub struct TakeNLinesReverse { + lines: std::vec::IntoIter, +} + +impl TakeNLinesReverse { + async fn new( + mut reader: R, + n: usize, + ) -> std::io::Result { + let mut lines_found = Vec::new(); + let mut buffer = Vec::new(); + let chunk_size = 8192; + + let file_size = + tokio::io::AsyncSeekExt::seek(&mut reader, std::io::SeekFrom::End(0)).await?; + + if file_size == 0 || n == 0 { + return Ok(Self { + lines: Vec::new().into_iter(), + }); + } + + let mut current_pos = file_size; + + // Read file backwards until we find n lines + while current_pos > 0 && lines_found.len() < n { + let read_size = std::cmp::min(chunk_size as u64, current_pos) as usize; + let new_pos = current_pos - read_size as u64; + + tokio::io::AsyncSeekExt::seek(&mut reader, std::io::SeekFrom::Start(new_pos)).await?; + + let mut chunk = vec![0u8; read_size]; + tokio::io::AsyncReadExt::read_exact(&mut reader, &mut chunk).await?; + + chunk.extend_from_slice(&buffer); + buffer = chunk; + current_pos = new_pos; + + let buffer_str = String::from_utf8_lossy(&buffer).into_owned(); + let lines: Vec<&str> = buffer_str.lines().collect(); + + let start_idx = if current_pos > 0 && !buffer.is_empty() && buffer[0] != b'\n' { + if lines.len() > 1 { + let incomplete_line = lines[0].to_string(); + buffer = incomplete_line.into_bytes(); + 1 + } else { + continue; + } + } else { + buffer.clear(); + 0 + }; + + for line in lines[start_idx..].iter().rev() { + let trimmed = line.trim(); + if !trimmed.is_empty() { + lines_found.insert(0, trimmed.to_string()); + if lines_found.len() >= n { + break; + } + } + } + } + + // Keep only the last n lines and reverse to get correct order (last line first) + if lines_found.len() > n { + let excess = lines_found.len() - n; + lines_found.drain(0..excess); + } + lines_found.reverse(); + + Ok(Self { + lines: lines_found.into_iter(), + }) + } +} + +impl Stream for TakeNLinesReverse { + type Item = anyhow::Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + match self.lines.next() { + Some(line) => Poll::Ready(Some(Ok(line))), + None => Poll::Ready(None), + } + } +} + +// Add get_n and get_rev_n methods +impl Jsonl { + /// Get the first n lines from the beginning of the file + pub fn get_n(self, n: usize) -> TakeNLines { + let reader = self.lines.into_inner().into_inner(); + TakeNLines::new(reader, n) + } +} + +impl Jsonl { + /// Get the last n lines from the end of the file (like tail) + pub async fn get_rev_n(self, n: usize) -> std::io::Result { + let reader = self.lines.into_inner().into_inner(); + TakeNLinesReverse::new(reader, n).await + } +} + +// Special implementations for File +impl Jsonl { + /// Get the first n lines from the beginning of the file + pub fn get_n_file(self, n: usize) -> TakeNLines { + let reader = self.lines.into_inner().into_inner(); + TakeNLines::new(reader, n) + } +} diff --git a/crates/async_jsonl_examples/examples/reverse_reading.rs b/crates/async_jsonl_examples/examples/reverse_reading.rs new file mode 100644 index 0000000..fb500d7 --- /dev/null +++ b/crates/async_jsonl_examples/examples/reverse_reading.rs @@ -0,0 +1,98 @@ +//! Example demonstrating reverse reading of JSONL files +//! +//! This example shows how to use the .get_rev_n() method to read JSONL files +//! from end to beginning, similar to the Unix `tail` command. + +use async_jsonl::Jsonl; +use futures::StreamExt; +use std::io::Cursor; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create sample JSONL data + let sample_data = r#"{"timestamp": "2024-01-01T10:00:00Z", "level": "INFO", "message": "Application started"} +{"timestamp": "2024-01-01T10:01:00Z", "level": "DEBUG", "message": "Processing request 1"} +{"timestamp": "2024-01-01T10:02:00Z", "level": "WARN", "message": "High memory usage detected"} +{"timestamp": "2024-01-01T10:03:00Z", "level": "ERROR", "message": "Database connection failed"} +{"timestamp": "2024-01-01T10:04:00Z", "level": "INFO", "message": "Retrying connection"} +"#; + + println!("=== Example 1: Reading from memory (Cursor) ==="); + + // Example 1: Reading from a Cursor (in-memory data) + let cursor = Cursor::new(sample_data.as_bytes()); + let jsonl = Jsonl::new(cursor); + + println!("Reading last 5 lines in reverse order:"); + let rev_jsonl = jsonl.get_rev_n(5).await?; + + // Collect results and iterate + let results: Vec<_> = rev_jsonl.collect().await; + for (i, result) in results.iter().enumerate() { + match result { + Ok(line) => println!("{}: {}", i + 1, line), + Err(e) => eprintln!("Error reading line: {}", e), + } + } + + println!(); + + // Example 2: Reading from a file + println!("=== Example 2: Reading from file ==="); + + let temp_file_path = "/tmp/example_log.jsonl"; + + // Create a temporary file + let mut file = File::create(temp_file_path).await?; + file.write_all(sample_data.as_bytes()).await?; + file.shutdown().await?; + + // Read the file in reverse + let jsonl_file = Jsonl::from_path(temp_file_path).await?; + let rev_jsonl_file = jsonl_file.get_rev_n(3).await?; + + println!("Reading file in reverse order (last 3 lines):"); + let results: Vec<_> = rev_jsonl_file.collect().await; + + for (i, result) in results.iter().enumerate() { + match result { + Ok(line) => println!("{}: {}", i + 1, line), + Err(e) => eprintln!("Error: {}", e), + } + } + + // Clean up + tokio::fs::remove_file(temp_file_path).await.ok(); + + println!(); + + // Example 3: Compare forward vs reverse reading + println!("=== Example 3: Forward vs Reverse comparison ==="); + + let cursor_forward = Cursor::new(sample_data.as_bytes()); + let jsonl_forward = Jsonl::new(cursor_forward); + + let cursor_reverse = Cursor::new(sample_data.as_bytes()); + let jsonl_reverse = Jsonl::new(cursor_reverse); + + println!("Forward reading:"); + let forward_results: Vec<_> = jsonl_forward.collect().await; + for (i, result) in forward_results.iter().enumerate() { + if let Ok(line) = result { + println!("{}: {}", i + 1, line); + } + } + + println!("\nReverse reading (last 5 lines):"); + let rev_jsonl_compare = jsonl_reverse.get_rev_n(5).await?; + let reverse_results: Vec<_> = rev_jsonl_compare.collect().await; + for (i, result) in reverse_results.iter().enumerate() { + if let Ok(line) = result { + println!("{}: {}", i + 1, line); + } + } + + Ok(()) +} diff --git a/crates/async_jsonl_tests/tests/reverse_tests.rs b/crates/async_jsonl_tests/tests/reverse_tests.rs new file mode 100644 index 0000000..8d5e7c3 --- /dev/null +++ b/crates/async_jsonl_tests/tests/reverse_tests.rs @@ -0,0 +1,202 @@ +use async_jsonl::Jsonl; +use futures::StreamExt; +use std::io::Cursor; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; + +#[tokio::test] +async fn test_get_rev_n_with_cursor() { + let data = r#"{"id": 1, "name": "Alice", "active": true} +{"id": 2, "name": "Bob", "active": false} +{"id": 3, "name": "Charlie", "active": true} +"#; + + let cursor = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(cursor); + + let rev_jsonl = jsonl + .get_rev_n(3) + .await + .expect("Failed to create reverse iterator"); + + let results: Vec<_> = rev_jsonl.collect().await; + + assert_eq!(results.len(), 3); + assert!(results.iter().all(|r| r.is_ok())); + + let lines: Vec = results.into_iter().map(|r| r.unwrap()).collect(); + + // Should be in reverse order (last lines first) + assert_eq!(lines[0], r#"{"id": 3, "name": "Charlie", "active": true}"#); + assert_eq!(lines[1], r#"{"id": 2, "name": "Bob", "active": false}"#); + assert_eq!(lines[2], r#"{"id": 1, "name": "Alice", "active": true}"#); +} + +#[tokio::test] +async fn test_get_rev_n_with_file() { + let temp_file_path = "/tmp/test_reverse_jsonl.jsonl"; + + // Create a test file + let mut file = File::create(temp_file_path) + .await + .expect("Failed to create temp file"); + let data = r#"{"line": 1} +{"line": 2} +{"line": 3} +{"line": 4} +{"line": 5} +"#; + file.write_all(data.as_bytes()) + .await + .expect("Failed to write to temp file"); + file.shutdown().await.expect("Failed to close file"); + + // Test getting last 3 lines + let jsonl = Jsonl::from_path(temp_file_path) + .await + .expect("Failed to open file"); + let rev_jsonl = jsonl + .get_rev_n(3) + .await + .expect("Failed to create reverse iterator"); + + let results: Vec<_> = rev_jsonl.collect().await; + + assert_eq!(results.len(), 3); + assert!(results.iter().all(|r| r.is_ok())); + + let lines: Vec = results.into_iter().map(|r| r.unwrap()).collect(); + + // Should be last 3 lines in reverse order + assert_eq!(lines[0], r#"{"line": 5}"#); + assert_eq!(lines[1], r#"{"line": 4}"#); + assert_eq!(lines[2], r#"{"line": 3}"#); + + // Clean up + tokio::fs::remove_file(temp_file_path).await.ok(); +} + +#[tokio::test] +async fn test_get_rev_n_empty_file() { + let cursor = Cursor::new(b""); + let jsonl = Jsonl::new(cursor); + + let rev_jsonl = jsonl + .get_rev_n(5) + .await + .expect("Failed to create reverse iterator"); + let results: Vec<_> = rev_jsonl.collect().await; + + assert_eq!(results.len(), 0); +} + +#[tokio::test] +async fn test_get_rev_n_single_line() { + let data = r#"{"single": "line"} +"#; + + let cursor = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(cursor); + + let rev_jsonl = jsonl + .get_rev_n(1) + .await + .expect("Failed to create reverse iterator"); + let results: Vec<_> = rev_jsonl.collect().await; + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + + let line = results.into_iter().next().unwrap().unwrap(); + assert_eq!(line, r#"{"single": "line"}"#); +} + +#[tokio::test] +async fn test_get_rev_n_with_empty_lines() { + let data = r#"{"id": 1} + +{"id": 2} + + +{"id": 3} + +"#; + + let cursor = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(cursor); + + let rev_jsonl = jsonl + .get_rev_n(3) + .await + .expect("Failed to create reverse iterator"); + let results: Vec<_> = rev_jsonl.collect().await; + + assert_eq!(results.len(), 3); + assert!(results.iter().all(|r| r.is_ok())); + + let lines: Vec = results.into_iter().map(|r| r.unwrap()).collect(); + + // Should be in reverse order, with empty lines filtered out + assert_eq!(lines[0], r#"{"id": 3}"#); + assert_eq!(lines[1], r#"{"id": 2}"#); + assert_eq!(lines[2], r#"{"id": 1}"#); +} + +#[tokio::test] +async fn test_get_n_lines() { + let data = r#"{"name": "Alice", "age": 30} +{"name": "Bob", "age": 25} +{"name": "Charlie", "age": 35} +{"name": "Dave", "age": 40} +{"name": "Eve", "age": 28} +"#; + + let cursor = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(cursor); + + let first_3 = jsonl.get_n(3); + let results: Vec<_> = first_3.collect().await; + + assert_eq!(results.len(), 3); + assert!(results.iter().all(|r| r.is_ok())); + + let lines: Vec = results.into_iter().map(|r| r.unwrap()).collect(); + + // Should be first 3 lines in order + assert_eq!(lines[0], r#"{"name": "Alice", "age": 30}"#); + assert_eq!(lines[1], r#"{"name": "Bob", "age": 25}"#); + assert_eq!(lines[2], r#"{"name": "Charlie", "age": 35}"#); +} + +#[tokio::test] +async fn test_compare_forward_and_reverse() { + let data = r#"{"test": 1} +{"test": 2} +{"test": 3} +"#; + + // Forward reading first 3 + let cursor_forward = Cursor::new(data.as_bytes()); + let jsonl_forward = Jsonl::new(cursor_forward); + let forward_stream = jsonl_forward.get_n(3); + let forward_results: Vec<_> = forward_stream.collect().await; + let forward_lines: Vec = forward_results.into_iter().map(|r| r.unwrap()).collect(); + + // Reverse reading last 3 + let cursor_reverse = Cursor::new(data.as_bytes()); + let jsonl_reverse = Jsonl::new(cursor_reverse); + let rev_jsonl = jsonl_reverse + .get_rev_n(3) + .await + .expect("Failed to create reverse iterator"); + let reverse_results: Vec<_> = rev_jsonl.collect().await; + let reverse_lines: Vec = reverse_results.into_iter().map(|r| r.unwrap()).collect(); + + // They should be exactly opposite + assert_eq!(forward_lines.len(), reverse_lines.len()); + + let mut expected_reverse = forward_lines.clone(); + expected_reverse.reverse(); + + assert_eq!(reverse_lines, expected_reverse); +} From 5c71d337ffd1cdb7f9ae02cddca78196f83814b2 Mon Sep 17 00:00:00 2001 From: Sandipsinh Rathod Date: Tue, 3 Jun 2025 15:06:44 -0400 Subject: [PATCH 2/3] feat: implement JsonlReader trait for reading first and last n lines from JSONL files --- Cargo.lock | 1 + crates/async_jsonl/Cargo.toml | 1 + crates/async_jsonl/src/async_jsonl.rs | 226 ++---------------- crates/async_jsonl/src/jsonl_reader.rs | 75 ++++++ crates/async_jsonl/src/lib.rs | 3 +- crates/async_jsonl/src/take_n.rs | 137 +++++++++++ crates/async_jsonl/src/value.rs | 20 +- crates/async_jsonl_examples/examples/count.rs | 69 ------ .../examples/reverse_reading.rs | 8 +- .../tests/integration_tests.rs | 53 ---- .../async_jsonl_tests/tests/reverse_tests.rs | 18 +- 11 files changed, 251 insertions(+), 360 deletions(-) create mode 100644 crates/async_jsonl/src/jsonl_reader.rs create mode 100644 crates/async_jsonl/src/take_n.rs delete mode 100644 crates/async_jsonl_examples/examples/count.rs diff --git a/Cargo.lock b/Cargo.lock index e612c31..0992143 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,7 @@ name = "async-jsonl" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "futures", "serde", "serde_json", diff --git a/crates/async_jsonl/Cargo.toml b/crates/async_jsonl/Cargo.toml index 6899374..f98385a 100644 --- a/crates/async_jsonl/Cargo.toml +++ b/crates/async_jsonl/Cargo.toml @@ -12,5 +12,6 @@ futures = "0.3.31" anyhow = "1.0.98" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" +async-trait = "0.1.88" [dev-dependencies] diff --git a/crates/async_jsonl/src/async_jsonl.rs b/crates/async_jsonl/src/async_jsonl.rs index ce3d968..0ec3335 100644 --- a/crates/async_jsonl/src/async_jsonl.rs +++ b/crates/async_jsonl/src/async_jsonl.rs @@ -1,219 +1,31 @@ use futures::Stream; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::fs::File; -use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncSeek, BufReader, Lines}; +use serde::Deserialize; +use serde_json::Value; +use tokio::io::{BufReader, Lines}; /// Iterator to read JSONL file as raw JSON strings pub struct Jsonl { pub(crate) lines: Lines>, } -impl Jsonl { - pub fn new(file: R) -> Self { - let reader = BufReader::new(file); - Self { - lines: reader.lines(), - } - } - /// Count lines from any AsyncRead source - pub async fn count_lines(mut self) -> anyhow::Result { - let mut count = 0; - while let Some(line) = self.lines.next_line().await? { - let trimmed = line.trim(); - if !trimmed.is_empty() { - count += 1; - } - } - Ok(count) - } +#[async_trait::async_trait] +pub trait JsonlReader: JsonlDeserialize + JsonlValueDeserialize + Stream + Send + Sync { + type NLines: Stream>; + type NLinesRev: Stream>; + async fn first_n(self, n: usize) -> anyhow::Result; + async fn last_n(self, n: usize) -> anyhow::Result; } -impl Jsonl { - /// Create a new Jsonl reader from a file path - pub async fn from_path>(path: P) -> anyhow::Result { - let file = File::open(path) - .await - .map_err(|e| anyhow::anyhow!("Failed to open file: {}", e))?; - Ok(Self::new(file)) - } +/// Extension trait to add deserialization capabilities to JsonlIterator +pub trait JsonlDeserialize { + /// Deserialize JSON lines into the specified type + fn deserialize(self) -> impl Stream> + where + T: for<'a> Deserialize<'a>; } -impl Stream for Jsonl { - type Item = anyhow::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.lines).poll_next_line(cx) { - Poll::Ready(Ok(Some(line))) => { - let line = line.trim(); - if line.is_empty() { - // Skip empty lines and recursively poll for next - self.poll_next(cx) - } else { - Poll::Ready(Some(Ok(line.to_string()))) - } - } - Poll::Ready(Ok(None)) => Poll::Ready(None), // EOF - Poll::Ready(Err(e)) => Poll::Ready(Some(Err(anyhow::anyhow!("IO error: {}", e)))), - Poll::Pending => Poll::Pending, - } - } -} - -/// Stream that yields n lines from the beginning of a JSONL file -pub struct TakeNLines { - lines: Lines>, - remaining: usize, -} - -impl TakeNLines { - fn new(reader: R, n: usize) -> Self { - let buf_reader = BufReader::new(reader); - Self { - lines: buf_reader.lines(), - remaining: n, - } - } -} - -impl Stream for TakeNLines { - type Item = anyhow::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.remaining == 0 { - return Poll::Ready(None); - } - - match Pin::new(&mut self.lines).poll_next_line(cx) { - Poll::Ready(Ok(Some(line))) => { - let line = line.trim(); - if !line.is_empty() { - self.remaining -= 1; - Poll::Ready(Some(Ok(line.to_string()))) - } else { - // Skip empty lines and try again - self.poll_next(cx) - } - } - Poll::Ready(Ok(None)) => Poll::Ready(None), // EOF - Poll::Ready(Err(e)) => Poll::Ready(Some(Err(anyhow::anyhow!("IO error: {}", e)))), - Poll::Pending => Poll::Pending, - } - } -} - -/// Stream that yields n lines from the end of a JSONL file -pub struct TakeNLinesReverse { - lines: std::vec::IntoIter, -} - -impl TakeNLinesReverse { - async fn new( - mut reader: R, - n: usize, - ) -> std::io::Result { - let mut lines_found = Vec::new(); - let mut buffer = Vec::new(); - let chunk_size = 8192; - - let file_size = - tokio::io::AsyncSeekExt::seek(&mut reader, std::io::SeekFrom::End(0)).await?; - - if file_size == 0 || n == 0 { - return Ok(Self { - lines: Vec::new().into_iter(), - }); - } - - let mut current_pos = file_size; - - // Read file backwards until we find n lines - while current_pos > 0 && lines_found.len() < n { - let read_size = std::cmp::min(chunk_size as u64, current_pos) as usize; - let new_pos = current_pos - read_size as u64; - - tokio::io::AsyncSeekExt::seek(&mut reader, std::io::SeekFrom::Start(new_pos)).await?; - - let mut chunk = vec![0u8; read_size]; - tokio::io::AsyncReadExt::read_exact(&mut reader, &mut chunk).await?; - - chunk.extend_from_slice(&buffer); - buffer = chunk; - current_pos = new_pos; - - let buffer_str = String::from_utf8_lossy(&buffer).into_owned(); - let lines: Vec<&str> = buffer_str.lines().collect(); - - let start_idx = if current_pos > 0 && !buffer.is_empty() && buffer[0] != b'\n' { - if lines.len() > 1 { - let incomplete_line = lines[0].to_string(); - buffer = incomplete_line.into_bytes(); - 1 - } else { - continue; - } - } else { - buffer.clear(); - 0 - }; - - for line in lines[start_idx..].iter().rev() { - let trimmed = line.trim(); - if !trimmed.is_empty() { - lines_found.insert(0, trimmed.to_string()); - if lines_found.len() >= n { - break; - } - } - } - } - - // Keep only the last n lines and reverse to get correct order (last line first) - if lines_found.len() > n { - let excess = lines_found.len() - n; - lines_found.drain(0..excess); - } - lines_found.reverse(); - - Ok(Self { - lines: lines_found.into_iter(), - }) - } -} - -impl Stream for TakeNLinesReverse { - type Item = anyhow::Result; - - fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - match self.lines.next() { - Some(line) => Poll::Ready(Some(Ok(line))), - None => Poll::Ready(None), - } - } -} - -// Add get_n and get_rev_n methods -impl Jsonl { - /// Get the first n lines from the beginning of the file - pub fn get_n(self, n: usize) -> TakeNLines { - let reader = self.lines.into_inner().into_inner(); - TakeNLines::new(reader, n) - } -} - -impl Jsonl { - /// Get the last n lines from the end of the file (like tail) - pub async fn get_rev_n(self, n: usize) -> std::io::Result { - let reader = self.lines.into_inner().into_inner(); - TakeNLinesReverse::new(reader, n).await - } -} - -// Special implementations for File -impl Jsonl { - /// Get the first n lines from the beginning of the file - pub fn get_n_file(self, n: usize) -> TakeNLines { - let reader = self.lines.into_inner().into_inner(); - TakeNLines::new(reader, n) - } +/// Extension trait specifically for deserializing JSONL to serde_json::Value +pub trait JsonlValueDeserialize { + /// Deserialize JSON lines into serde_json::Value objects + fn deserialize_values(self) -> impl Stream>; } diff --git a/crates/async_jsonl/src/jsonl_reader.rs b/crates/async_jsonl/src/jsonl_reader.rs new file mode 100644 index 0000000..d2eb702 --- /dev/null +++ b/crates/async_jsonl/src/jsonl_reader.rs @@ -0,0 +1,75 @@ +use crate::take_n::{TakeNLines, TakeNLinesReverse}; +use crate::{Jsonl, JsonlReader}; +use futures::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::fs::File; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncSeek, BufReader}; + +#[async_trait::async_trait] +impl JsonlReader for Jsonl { + type NLines = TakeNLines; + type NLinesRev = TakeNLinesReverse; + + async fn first_n(self, n: usize) -> anyhow::Result { + Ok(self.get_n(n)) + } + + async fn last_n(self, n: usize) -> anyhow::Result { + self.get_rev_n(n).await + } +} + +impl Jsonl { + pub fn new(file: R) -> Self { + let reader = BufReader::new(file); + Self { + lines: reader.lines(), + } + } + + /// Get the first n lines from the beginning of the file + pub(crate) fn get_n(self, n: usize) -> TakeNLines { + let reader = self.lines.into_inner().into_inner(); + TakeNLines::new(reader, n) + } +} + +impl Jsonl { + /// Get the last n lines from the end of the file (like tail) + pub(crate) async fn get_rev_n(self, n: usize) -> anyhow::Result { + let reader = self.lines.into_inner().into_inner(); + TakeNLinesReverse::new(reader, n).await + } +} + +impl Jsonl { + /// Create a new Jsonl reader from a file path + pub async fn from_path>(path: P) -> anyhow::Result { + let file = File::open(path) + .await + .map_err(|e| anyhow::anyhow!("Failed to open file: {}", e))?; + Ok(Self::new(file)) + } +} + +impl Stream for Jsonl { + type Item = anyhow::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.lines).poll_next_line(cx) { + Poll::Ready(Ok(Some(line))) => { + let line = line.trim(); + if line.is_empty() { + // Skip empty lines and recursively poll for next + self.poll_next(cx) + } else { + Poll::Ready(Some(Ok(line.to_string()))) + } + } + Poll::Ready(Ok(None)) => Poll::Ready(None), // EOF + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(anyhow::anyhow!("IO error: {}", e)))), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/crates/async_jsonl/src/lib.rs b/crates/async_jsonl/src/lib.rs index 6d20622..e20f3c6 100644 --- a/crates/async_jsonl/src/lib.rs +++ b/crates/async_jsonl/src/lib.rs @@ -1,5 +1,6 @@ mod async_jsonl; +mod jsonl_reader; +mod take_n; mod value; pub use async_jsonl::*; -pub use value::*; diff --git a/crates/async_jsonl/src/take_n.rs b/crates/async_jsonl/src/take_n.rs new file mode 100644 index 0000000..0c8182c --- /dev/null +++ b/crates/async_jsonl/src/take_n.rs @@ -0,0 +1,137 @@ +use futures::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{ + AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, BufReader, Lines, +}; + +/// Stream that yields n lines from the beginning of a JSONL file +pub struct TakeNLines { + lines: Lines>, + remaining: usize, +} + +impl TakeNLines { + pub(crate) fn new(reader: R, n: usize) -> Self { + let buf_reader = BufReader::new(reader); + Self { + lines: buf_reader.lines(), + remaining: n, + } + } +} + +impl Stream for TakeNLines { + type Item = anyhow::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.remaining == 0 { + return Poll::Ready(None); + } + + match Pin::new(&mut self.lines).poll_next_line(cx) { + Poll::Ready(Ok(Some(line))) => { + let line = line.trim(); + if !line.is_empty() { + self.remaining -= 1; + Poll::Ready(Some(Ok(line.to_string()))) + } else { + // Skip empty lines and try again + self.poll_next(cx) + } + } + Poll::Ready(Ok(None)) => Poll::Ready(None), // EOF + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(anyhow::anyhow!("IO error: {}", e)))), + Poll::Pending => Poll::Pending, + } + } +} + +/// Stream that yields n lines from the end of a JSONL file +pub struct TakeNLinesReverse { + lines: std::vec::IntoIter, +} + +impl TakeNLinesReverse { + pub(crate) async fn new( + mut reader: R, + n: usize, + ) -> anyhow::Result { + let mut lines_found = Vec::new(); + let mut buffer = Vec::new(); + let chunk_size = 8192; + + let file_size = reader.seek(std::io::SeekFrom::End(0)).await?; + + if file_size == 0 || n == 0 { + return Ok(Self { + lines: Vec::new().into_iter(), + }); + } + + let mut current_pos = file_size; + + // Read file backwards until we find n lines + while current_pos > 0 && lines_found.len() < n { + let read_size = std::cmp::min(chunk_size as u64, current_pos) as usize; + let new_pos = current_pos - read_size as u64; + + reader.seek(std::io::SeekFrom::Start(new_pos)).await?; + + let mut chunk = vec![0u8; read_size]; + reader.read_exact(chunk.as_mut_slice()).await?; + + chunk.extend_from_slice(&buffer); + buffer = chunk; + current_pos = new_pos; + + let buffer_str = String::from_utf8_lossy(&buffer).into_owned(); + let lines: Vec<&str> = buffer_str.lines().collect(); + + let start_idx = if current_pos > 0 && !buffer.is_empty() && buffer[0] != b'\n' { + if lines.len() > 1 { + let incomplete_line = lines[0].to_string(); + buffer = incomplete_line.into_bytes(); + 1 + } else { + continue; + } + } else { + buffer.clear(); + 0 + }; + + for line in lines[start_idx..].iter().rev() { + let trimmed = line.trim(); + if !trimmed.is_empty() { + lines_found.insert(0, trimmed.to_string()); + if lines_found.len() >= n { + break; + } + } + } + } + + // Keep only the last n lines and reverse to get correct order (last line first) + if lines_found.len() > n { + let excess = lines_found.len() - n; + lines_found.drain(0..excess); + } + lines_found.reverse(); + + Ok(Self { + lines: lines_found.into_iter(), + }) + } +} + +impl Stream for TakeNLinesReverse { + type Item = anyhow::Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + match self.lines.next() { + Some(line) => Poll::Ready(Some(Ok(line))), + None => Poll::Ready(None), + } + } +} diff --git a/crates/async_jsonl/src/value.rs b/crates/async_jsonl/src/value.rs index 73667ba..516d072 100644 --- a/crates/async_jsonl/src/value.rs +++ b/crates/async_jsonl/src/value.rs @@ -1,18 +1,10 @@ -use crate::Jsonl; +use crate::{Jsonl, JsonlDeserialize, JsonlValueDeserialize}; use futures::{Stream, StreamExt}; use serde::Deserialize; use serde_json::Value; use tokio::io::AsyncRead; -/// Extension trait to add deserialization capabilities to JsonlIterator -pub trait JsonlDeserialize { - /// Deserialize JSON lines into the specified type - fn deserialize(self) -> impl Stream> - where - T: for<'a> Deserialize<'a>; -} - -impl JsonlDeserialize for Jsonl { +impl JsonlDeserialize for Jsonl { fn deserialize(self) -> impl Stream> where T: for<'a> Deserialize<'a>, @@ -26,13 +18,7 @@ impl JsonlDeserialize for Jsonl { } } -/// Extension trait specifically for deserializing JSONL to serde_json::Value -pub trait JsonlValueDeserialize { - /// Deserialize JSON lines into serde_json::Value objects - fn deserialize_values(self) -> impl Stream>; -} - -impl JsonlValueDeserialize for Jsonl { +impl JsonlValueDeserialize for Jsonl { fn deserialize_values(self) -> impl Stream> { self.deserialize::() } diff --git a/crates/async_jsonl_examples/examples/count.rs b/crates/async_jsonl_examples/examples/count.rs deleted file mode 100644 index 3dfce97..0000000 --- a/crates/async_jsonl_examples/examples/count.rs +++ /dev/null @@ -1,69 +0,0 @@ -//! Example demonstrating line counting capabilities - -use async_jsonl::{Jsonl, JsonlDeserialize}; -use futures::StreamExt; -use serde_json::Value; -use std::io::Cursor; -use tempfile::NamedTempFile; -use tokio::fs; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - // Example 1: Line counting from memory - println!("=== Example 1: Line counting from memory ==="); - let data = r#"{"id": 1, "name": "Alice"} -{"id": 2, "name": "Bob"} - -{"id": 3, "name": "Charlie"} - -{"id": 4, "name": "Diana"} -"#; - - let reader = Cursor::new(data.as_bytes()); - let jsonl = Jsonl::new(reader); - let count = jsonl.count_lines().await?; - println!("Total non-empty lines: {}", count); - - // Example 2: Reading from file and counting - println!("\n=== Example 2: File operations ==="); - let temp_file = NamedTempFile::new()?; - let temp_path = temp_file.path(); - - let file_data = r#"{"order": 1, "product": "Laptop", "price": 999.99} -{"order": 2, "product": "Mouse", "price": 29.99} -{"order": 3, "product": "Keyboard", "price": 79.99} -{"order": 4, "product": "Monitor", "price": 299.99} -{"order": 5, "product": "Webcam", "price": 89.99} -"#; - - fs::write(temp_path, file_data).await?; - - // Count lines in the file - let file_line_count = Jsonl::from_path(temp_path).await?.count_lines().await?; - println!("Lines in file: {}", file_line_count); - - // Example 3: Forward reading - println!("\n=== Example 3: Forward reading ==="); - let jsonl = Jsonl::from_path(temp_path).await?; - let mut forward_stream = jsonl.deserialize::(); - - println!("Forward order:"); - while let Some(result) = forward_stream.next().await { - let value = result?; - println!( - " Order {}: {} - ${}", - value["order"], value["product"], value["price"] - ); - } - - // Example 5: Comparing counts - println!("\n=== Example 5: Count comparison ==="); - let jsonl_for_count = Jsonl::from_path(temp_path).await?; - let manual_count = jsonl_for_count.collect::>().await.len(); - - println!("File line count: {}", file_line_count); - println!("Manual count: {}", manual_count); - println!("Counts match: {}", file_line_count == manual_count); - - Ok(()) -} diff --git a/crates/async_jsonl_examples/examples/reverse_reading.rs b/crates/async_jsonl_examples/examples/reverse_reading.rs index fb500d7..3e2e7e3 100644 --- a/crates/async_jsonl_examples/examples/reverse_reading.rs +++ b/crates/async_jsonl_examples/examples/reverse_reading.rs @@ -3,7 +3,7 @@ //! This example shows how to use the .get_rev_n() method to read JSONL files //! from end to beginning, similar to the Unix `tail` command. -use async_jsonl::Jsonl; +use async_jsonl::{Jsonl, JsonlReader}; use futures::StreamExt; use std::io::Cursor; use tokio::fs::File; @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box> { let jsonl = Jsonl::new(cursor); println!("Reading last 5 lines in reverse order:"); - let rev_jsonl = jsonl.get_rev_n(5).await?; + let rev_jsonl = jsonl.last_n(5).await?; // Collect results and iterate let results: Vec<_> = rev_jsonl.collect().await; @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box> { // Read the file in reverse let jsonl_file = Jsonl::from_path(temp_file_path).await?; - let rev_jsonl_file = jsonl_file.get_rev_n(3).await?; + let rev_jsonl_file = jsonl_file.last_n(3).await?; println!("Reading file in reverse order (last 3 lines):"); let results: Vec<_> = rev_jsonl_file.collect().await; @@ -86,7 +86,7 @@ async fn main() -> Result<(), Box> { } println!("\nReverse reading (last 5 lines):"); - let rev_jsonl_compare = jsonl_reverse.get_rev_n(5).await?; + let rev_jsonl_compare = jsonl_reverse.last_n(5).await?; let reverse_results: Vec<_> = rev_jsonl_compare.collect().await; for (i, result) in reverse_results.iter().enumerate() { if let Ok(line) = result { diff --git a/crates/async_jsonl_tests/tests/integration_tests.rs b/crates/async_jsonl_tests/tests/integration_tests.rs index 9ce7ba6..fa61af8 100644 --- a/crates/async_jsonl_tests/tests/integration_tests.rs +++ b/crates/async_jsonl_tests/tests/integration_tests.rs @@ -308,56 +308,3 @@ async fn test_complex_nested_values() { assert_eq!(values[1]["array"][2], "c"); assert!(values[1]["null_val"].is_null()); } - -#[tokio::test] -async fn test_line_counting() { - let data = r#"{"id": 1} -{"id": 2} - -{"id": 3} - - -{"id": 4} -"#; - - let reader = Cursor::new(data.as_bytes()); - let jsonl = Jsonl::new(reader); - let count = jsonl.count_lines().await.unwrap(); - assert_eq!(count, 4); // Should count only non-empty lines -} - -#[tokio::test] -async fn test_file_line_counting_vs_manual_count() { - use tempfile::NamedTempFile; - use tokio::fs; - - let temp_file = NamedTempFile::new().unwrap(); - let temp_path = temp_file.path(); - - let data = r#"{"line": 1} -{"line": 2} - -{"line": 3} -{"line": 4} - - -{"line": 5} -"#; - - fs::write(temp_path, data).await.unwrap(); - - // Count using the line counter - let counted_lines = Jsonl::from_path(temp_path) - .await - .unwrap() - .count_lines() - .await - .unwrap(); - - // Count manually by reading all lines - let jsonl = Jsonl::from_path(temp_path).await.unwrap(); - let manual_count = jsonl.collect::>().await.len(); - - assert_eq!(counted_lines, manual_count); - assert_eq!(counted_lines, 5); -} diff --git a/crates/async_jsonl_tests/tests/reverse_tests.rs b/crates/async_jsonl_tests/tests/reverse_tests.rs index 8d5e7c3..5011aff 100644 --- a/crates/async_jsonl_tests/tests/reverse_tests.rs +++ b/crates/async_jsonl_tests/tests/reverse_tests.rs @@ -1,4 +1,4 @@ -use async_jsonl::Jsonl; +use async_jsonl::{Jsonl, JsonlReader}; use futures::StreamExt; use std::io::Cursor; use tokio::fs::File; @@ -15,7 +15,7 @@ async fn test_get_rev_n_with_cursor() { let jsonl = Jsonl::new(cursor); let rev_jsonl = jsonl - .get_rev_n(3) + .last_n(3) .await .expect("Failed to create reverse iterator"); @@ -56,7 +56,7 @@ async fn test_get_rev_n_with_file() { .await .expect("Failed to open file"); let rev_jsonl = jsonl - .get_rev_n(3) + .last_n(3) .await .expect("Failed to create reverse iterator"); @@ -82,7 +82,7 @@ async fn test_get_rev_n_empty_file() { let jsonl = Jsonl::new(cursor); let rev_jsonl = jsonl - .get_rev_n(5) + .last_n(5) .await .expect("Failed to create reverse iterator"); let results: Vec<_> = rev_jsonl.collect().await; @@ -99,7 +99,7 @@ async fn test_get_rev_n_single_line() { let jsonl = Jsonl::new(cursor); let rev_jsonl = jsonl - .get_rev_n(1) + .last_n(1) .await .expect("Failed to create reverse iterator"); let results: Vec<_> = rev_jsonl.collect().await; @@ -126,7 +126,7 @@ async fn test_get_rev_n_with_empty_lines() { let jsonl = Jsonl::new(cursor); let rev_jsonl = jsonl - .get_rev_n(3) + .last_n(3) .await .expect("Failed to create reverse iterator"); let results: Vec<_> = rev_jsonl.collect().await; @@ -154,7 +154,7 @@ async fn test_get_n_lines() { let cursor = Cursor::new(data.as_bytes()); let jsonl = Jsonl::new(cursor); - let first_3 = jsonl.get_n(3); + let first_3 = jsonl.first_n(3).await.unwrap(); let results: Vec<_> = first_3.collect().await; assert_eq!(results.len(), 3); @@ -178,7 +178,7 @@ async fn test_compare_forward_and_reverse() { // Forward reading first 3 let cursor_forward = Cursor::new(data.as_bytes()); let jsonl_forward = Jsonl::new(cursor_forward); - let forward_stream = jsonl_forward.get_n(3); + let forward_stream = jsonl_forward.first_n(3).await.unwrap(); let forward_results: Vec<_> = forward_stream.collect().await; let forward_lines: Vec = forward_results.into_iter().map(|r| r.unwrap()).collect(); @@ -186,7 +186,7 @@ async fn test_compare_forward_and_reverse() { let cursor_reverse = Cursor::new(data.as_bytes()); let jsonl_reverse = Jsonl::new(cursor_reverse); let rev_jsonl = jsonl_reverse - .get_rev_n(3) + .last_n(3) .await .expect("Failed to create reverse iterator"); let reverse_results: Vec<_> = rev_jsonl.collect().await; From ea0904a0acb67bd348db073f01bdbe10be290285 Mon Sep 17 00:00:00 2001 From: Sandipsinh Rathod Date: Tue, 3 Jun 2025 15:35:25 -0400 Subject: [PATCH 3/3] impl `JsonlDeserialize` for `TakeNLines` and `TakeNLinesReverse` --- crates/async_jsonl/src/async_jsonl.rs | 381 +++++++++++++++++- crates/async_jsonl/src/value.rs | 43 ++ .../tests/integration_tests.rs | 90 ++++- 3 files changed, 510 insertions(+), 4 deletions(-) diff --git a/crates/async_jsonl/src/async_jsonl.rs b/crates/async_jsonl/src/async_jsonl.rs index 0ec3335..e60bc14 100644 --- a/crates/async_jsonl/src/async_jsonl.rs +++ b/crates/async_jsonl/src/async_jsonl.rs @@ -8,15 +8,243 @@ pub struct Jsonl { pub(crate) lines: Lines>, } +/// Main trait for reading JSONL (JSON Lines) files with async capabilities. +/// +/// This trait provides methods to read and process JSONL files asynchronously. +/// It combines streaming capabilities with deserialization and line selection methods. +/// The trait is implemented by `Jsonl` where `R` implements `AsyncRead + AsyncSeek`. +/// +/// # Examples +/// +/// ## Reading from a file and getting first n lines +/// +/// ```ignore +/// use async_jsonl::{Jsonl, JsonlReader, JsonlDeserialize}; +/// use futures::StreamExt; +/// use serde::Deserialize; +/// +/// #[derive(Deserialize, Debug)] +/// struct Person { +/// name: String, +/// age: u32, +/// } +/// +/// #[tokio::main] +/// async fn main() -> anyhow::Result<()> { +/// let reader = Jsonl::from_path("people.jsonl").await?; +/// +/// // Get first 5 lines and deserialize directly +/// let first_five = reader.first_n(5).await?; +/// let mut stream = first_five.deserialize::(); +/// +/// while let Some(result) = stream.next().await { +/// match result { +/// Ok(person) => println!("Found person: {:?}", person), +/// Err(e) => eprintln!("Error parsing line: {}", e), +/// } +/// } +/// +/// Ok(()) +/// } +/// ``` +/// +/// ## Reading last n lines (tail-like functionality) +/// +/// ```ignore +/// use async_jsonl::{Jsonl, JsonlReader}; +/// use futures::StreamExt; +/// +/// #[tokio::main] +/// async fn main() -> anyhow::Result<()> { +/// let reader = Jsonl::from_path("log.jsonl").await?; +/// +/// // Get last 10 lines (like tail) +/// let last_ten = reader.last_n(10).await?; +/// +/// let lines: Vec = last_ten +/// .collect::>() +/// .await +/// .into_iter() +/// .collect::, _>>()?; +/// +/// for line in lines { +/// println!("{}", line); +/// } +/// +/// Ok(()) +/// } +/// ``` #[async_trait::async_trait] pub trait JsonlReader: JsonlDeserialize + JsonlValueDeserialize + Stream + Send + Sync { + /// Stream type for the first n lines type NLines: Stream>; + /// Stream type for the last n lines (in reverse order) type NLinesRev: Stream>; + + /// Get the first `n` lines from the JSONL stream. + /// + /// # Arguments + /// + /// * `n` - The number of lines to retrieve from the beginning + /// + /// # Returns + /// + /// Returns a stream of the first `n` lines as `String`s, or an error if reading fails. + /// + /// # Examples + /// + /// ```ignore + /// use async_jsonl::{Jsonl, JsonlReader, JsonlDeserialize}; + /// use futures::StreamExt; + /// use serde::Deserialize; + /// + /// #[derive(Deserialize, Debug)] + /// struct LogEntry { + /// timestamp: String, + /// level: String, + /// message: String, + /// } + /// + /// #[tokio::main] + /// async fn main() -> anyhow::Result<()> { + /// let reader = Jsonl::from_path("data.jsonl").await?; + /// + /// // Get first 3 lines and deserialize them + /// let first_three = reader.first_n(3).await?; + /// let entries: Vec = first_three + /// .deserialize::() + /// .collect::>() + /// .await + /// .into_iter() + /// .collect::, _>>()?; + /// + /// println!("First 3 log entries: {:?}", entries); + /// Ok(()) + /// } + /// ``` async fn first_n(self, n: usize) -> anyhow::Result; + + /// Get the last `n` lines from the JSONL stream. + /// + /// # Arguments + /// + /// * `n` - The number of lines to retrieve from the end + /// + /// # Returns + /// + /// Returns a stream of the last `n` lines as `String`s in reverse order, + /// or an error if reading fails. + /// + /// # Examples + /// + /// ```ignore + /// use async_jsonl::{Jsonl, JsonlReader}; + /// use futures::StreamExt; + /// + /// #[tokio::main] + /// async fn main() -> anyhow::Result<()> { + /// let reader = Jsonl::from_path("data.jsonl").await?; + /// + /// let last_two = reader.last_n(2).await?; + /// let mut stream = last_two; + /// + /// while let Some(result) = stream.next().await { + /// match result { + /// Ok(line) => println!("Line: {}", line), + /// Err(e) => eprintln!("Error: {}", e), + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` async fn last_n(self, n: usize) -> anyhow::Result; } -/// Extension trait to add deserialization capabilities to JsonlIterator +/// Extension trait to add deserialization capabilities to JSONL readers. +/// +/// This trait provides methods to deserialize JSON lines into strongly-typed Rust structures. +/// It works with any type that implements `serde::Deserialize` and processes each line +/// of a JSONL file as a separate JSON object. +/// +/// # Examples +/// +/// ## Basic Usage with Custom Types +/// +/// ```ignore +/// use async_jsonl::{Jsonl, JsonlDeserialize}; +/// use futures::StreamExt; +/// use serde::Deserialize; +/// use std::io::Cursor; +/// +/// #[derive(Deserialize, Debug, PartialEq)] +/// struct User { +/// id: u64, +/// name: String, +/// email: String, +/// } +/// +/// #[tokio::main] +/// async fn main() -> anyhow::Result<()> { +/// let data = r#"{"id": 1, "name": "Alice", "email": "alice@example.com"} +/// {"id": 2, "name": "Bob", "email": "bob@example.com"}"#; +/// let reader = Jsonl::new(Cursor::new(data.as_bytes())); +/// +/// let mut user_stream = reader.deserialize::(); +/// +/// while let Some(result) = user_stream.next().await { +/// match result { +/// Ok(user) => println!("User: {} ({})", user.name, user.email), +/// Err(e) => eprintln!("Failed to parse user: {}", e), +/// } +/// } +/// +/// Ok(()) +/// } +/// ``` +/// +/// ## Error Handling and Filtering +/// +/// ```ignore +/// use async_jsonl::{Jsonl, JsonlDeserialize}; +/// use futures::StreamExt; +/// use serde::Deserialize; +/// use std::io::Cursor; +/// +/// #[derive(Deserialize, Debug)] +/// struct Product { +/// name: String, +/// price: f64, +/// #[serde(default)] +/// in_stock: bool, +/// } +/// +/// #[tokio::main] +/// async fn main() -> anyhow::Result<()> { +/// let data = r#"{"name": "Widget A", "price": 19.99, "in_stock": true} +/// {"name": "Widget B", "price": 29.99, "in_stock": false}"#; +/// let reader = Jsonl::new(Cursor::new(data.as_bytes())); +/// +/// // Filter only successful deserializations and in-stock products +/// let in_stock_products: Vec = reader +/// .deserialize::() +/// .filter_map(|result| async move { +/// match result { +/// Ok(product) if product.in_stock => Some(product), +/// Ok(_) => None, // Out of stock +/// Err(e) => { +/// eprintln!("Skipping invalid product: {}", e); +/// None +/// } +/// } +/// }) +/// .collect() +/// .await; +/// +/// println!("Found {} products in stock", in_stock_products.len()); +/// Ok(()) +/// } +/// ``` pub trait JsonlDeserialize { /// Deserialize JSON lines into the specified type fn deserialize(self) -> impl Stream> @@ -24,8 +252,155 @@ pub trait JsonlDeserialize { T: for<'a> Deserialize<'a>; } -/// Extension trait specifically for deserializing JSONL to serde_json::Value +/// Extension trait specifically for deserializing JSONL to `serde_json::Value` objects. +/// +/// This trait provides a convenient method to deserialize JSON lines into generic +/// `serde_json::Value` objects when you don't know the exact structure of the JSON +/// data ahead of time or when working with heterogeneous JSON objects. +/// +/// # Examples +/// +/// ## Basic Usage with Dynamic JSON +/// +/// ```ignore +/// use async_jsonl::{Jsonl, JsonlValueDeserialize}; +/// use futures::StreamExt; +/// use std::io::Cursor; +/// +/// #[tokio::main] +/// async fn main() -> anyhow::Result<()> { +/// let data = r#"{"user_id": 123, "action": "login", "timestamp": "2024-01-01T10:00:00Z"} +/// {"user_id": 456, "action": "logout", "timestamp": "2024-01-01T11:00:00Z"} +/// {"user_id": 789, "action": "purchase", "item": "widget", "price": 29.99}"#; +/// +/// let reader = Jsonl::new(Cursor::new(data.as_bytes())); +/// let mut value_stream = reader.deserialize_values(); +/// +/// while let Some(result) = value_stream.next().await { +/// match result { +/// Ok(value) => { +/// println!("Event: {}", value["action"]); +/// if let Some(price) = value.get("price") { +/// println!(" Purchase amount: {}", price); +/// } +/// } +/// Err(e) => eprintln!("Failed to parse JSON: {}", e), +/// } +/// } +/// +/// Ok(()) +/// } +/// ``` +/// +/// ## Processing Mixed JSON Structures +/// +/// ```ignore +/// use async_jsonl::{Jsonl, JsonlValueDeserialize}; +/// use futures::StreamExt; +/// use serde_json::Value; +/// use std::io::Cursor; +/// +/// #[tokio::main] +/// async fn main() -> anyhow::Result<()> { +/// let mixed_data = r#"{"type": "user", "name": "Alice", "age": 30} +/// {"type": "product", "name": "Widget", "price": 19.99, "categories": ["tools", "hardware"]} +/// {"type": "event", "name": "click", "target": "button", "metadata": {"page": "/home"}}"#; +/// +/// let reader = Jsonl::new(Cursor::new(mixed_data.as_bytes())); +/// let values: Vec = reader +/// .deserialize_values() +/// .collect::>() +/// .await +/// .into_iter() +/// .collect::, _>>()?; +/// +/// for value in values { +/// match value["type"].as_str() { +/// Some("user") => println!("User: {} (age {})", value["name"], value["age"]), +/// Some("product") => println!("Product: {} - ${}", value["name"], value["price"]), +/// Some("event") => println!("Event: {} on {}", value["name"], value["target"]), +/// _ => println!("Unknown type: {:?}", value), +/// } +/// } +/// +/// Ok(()) +/// } +/// ``` +/// +/// ## Error Handling with Invalid JSON +/// +/// ```ignore +/// use async_jsonl::{Jsonl, JsonlValueDeserialize}; +/// use futures::StreamExt; +/// use std::io::Cursor; +/// +/// #[tokio::main] +/// async fn main() -> anyhow::Result<()> { +/// let data_with_errors = r#"{"valid": "json"} +/// {invalid json line +/// {"another": "valid line"}"#; +/// +/// let reader = Jsonl::new(Cursor::new(data_with_errors.as_bytes())); +/// let mut value_stream = reader.deserialize_values(); +/// +/// let mut valid_count = 0; +/// let mut error_count = 0; +/// +/// while let Some(result) = value_stream.next().await { +/// match result { +/// Ok(_) => valid_count += 1, +/// Err(_) => error_count += 1, +/// } +/// } +/// +/// println!("Valid JSON lines: {}, Errors: {}", valid_count, error_count); +/// Ok(()) +/// } +/// ``` pub trait JsonlValueDeserialize { - /// Deserialize JSON lines into serde_json::Value objects + /// Deserialize JSON lines into `serde_json::Value` objects. + /// + /// This method transforms each line of a JSONL stream into `serde_json::Value` objects, + /// which can represent any valid JSON structure. This is useful when: + /// + /// - You don't know the exact structure of the JSON data ahead of time + /// - You're working with heterogeneous JSON objects in the same file + /// - You want to inspect or transform JSON data dynamically + /// - You need to handle mixed or evolving JSON schemas + /// + /// # Returns + /// + /// Returns a `Stream` of `anyhow::Result` where: + /// - `Ok(Value)` represents a successfully parsed JSON value + /// - `Err(anyhow::Error)` represents parsing errors for invalid JSON lines + /// + /// # Examples + /// + /// ```ignore + /// use async_jsonl::{Jsonl, JsonlValueDeserialize}; + /// use futures::StreamExt; + /// use std::io::Cursor; + /// + /// #[tokio::main] + /// async fn main() -> anyhow::Result<()> { + /// let data = r#"{"id": 1, "data": {"nested": [1, 2, 3]}} + /// {"id": 2, "data": {"different": "structure"}}"#; + /// + /// let reader = Jsonl::new(Cursor::new(data.as_bytes())); + /// let values: Vec<_> = reader + /// .deserialize_values() + /// .collect() + /// .await; + /// + /// for (i, result) in values.iter().enumerate() { + /// match result { + /// Ok(value) => println!("Object {}: ID = {}", i + 1, value["id"]), + /// Err(e) => eprintln!("Error parsing object {}: {}", i + 1, e), + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` fn deserialize_values(self) -> impl Stream>; } diff --git a/crates/async_jsonl/src/value.rs b/crates/async_jsonl/src/value.rs index 516d072..45293a9 100644 --- a/crates/async_jsonl/src/value.rs +++ b/crates/async_jsonl/src/value.rs @@ -1,3 +1,4 @@ +use crate::take_n::{TakeNLines, TakeNLinesReverse}; use crate::{Jsonl, JsonlDeserialize, JsonlValueDeserialize}; use futures::{Stream, StreamExt}; use serde::Deserialize; @@ -23,3 +24,45 @@ impl JsonlValueDeserialize for Jsonl { self.deserialize::() } } + +// Implementations for TakeNLines +impl JsonlDeserialize for TakeNLines { + fn deserialize(self) -> impl Stream> + where + T: for<'a> Deserialize<'a>, + { + self.map(|result| { + result.and_then(|line| { + serde_json::from_str::(&line) + .map_err(|e| anyhow::anyhow!("Failed to parse JSON line: {}", e)) + }) + }) + } +} + +impl JsonlValueDeserialize for TakeNLines { + fn deserialize_values(self) -> impl Stream> { + self.deserialize::() + } +} + +// Implementations for TakeNLinesReverse +impl JsonlDeserialize for TakeNLinesReverse { + fn deserialize(self) -> impl Stream> + where + T: for<'a> Deserialize<'a>, + { + self.map(|result| { + result.and_then(|line| { + serde_json::from_str::(&line) + .map_err(|e| anyhow::anyhow!("Failed to parse JSON line: {}", e)) + }) + }) + } +} + +impl JsonlValueDeserialize for TakeNLinesReverse { + fn deserialize_values(self) -> impl Stream> { + self.deserialize::() + } +} diff --git a/crates/async_jsonl_tests/tests/integration_tests.rs b/crates/async_jsonl_tests/tests/integration_tests.rs index fa61af8..82360a4 100644 --- a/crates/async_jsonl_tests/tests/integration_tests.rs +++ b/crates/async_jsonl_tests/tests/integration_tests.rs @@ -1,4 +1,4 @@ -use async_jsonl::{Jsonl, JsonlDeserialize, JsonlValueDeserialize}; +use async_jsonl::{Jsonl, JsonlDeserialize, JsonlReader, JsonlValueDeserialize}; use futures::StreamExt; use serde::Deserialize; use serde_json::Value; @@ -308,3 +308,91 @@ async fn test_complex_nested_values() { assert_eq!(values[1]["array"][2], "c"); assert!(values[1]["null_val"].is_null()); } + +#[tokio::test] +async fn test_take_n_lines_deserialize() { + let data = r#"{"id": 1, "name": "Alice", "active": true} +{"id": 2, "name": "Bob", "active": false} +{"id": 3, "name": "Charlie", "active": true} +{"id": 4, "name": "Diana", "active": false} +{"id": 5, "name": "Eve", "active": true} +"#; + + let reader = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(reader); + + // Test first_n with deserialize + let first_three = jsonl.first_n(3).await.unwrap(); + let records: Vec = first_three + .deserialize::() + .collect::>() + .await + .into_iter() + .collect::, _>>() + .unwrap(); + + assert_eq!(records.len(), 3); + assert_eq!(records[0].id, 1); + assert_eq!(records[0].name, "Alice"); + assert_eq!(records[1].id, 2); + assert_eq!(records[1].name, "Bob"); + assert_eq!(records[2].id, 3); + assert_eq!(records[2].name, "Charlie"); +} + +#[tokio::test] +async fn test_take_n_lines_reverse_deserialize() { + let data = r#"{"id": 1, "name": "Alice", "active": true} +{"id": 2, "name": "Bob", "active": false} +{"id": 3, "name": "Charlie", "active": true} +{"id": 4, "name": "Diana", "active": false} +{"id": 5, "name": "Eve", "active": true} +"#; + + let reader = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(reader); + + // Test last_n with deserialize + let last_two = jsonl.last_n(2).await.unwrap(); + let records: Vec = last_two + .deserialize::() + .collect::>() + .await + .into_iter() + .collect::, _>>() + .unwrap(); + + assert_eq!(records.len(), 2); + // last_n returns in reverse order (last line first) + assert_eq!(records[0].id, 5); + assert_eq!(records[0].name, "Eve"); + assert_eq!(records[1].id, 4); + assert_eq!(records[1].name, "Diana"); +} + +#[tokio::test] +async fn test_take_n_lines_deserialize_values() { + let data = r#"{"id": 1, "name": "Alice"} +{"id": 2, "name": "Bob"} +{"id": 3, "name": "Charlie"} +"#; + + let reader = Cursor::new(data.as_bytes()); + let jsonl = Jsonl::new(reader); + + // Test first_n with deserialize_values + let first_two = jsonl.first_n(2).await.unwrap(); + let values: Vec = first_two + .deserialize_values() + .collect::>() + .await + .into_iter() + .collect::, _>>() + .unwrap(); + + assert_eq!(values.len(), 2); + assert_eq!(values[0]["id"], 1); + assert_eq!(values[0]["name"], "Alice"); + assert_eq!(values[1]["id"], 2); + assert_eq!(values[1]["name"], "Bob"); +}