Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
int num_fields()
c_string ToString()

CStatus AddField(int i, const shared_ptr[CField]& field,
shared_ptr[CSchema]* out)
CStatus RemoveField(int i, shared_ptr[CSchema]* out)

# Removed const in Cython so don't have to cast to get code to generate
shared_ptr[CSchema] AddMetadata(
const shared_ptr[CKeyValueMetadata]& metadata)
Expand Down
42 changes: 28 additions & 14 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,10 @@ def __init__(self, dirpath, filesystem=None, pathsep='/',

self._visit_level(0, self.dirpath, [])

if self.common_metadata_path is None:
# _common_metadata is a subset of _metadata
self.common_metadata_path = self.metadata_path

def _visit_level(self, level, base_path, part_keys):
fs = self.filesystem

Expand Down Expand Up @@ -695,10 +699,10 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
self.paths = path_or_paths

(self.pieces, self.partitions,
self.metadata_path) = _make_manifest(path_or_paths, self.fs)
self.common_metadata_path) = _make_manifest(path_or_paths, self.fs)

if self.metadata_path is not None:
with self.fs.open(self.metadata_path) as f:
if self.common_metadata_path is not None:
with self.fs.open(self.common_metadata_path) as f:
self.common_metadata = ParquetFile(f).metadata
else:
self.common_metadata = None
Expand All @@ -718,21 +722,31 @@ def validate_schemas(self):
open_file = self._get_open_file_func()

if self.metadata is None and self.schema is None:
if self.metadata_path is not None:
self.schema = open_file(self.metadata_path).schema
if self.common_metadata_path is not None:
self.schema = open_file(self.common_metadata_path).schema
else:
self.schema = self.pieces[0].get_metadata(open_file).schema
elif self.schema is None:
self.schema = self.metadata.schema

# Verify schemas are all equal
# Verify schemas are all compatible
dataset_schema = self.schema.to_arrow_schema()
# Exclude the partition columns from the schema, they are provided
# by the path, not the DatasetPiece
if self.partitions is not None:
for partition_name in self.partitions.partition_names:
if dataset_schema.get_field_index(partition_name) != -1:
field_idx = dataset_schema.get_field_index(partition_name)
dataset_schema = dataset_schema.remove(field_idx)

for piece in self.pieces:
file_metadata = piece.get_metadata(open_file)
if not self.schema.equals(file_metadata.schema):
raise ValueError('Schema in {0!s} was different. '
'{1!s} vs {2!s}'
.format(piece, file_metadata.schema,
self.schema))
file_schema = file_metadata.schema.to_arrow_schema()
if not dataset_schema.equals(file_schema):
raise ValueError('Schema in {0!s} was different. \n'
'{1!s}\n\nvs\n\n{2!s}'
.format(piece, file_schema,
dataset_schema))

def read(self, columns=None, nthreads=1, use_pandas_metadata=False):
"""
Expand Down Expand Up @@ -831,7 +845,7 @@ def _ensure_filesystem(fs):

def _make_manifest(path_or_paths, fs, pathsep='/'):
partitions = None
metadata_path = None
common_metadata_path = None

if len(path_or_paths) == 1:
# Dask passes a directory as a list of length 1
Expand All @@ -840,7 +854,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
if is_string(path_or_paths) and fs.isdir(path_or_paths):
manifest = ParquetManifest(path_or_paths, filesystem=fs,
pathsep=fs.pathsep)
metadata_path = manifest.metadata_path
common_metadata_path = manifest.common_metadata_path
pieces = manifest.pieces
partitions = manifest.partitions
else:
Expand All @@ -859,7 +873,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
piece = ParquetDatasetPiece(path)
pieces.append(piece)

return pieces, partitions, metadata_path
return pieces, partitions, common_metadata_path


def read_table(source, columns=None, nthreads=1, metadata=None,
Expand Down
13 changes: 10 additions & 3 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1099,12 +1099,12 @@ def _test_read_common_metadata_files(fs, base_path):
with fs.open(data_path, 'wb') as f:
_write_table(table, f)

metadata_path = pjoin(base_path, '_metadata')
metadata_path = pjoin(base_path, '_common_metadata')
with fs.open(metadata_path, 'wb') as f:
pq.write_metadata(table.schema, f)

dataset = pq.ParquetDataset(base_path, filesystem=fs)
assert dataset.metadata_path == metadata_path
assert dataset.common_metadata_path == metadata_path

with fs.open(data_path) as f:
common_schema = pq.read_metadata(f).schema
Expand Down Expand Up @@ -1417,7 +1417,14 @@ def _test_write_to_dataset_with_partitions(base_path, filesystem=None):
output_table = pa.Table.from_pandas(output_df)
pq.write_to_dataset(output_table, base_path, partition_by,
filesystem=filesystem)
input_table = pq.ParquetDataset(base_path, filesystem=filesystem).read()
pq.write_metadata(output_table.schema,
os.path.join(base_path, '_common_metadata'))
dataset = pq.ParquetDataset(base_path, filesystem=filesystem)
# ARROW-2209: Ensure the dataset schema also includes the partition columns
dataset_cols = set(dataset.schema.to_arrow_schema().names)
assert dataset_cols == set(output_table.schema.names)

input_table = dataset.read()
input_df = input_table.to_pandas()

# Read data back in and compare with original DataFrame
Expand Down
58 changes: 58 additions & 0 deletions python/pyarrow/types.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,64 @@ cdef class Schema:
def get_field_index(self, name):
return self.schema.GetFieldIndex(tobytes(name))

def append(self, Field field):
"""
Append a field at the end of the schema.

Parameters
----------

field: Field

Returns
-------
schema: Schema
"""
return self.insert(self.schema.num_fields(), field)

def insert(self, int i, Field field):
"""
Add a field at position i to the schema.

Parameters
----------
i: int
field: Field

Returns
-------
schema: Schema
"""
cdef:
shared_ptr[CSchema] new_schema
shared_ptr[CField] c_field

c_field = field.sp_field

with nogil:
check_status(self.schema.AddField(i, c_field, &new_schema))

return pyarrow_wrap_schema(new_schema)

def remove(self, int i):
"""
Remove the field at index i from the schema.

Parameters
----------
i: int

Returns
-------
schema: Schema
"""
cdef shared_ptr[CSchema] new_schema

with nogil:
check_status(self.schema.RemoveField(i, &new_schema))

return pyarrow_wrap_schema(new_schema)

def add_metadata(self, dict metadata):
"""
Add metadata as dict of string keys and values to Schema
Expand Down