# Capítulo 3 Learning Spark

## Alto nivel vs Bajo Nivel

In [205]:
spark

res128: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@3d671bd


In [206]:
sc

res129: org.apache.spark.SparkContext = org.apache.spark.SparkContext@1facc93c


In [207]:
// Importamos las funciones
import org.apache.spark.sql.functions._

import org.apache.spark.sql.functions._


### Creación de DataFrame por secuencias

In [208]:
val df = spark.createDataFrame(Seq(("Brooke",20),("Denny",31),("Jules",30),("TD",35),("Brooke",35))).toDF("Nombre","Edad")

df: org.apache.spark.sql.DataFrame = [Nombre: string, Edad: int]


In [209]:
df.show()

+------+----+
|Nombre|Edad|
+------+----+
|Brooke|  20|
| Denny|  31|
| Jules|  30|
|    TD|  35|
|Brooke|  35|
+------+----+



### Ejemplos de qué se puede hacer en Alto Nivel

In [210]:
val dfmedia = df.groupBy("Nombre").agg(avg("Edad").alias("Media"))

dfmedia: org.apache.spark.sql.DataFrame = [Nombre: string, Media: double]


In [211]:
dfmedia.show()

+------+-----+
|Nombre|Media|
+------+-----+
|Brooke| 27.5|
| Denny| 31.0|
| Jules| 30.0|
|    TD| 35.0|
+------+-----+



## Tipos de datos en SPARK de Scala

In [212]:
// importamos los tipos de datos
import org.apache.spark.sql.types._

import org.apache.spark.sql.types._


In [213]:
val nameTypes = StringType

nameTypes: org.apache.spark.sql.types.StringType.type = StringType


In [214]:
val firsName = nameTypes

firsName: org.apache.spark.sql.types.StringType.type = StringType


In [215]:
val lastName = nameTypes

lastName: org.apache.spark.sql.types.StringType.type = StringType


## Definimos un SCHEMA

In [216]:
// FORMA PROGRAMATIVA
val schema = StructType(
    Array(
    StructField("Autor",StringType,false), //Creo que el falso indica si puede o no ser nulo
    StructField("Titulo",StringType,false),
    StructField("Pagina",IntegerType,false)
    )
    )

schema: org.apache.spark.sql.types.StructType = StructType(StructField(Autor,StringType,false), StructField(Titulo,StringType,false), StructField(Pagina,IntegerType,false))


In [217]:
// FORMA FACIL (DDL)
val schema2 = "autor STRING, titulo STRING, paginas INT"

schema2: String = autor STRING, titulo STRING, paginas INT


In [218]:
val schema = StructType(Array(StructField("author", StringType, false),
    StructField("title", StringType, true),
    StructField("pages", IntegerType, false)))

schema: org.apache.spark.sql.types.StructType = StructType(StructField(author,StringType,false), StructField(title,StringType,true), StructField(pages,IntegerType,false))


## Ejemplo de uso de SCHEMA y lectura de JSON

### Sin SCHEMA

In [219]:
// cargamos el arvhivo JSON
val jsonfile = spark.read.format("json")
    .load("blogs.json")

jsonfile: org.apache.spark.sql.DataFrame = [Campaigns: array<string>, First: string ... 5 more fields]


In [220]:
jsonfile.show(5)

+--------------------+---------+-----+---+-------+---------+-----------------+
|           Campaigns|    First| Hits| Id|   Last|Published|              Url|
+--------------------+---------+-----+---+-------+---------+-----------------+
| [twitter, LinkedIn]|    Jules| 4535|  1|  Damji| 1/4/2016|https://tinyurl.1|
| [twitter, LinkedIn]|   Brooke| 8908|  2|  Wenig| 5/5/2018|https://tinyurl.2|
|[web, twitter, FB...|    Denny| 7659|  3|    Lee| 6/7/2019|https://tinyurl.3|
|       [twitter, FB]|Tathagata|10568|  4|    Das|5/12/2018|https://tinyurl.4|
|[web, twitter, FB...|    Matei|40578|  5|Zaharia|5/14/2014|https://tinyurl.5|
+--------------------+---------+-----+---+-------+---------+-----------------+
only showing top 5 rows



### Con SCHEMA

In [221]:
// CREO MI SCHEMA
val schema = StructType(
    Array(
    StructField("Id",IntegerType,false), //Creo que el falso indica si puede o no ser nulo
    StructField("First",StringType,false),
    StructField("Last",StringType,false),
    StructField("Url",StringType,false),
    StructField("Published",DateType,false),
    StructField("Hits",IntegerType,false),
    StructField("Campaigns",ArrayType(StringType),false)
    )
    )

schema: org.apache.spark.sql.types.StructType = StructType(StructField(Id,IntegerType,false), StructField(First,StringType,false), StructField(Last,StringType,false), StructField(Url,StringType,false), StructField(Published,DateType,false), StructField(Hits,IntegerType,false), StructField(Campaigns,ArrayType(StringType,true),false))


In [222]:
val dfBlog = spark.read.schema(schema).json("blogs.json")

dfBlog: org.apache.spark.sql.DataFrame = [Id: int, First: string ... 5 more fields]


In [223]:
dfBlog.show(truncate = false)

+---+---------+-------+-----------------+---------+-----+----------------------------+
|Id |First    |Last   |Url              |Published|Hits |Campaigns                   |
+---+---------+-------+-----------------+---------+-----+----------------------------+
|1  |Jules    |Damji  |https://tinyurl.1|null     |4535 |[twitter, LinkedIn]         |
|2  |Brooke   |Wenig  |https://tinyurl.2|null     |8908 |[twitter, LinkedIn]         |
|3  |Denny    |Lee    |https://tinyurl.3|null     |7659 |[web, twitter, FB, LinkedIn]|
|4  |Tathagata|Das    |https://tinyurl.4|null     |10568|[twitter, FB]               |
|5  |Matei    |Zaharia|https://tinyurl.5|null     |40578|[web, twitter, FB, LinkedIn]|
|6  |Reynold  |Xin    |https://tinyurl.6|null     |25568|[twitter, LinkedIn]         |
+---+---------+-------+-----------------+---------+-----+----------------------------+



In [224]:
dfBlog.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: date (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaigns: array (nullable = true)
 |    |-- element: string (containsNull = true)



## Columns & Expressions

In [225]:
// Vamos a ver las columnas del dataFrame anterior
dfBlog.columns

res135: Array[String] = Array(Id, First, Last, Url, Published, Hits, Campaigns)


In [226]:
dfBlog.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: date (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaigns: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [227]:
dfBlog.col("Id")

res137: org.apache.spark.sql.Column = Id


In [228]:
// Vamos a utilizar expr para multiplicar
dfBlog.select(expr("Hits*2")).show()

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
|     15318|
|     21136|
|     81156|
|     51136|
+----------+



In [229]:
//Equivalentemente
dfBlog.select($"Hits"*2).show()

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
|     15318|
|     21136|
|     81156|
|     51136|
+----------+



In [230]:
// USAR $"..." es lo mismo que col("...")
dfBlog.select(col("Hits")*2).show()

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
|     15318|
|     21136|
|     81156|
|     51136|
+----------+



### Intentamos multiplicar una String

In [231]:
dfBlog.select(col("Last")*2).show()
// devuelve nulo

+----------+
|(Last * 2)|
+----------+
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
+----------+



In [232]:
dfBlog.select(expr("Last*2")).show()

+----------+
|(Last * 2)|
+----------+
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
+----------+



## Renombrar columnas de un DataFrame

In [233]:
val df1 = dfBlog.withColumnRenamed("Last","Apellido")
df1.printSchema

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Apellido: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: date (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaigns: array (nullable = true)
 |    |-- element: string (containsNull = true)



df1: org.apache.spark.sql.DataFrame = [Id: int, First: string ... 5 more fields]


### Usar expresiones para condiciones

In [234]:
dfBlog.withColumn("NombreCol", expr("Hits > 10000")).show()

+---+---------+-------+-----------------+---------+-----+--------------------+---------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|NombreCol|
+---+---------+-------+-----------------+---------+-----+--------------------+---------+
|  1|    Jules|  Damji|https://tinyurl.1|     null| 4535| [twitter, LinkedIn]|    false|
|  2|   Brooke|  Wenig|https://tinyurl.2|     null| 8908| [twitter, LinkedIn]|    false|
|  3|    Denny|    Lee|https://tinyurl.3|     null| 7659|[web, twitter, FB...|    false|
|  4|Tathagata|    Das|https://tinyurl.4|     null|10568|       [twitter, FB]|     true|
|  5|    Matei|Zaharia|https://tinyurl.5|     null|40578|[web, twitter, FB...|     true|
|  6|  Reynold|    Xin|https://tinyurl.6|     null|25568| [twitter, LinkedIn]|     true|
+---+---------+-------+-----------------+---------+-----+--------------------+---------+



### Concatenar columnas

In [235]:
dfBlog.withColumn("Concatenacion",concat($"First",$"Last")).show()
dfBlog.withColumn("Concatenacion",concat(expr("First"),expr("Last"))).show()

+---+---------+-------+-----------------+---------+-----+--------------------+-------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Concatenacion|
+---+---------+-------+-----------------+---------+-----+--------------------+-------------+
|  1|    Jules|  Damji|https://tinyurl.1|     null| 4535| [twitter, LinkedIn]|   JulesDamji|
|  2|   Brooke|  Wenig|https://tinyurl.2|     null| 8908| [twitter, LinkedIn]|  BrookeWenig|
|  3|    Denny|    Lee|https://tinyurl.3|     null| 7659|[web, twitter, FB...|     DennyLee|
|  4|Tathagata|    Das|https://tinyurl.4|     null|10568|       [twitter, FB]| TathagataDas|
|  5|    Matei|Zaharia|https://tinyurl.5|     null|40578|[web, twitter, FB...| MateiZaharia|
|  6|  Reynold|    Xin|https://tinyurl.6|     null|25568| [twitter, LinkedIn]|   ReynoldXin|
+---+---------+-------+-----------------+---------+-----+--------------------+-------------+

+---+---------+-------+-----------------+---------+-----+------------

### Similitud entre métodos

In [236]:
dfBlog.select("Hits").show(2)
dfBlog.select(col("Hits")).show(2)
dfBlog.select(expr("Hits")).show(2)
dfBlog.select($"Hits").show(2)

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows



## Cómo ordenar un DataFrame

In [237]:
dfBlog.sort($"Id".desc).show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6|     null|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|     null|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|     null|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3|     null| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2|     null| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1|     null| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



## OBJETO FILAS

In [238]:
// importamos el objeto Row
import org.apache.spark.sql.Row

import org.apache.spark.sql.Row


### Acceso por indice

In [239]:
// creamos una fila
val filablog = Row(6,"Reynold","Xin","https//tinyurl.6",255568,"3/2/2015",Array("twitter","LinkedIn"))

filablog: org.apache.spark.sql.Row = [6,Reynold,Xin,https//tinyurl.6,255568,3/2/2015,[Ljava.lang.String;@56bc2f2a]


In [240]:
// acedemos por indice 
filablog(1)

res148: Any = Reynold


In [241]:
val filablog2 = Row(5,"agatha","christie","https//youtube.com",777,"31/3/2012",Array("Instagram","LinkedIn"))

filablog2: org.apache.spark.sql.Row = [5,agatha,christie,https//youtube.com,777,31/3/2012,[Ljava.lang.String;@7d89517b]


In [242]:
val rows2 = Seq(filablog,filablog2)

rows2: Seq[org.apache.spark.sql.Row] = List([6,Reynold,Xin,https//tinyurl.6,255568,3/2/2015,[Ljava.lang.String;@56bc2f2a], [5,agatha,christie,https//youtube.com,777,31/3/2012,[Ljava.lang.String;@7d89517b])


In [243]:
// ESTA ME DA ERROR
//val df2 = spark.createDataFrameto(rows2).toDF("Id","Nombre","Apellido","url","Numero","Fecha","redes")

In [244]:
val rows = Seq(("Matei Zaharia", "CA"), ("Reynold Xin", "CA"))
val authorsDF = rows.toDF("Author", "State")
authorsDF.show()

+-------------+-----+
|       Author|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



rows: Seq[(String, String)] = List((Matei Zaharia,CA), (Reynold Xin,CA))
authorsDF: org.apache.spark.sql.DataFrame = [Author: string, State: string]


## DataFrame Operations

### Lectura de DataFrames (DataFrameReader)

In [245]:
// VAMOS A LEER UNOS POCOS DATOS PARA VER QuE ESTRucturA tienen
val sampleDF = spark.read
                    .option("samplingRatio",0.001)
                    .option("header",true)
                    .csv("sf-fire-calls.csv")

sampleDF: org.apache.spark.sql.DataFrame = [CallNumber: string, UnitID: string ... 26 more fields]


In [246]:
sampleDF.show(1)

+----------+------+--------------+--------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+---------------+--------------------+-------------+-----+
|CallNumber|UnitID|IncidentNumber|      CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|   Neighborhood|            Location|        RowID|Delay|
+----------+------+--------------+--------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+

In [247]:
sampleDF.columns

res152: Array[String] = Array(CallNumber, UnitID, IncidentNumber, CallType, CallDate, WatchDate, CallFinalDisposition, AvailableDtTm, Address, City, Zipcode, Battalion, StationArea, Box, OriginalPriority, Priority, FinalPriority, ALSUnit, CallTypeGroup, NumAlarms, UnitType, UnitSequenceInCallDispatch, FirePreventionDistrict, SupervisorDistrict, Neighborhood, Location, RowID, Delay)


### Creación del Schema para el dataFRame

In [248]:
val fire_schema = StructType(Array(
    StructField("CallNumber", IntegerType,true),
    StructField("UnitID", StringType, true),
    StructField("IncidentNumber", IntegerType, true),
    StructField("CallType", StringType, true),
    StructField("CallDate", StringType, true),
    StructField("WatchDate", StringType, true),
    StructField("CallFinalDisposition", StringType, true),
    StructField("AvailableDtTm", StringType, true),
    StructField("Address", StringType, true),
    StructField("City", StringType, true),
    StructField("Zipcode", IntegerType, true),
    StructField("Battalion", StringType, true),
    StructField("StationArea", IntegerType, true),
    StructField("Box", IntegerType, true),
    StructField("OriginalPriority", IntegerType, true),
    StructField("Priority", IntegerType, true),
    StructField("FinalPriority", IntegerType, true),
    StructField("ALSUnit", BooleanType, true),
    StructField("CallTypeGroup", StringType, true),
    StructField("NumAlarms", IntegerType, true),
    StructField("UnitType", StringType, true),
    StructField("UnitSequenceInCallDispatch", IntegerType, true),
    StructField("FirePreventionDistrict", IntegerType, true),
    StructField("SupervisorDistrict", IntegerType, true),
    StructField("Neighborhood", StringType, true),
    StructField("Location",StringType,true),
    StructField("RowID", StringType, true),
    StructField("Delay", FloatType, true)
    ))

fire_schema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,IntegerType,true), StructField(Box,IntegerType,true), StructField(OriginalPriority,IntegerType,true), StructField(Priority,IntegerType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,...


In [249]:
val fire_df = spark.read.schema(fire_schema)
                        .option("header",true)
                        .option("sep",",")
                        .csv("sf-fire-calls.csv")

fire_df: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [250]:
fire_df.show(2)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+-----+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+------

## Guardar un DataFrame formato Parquet

In [251]:
// PARA NO GUARDARLA OTRA VEZ SE HA COMENTADO
//fire_df.write.format("parquet").save("parquetEjemplo2forma2")

## Guardar DataFrame formato TABLA SQL

In [252]:
// PARA NO GUARDARLA AGAIN SE HA COMENTADO
//fire_df.write.format("parquet").saveAsTable("parquetTable2")

## Ejecutamos Transformaciones y Acciones del capitulo 2

In [253]:
fire_df.first()

res156: org.apache.spark.sql.Row = [20110016,T13,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:51:44 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,false,null,1,TRUCK,2,4,5,Pacific Heights,(37.7895840679362, -122.428071912459),020110016-T13,2.95]


In [254]:
fire_df.count()

res157: Long = 175296


In [255]:
val agreg = fire_df.groupBy("CallDate").count()

agreg: org.apache.spark.sql.DataFrame = [CallDate: string, count: bigint]


In [256]:
agreg.count()

res158: Long = 6783


## Proyecciones y filtros

In [257]:
val fewFireDF = fire_df.select("IncidentNumber","AvailableDtTm","CallType").where($"CallType" =!= "Vehicle Fire")
fewFireDF.show(5,truncate=false)

+--------------+----------------------+----------------+
|IncidentNumber|AvailableDtTm         |CallType        |
+--------------+----------------------+----------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire  |
|2003241       |01/11/2002 03:01:18 AM|Medical Incident|
|2003242       |01/11/2002 02:39:50 AM|Medical Incident|
|2003259       |01/11/2002 06:01:58 AM|Alarms          |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire  |
+--------------+----------------------+----------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


## Distinct en Spark SCALA

In [258]:
import org.apache.spark.sql.functions._

import org.apache.spark.sql.functions._


In [259]:
fire_df.select("CallType")
    .where($"CallType".isNotNull)
    .agg(countDistinct("CallType").as("DistintosTypos"))
    .show()

+--------------+
|DistintosTypos|
+--------------+
|            30|
+--------------+



In [260]:
fire_df.select("CallType")
    .where(col("CallType").isNotNull)
    .distinct()
    .show(10,false)

+-----------------------------+
|CallType                     |
+-----------------------------+
|Elevator / Escalator Rescue  |
|Marine Fire                  |
|Aircraft Emergency           |
|Administrative               |
|Alarms                       |
|Odor (Strange / Unknown)     |
|Citizen Assist / Service Call|
|HazMat                       |
|Watercraft in Distress       |
|Explosion                    |
+-----------------------------+
only showing top 10 rows



## Cambiar el nombre, añadir y eliminar columnas

In [261]:
val newdf = fire_df.withColumnRenamed("Delay","Tolay")

newdf: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [262]:
newdf.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: integer (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: integer (nullable = true)
 |-- Priority: integer (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: integer (nullable = tr

In [263]:
newdf.select("Tolay").where($"Tolay">5.0).show(10,false)

+---------+
|Tolay    |
+---------+
|5.35     |
|6.25     |
|5.2      |
|5.6      |
|7.25     |
|11.916667|
|5.116667 |
|8.633333 |
|95.28333 |
|5.45     |
+---------+
only showing top 10 rows



### Cambiar de String a Fecha

In [264]:
newdf.columns

res164: Array[String] = Array(CallNumber, UnitID, IncidentNumber, CallType, CallDate, WatchDate, CallFinalDisposition, AvailableDtTm, Address, City, Zipcode, Battalion, StationArea, Box, OriginalPriority, Priority, FinalPriority, ALSUnit, CallTypeGroup, NumAlarms, UnitType, UnitSequenceInCallDispatch, FirePreventionDistrict, SupervisorDistrict, Neighborhood, Location, RowID, Tolay)


In [265]:
newdf.select("CallDate","WatchDate","AvailableDtTm").show(5,truncate=false)

+----------+----------+----------------------+
|CallDate  |WatchDate |AvailableDtTm         |
+----------+----------+----------------------+
|01/11/2002|01/10/2002|01/11/2002 01:51:44 AM|
|01/11/2002|01/10/2002|01/11/2002 03:01:18 AM|
|01/11/2002|01/10/2002|01/11/2002 02:39:50 AM|
|01/11/2002|01/10/2002|01/11/2002 04:16:46 AM|
|01/11/2002|01/10/2002|01/11/2002 06:01:58 AM|
+----------+----------+----------------------+
only showing top 5 rows



In [266]:
val df3 = newdf.withColumn("FechaIncidente",to_timestamp($"CallDate","MM/dd/yyyy")).drop("CallDate")
               .withColumn("FechaVista",to_timestamp($"WatchDate","MM/dd/yyyy")).drop("WatchDate")
               .withColumn("Dateee",to_timestamp($"AvailableDtTm","MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

df3: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [267]:
df3.select("FechaIncidente","FechaVista","Dateee").show(2)

+-------------------+-------------------+-------------------+
|     FechaIncidente|         FechaVista|             Dateee|
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
+-------------------+-------------------+-------------------+
only showing top 2 rows



### Utilizar funciones Tiempo (día, mes o año)

In [268]:
import org.apache.spark.sql.functions._

import org.apache.spark.sql.functions._


In [269]:
df3.select(year($"FechaIncidente")).show(2);
df3.select(month($"FechaIncidente")).withColumnRenamed("month(FechaIncidente)","mes").show(2);
df3.select(dayofmonth($"FechaIncidente")).show(2);

+--------------------+
|year(FechaIncidente)|
+--------------------+
|                2002|
|                2002|
+--------------------+
only showing top 2 rows

+---+
|mes|
+---+
|  1|
|  1|
+---+
only showing top 2 rows

+--------------------------+
|dayofmonth(FechaIncidente)|
+--------------------------+
|                        11|
|                        11|
+--------------------------+
only showing top 2 rows



## Agregaciones y agrupaciones

In [270]:
df3.columns

res168: Array[String] = Array(CallNumber, UnitID, IncidentNumber, CallType, CallFinalDisposition, Address, City, Zipcode, Battalion, StationArea, Box, OriginalPriority, Priority, FinalPriority, ALSUnit, CallTypeGroup, NumAlarms, UnitType, UnitSequenceInCallDispatch, FirePreventionDistrict, SupervisorDistrict, Neighborhood, Location, RowID, Tolay, FechaIncidente, FechaVista, Dateee)


In [271]:
df3.select("CallType")
    .where($"CallType".isNotNull)
    .groupBy("CallType")
    .count()
    .orderBy($"count" desc)
    .show()

+--------------------+------+
|            CallType| count|
+--------------------+------+
|    Medical Incident|113794|
|      Structure Fire| 23319|
|              Alarms| 19406|
|   Traffic Collision|  7013|
|Citizen Assist / ...|  2524|
|               Other|  2166|
|        Outside Fire|  2094|
|        Vehicle Fire|   854|
|Gas Leak (Natural...|   764|
|        Water Rescue|   755|
|Odor (Strange / U...|   490|
|   Electrical Hazard|   482|
|Elevator / Escala...|   453|
|Smoke Investigati...|   391|
|          Fuel Spill|   193|
|              HazMat|   124|
|Industrial Accidents|    94|
|           Explosion|    89|
|Train / Rail Inci...|    57|
|  Aircraft Emergency|    36|
+--------------------+------+
only showing top 20 rows





In [272]:
df3.take(3)

res170: Array[org.apache.spark.sql.Row] = Array([20110016,T13,2003235,Structure Fire,Other,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,false,null,1,TRUCK,2,4,5,Pacific Heights,(37.7895840679362, -122.428071912459),020110016-T13,2.95,2002-01-11 00:00:00.0,2002-01-10 00:00:00.0,2002-01-11 01:51:44.0], [20110022,M17,2003241,Medical Incident,Other,0 Block of SILVERVIEW DR,SF,94124,B10,42,6495,3,3,3,true,null,1,MEDIC,1,10,10,Bayview Hunters Point,(37.7337623673897, -122.396113802632),020110022-M17,4.7,2002-01-11 00:00:00.0,2002-01-10 00:00:00.0,2002-01-11 03:01:18.0], [20110023,M41,2003242,Medical Incident,Other,MARKET ST/MCALLISTER ST,SF,94102,B03,1,1455,3,3,3,true,null,1,MEDIC,2,3,6,Tenderloin,(37.7811772186856, -122.411699931232),020110023-M41,2.4333334,2002-01-11 00:00:00.0,20...


In [273]:
// Vamos a importar las funciones de agregacion llamándolas como F para que no haya conflictos

In [274]:
import org.apache.spark.sql.{functions => F}

import org.apache.spark.sql.{functions=>F}


In [275]:
df3.select(F.sum($"NumAlarms").alias("Numero de alarmas"),
           F.avg($"Tolay").alias("Media de respuesta en mins"),
           F.min($"Tolay").alias("Respuesta + rapida en mins"),
           F.max($"Tolay").alias("Respuesta + lenta en mins"),
          ).show()

+-----------------+--------------------------+--------------------------+-------------------------+
|Numero de alarmas|Media de respuesta en mins|Respuesta + rapida en mins|Respuesta + lenta en mins|
+-----------------+--------------------------+--------------------------+-------------------------+
|           176170|         3.892364154521585|               0.016666668|                  1844.55|
+-----------------+--------------------------+--------------------------+-------------------------+



## Respuesta a las preguntas del libro

In [276]:
// usamos fire_df

In [277]:
fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: integer (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: integer (nullable = true)
 |-- Priority: integer (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: integer (nullable = tr

In [278]:
// Pasamos las fechas de string a formato fecha

In [279]:
val fire2 = fire_df.withColumn("CallDateF",to_timestamp($"CallDate","MM/dd/yyyy")).drop("CallDate")
               .withColumn("WatchDateF",to_timestamp($"WatchDate","MM/dd/yyyy")).drop("WatchDate")
               .withColumn("AvailableDtTmF",to_timestamp($"AvailableDtTm","MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fire2: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [280]:
val fire3 = fire2.withColumn("Anio",year($"CallDateF"))

fire3: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 27 more fields]


In [281]:
fire3.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: integer (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: integer (nullable = true)
 |-- Priority: integer (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: integer (nullable = true)
 |-- SupervisorDistrict: integer (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (null

In [282]:
fire3.select("CallType","Anio")
    .where($"Anio"===2008)
    .distinct()
    .show()

+--------------------+----+
|            CallType|Anio|
+--------------------+----+
|Odor (Strange / U...|2008|
|        Vehicle Fire|2008|
|        Water Rescue|2008|
|   Traffic Collision|2008|
|Gas Leak (Natural...|2008|
|           Explosion|2008|
|        Outside Fire|2008|
|          Fuel Spill|2008|
|               Other|2008|
|Train / Rail Inci...|2008|
|Extrication / Ent...|2008|
|   Electrical Hazard|2008|
|Industrial Accidents|2008|
|       Assist Police|2008|
|              HazMat|2008|
|Citizen Assist / ...|2008|
|      Structure Fire|2008|
|Elevator / Escala...|2008|
|    Medical Incident|2008|
|Watercraft in Dis...|2008|
+--------------------+----+
only showing top 20 rows



In [283]:
val fire4 = fire3.withColumn("Mes",month($"CallDateF"))

fire4: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 28 more fields]


In [284]:
fire4.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: integer (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: integer (nullable = true)
 |-- Priority: integer (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: integer (nullable = true)
 |-- SupervisorDistrict: integer (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (null

In [285]:
fire4.select("Mes","Anio")
    .where($"Anio"===2008)
    .groupBy("Mes")
    .count()
    .show(12)

+---+-----+
|Mes|count|
+---+-----+
| 12|  762|
|  9|  713|
|  8|  767|
|  7|  741|
| 10|  789|
| 11|  738|
|  1|  699|
|  3|  712|
|  5|  760|
|  4|  728|
|  2|  710|
|  6|  750|
+---+-----+



In [286]:
fire4.withColumn("week_day_full", date_format(col("CallDateF"), "EEEE"))
    .withColumn("week_of_month", date_format(col("CallDateF"), "E")).show(false)

+----------+------+--------------+----------------+--------------------+------------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+------------------------------+-------------------------------------+-------------+---------+-------------------+-------------------+-------------------+----+---+-------------+-------------+
|CallNumber|UnitID|IncidentNumber|CallType        |CallFinalDisposition|Address                       |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType      |UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood                  |Location                             |RowID        |Delay    |CallDateF          |WatchDateF         |AvailableDtTmF     |Anio|Mes|week_day_full|week_of_month|
+----------+------+-

In [287]:
fire4.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: integer (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: integer (nullable = true)
 |-- Priority: integer (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: integer (nullable = true)
 |-- SupervisorDistrict: integer (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (null

In [288]:
fire4.select("CallNumber","IncidentNumber")
    .describe().show()

+-------+--------------------+-----------------+
|summary|          CallNumber|   IncidentNumber|
+-------+--------------------+-----------------+
|  count|              175296|           175296|
|   mean|1.0023517638629518E8|9900785.635057274|
| stddev|5.3969092834126316E7|5407019.524900525|
|    min|             1030128|            30636|
|    max|           183104004|         18130302|
+-------+--------------------+-----------------+



In [289]:
import org.apache.spark.sql.functions._

import org.apache.spark.sql.functions._


## Typed and UnTyped

In [290]:
import org.apache.spark.sql.Row

import org.apache.spark.sql.Row


In [291]:
val row = Row(350,true,"fnrieofj3r",null)

row: org.apache.spark.sql.Row = [350,true,fnrieofj3r,null]


In [292]:
row.getInt(0)

res183: Int = 350


In [293]:
row.getBoolean(1)

res184: Boolean = true


In [294]:
row.getString(2)

res185: String = fnrieofj3r


In [295]:
row.get(3)

res186: Any = null


## Creando Datasets

In [296]:
case class DeviceIoTData (battery_level: Long, c02_level: Long, 
                          cca2: String, cca3: String, cn: String,
                          device_id: Long, device_name: String,
                          humidity: Long, ip: String, latitude: Double,
                          lcd: String, longitude: Double, scale:String,
                          temp: Long, timestamp: Long)

defined class DeviceIoTData


In [297]:
import spark.implicits._

import spark.implicits._


In [298]:
val ds = spark.read
              .json("iot_devices.json")
              .as[DeviceIoTData]

ds: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level: bigint, c02_level: bigint ... 13 more fields]


In [299]:
ds.show(5,false)

+-------------+---------+----+----+-------------+---------+---------------------+--------+-------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|cn           |device_id|device_name          |humidity|ip           |latitude|lcd   |longitude|scale  |temp|timestamp    |
+-------------+---------+----+----+-------------+---------+---------------------+--------+-------------+--------+------+---------+-------+----+-------------+
|8            |868      |US  |USA |United States|1        |meter-gauge-1xbYRYcj |51      |68.161.225.1 |38.0    |green |-97.0    |Celsius|34  |1458444054093|
|7            |1473     |NO  |NOR |Norway       |2        |sensor-pad-2n2Pea    |70      |213.161.254.1|62.47   |red   |6.15     |Celsius|11  |1458444054119|
|2            |1556     |IT  |ITA |Italy        |3        |device-mac-36TWSKiT  |44      |88.36.5.1    |42.83   |red   |12.83    |Celsius|19  |1458444054120|
|6            |1080     |US  |USA |United States|4  

In [300]:
import org.apache.spark.sql._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{asc,col,desc}
import org.apache.spark.sql.{functions => F}

import org.apache.spark.sql._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{asc, col, desc}
import org.apache.spark.sql.{functions=>F}


In [303]:
val ds2 = ds.filter(d=>{d.temp>30 && d.humidity>70})

ds2: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level: bigint, c02_level: bigint ... 13 more fields]


In [302]:
ds2.show(5,false)

org.apache.spark.SparkException:  Job aborted due to stage failure: Task 0 in stage 209.0 failed 1 times, most recent failure: Lost task 0.0 in stage 209.0 (TID 396) (5fcc63a3e7cf executor driver): java.lang.ClassCastException: class $iw cannot be cast to class $iw ($iw is in unnamed module of loader org.apache.spark.repl.ExecutorClassLoader @14192ec4; $iw is in unnamed module of loader scala.tools.nsc.interpreter.IMain$TranslatingClassLoader @1f2963a5)

### Otro ejemplo

In [97]:
case class DeviceTempByCountry(temp: Long, device_name: String, device_id: Long, cca3: String)

defined class DeviceTempByCountry


In [98]:
val dsTemp = ds.filter(d=>{d.temp>25})
               .map(d => (d.temp,d.device_name,d.device_id,d.cca3))
               .toDF("temp","device_name","device_id","cca3")
               .as[DeviceTempByCountry]

dsTemp: org.apache.spark.sql.Dataset[DeviceTempByCountry] = [temp: bigint, device_name: string ... 2 more fields]


In [99]:
dsTemp.show(2)

org.apache.spark.SparkException:  Job aborted due to stage failure: Task 0 in stage 66.0 failed 1 times, most recent failure: Lost task 0.0 in stage 66.0 (TID 126) (5fcc63a3e7cf executor driver): java.lang.ClassCastException: class $iw cannot be cast to class $iw ($iw is in unnamed module of loader org.apache.spark.repl.ExecutorClassLoader @3fb6911f; $iw is in unnamed module of loader scala.tools.nsc.interpreter.IMain$TranslatingClassLoader @341ad6ac)