Skip to content
This repository has been archived by the owner on Feb 2, 2024. It is now read-only.

Commit

Permalink
Merge ca6cf11 into 7722b6e
Browse files Browse the repository at this point in the history
  • Loading branch information
PokhodenkoSA committed Oct 25, 2019
2 parents 7722b6e + ca6cf11 commit f500cec
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 32 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Building on Linux with setuptools
::

PYVER=<3.6 or 3.7>
conda create -n HPAT -q -y -c numba -c conda-forge -c defaults numba mpich pyarrow=0.14.1 arrow-cpp=0.14.1 gcc_linux-64 gxx_linux-64 gfortran_linux-64 scipy pandas boost python=$PYVER
conda create -n HPAT -q -y -c numba -c conda-forge -c defaults numba mpich pyarrow=0.15.0 arrow-cpp=0.15.0 gcc_linux-64 gxx_linux-64 gfortran_linux-64 scipy pandas boost python=$PYVER
source activate HPAT
git clone https://github.com/IntelPython/hpat
cd hpat
Expand Down Expand Up @@ -136,7 +136,7 @@ Building on Windows with setuptools
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::

conda create -n HPAT -c numba -c defaults -c intel python=<3.6 or 3.7> numba impi-devel pyarrow=0.14.1 arrow-cpp=0.14.1 scipy pandas boost
conda create -n HPAT -c numba -c defaults -c intel -c conda-forge python=<3.6 or 3.7> numba impi-devel pyarrow=0.15.0 arrow-cpp=0.15.0 scipy pandas boost
conda activate HPAT
git clone https://github.com/IntelPython/hpat.git
cd hpat
Expand Down
8 changes: 4 additions & 4 deletions buildscripts/hpat-conda-recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ requirements:
- numba ==0.46
- numpy
- pandas >=0.23
- pyarrow ==0.14.1
- arrow-cpp ==0.14.1
- pyarrow ==0.15.0
- arrow-cpp ==0.15.0
- boost
- hdf5
- h5py
Expand All @@ -41,8 +41,8 @@ requirements:
- python
- {{ pin_compatible('numpy') }}
- pandas >=0.23
- pyarrow ==0.14.1
- arrow-cpp ==0.14.1
- pyarrow ==0.15.0
- arrow-cpp ==0.15.0
- boost
- numba ==0.46
- mpich # [not win]
Expand Down
8 changes: 4 additions & 4 deletions buildscripts/parquet-reader-conda-recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ requirements:
- {{ compiler('cxx') }}
- cmake >=3.2
- python 3.6.*
- pyarrow ==0.14.1
- arrow-cpp ==0.14.1
- pyarrow ==0.15.0
- arrow-cpp ==0.15.0
run:
- pyarrow ==0.14.1
- arrow-cpp ==0.14.1
- pyarrow ==0.15.0
- arrow-cpp ==0.15.0


about:
Expand Down
42 changes: 20 additions & 22 deletions parquet_reader/hpat_parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ int64_t pq_get_size_single_file(std::shared_ptr<FileReader> arrow_reader, int64_
int64_t
pq_read_single_file(std::shared_ptr<FileReader> arrow_reader, int64_t column_idx, uint8_t* out_data, int out_dtype)
{
std::shared_ptr<::arrow::Array> arr;
arrow_reader->ReadColumn(column_idx, &arr);
if (arr == NULL)
std::shared_ptr<::arrow::ChunkedArray> chunked_array;
arrow_reader->ReadColumn(column_idx, &chunked_array);
if (chunked_array == NULL)
return 0;

auto arr = chunked_array->chunk(0);

int64_t num_values = arr->length();
// std::cout << "arr: " << arr->ToString() << std::endl;
std::shared_ptr<arrow::DataType> arrow_type = get_arrow_type(arrow_reader, column_idx);
Expand Down Expand Up @@ -154,8 +156,7 @@ int pq_read_parallel_single_file(std::shared_ptr<FileReader> arrow_reader,
/* -------- read row group ---------- */
std::shared_ptr<::arrow::Table> table;
arrow_reader->ReadRowGroup(row_group_index, column_indices, &table);
std::shared_ptr<::arrow::Column> column = table->column(0);
std::shared_ptr<::arrow::ChunkedArray> chunked_arr = column->data();
std::shared_ptr<::arrow::ChunkedArray> chunked_arr = table->column(0);
// std::cout << chunked_arr->num_chunks() << std::endl;
if (chunked_arr->num_chunks() != 1)
{
Expand Down Expand Up @@ -390,10 +391,11 @@ int64_t pq_read_string_single_file(std::shared_ptr<FileReader> arrow_reader,
{
// std::cout << "string read file" << '\n';
//
std::shared_ptr<::arrow::Array> arr;
arrow_reader->ReadColumn(column_idx, &arr);
if (arr == NULL)
std::shared_ptr<::arrow::ChunkedArray> chunked_arr;
arrow_reader->ReadColumn(column_idx, &chunked_arr);
if (chunked_arr == NULL)
return -1;
auto arr = chunked_arr->chunk(0);
int64_t num_values = arr->length();
// std::cout << arr->ToString() << std::endl;
std::shared_ptr<arrow::DataType> arrow_type = get_arrow_type(arrow_reader, column_idx);
Expand Down Expand Up @@ -509,8 +511,7 @@ int pq_read_string_parallel_single_file(std::shared_ptr<FileReader> arrow_reader
/* -------- read row group ---------- */
std::shared_ptr<::arrow::Table> table;
arrow_reader->ReadRowGroup(row_group_index, column_indices, &table);
std::shared_ptr<::arrow::Column> column = table->column(0);
std::shared_ptr<::arrow::ChunkedArray> chunked_arr = column->data();
std::shared_ptr<::arrow::ChunkedArray> chunked_arr = table->column(0);
// std::cout << chunked_arr->num_chunks() << std::endl;
if (chunked_arr->num_chunks() != 1)
{
Expand Down Expand Up @@ -634,11 +635,15 @@ void pq_init_reader(const char* file_name, std::shared_ptr<FileReader>* a_reader
::arrow::io::HadoopFileSystem::Connect(&hfs_config, &fs);
std::shared_ptr<::arrow::io::HdfsReadableFile> file;
fs->OpenReadable(f_name, &file);
a_reader->reset(new FileReader(pool, ParquetFileReader::Open(file)));
std::unique_ptr<FileReader> arrow_reader;
FileReader::Make(pool, ParquetFileReader::Open(file), &arrow_reader);
*a_reader = std::move(arrow_reader);
}
else // regular file system
{
a_reader->reset(new FileReader(pool, ParquetFileReader::OpenFile(f_name, false)));
std::unique_ptr<FileReader> arrow_reader;
FileReader::Make(pool, ParquetFileReader::OpenFile(f_name, false), &arrow_reader);
*a_reader = std::move(arrow_reader);
}
// printf("file open for arrow reader done\n");
// fflush(stdout);
Expand All @@ -649,17 +654,10 @@ void pq_init_reader(const char* file_name, std::shared_ptr<FileReader>* a_reader
// TODO: handle more complex types
std::shared_ptr<arrow::DataType> get_arrow_type(std::shared_ptr<FileReader> arrow_reader, int64_t column_idx)
{
// TODO: error checking
std::vector<int> column_indices;
column_indices.push_back(column_idx);

// TODO: error checking (column_idx out of bounds)
std::shared_ptr<::arrow::Schema> col_schema;
auto descr = arrow_reader->parquet_reader()->metadata()->schema();
auto parquet_key_value_metadata = arrow_reader->parquet_reader()->metadata()->key_value_metadata();
parquet::arrow::FromParquetSchema(descr, column_indices, parquet_key_value_metadata, &col_schema);
// std::cout<< col_schema->ToString() << std::endl;
std::shared_ptr<::arrow::DataType> arrow_dtype = col_schema->field(0)->type();
return arrow_dtype;
arrow_reader->GetSchema(&col_schema);
return col_schema->field(column_idx)->type();
}

bool arrowPqTypesEqual(std::shared_ptr<arrow::DataType> arrow_type, ::parquet::Type::type pq_type)
Expand Down

0 comments on commit f500cec

Please sign in to comment.