Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

What changes were proposed in this pull request?

FPGrowth supports local filesystem

Why are the changes needed?

to make FPGrowth work with local filesystem

Does this PR introduce any user-facing change?

yes, FPGrowth will work when local saving mode is one

How was this patch tested?

updated tests

Was this patch authored or co-authored using generative AI tooling?

no

Using.resource(
new ObjectInputStream(new BufferedInputStream(new FileInputStream(path)))
) { ois =>
val schema = ois.readObject().asInstanceOf[StructType]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses Java deserializer which seems unsafe (risk of Remote Code Execution)

Related commit: #50922

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resort to arrow format suggested by @cloud-fan

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to address the RCE issue :)

val spark = df.sparkSession
val schema = df.schema
val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
df.queryExecution.executedPlan.execute().mapPartitionsInternal { iter =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dataset already has def toArrowBatchRdd, shall we reuse it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can reuse it, with some change

val schema: StructType = df.schema
dos.writeUTF(schema.json)

val iter = DatasetUtils.toArrowBatchRDD(df, "UTC").toLocalIterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the arrow library provide APIs to write to local file?

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrow isn't intended for long term storage it's intended as a wire protocol -- I don't love using it for persisting models. I'm -0.9 on this change for now. Parquet seems like a better choice most likely.

@zhengruifeng
Copy link
Contributor Author

Arrow isn't intended for long term storage it's intended as a wire protocol -- I don't love using it for persisting models. I'm -0.9 on this change for now. Parquet seems like a better choice most likely.

does the arrow library provide APIs to write to local file?

@holdenk @cloud-fan Arrow supports Random Access Files, and it provides APIs to write to local file. But our arrow utils mainly works with serialized ArrowRecordBatches the Array[Byte], we will need add new helper functions for ArrowRecordBatches if we want to use arrow files APIs.

@WeichenXu123
Copy link
Contributor

Arrow isn't intended for long term storage it's intended as a wire protocol -- I don't love using it for persisting models. I'm -0.9 on this change for now. Parquet seems like a better choice most likely.

@holdenk

SparkML model "saveToLocal" is an internal API and it is only used in SparkConnect server side to cache model within one session, it is not used for long term storage. So it should be fine to use it here.

@zhengruifeng
Copy link
Contributor Author

The PR to apply the arrow file format #53232

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants