Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,23 @@ class SparkSession private(
Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder))
}

/**
* Creates a `DataFrame` from a csv files with schema of Product (e.g. case classes).
* All options related to CSV data source can be used.
* @return
*/
def createDataFrame[A <: Product : TypeTag](options: Map[String, String], csvFilePath: String*)
: DataFrame = {
val structSchema = Encoders.product[A].schema
val inferredSchemaDisabled = options.map(x => (x._1,
if (x._1 == "inferSchema") "false" else x._2))

SparkSession.this.read
.options(inferredSchemaDisabled)
.schema(structSchema)
.csv(csvFilePath: _*)
}

/**
* :: Experimental ::
* Creates a `DataFrame` from a local Seq of Product.
Expand Down Expand Up @@ -527,6 +544,42 @@ class SparkSession private(
createDataset(data.asScala)
}

/**
* Creates a `[[Dataset]]` from a csv file with schema of Product (e.g. case classes).
* All options related to CSV data source can be used.
*
* == Example ==
*
* {{{
*
* import spark.implicits._
* case class Person(name: String, age: Long)
* val ds = spark.createDataset[Person](Map("header" -> "false"), csvFilePath)
*
* ds.show()
* // +-------+---+
* // | name|age|
* // +-------+---+
* // |Michael| 29|
* // | Andy| 30|
* // | Justin| 19|
* // +-------+---+
* }}}
* @return
*/
def createDataset[A <: Product : TypeTag](options: Map[String, String], csvFilePath: String*)
: Dataset[A] = {
import implicits._
val structSchema = Encoders.product[A].schema
val inferredSchemaDisabled = options.map(x => (x._1,
if (x._1 == "inferSchema") "false" else x._2))

SparkSession.this.read
.options(inferredSchemaDisabled)
.schema(structSchema)
.csv(csvFilePath: _*).as[A]
}

/**
* :: Experimental ::
* Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements
Expand Down