Skip to content

User friendly dataset, dataframe generation for csv datasources without explicit StructType definitions.#24724

Closed
swapnilushinde wants to merge 2 commits intoapache:masterfrom
swapnilushinde:csv-pr
Closed

User friendly dataset, dataframe generation for csv datasources without explicit StructType definitions.#24724
swapnilushinde wants to merge 2 commits intoapache:masterfrom
swapnilushinde:csv-pr

Conversation

@swapnilushinde
Copy link

What changes were proposed in this pull request?

Many users frequently load structured data from csv datasources. It's is very common with current APIs to load csv as Dataframe where schema needs to be defined as StructType object. Many users then convert Dataframe to Dataset with objects of Product (case classes).
Loading CSV files becomes relatively complex which can be easily simplified. This change would help to work with csv files more user friendly.

Input -

csv file with five columns - {id: Int,
 name: String,
 subject: String,
 marks: Int,
 result: Boolean}

Current approach -

val schema = StructType(StructField(id,IntegerType,false),
StructField(name,StringType,false),
StructField(subject,StringType,false),
StructField(marks,IntegerType,false),
StructField(result,Booleanype,false))

val df = spark.read.schema(schema).csv(<file_path>)
case class A(id: Int, name: String, subject: String, marks: Int, result: Boolean) 
val ds = df.as[A]

Proposed change -

case class A (id: Int, name: String, subject: String, marks: Int, result: Boolean) 
val df = spark.createDataframe[A](optionsMap, <file_paths>)
val ds = spark.createDataset[A](optionsMap, <file_paths>)
  • No explicit schema definition with StructType is needed as it can be resolved by Product classes.
  • Redundant codebase in applications to define verbose structType can be avoided with this change.
  • Proposed APIs are similar to current APIs so easy to use. All current and future csv options can be used as is with no changes needed. (exception - inferSchema is internally disabled as it's useless/confusing with this api)
  • Similar to current createDataset/createDataframe APIs, it would make loading csv files for debug purpose more convenient.

How was this patch tested?

This change is manually tested. I didnt see similar createDataset/createDataframe unit test cases. Please let me know best place to add unit test cases for this and existing similar APIs.

swapnilushinde and others added 2 commits May 25, 2019 12:38
Updating the fork with latest
…t specifying explicit schema using structType.
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@dongjoon-hyun
Copy link
Member

Hi, @swapnilushinde . Thank you for making a PR, but do you the following? It's one-liner.

scala> spark.version
res0: String = 2.4.3

scala> spark.read.schema("id int, name string, subject string, marks int, result boolean").load("/tmp/csv").printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- result: boolean (nullable = true)

@swapnilushinde
Copy link
Author

swapnilushinde commented May 28, 2019

Hi, @dongjoon-hyun Thanks for reply. Yes, I use this API sometimes as well. Passing schema as DDL string is one-liner but would require to define case class for Dataset creation anyways. So, creating dataset would require to define schema as both DDL string and case class. for instance,

case class A(id: Int, name: String, subject: String, marks: Int, result: Boolean)
val df = spark.read.schema("id int, name string, subject string, marks int, result boolean").load("/tmp/csv")
val ds = df.as[A]

Above change would need to define schema just once with Product class and dataset/dataframes can be created easily.
Furthermore, this API is in line with all other similar APIs of creating dataset/dataframe.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented May 28, 2019

First of all, the followings are the most frequent use cases. (And, the recommended way.)

  1. HEADER and INFERSCHEMA
scala> spark.read.option("header", true).option("inferSchema", true).csv("/tmp/csv").as[Person]
res0: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
  1. USER-DEFINED SCHEMA or Hive MetaStore
scala> case class Person(name: String, age: Long)
scala> spark.read.schema("name string, age long").csv("/tmp/csv").as[Person]
res0: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

I believe the above two are more natural.

Anyway, cc @HyukjinKwon and @MaxGekk

@HyukjinKwon
Copy link
Member

API itself is two lines. It's one liner or two liner - workaround is easy. I don't think we need this and I would like to avoid to introduce some other variants like this.

@HyukjinKwon
Copy link
Member

There's virtually no diff:

case class Person(name: String, age: Long)
val df = spark.createDataFrame[A]("/tmp/csv")

vs

case class Person(name: String, age: Long)
spark.read.schema("name string, age long").csv("/tmp/csv").as[Person]

and it's super confusing that createDataFrame takes CSV. how about JSON and other formats?

@swapnilushinde
Copy link
Author

Hello @HyukjinKwon @MaxGekk -
Above simple example was just for an illustration purpose. DDL format schema is not a good choice for many applications where CSV data contains many fields. However, case classes are required to be defined for those applications anyways for type safety. Maintaining schema definitions with both case class and DDL string is not an idea case.
What's the recommended way to load tab delimited data with 10+ fields into spark dataset? Only options that I know are to define StructType for all those fields or long DDL statement. For type safety, case class needs to be defined anyways. This causes schema for single dataset to be defined twice in a given application. Wouldn't it be beneficial to just a have simpler API that only need case class to load CSV file?

Proposed API gives single way to define schema using case class and load csv without StructType or DDL definitions.

Parquet and json formats: It is already easy to load these formats with schema so no need or confusion to create equivalent API like proposed.

@MaxGekk
Copy link
Member

MaxGekk commented May 28, 2019

The idea looks interesting, especially, getting schema from a case class. How about new schema() method for passing schema as a Product like:

case class Person(name: String, age: Long)
spark.read.schema[Person].csv("/tmp/csv").as[Person]

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 28, 2019

Why don't we just call

import org.apache.spark.sql.Encoders
val schema = Encoders.product[Person].schema
spark.read.schema(schema).csv("/tmp/csv").as[Person]

?

Once we allow, we have to consider allowing this all the ways. createDataFrame, from_json, DataFrame[Stream]Reader.schema, UDFs, etc. Is this something really worthy?

@MaxGekk
Copy link
Member

MaxGekk commented May 28, 2019

Why don't we just call

I just think users are not aware of it. It would be nice to add an example to http://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

@HyukjinKwon
Copy link
Member

Yes, then, I would leave a note for schema = Encoders.product[Person].schema usage somewhere else if there are appropriate places to note. It'd be useful for all other cases I mentioned.

@dongjoon-hyun
Copy link
Member

+1 for @HyukjinKwon 's comment about adding a note instead of this code.

@MaxGekk
Copy link
Member

MaxGekk commented May 28, 2019

@swapnilushinde Will you open a PR for comments? Please, let us know if you don't have time for that, I will do that.

@swapnilushinde
Copy link
Author

@swapnilushinde Will you open a PR for comments? Please, let us know if you don't have time for that, I will do that.

Hello @MaxGekk, I am not able to see options to reopen PR for comments. Could you please do it from your end and let me know how to proceed.
Thanks..

@swapnilushinde
Copy link
Author

@swapnilushinde Will you open a PR for comments? Please, let us know if you don't have time for that, I will do that.

Hello @MaxGekk, I am not able to see options to reopen PR for comments. Could you please do it from your end and let me know how to proceed.
Thanks..

@MaxGekk - Following up. please let me know how to proceed on this.

@MaxGekk
Copy link
Member

MaxGekk commented May 30, 2019

@MaxGekk - Following up. please let me know how to proceed on this.

@swapnilushinde Open new PR with updated comments, add an example of using Encoders.product[Person].schema as a parameter for schema(). Cover the cases mentioned in the comment #24724 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants