New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc file in DataFrameReader.orc #10307
Conversation
Test build #47725 has finished for PR 10307 at commit
|
@@ -322,11 +322,11 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { | |||
/** | |||
* Loads an ORC file and returns the result as a [[DataFrame]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments should be updated as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bomeng also update docs for other formats
please build it again (if it works) |
+1, I vote to request this feature for a while. |
val path2 = Utils.createTempDir() | ||
makeOrcFile((1 to 10).map(Tuple1.apply), path1) | ||
makeOrcFile((1 to 10).map(Tuple1.apply), path2) | ||
assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to remove generated temporary file automatically, use withOrcFile
or withTempDir
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be deleted automatically after the program exit.
/**
* Create a temporary directory inside the given parent directory. The directory will be
* automatically deleted when the VM shuts down.
*/
def createTempDir(
root: String = System.getProperty("java.io.tmpdir"),
namePrefix: String = "spark"): File = {
val dir = createDirectory(root, namePrefix)
ShutdownHookManager.registerShutdownDeleteDir(dir)
dir
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withOrcFile
will get cleaned up faster (e.g. as soon as the test ends rather than program exit).
@zjffdu would you want to update this against master so jenkins can give it a run? |
Test build #56216 has finished for PR 10307 at commit
|
Test build #56219 has finished for PR 10307 at commit
|
I am trying to apply this patch on 1.6 branch but patch failed. |
Test build #66591 has finished for PR 10307 at commit
|
Test build #66592 has finished for PR 10307 at commit
|
Test build #66773 has finished for PR 10307 at commit
|
ping @holdenk @JoshRosen @davies |
Test build #71521 has finished for PR 10307 at commit
|
Sorry for the delay in getting to this, do you have time to update this to the latest master branch? It would be a nice small fix/improvement to get in :) |
Gentle ping @zjffdu |
Thanks for your kind reply. sorry for the delay in reply
I have fixed this and it working fine. )
…On Sat, Feb 18, 2017 at 12:41 AM, Holden Karau ***@***.***> wrote:
Gentle ping @zjffdu <https://github.com/zjffdu>
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#10307 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AFjRF7dFzSmNXrxVDcyUtgEve6Br0E6mks5rdfDrgaJpZM4G1gGy>
.
--
Thanking You
With Regards
Vishal Donderia
vishaldonderia@gmail.com
+91-9711556310
|
So this still doesn't merge with master, if you want to update it would be good to take a look :) |
Gentle ping @zjffdu :) |
Test build #73567 has finished for PR 10307 at commit
|
Test build #73569 has started for PR 10307 at commit |
Test build #73577 has finished for PR 10307 at commit
|
python/pyspark/sql/readwriter.py
Outdated
@@ -388,16 +388,18 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non | |||
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) | |||
|
|||
@since(1.5) | |||
def orc(self, path): | |||
"""Loads an ORC file, returning the result as a :class:`DataFrame`. | |||
def orc(self, paths): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if someones been calling orc with a named param of path this could cause them problems when they upgrade. I might be being overly cautious but it seems like we should avoid breaking that since we don't have to until the next major version change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I should not break the compatibility. BTW, I found that DataFrameReader.parquet
use variable length argument which is not consistent with other file formats such as text, json and orc that use string or a list of string. I can fix it in this PR or can do it in another PR to make them consistent. What do you think ?
@since(1.4)
def parquet(self, *paths):
"""Loads Parquet files, returning the result as a :class:`DataFrame`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might as well make it consistent in this PR if we can do it without breaking anything.
Test build #73647 has finished for PR 10307 at commit
|
Test build #74096 has finished for PR 10307 at commit
|
Test build #74097 has finished for PR 10307 at commit
|
python/pyspark/sql/readwriter.py
Outdated
@@ -282,6 +282,23 @@ def parquet(self, *paths): | |||
""" | |||
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) | |||
|
|||
@since(2.2) | |||
def parquet(self, path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having two functions with the same name and different args doesn't behave like in Scala (so this won't work). Please use kwargs or similar and add a test for paths and path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @holdenk , I learned a new thing of python. I reverted the changes on parquet, It would be very weird to change it as def parquet(self, *paths, path=None):
and def parquet(self, **kwargs:)
would break the code without using keyword argument, e.g. parquet("p_file")
@@ -407,15 +424,17 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non | |||
|
|||
@since(1.5) | |||
def orc(self, path): | |||
"""Loads an ORC file, returning the result as a :class:`DataFrame`. | |||
"""Loads ORC files, returning the result as a :class:`DataFrame`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a test for loading with a list of orc files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in python/pyspark/sql/tests.py
Test build #74198 has finished for PR 10307 at commit
|
…r orc file in DataFrameReader.orc
Test build #74265 has finished for PR 10307 at commit
|
So right now we've got a mix of path & paths as the name for the arguments to the different file loading things - this is annoying to fix in Python but we should maybe make a JIRA so we follow up on the reader/writer interfaces next time we have a major release. Can you do that @zjffdu ? Also thank you for working on this for over a year, I'm so sorry its taken so long to get to this. |
Merged to master, thank you @zjffdu |
…ValueGroupedDataset ### What changes were proposed in this pull request? This PR proposes to add `as` API to RelationalGroupedDataset. It creates KeyValueGroupedDataset instance using given grouping expressions, instead of a typed function in groupByKey API. Because it can leverage existing columns, it can use existing data partition, if any, when doing operations like cogroup. ### Why are the changes needed? Currently if users want to do cogroup on DataFrames, there is no good way to do except for KeyValueGroupedDataset. 1. KeyValueGroupedDataset ignores existing data partition if any. That is a problem. 2. groupByKey calls typed function to create additional keys. You can not reuse existing columns, if you just need grouping by them. ```scala // df1 and df2 are certainly partitioned and sorted. val df1 = Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c") .repartition($"a").sortWithinPartitions("a") val df2 = Seq((1, 2, 4), (2, 3, 5)).toDF("a", "b", "c") .repartition($"a").sortWithinPartitions("a") ``` ```scala // This groupBy.as.cogroup won't unnecessarily repartition the data val df3 = df1.groupBy("a").as[Int] .cogroup(df2.groupBy("a").as[Int]) { case (key, data1, data2) => data1.zip(data2).map { p => p._1.getInt(2) + p._2.getInt(2) } } ``` ``` == Physical Plan == *(5) SerializeFromObject [input[0, int, false] AS value#11247] +- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4922/12067092816eec1b6f, a#11209: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [a#11209], [a#11225], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11246: int :- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(a#11209, 5), false, [id=#10218] : +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211] : +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204] +- *(4) Sort [a#11225 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#11225, 5), false, [id=#10223] +- *(3) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227] +- *(3) LocalTableScan [_1#11218, _2#11219, _3#11220] ``` ```scala // Current approach creates additional AppendColumns and repartition data again val df4 = df1.groupByKey(r => r.getInt(0)).cogroup(df2.groupByKey(r => r.getInt(0))) { case (key, data1, data2) => data1.zip(data2).map { p => p._1.getInt(2) + p._2.getInt(2) } } ``` ``` == Physical Plan == *(7) SerializeFromObject [input[0, int, false] AS value#11257] +- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4933/138102700737171997, value#11252: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [value#11252], [value#11254], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11256: int :- *(3) Sort [value#11252 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(value#11252, 5), true, [id=#10302] : +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4930/19529195347ce07f47, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11252] : +- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(a#11209, 5), false, [id=#10297] : +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211] : +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204] +- *(6) Sort [value#11254 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(value#11254, 5), true, [id=#10312] +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4932/15265288491f0e0c1f, createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11254] +- *(5) Sort [a#11225 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#11225, 5), false, [id=#10307] +- *(4) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227] +- *(4) LocalTableScan [_1#11218, _2#11219, _3#11220] ``` ### Does this PR introduce any user-facing change? Yes, this adds a new `as` API to RelationalGroupedDataset. Users can use it to create KeyValueGroupedDataset and do cogroup. ### How was this patch tested? Unit tests. Closes #26509 from viirya/SPARK-29427-2. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Beside the issue in spark api, also fix 2 minor issues in pyspark