diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index f7e3c7c082..3c077cdfc1 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -2254,6 +2254,7 @@ def data_file_statistics_from_parquet_metadata(
     parquet_metadata: pq.FileMetaData,
     stats_columns: Dict[int, StatisticsCollector],
     parquet_column_mapping: Dict[str, int],
+    check_schema: bool = True,
 ) -> DataFileStatistics:
     """
     Compute and return DataFileStatistics that includes the following.
@@ -2299,7 +2300,15 @@ def data_file_statistics_from_parquet_metadata(
 
         for pos in range(parquet_metadata.num_columns):
             column = row_group.column(pos)
-            field_id = parquet_column_mapping[column.path_in_schema]
+            try:
+                field_id = parquet_column_mapping[column.path_in_schema]
+            except KeyError:
+                if check_schema:
+                    raise
+                else:
+                    logger.warning("PyArrow column %d missing in current schema", pos)
+                    continue
+
 
             stats_col = stats_columns[field_id]
 
@@ -2464,7 +2473,7 @@ def _check_pyarrow_schema_compatible(
     _check_schema_compatible(requested_schema, provided_schema)
 
 
-def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]:
+def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str], check_schema: bool = True, partition_deductor: Callable[[str], Record] | None = None) -> Iterator[DataFile]:
     for file_path in file_paths:
         input_file = io.new_input(file_path)
         with input_file.open() as input_stream:
@@ -2475,18 +2484,25 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_
                 f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
             )
         schema = table_metadata.schema()
-        _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema())
+        if check_schema:
+            _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema())
 
         statistics = data_file_statistics_from_parquet_metadata(
             parquet_metadata=parquet_metadata,
             stats_columns=compute_statistics_plan(schema, table_metadata.properties),
             parquet_column_mapping=parquet_path_to_id_mapping(schema),
+            check_schema=check_schema,
         )
+        if partition_deductor is None:
+            partition = statistics.partition(table_metadata.spec(), table_metadata.schema())
+        else:
+            partition = partition_deductor(file_path)
+
         data_file = DataFile(
             content=DataFileContent.DATA,
             file_path=file_path,
             file_format=FileFormat.PARQUET,
-            partition=statistics.partition(table_metadata.spec(), table_metadata.schema()),
+            partition=partition,
             file_size_in_bytes=len(input_file),
             sort_order_id=None,
             spec_id=table_metadata.default_spec_id,
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index ee50a8b5bf..a80d8589be 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -686,7 +686,7 @@ def delete(
             warnings.warn("Delete operation did not match any records")
 
     def add_files(
-        self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
+        self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True, check_schema: bool = True, partition_deductor: Callable[[str], Record] | None = None,
     ) -> None:
         """
         Shorthand API for adding files as data files to the table transaction.
@@ -717,7 +717,7 @@ def add_files(
             )
         with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
             data_files = _parquet_files_to_data_files(
-                table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
+                table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io, check_schema = check_schema, partition_deductor = partition_deductor
             )
             for data_file in data_files:
                 update_snapshot.append_data_file(data_file)
@@ -1283,7 +1283,7 @@ def delete(
             tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)
 
     def add_files(
-        self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
+        self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True, check_schema: bool = True, partition_deductor: Callable[[str], Record] | None = None,
     ) -> None:
         """
         Shorthand API for adding files as data files to the table.
@@ -1296,8 +1296,7 @@ def add_files(
         """
         with self.transaction() as tx:
             tx.add_files(
-                file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files
-            )
+                file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files, check_schema=check_schema, partition_deductor=partition_deductor)
 
     def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
         return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)
@@ -1883,7 +1882,7 @@ class AddFileTask:
     partition_field_value: Record
 
 
-def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
+def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO, check_schema: bool = True, partition_deductor: Callable[[str], Record] | None = None) -> Iterable[DataFile]:
     """Convert a list files into DataFiles.
 
     Returns:
@@ -1891,4 +1890,4 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List
     """
     from pyiceberg.io.pyarrow import parquet_files_to_data_files
 
-    yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))
+    yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths), check_schema=check_schema, partition_deductor=partition_deductor)