# Crear DataFrames en Spark

En Spark, los métodos `createDataFrame()` y `toDF()` se utilizan para crear un DataFrame manualmente, utilizando estos métodos se puede crear un DataFrame de Spark a partir de objetos de datos RDD, DataFrame, Dataset, List, Seq ya existentes, aquí voy a examinar estos con ejemplos de Scala.

También puedes crear un DataFrame a partir de diferentes fuentes como texto, CSV, JSON, XML, Parquet, Avro, ORC, archivos binarios, tablas RDBMS, Hive, HBase, y muchos más.

DataFrame es una colección distribuida de datos organizada en columnas con nombre. Es conceptualmente equivalente a una tabla en una base de datos relacional o a un marco de datos en R/Python, pero con optimizaciones más ricas bajo el capó. Los DataFrames pueden construirse a partir de una amplia gama de fuentes como: archivos de datos estructurados, tablas en Hive, bases de datos externas o RDDs existentes.

En primer lugar, vamos a importar los implícitos de Spark que necesitemos para nuestros ejemplos (por ejemplo, cuando queramos utilizar la función `.toDF()`) y crear los datos de muestra.

In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
                        .master("local[1]")
                        .appName("SparkByExamples.com")
                        .getOrCreate()

import spark.implicits._
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))

Intitializing Scala interpreter ...

Spark Web UI available at http://ALC-1NJW5D3.usersad.everis.int:4040
SparkContext available as 'sc' (version = 3.3.0, master = local[*], app id = local-1656667285616)
SparkSession available as 'spark'


22/07/01 11:21:32 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@318c2d9d
import spark.implicits._
columns: Seq[String] = List(language, users_count)
data: Seq[(String, String)] = List((Java,20000), (Python,100000), (Scala,3000))


## 1. Spark crea un DataFrame a partir de un RDD
Una forma fácil de crear manualmente un Spark DataFrame es a partir de un RDD existente. Primero, vamos a crear un RDD a partir de una colección Seq llamando a parallelize().

Utilizaré este objeto RDD para todos nuestros ejemplos a continuación.

In [2]:
val rdd = spark.sparkContext.parallelize(data)

rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:29


### 1.1 Uso de la función toDF()
Una vez que tenemos un RDD, vamos a utilizar `toDF()` para crear DataFrame en Spark. Por defecto, crea los nombres de las columnas como "_1" y "_2" ya que tenemos dos columnas para cada fila.

In [3]:
val dfFromRDD1 = rdd.toDF()

dfFromRDD1.printSchema()
dfFromRDD1.show()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)

+------+------+
|    _1|    _2|
+------+------+
|  Java| 20000|
|Python|100000|
| Scala|  3000|
+------+------+



dfFromRDD1: org.apache.spark.sql.DataFrame = [_1: string, _2: string]


Dado que el RDD no tiene esquema, sin nombres de columna y tipo de datos, la conversión de RDD a DataFrame le da nombres de columna por defecto como _1, _2 y así sucesivamente y el tipo de datos como String. Utiliza DataFrame `printSchema()` para imprimir el esquema en la consola.

`toDF()` tiene otra firma para asignar un nombre de columna, esta toma un número variable de argumentos para los nombres de columna como se muestra a continuación.

Produce la siguiente salida. Recuerde que aquí acabamos de asignar los nombres de las columnas, pero se toman todos los tipos de datos como cadenas.

In [4]:
val dfFromRDD1 = rdd.toDF("language","users_count")

dfFromRDD1.printSchema()
dfFromRDD1.show()

22/07/01 11:21:36 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



dfFromRDD1: org.apache.spark.sql.DataFrame = [language: string, users_count: string]


Por defecto, el tipo de datos de estas columnas se asigna a String. Podemos cambiar este comportamiento suministrando el esquema, donde podemos especificar un nombre de columna, tipo de datos y nullable para cada campo/columna.

### 1.2 Usando Spark createDataFrame() desde SparkSession
Usar `createDataFrame()` desde SparkSession es otra forma de crear y toma el objeto rdd como argumento y lo encadena con `toDF()` para especificar nombres a las columnas.

In [5]:
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)

dfFromRDD2.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



dfFromRDD2: org.apache.spark.sql.DataFrame = [language: string, users_count: string]


### 1.3 Utilización de createDataFrame() con el tipo Row
`createDataFrame()` tiene otra firma que toma el tipo `RDD[Row]` y el esquema para los nombres de las columnas como argumentos. Para usar esto primero necesitamos convertir nuestro objeto "rdd" de `RDD[T]` a `RDD[Row]` y definir un esquema usando StructType & StructField.

In [6]:
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.Row
val schema = StructType( Array(
                 StructField("language", StringType,true),
                 StructField("users", StringType,true)
             ))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)

dfFromRDD3.show()

+--------+------+
|language| users|
+--------+------+
|    Java| 20000|
|  Python|100000|
|   Scala|  3000|
+--------+------+



import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.Row
schema: org.apache.spark.sql.types.StructType = StructType(StructField(language,StringType,true),StructField(users,StringType,true))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[10] at map at <console>:35
dfFromRDD3: org.apache.spark.sql.DataFrame = [language: string, users: string]


## 2. Crear un Spark DataFrame a partir de una colección List y Seq
En esta sección, veremos varias aproximaciones para crear Spark DataFrame a partir de la colección `Seq[T]` o `List[T]`. Estos ejemplos serían similares a lo que hemos visto en la sección anterior con RDD, pero utilizamos el objeto "data" en lugar del objeto "rdd".

### 2.1 Utilizar toDF() sobre una colección List o Seq
`toDF()` sobre una colección (Seq, List) crea un DataFrame. asegúrate de `import spark.implicits._` para usar `toDF()`

In [7]:
import spark.implicits._
val dfFromData1 = data.toDF() 

dfFromData1.show()

+------+------+
|    _1|    _2|
+------+------+
|  Java| 20000|
|Python|100000|
| Scala|  3000|
+------+------+



import spark.implicits._
dfFromData1: org.apache.spark.sql.DataFrame = [_1: string, _2: string]


### 2.2 Uso de createDataFrame() desde SparkSession
Llamando a `createDataFrame()` desde `SparkSession` es otra forma de crear y toma como argumento un objeto de colección (Seq o List) y lo encadena con `toDF()` para especificar nombres a las columnas.

In [8]:
//From Data (USING createDataFrame)
var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)

dfFromData2.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



dfFromData2: org.apache.spark.sql.DataFrame = [language: string, users_count: string]


### 2.3 Uso de createDataFrame() con el tipo Row
`createDataFrame()` tiene otra firma en Spark que toma el `util.List` de tipo Row y el esquema para los nombres de las columnas como argumentos. Para usar esto primero necesitamos `importar scala.collection.JavaConversions._`

In [9]:
import scala.collection.JavaConversions._
//From Data (USING createDataFrame and Adding schema using StructType)
val rowData= Seq(Row("Java", "20000"), 
               Row("Python", "100000"), 
               Row("Scala", "3000"))
var dfFromData3 = spark.createDataFrame(rowData,schema)

dfFromData3.show()

+--------+------+
|language| users|
+--------+------+
|    Java| 20000|
|  Python|100000|
|   Scala|  3000|
+--------+------+



import scala.collection.JavaConversions._
rowData: Seq[org.apache.spark.sql.Row] = List([Java,20000], [Python,100000], [Scala,3000])
dfFromData3: org.apache.spark.sql.DataFrame = [language: string, users: string]


## 3. Crear un Spark DataFrame a partir de CSV
En todos los ejemplos anteriores, has aprendido a Spark a crear DataFrame a partir de RDD y objetos de recolección de datos. En tiempo real estos son menos utilizados, en esta y en las siguientes secciones, aprenderás a crear DataFrame a partir de fuentes de datos como CSV, texto, JSON, Avro, etc.

Spark proporciona por defecto una API para leer archivos con delimitadores como comas, tuberías, tabuladores y también proporciona varias opciones de manejo con cabecera, sin cabecera, comillas dobles, tipos de datos, etc.

Para un ejemplo detallado, consulte crear un DataFrame a partir de un archivo CSV.

In [10]:
val df2 = spark.read.option("header", "true").csv("data/iris.csv")

df2.show()

+------------+-----------+------------+-----------+-------+
|sepal.length|sepal.width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|         .2| Setosa|
|         4.9|          3|         1.4|         .2| Setosa|
|         4.7|        3.2|         1.3|         .2| Setosa|
|         4.6|        3.1|         1.5|         .2| Setosa|
|           5|        3.6|         1.4|         .2| Setosa|
|         5.4|        3.9|         1.7|         .4| Setosa|
|         4.6|        3.4|         1.4|         .3| Setosa|
|           5|        3.4|         1.5|         .2| Setosa|
|         4.4|        2.9|         1.4|         .2| Setosa|
|         4.9|        3.1|         1.5|         .1| Setosa|
|         5.4|        3.7|         1.5|         .2| Setosa|
|         4.8|        3.4|         1.6|         .2| Setosa|
|         4.8|          3|         1.4|         .1| Setosa|
|         4.3|          3|         1.1| 

df2: org.apache.spark.sql.DataFrame = [sepal.length: string, sepal.width: string ... 3 more fields]


## 4. Creación a partir de un archivo de texto (TXT)
Aquí veremos cómo crear a partir de un archivo TXT.

In [None]:
val df2 = spark.read
.text("/src/resources/file.txt")

## 5. Creando desde un archivo JSON
Aquí veremos cómo crear a partir de un archivo JSON.

In [None]:
val df2 = spark.read
.json("/src/resources/file.json")

## 6. Creando desde un archivo XML
Para crear DataFrame parseando XML, debemos utilizar el DataSource "com.databricks.spark.xml" spark-xml api de Databricks.

In [None]:
val df = spark.read
      .format("com.databricks.spark.xml")
      .option("rowTag", "person")
      .xml("src/main/resources/persons.xml")

## 7. Creación a partir de Hive

In [None]:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
val hiveDF = hiveContext.sql(“select * from emp”)

## 8. Spark crea un DataFrame a partir de una base de datos RDBMS
### 8.a) Desde una tabla Mysql
Asegúrate de que tienes la librería MySQL como dependencia en tu archivo pom.xml o los jars de MySQL en tu classpath.

In [None]:
val df_mysql = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:mysql://localhost:port/db”)
   .option(“driver”, “com.mysql.jdbc.Driver”)
   .option(“dbtable”, “tablename”) 
   .option(“user”, “user”) 
   .option(“password”, “password”) 
   .load()

### 8.1 Desde la tabla DB2
Asegúrese de que tiene la biblioteca DB2 como dependencia en su archivo pom.xml o los jars DB2 en su classpath.

In [None]:
val df_db2 = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:db2://localhost:50000/dbname”)
   .option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
   .option(“dbtable”, “tablename”) 
   .option(“user”, “user”) 
   .option(“password”, “password”) 
   .load()

Del mismo modo, podemos crear DataFrame en Spark a partir de la mayoría de las bases de datos relacionales que no he cubierto aquí y que dejaré que exploren.

## 9. Crear DataFrame a partir de una tabla HBase
Para crear un DataFrame en Spark a partir de una tabla HBase, debemos utilizar el DataSource definido en los conectores HBase de Spark. Por ejemplo, utilizar el DataSource "`org.apache.spark.sql.execution.datasources.hbase`" de Hortonworks o utilizar "`org.apache.hadoop.hbase.spark`" del conector HBase de Spark.

In [None]:
val hbaseDF = sparkSession.read
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

## 10. Otras fuentes (Avro, Parquet, Kafka)
También podemos crear DataFrame a partir de Avro, Parquet, HBase y leer datos de Kafka