# Chapter 4

## Creamos una vista temporal

In [0]:
%scala
val schema = "date STRING, delay INT, distance INT, origin STRING, destination STRING"

In [0]:
%scala
// In Scala
import org.apache.spark.sql.SparkSession 
val spark = SparkSession
 .builder
 .appName("SparkSQLExampleApp")
 .getOrCreate()
// Path to data set 
val csvFile= "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
// Read and create a temporary view
// Infer schema (note that for larger files you may want to specify the schema)
val df = spark.read.format("csv")
 .option("inferSchema", "true")
 .option("header", "true")
 .load(csvFile)
// Create a temporary view
df.createOrReplaceTempView("us_delay_flights_tbl")

In [0]:
%scala
df.columns

## Consultas SQL

Buscamos todos los vuelos cuya distancia sea mayor a 1,000 millas

In [0]:
%scala
spark.sql("""SELECT distance, origin, destination 
FROM us_delay_flights_tbl WHERE distance > 1000 
ORDER BY distance DESC""").show(10)

Todos los vuelos entre San Francisco (SFO) y Chicago (ORD) con al menos dos horas de retraso

In [0]:
%scala
spark.sql("""SELECT date, delay, origin, destination 
FROM us_delay_flights_tbl 
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
ORDER by delay DESC""").show(10)

Etiquetar todos los vuelos de EE. UU., independientemente de su origen y destino, con una indicación de los retrasos que experimentaron: retrasos muy largos (> 6 horas), retrasos largos (2 a 6 horas), etc. Agregaremos estas etiquetas legibles por humanos en una nueva columna llamada Flight_Delays

In [0]:
%scala
spark.sql("""SELECT delay, origin, destination,
 CASE
 WHEN delay > 360 THEN 'Very Long Delays'
 WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
 WHEN delay > 60 AND delay < 120 THEN 'Short Delays'
 WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
 WHEN delay = 0 THEN 'No Delays'
 ELSE 'Early'
 END AS Flight_Delays
 FROM us_delay_flights_tbl
 ORDER BY origin, delay DESC""").show(10)

**Ejercicio: Convertir la columna fecha a un formato legible**

In [0]:
%scala
import org.apache.spark.sql.functions._
val df_new = (df
         .withColumn("DateAndHour", to_timestamp(col("date"), "mm-dd hh:mm")))

In [0]:
%scala
def toDateFormatUDF(dStr:String) : String  = {
  return s"${dStr(0)}${dStr(1)}${'-'}${dStr(2)}${dStr(3)}${' '}${dStr(4)}${dStr(5)}${':'}${dStr(6)}${dStr(7)}"
}

// test  it
toDateFormatUDF("02190925")

In [0]:
%scala
df.selectExpr("toDateFormatUDF(date) as data_format").show(10, false)

In [0]:
%scala
spark.udf.register("toDateFormatUDF", toDateFormatUDF(_:String):String)

In [0]:
%scala
val df_new = df.selectExpr("toDateFormatUDF(date) as DateAndHour")

In [0]:
%scala
df_new.show()

Las consultas anteriores se pueden expresar como consultas API. Por ejemplo la primera:

In [0]:
%scala
(df.select("distance", "origin", "destination")
 .where(col("distance") > 1000)
 .orderBy(desc("distance"))).show(10)

La segunda

In [0]:
%scala
//spark.sql("""SELECT date, delay, origin, destination 
//FROM us_delay_flights_tbl 
//WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
//ORDER by delay DESC""").show(10)

df
       .select("date", "delay", "origin", "destination")
       .where(col("delay") > 120 && col("origin") === "SFO" && col("destination") === "ORD").show()

La tercera

In [0]:
%scala
//spark.sql("""SELECT delay, origin, destination,
//CASE
//WHEN delay > 360 THEN 'Very Long Delays'
//WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
//WHEN delay > 60 AND delay < 120 THEN 'Short Delays'
//WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
//WHEN delay = 0 THEN 'No Delays'
//ELSE 'Early'
//END AS Flight_Delays
//FROM us_delay_flights_tbl
//ORDER BY origin, delay DESC""").show(10)


val df_3 = df.withColumn("Flight_Delays",
                        when(col("delay") > 360,
                            lit("Very Long Delays"))
                        when(col("delay") > 120 && col("delay") < 360,
                            lit("Long Delays"))
                        when(col("delay") > 60 && col("delay") < 120,
                            lit("Short Delays"))
                        when(col("delay") > 0 && col("delay") < 60,
                            lit("Tolerable Delays"))
                        when(col("delay") === 0,
                            lit("No Delays"))
                        otherwise(lit("Early")))
.orderBy(desc("origin"))
.show(10)

## Creando Tablas y Databases de SQL

### Databases

In [0]:
%scala
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")

Ahora todas las tablas que se creen estarán dentro de esa base de datos

d
### Tables

#### Managed
Las tablas administradas administran tanto los metadatos como los datos. Si se ejecuta un DROP se eliminarían los datos reales y los metadatos.

In [0]:
%scala
spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)").show()

Aunque cierre un cluster y abra otro, lo que he hecho en sesiones anteriores se guarda en el fichero de metadatos. Al abrir otra sesión, la tabla no aparece en el contenido de las tablas, pero no me deja volverla a crear porque ya existen los metadatos de la misma. Tendré que eliminar el registro

In [0]:
%scala
dbutils.fs.rm("dbfs:/user/hive/warehouse/learn_spark_db.db/managed_us_delay_flights_tbl",recurse=true)

#### Unmanaged
En las tablas no administradas, Spark solo administra los metadatos mientras que el propio usuario maneja los datos. Aquí el DROP solo eliminaría los metadatos.

In [0]:
%scala
spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT, 
 distance INT, origin STRING, destination STRING) 
 USING csv OPTIONS (PATH 
 '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")

### Creando vistas
Las vistas se crean sobre tablas existentes y pueden ser globales o session-scoped. Las globales son visibles en todas las Spark-Sessions de un cluster y la segunda solo es visible para una sola Spark-session (son temporales, desaparecen después de la finalización de la Spark Application.

#### Global
Son visibles en todos los SparkSession en un cluster dado

In [0]:
%scala
//A partir de una tabla existente: SQL
spark.sql("""CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_SFO_global_tmp_view AS
 SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE
 origin = 'SFO'""")

In [0]:
%scala
// API
val df_sfo = spark.sql("""SELECT date, delay, origin, destination FROM 
 us_delay_flights_tbl WHERE origin = 'SFO'""")
df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
df_sfo.show(5)

In [0]:
%scala
spark.sql("""SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view""").show()

**Eliminar vistas**

In [0]:
%scala
spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view")

#### Temporal
Las vistas son visibles para un único SparkSessios y se eliminan después de finalizar la aplicación Spark

In [0]:
%scala
spark.sql("""CREATE OR REPLACE TEMP VIEW us_origin_airport_JFK_tmp_view AS
 SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE
 origin = 'JFK'""")

In [0]:
%scala
//API
val df_jfk = spark.sql("""SELECT date, delay, origin, destination FROM 
 us_delay_flights_tbl WHERE origin = 'JFK'""")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")
df_jfk.show(5)

In [0]:
%scala
spark.sql("""SELECT * FROM us_origin_airport_JFK_tmp_view""").show()

**Eliminar vistas**

In [0]:
%scala
spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")

## Metadatos
Spark administra los metadatos asociados a cada tabla, ya sea administrada o no administrada. Para administrar los metadatos se utiliza Catalog la cual es una herramienta de alto nivel de Spark SQL para almacenar metadatos.

Después de crear la vartiable de la SparkSession, se puede acceder al metadata almacenado de la siaguiente manera:

In [0]:
%scala
display(spark.catalog.listTables(dbName="global_temp"))

name,database,description,tableType,isTemporary
us_origin_airport_sfo_global_tmp_view,global_temp,,TEMPORARY,True
us_delay_flights_tbl,,,TEMPORARY,True
us_origin_airport_jfk_tmp_view,,,TEMPORARY,True


**Almacenamiento en caché de tablas SQL**

Se puede cache y uncahe tablas SQL y vistas. Además, si especificas la tabla como LAZY, se guardará en caché cuando se utiliza la tabla por primera vez y no inmediatamente cuando se crea.

In [0]:
%scala
//In SQL
spark.sql("CACHE [LAZY] TABLE <table-name>")
spark.sql("UNCACHE TABLE <table-name>")

## Leyendo tablas a DataFrames
Si ya hay una base de datos learn_spark_db y una tabla us_delay_flights_tbl preparadas para ser utilizadas. En vez de importar datos directamente del JSON file para tener un dataframe, podemos simplemente ejecutar una consulta SQL a la tabla y asignarle como resultado un DataFrame

In [0]:
%scala
//Dos formas
val usFlightsDF = spark.sql("SELECT * FROM us_delay_flights_tbl")
val usFlightsDF2 = spark.table("us_delay_flights_tbl")
usFlightsDF.show(5)

## Data Sources for DataFrames and SQL Tables
Spark SQL proporciona una gran variedad de data sources. Además de proporcionar un conjunto de métodos reales para leer y escribir desde estas fuentes utilizando la Data Sources API.

### DataFrameReader
Es la herramienta con la que se lee una fuente de datos en un DataFrame. Tiene un formato definido y un patrón recomendado de uso.

```DataFrameReader.format(args).option("key", "value").schema(args).load()```

**Opciones**

| Method | Arguments | Description |
| --- | --- | --- |
| format() | "parquet", "csv", "txt", "json","jdbc", "orc", "avro", etc. | If you don’t specify this method, then the default ismParquet or whatever is set in spark.sql.sources.default.|
| option() |("mode", {PERMISSIVE / FAILFAST / DROPMALFORMED } )("inferSchema", {true / false}) ("path", "path_file_data_source") | A series of key/value pairs and options. The Spark documentation shows some examples and explains the different modes and their actions. The default mode is PERMISSIVE. The "inferSchema" and "mode" options are specific to the JSON and CSV file formats.|
| schema() | DDL String or StructType, e.g., 'A INT, B STRING' orStructType(...) | For JSON or CSV format, you can specify to infer the schema in the option() method. Generally, providing a schema for any format makes loading faster and ensures your data conforms to the expected schema.|
| load() | "/path/to/data/source" | The path to the data source. This can be empty if specified in option("path", "...").|

**Ejemplos**

In [0]:
%scala
// Use Parquet 
val file = """/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet"""
val df = spark.read.format("parquet").load(file)
// Use Parquet; you can omit format("parquet") if you wish as it's the default
//val df2 = spark.read.load(file)
// Use CSV
val df3 = spark.read.format("csv")
 .option("inferSchema", "true")
 .option("header", "true")
 .option("mode", "PERMISSIVE")
 .load("/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*")
// Use JSON
val df4 = spark.read.format("json")
 .load("/databricks-datasets/learning-spark-v2/flights/summary-data/json/*")

df.show(5)
df3.show(5)
df4.show(5)

En general, no se necesita ningún esquema cuando se lee desde una fuente de datos estática de Parquet; los metadatos de Parquet generalmente contienen el esquema,  por lo que se deduce.

### DataFrameWriter
Guarda o escribe datos en una fuente de datos integrada especificada y estos son sus patrones de uso:
```
DataFrameWriter.format(args)
 .option(args)
 .bucketBy(args)
 .partitionBy(args)
 .save(path)
```
o

```
DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)
```

**Opciones**

| Method | Arguments | Description |
| --- | --- | --- |
| format() | "parquet", "csv", "txt", "json","jdbc", "orc", "avro", etc. | If you don’t specify this method, then the default is Parquet or whatever is set in spark.sql.sources.default.|
| option() | ("mode", {append / overwrite / ignore / error or errorifexists} ) ("mode", {SaveMode.Overwrite / SaveMode.Append, Save Mode.Ignore, SaveMode.ErrorIfExists}) ("path", "path_to_write_to") | A series of key/value pairs and options. The Spark documentation shows some examples. This is an overloaded method. The default mode options are error or error ifexists and SaveMode.ErrorIfExists; they throw an exception at runtime if the data already exists.|
| bucketBy() | (numBuckets, col, col..., coln) | The number of buckets and names of columns to bucket by. Uses Hive’s bucketing scheme on a filesystem.|
| save() | "/path/to/data/source" | The path to save to. This can be empty if specified in option("path", "..."). |
| saveAsTable() | "table_name" | The table to save to.|

**Ejemplo**

In [0]:
%scala
//Use JSON
//El nombre me lo pone Databricks. La location es el directorio en el que se guarda el fichero. Si le quier camciar el nombre tengo que hacer un rename
val location = """/learning-spark-v2/sf-fire/Ejemplo_save_json"""
df.write.format("json").mode("overwrite").save(location)

## Fuentes de datos

### Parquet
La fuente de datos predeterminada de Spark

#### Leer de archivos Parquet en un DataFrame

In [0]:
%scala
val file = """/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/"""
val df = spark.read.format("parquet").load(file)
df.show(5)

#### Leer de archivos Parquet en una tabla SQL

In [0]:
%scala
//Creamos la base de datos
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
 USING parquet
 OPTIONS (
 path "/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/" )
""")

In [0]:
%scala
//Leemos la tabla
spark.sql("SELECT * FROM us_delay_flights_tbl").show(5)

#### Escribir DataFrames a archivos Parquet

In [0]:
%scala
df.write.format("parquet")
 .mode("overwrite")
 .option("compression", "snappy")
 .save("/tmp/data/parquet/df_parquet")

#### Escribir DataFrames a tablas SQL

In [0]:
%scala
df.write
 .mode("overwrite")
 .saveAsTable("us_delay_flights_tbl")

### JSON
En Spark se soporta tanto el formato sigle-line mode como el multiline mode.

#### Leer de archivos JSON en un DataFrame

In [0]:
%scala
val file = "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
val df = spark.read.format("json").load(file)


#### Leer de archivos JSON en una tabla SQL

In [0]:
%scala
// Creamos la base de datos
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
 USING json
 OPTIONS (
 path "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
 )""")

In [0]:
%scala
spark.sql("SELECT * FROM us_delay_flights_tbl").show()

d
#### Escribir DataFrames a archivos JSON

In [0]:
%scala
df.write.format("json")
 .mode("overwrite")
 .option("compression", "snappy")
 .save("/tmp/data/json/df_json")

**Opciones**

| Property name | Values | Meaning | Scope
| --- | --- | --- | --- |
| compression | none, uncompressed, bzip2, deflate, gzip, lz4, or snappy | Use this compression codec for writing. Note that read will only detect the compression or codec from the file extension.| Write |
| dateFormat | yyyy-MM-dd or DateTimeFormatter | Use this format or any format from Java’s DateTime Formatter. | Read/ write |
| multiLine | true, false | Use multiline mode. Default is false (single-line mode). | Read
| allowUnquoted FieldNames | true, false | Allow unquoted JSON field names. Default is false. | Read

### CSV

#### Leer de archivos CSV en un DataFrame

In [0]:
%scala
val file = "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
val schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"
val df = spark.read.format("csv")
 .schema(schema)
 .option("header", "true")
 .option("mode", "FAILFAST") // Exit if any errors
 .option("nullValue", "") // Replace any null data with quotes
 .load(file)

#### Leer de archivos CSV en una tabla SQL

In [0]:
%scala
//Creamos la base de datos
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
 USING csv
 OPTIONS (
 path "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*",
 header "true",
 inferSchema "true",
 mode "FAILFAST"
 )""")

In [0]:
%scala
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10)


#### Escribir DataFrames a archivos CSV

In [0]:
%scala
df.write.format("csv").mode("overwrite").save("/tmp/data/csv/df_csv")

**Opciones**
## TABLA
Página 128

### AVRO

#### Leer de archivos AVRO en un DataFrame

In [0]:
%scala
val df = spark.read.format("avro")
.load("/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*")
df.show(false)

#### Leer de archivos AVRO en una tabla SQL

In [0]:
%scala
//Creamos la base de datos
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW episode_tbl
 USING avro
 OPTIONS (
 path "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
 )""")


In [0]:
%scala
spark.sql("SELECT * FROM episode_tbl").show(false)

#### Escribir DataFrames a archivos AVRO

In [0]:
%scala
df.write
 .format("avro")
 .mode("overwrite")
 .save("/tmp/data/avro/df_avro")

**Opciones**

##TABLA
Página 130

### ORC

#### Leer de archivos ORC en un DataFrame

In [0]:
%scala
val file = "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
val df = spark.read.format("orc").load(file)
df.show(10, false)

#### Leer de archivos ORC en una tabla SQL

In [0]:
%scala
//Creo la base de datos 
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
 USING orc
 OPTIONS (
 path "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
 )""")

In [0]:
%scala
spark.sql("SELECT * FROM us_delay_flights_tbl").show()

#### Escribir DataFrames a archivos ORC

In [0]:
%scala
// In Scala
df.write.format("orc")
 .mode("overwrite")
 .option("compression", "snappy")
 .save("/tmp/data/orc/df_orc")


### Imágenes

#### Reading an image file into a DataFrame

In [0]:
%scala
import org.apache.spark.ml.source.image
val imageDir = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
val imagesDF = spark.read.format("image").load(imageDir)
imagesDF.printSchema
imagesDF.select("image.height", "image.width", "image.nChannels", "image.mode",
 "label").show(5, false)

### Binary Files

#### Reading a binary file into a DataFrame
El siguiente código lee todos los archivos JPG del directorio de entrada con cualquier directorio particionado

In [0]:
%scala
val path = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
val binaryFilesDF = spark.read.format("binaryFile")
 .option("pathGlobFilter", "*.jpg")
 .load(path)
binaryFilesDF.show(5)

Para ignorar la detección de datos de partición en un directorio, puede configurar ```recursiveFile Lookup to "true"```:

In [0]:
%scala
val binaryFilesDF = spark.read.format("binaryFile")
 .option("pathGlobFilter", "*.jpg")
 .option("recursiveFileLookup", "true")
 .load(path)
binaryFilesDF.show(5)

# APUNTES
Con \%fs puedo ejecutar comandos linux en el Notebook de Databricks. Se puede hacer un ls, ver directorios, crearlos y eliminarlos. En la siguiente web ```//https://docs.databricks.com/_static/notebooks/dbutils.html``` se observan todos los comandos disponibles.

**Ejemplos**

In [0]:
%fs
ls

path,name,size,modificationTime
dbfs:/Ejemplo_save_json.json/,Ejemplo_save_json.json/,0,0
dbfs:/FileStore/,FileStore/,0,0
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-results/,databricks-results/,0,0
dbfs:/learning-spark-v2/,learning-spark-v2/,0,0
dbfs:/tmp/,tmp/,0,0
dbfs:/user/,user/,0,0


In [0]:
%fs
ls /learning-spark-v2/sf-fire/Ejemplo_save_json

path,name,size,modificationTime
dbfs:/learning-spark-v2/sf-fire/Ejemplo_save_json/_SUCCESS,_SUCCESS,0,1651142577000
dbfs:/learning-spark-v2/sf-fire/Ejemplo_save_json/_committed_2238006556770501276,_committed_2238006556770501276,114,1651142577000
dbfs:/learning-spark-v2/sf-fire/Ejemplo_save_json/_started_2238006556770501276,_started_2238006556770501276,0,1651142576000
dbfs:/learning-spark-v2/sf-fire/Ejemplo_save_json/part-00000-tid-2238006556770501276-4704d525-889b-423b-8d58-a96e6976647e-280-1-c000.json,part-00000-tid-2238006556770501276-4704d525-889b-423b-8d58-a96e6976647e-280-1-c000.json,21353,1651142576000


In [0]:
%fs
rm -r '/"'