diff --git a/datafusion/common/src/file_type.rs b/datafusion/common/src/file_type.rs deleted file mode 100644 index f8d4fc0a314e..000000000000 --- a/datafusion/common/src/file_type.rs +++ /dev/null @@ -1,410 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! File type abstraction - -use crate::error::{DataFusionError, Result}; -#[cfg(feature = "compression")] -use async_compression::tokio::bufread::{ - BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder, - GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder, - XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder, - ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder, -}; - -use crate::parsers::CompressionTypeVariant; -#[cfg(feature = "compression")] -use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder}; -use bytes::Bytes; -#[cfg(feature = "compression")] -use bzip2::read::MultiBzDecoder; -#[cfg(feature = "compression")] -use flate2::read::MultiGzDecoder; - -use core::fmt; -use futures::stream::BoxStream; -use futures::StreamExt; -#[cfg(feature = "compression")] -use futures::TryStreamExt; -use std::fmt::Display; -use std::str::FromStr; -use tokio::io::AsyncWrite; -#[cfg(feature = "compression")] -use tokio_util::io::{ReaderStream, StreamReader}; -#[cfg(feature = "compression")] -use xz2::read::XzDecoder; -#[cfg(feature = "compression")] -use zstd::Decoder as ZstdDecoder; -use CompressionTypeVariant::*; - -/// The default file extension of arrow files -pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow"; -/// The default file extension of avro files -pub const DEFAULT_AVRO_EXTENSION: &str = ".avro"; -/// The default file extension of csv files -pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; -/// The default file extension of json files -pub const DEFAULT_JSON_EXTENSION: &str = ".json"; -/// The default file extension of parquet files -pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; - -/// Define each `FileType`/`FileCompressionType`'s extension -pub trait GetExt { - /// File extension getter - fn get_ext(&self) -> String; -} - -/// Readable file compression type -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct FileCompressionType { - variant: CompressionTypeVariant, -} - -impl GetExt for FileCompressionType { - fn get_ext(&self) -> String { - match self.variant { - GZIP => ".gz".to_owned(), - BZIP2 => ".bz2".to_owned(), - XZ => ".xz".to_owned(), - ZSTD => ".zst".to_owned(), - UNCOMPRESSED => "".to_owned(), - } - } -} - -impl From for FileCompressionType { - fn from(t: CompressionTypeVariant) -> Self { - Self { variant: t } - } -} - -impl FromStr for FileCompressionType { - type Err = DataFusionError; - - fn from_str(s: &str) -> Result { - let variant = CompressionTypeVariant::from_str(s).map_err(|_| { - DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {s}")) - })?; - Ok(Self { variant }) - } -} - -/// `FileCompressionType` implementation -impl FileCompressionType { - /// Gzip-ed file - pub const GZIP: Self = Self { variant: GZIP }; - - /// Bzip2-ed file - pub const BZIP2: Self = Self { variant: BZIP2 }; - - /// Xz-ed file (liblzma) - pub const XZ: Self = Self { variant: XZ }; - - /// Zstd-ed file - pub const ZSTD: Self = Self { variant: ZSTD }; - - /// Uncompressed file - pub const UNCOMPRESSED: Self = Self { - variant: UNCOMPRESSED, - }; - - /// The file is compressed or not - pub const fn is_compressed(&self) -> bool { - self.variant.is_compressed() - } - - /// Given a `Stream`, create a `Stream` which data are compressed with `FileCompressionType`. - pub fn convert_to_compress_stream( - &self, - s: BoxStream<'static, Result>, - ) -> Result>> { - Ok(match self.variant { - #[cfg(feature = "compression")] - GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s))) - .map_err(DataFusionError::from) - .boxed(), - #[cfg(feature = "compression")] - BZIP2 => ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s))) - .map_err(DataFusionError::from) - .boxed(), - #[cfg(feature = "compression")] - XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s))) - .map_err(DataFusionError::from) - .boxed(), - #[cfg(feature = "compression")] - ZSTD => ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s))) - .map_err(DataFusionError::from) - .boxed(), - #[cfg(not(feature = "compression"))] - GZIP | BZIP2 | XZ | ZSTD => { - return crate::error::_not_impl_err!("Compression feature is not enabled") - } - UNCOMPRESSED => s.boxed(), - }) - } - - /// Wrap the given `AsyncWrite` so that it performs compressed writes - /// according to this `FileCompressionType`. - pub fn convert_async_writer( - &self, - w: Box, - ) -> Result> { - Ok(match self.variant { - #[cfg(feature = "compression")] - GZIP => Box::new(GzipEncoder::new(w)), - #[cfg(feature = "compression")] - BZIP2 => Box::new(BzEncoder::new(w)), - #[cfg(feature = "compression")] - XZ => Box::new(XzEncoder::new(w)), - #[cfg(feature = "compression")] - ZSTD => Box::new(ZstdEncoder::new(w)), - #[cfg(not(feature = "compression"))] - GZIP | BZIP2 | XZ | ZSTD => { - return crate::error::_not_impl_err!("Compression feature is not enabled") - } - UNCOMPRESSED => w, - }) - } - - /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`. - pub fn convert_stream( - &self, - s: BoxStream<'static, Result>, - ) -> Result>> { - Ok(match self.variant { - #[cfg(feature = "compression")] - GZIP => ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s))) - .map_err(DataFusionError::from) - .boxed(), - #[cfg(feature = "compression")] - BZIP2 => ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s))) - .map_err(DataFusionError::from) - .boxed(), - #[cfg(feature = "compression")] - XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s))) - .map_err(DataFusionError::from) - .boxed(), - #[cfg(feature = "compression")] - ZSTD => ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s))) - .map_err(DataFusionError::from) - .boxed(), - #[cfg(not(feature = "compression"))] - GZIP | BZIP2 | XZ | ZSTD => { - return crate::error::_not_impl_err!("Compression feature is not enabled") - } - UNCOMPRESSED => s.boxed(), - }) - } - - /// Given a `Read`, create a `Read` which data are decompressed with `FileCompressionType`. - pub fn convert_read( - &self, - r: T, - ) -> Result> { - Ok(match self.variant { - #[cfg(feature = "compression")] - GZIP => Box::new(MultiGzDecoder::new(r)), - #[cfg(feature = "compression")] - BZIP2 => Box::new(MultiBzDecoder::new(r)), - #[cfg(feature = "compression")] - XZ => Box::new(XzDecoder::new_multi_decoder(r)), - #[cfg(feature = "compression")] - ZSTD => match ZstdDecoder::new(r) { - Ok(decoder) => Box::new(decoder), - Err(e) => return Err(DataFusionError::External(Box::new(e))), - }, - #[cfg(not(feature = "compression"))] - GZIP | BZIP2 | XZ | ZSTD => { - return crate::error::_not_impl_err!("Compression feature is not enabled") - } - UNCOMPRESSED => Box::new(r), - }) - } -} - -/// Readable file type -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum FileType { - /// Apache Arrow file - ARROW, - /// Apache Avro file - AVRO, - /// Apache Parquet file - PARQUET, - /// CSV file - CSV, - /// JSON file - JSON, -} - -impl GetExt for FileType { - fn get_ext(&self) -> String { - match self { - FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(), - FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(), - FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(), - FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(), - FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(), - } - } -} - -impl Display for FileType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let out = match self { - FileType::CSV => "csv", - FileType::JSON => "json", - FileType::PARQUET => "parquet", - FileType::AVRO => "avro", - FileType::ARROW => "arrow", - }; - write!(f, "{}", out) - } -} - -impl FromStr for FileType { - type Err = DataFusionError; - - fn from_str(s: &str) -> Result { - let s = s.to_uppercase(); - match s.as_str() { - "ARROW" => Ok(FileType::ARROW), - "AVRO" => Ok(FileType::AVRO), - "PARQUET" => Ok(FileType::PARQUET), - "CSV" => Ok(FileType::CSV), - "JSON" | "NDJSON" => Ok(FileType::JSON), - _ => crate::error::_not_impl_err!("Unknown FileType: {s}"), - } - } -} - -impl FileType { - /// Given a `FileCompressionType`, return the `FileType`'s extension with compression suffix - pub fn get_ext_with_compression(&self, c: FileCompressionType) -> Result { - let ext = self.get_ext(); - - match self { - FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())), - FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant { - UNCOMPRESSED => Ok(ext), - _ => crate::error::_internal_err!( - "FileCompressionType can be specified for CSV/JSON FileType." - ), - }, - } - } -} - -#[cfg(test)] -mod tests { - use crate::error::DataFusionError; - use crate::file_type::{FileCompressionType, FileType}; - use std::str::FromStr; - - #[test] - fn get_ext_with_compression() { - for (file_type, compression, extension) in [ - (FileType::CSV, FileCompressionType::UNCOMPRESSED, ".csv"), - (FileType::CSV, FileCompressionType::GZIP, ".csv.gz"), - (FileType::CSV, FileCompressionType::XZ, ".csv.xz"), - (FileType::CSV, FileCompressionType::BZIP2, ".csv.bz2"), - (FileType::CSV, FileCompressionType::ZSTD, ".csv.zst"), - (FileType::JSON, FileCompressionType::UNCOMPRESSED, ".json"), - (FileType::JSON, FileCompressionType::GZIP, ".json.gz"), - (FileType::JSON, FileCompressionType::XZ, ".json.xz"), - (FileType::JSON, FileCompressionType::BZIP2, ".json.bz2"), - (FileType::JSON, FileCompressionType::ZSTD, ".json.zst"), - ] { - assert_eq!( - file_type.get_ext_with_compression(compression).unwrap(), - extension - ); - } - - // Cannot specify compression for these file types - for (file_type, extension) in - [(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")] - { - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) - .unwrap(), - extension - ); - for compression in [ - FileCompressionType::GZIP, - FileCompressionType::XZ, - FileCompressionType::BZIP2, - FileCompressionType::ZSTD, - ] { - assert!(matches!( - file_type.get_ext_with_compression(compression), - Err(DataFusionError::Internal(_)) - )); - } - } - } - - #[test] - fn from_str() { - for (ext, file_type) in [ - ("csv", FileType::CSV), - ("CSV", FileType::CSV), - ("json", FileType::JSON), - ("JSON", FileType::JSON), - ("avro", FileType::AVRO), - ("AVRO", FileType::AVRO), - ("parquet", FileType::PARQUET), - ("PARQUET", FileType::PARQUET), - ] { - assert_eq!(FileType::from_str(ext).unwrap(), file_type); - } - - assert!(matches!( - FileType::from_str("Unknown"), - Err(DataFusionError::NotImplemented(_)) - )); - - for (ext, compression_type) in [ - ("gz", FileCompressionType::GZIP), - ("GZ", FileCompressionType::GZIP), - ("gzip", FileCompressionType::GZIP), - ("GZIP", FileCompressionType::GZIP), - ("xz", FileCompressionType::XZ), - ("XZ", FileCompressionType::XZ), - ("bz2", FileCompressionType::BZIP2), - ("BZ2", FileCompressionType::BZIP2), - ("bzip2", FileCompressionType::BZIP2), - ("BZIP2", FileCompressionType::BZIP2), - ("zst", FileCompressionType::ZSTD), - ("ZST", FileCompressionType::ZSTD), - ("zstd", FileCompressionType::ZSTD), - ("ZSTD", FileCompressionType::ZSTD), - ("", FileCompressionType::UNCOMPRESSED), - ] { - assert_eq!( - FileCompressionType::from_str(ext).unwrap(), - compression_type - ); - } - - assert!(matches!( - FileCompressionType::from_str("Unknown"), - Err(DataFusionError::NotImplemented(_)) - )); - } -}