Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Missing Partition Filters on renamed Columns after Join #2942

Open
2 of 8 tasks
markus-raster opened this issue Apr 22, 2024 · 3 comments
Open
2 of 8 tasks

[BUG] Missing Partition Filters on renamed Columns after Join #2942

markus-raster opened this issue Apr 22, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@markus-raster
Copy link

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

When you load two delta tables in PySpark, each partitioned by columns with different names, partition filters applied after joining the two dataframes will only affect the dataset where the column name has not been changed if the datatype is decimal in one of the DataFrames. In contrast, when these tables are read directly in the parquet format, the partition filter is applied to both tables.

Steps to reproduce

from pyspark.sql import SparkSession
from decimal import Decimal
from pyspark.sql.types import StructType, StructField, StringType, DecimalType


# Create a Spark session
spark = SparkSession.builder \
    .appName("Write DF to S3 with Partitioning") \
    .getOrCreate()

# Sample data
data = [("Alice", 1), ("Bob", 1), ("Cathy", 2), ("David", 2), ("Eve", 3)]
columns = ["Name", "Category"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Specify S3 bucket and path (make sure you have the correct access rights and the bucket exists)
output_path = "s3a://test/"

# Write DataFrame to S3 with partitioning by the 'Category' column
df.write.mode("overwrite").format("delta").partitionBy("Category").save(output_path)



# Create new Data with Decimal Type
data = [("Alice", Decimal('1')), ("Bob", Decimal('1')), ("Cathy", Decimal('2')), ("David", Decimal('2')), ("Eve", Decimal('3'))]
columns = ["Name", "Category_renamed"]


# Define the schema
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Category_renamed", DecimalType(10, 0), True)  # Decimal column with precision 10 and scale 0
])
# Create DataFrame
df_2 = spark.createDataFrame(data, schema=schema)

# Specify S3 bucket and path (make sure you have the correct access rights and the bucket exists)
output_path_2 = "s3a://test_rename/"

# Write DataFrame to S3 with partitioning by the 'Category' column
df_2.write.mode("overwrite").format("delta").partitionBy("Category_renamed").save(output_path_2)


# read the data and Explain the plan
read_1 = spark.read.format("delta").load(output_path)
read_2 = spark.read.format("delta").load(output_path_2).withColumnRenamed("Category_renamed", "Category")


df_join = read_1.join(read_2, on="Category").filter("Category=3")

df_join.explain()

# comparison to parquet
read_1 = spark.read.format("parquet").load(output_path)
read_2 = spark.read.format("parquet").load(output_path_2).withColumnRenamed("Category_renamed", "Category")


df_join = read_1.join(read_2, on="Category").filter("Category=3")

df_join.explain()

Observed results

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Category#15107L, Name#15106, Name#15110]
   +- BroadcastHashJoin [cast(Category#15107L as decimal(20,0))], [cast(Category#15114 as decimal(20,0))], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, bigint, true] as decimal(20,0))),false), [plan_id=2772]
      :  +- FileScan parquet [Name#15106,Category#15107L] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[s3a://test], PartitionFilters: [isnotnull(Category#15107L), (Category#15107L = 3)], PushedFilters: [], ReadSchema: struct<Name:string>
      +- Project [Name#15110, Category_renamed#15111 AS Category#15114]
         +- FileScan parquet [Name#15110,Category_renamed#15111] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[s3a://test_rename], PartitionFilters: [isnotnull(Category_renamed#15111)], PushedFilters: [], ReadSchema: struct<Name:string>

Expected results

The PartitionFilters are applied on both tables.

Further details

Environment information

  • Delta Lake version: 3.1.0 and 2.4.0
  • Spark version: 3.5.0 and 3.4.1

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@markus-raster markus-raster added the bug Something isn't working label Apr 22, 2024
@NuthanReddy
Copy link

Can someone assign this to me. I can work on this.

@NuthanReddy
Copy link

NuthanReddy commented May 1, 2024

@markus-raster

This issue is observed on
Delta Lake version: 3.1.0
Spark version: 3.5.0

and

Delta Lake version: 2.4.0
Spark version: 3.4.1

right?

@NuthanReddy
Copy link

As a initial check, I ran the same locally
using spark 3.5.1 and delta 3.1.0
and observed predicate pushdown for renamed column as well.
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants