In [13]:
import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.SparkSession


In [14]:
import org.apache.spark.sql.functions.{col, countDistinct, to_timestamp}

import org.apache.spark.sql.functions.{col, countDistinct, to_timestamp}


In [15]:
import org.apache.spark.sql.types.{BooleanType, FloatType, IntegerType, StringType, StructField, StructType}

import org.apache.spark.sql.types.{BooleanType, FloatType, IntegerType, StringType, StructField, StructType}


In [16]:
val sparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("curso")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6ce92ab4


In [19]:
val sampleDF = sparkSession
    .read
    .option("samplingRatio", 0.001)
    .option("header", true)
    .csv("sf-fire-calls.csv")

sampleDF: org.apache.spark.sql.DataFrame = [CallNumber: string, UnitID: string ... 26 more fields]


In [20]:
val fireSchema = StructType(Array(
    StructField("CallNumber", IntegerType, true),
    StructField("UnitID", StringType, true),
    StructField("IncidentNumber", IntegerType, true),
    StructField("CallType", StringType, true),
    StructField("CallDate", StringType, true),
    StructField("WatchDate", StringType, true),
    StructField("CallFinalDisposition", StringType, true),
    StructField("AvailableDtTm", StringType, true),
    StructField("Address", StringType, true),
    StructField("City", StringType, true),
    StructField("Zipcode", IntegerType, true),
    StructField("Battalion", StringType, true),
    StructField("StationArea", StringType, true),
    StructField("Box", StringType, true),
    StructField("OriginalPriority", StringType, true),
    StructField("Priority", StringType, true),
    StructField("FinalPriority", IntegerType, true),
    StructField("ALSUnit", BooleanType, true),
    StructField("CallTypeGroup", StringType, true),
    StructField("NumAlarms", IntegerType, true),
    StructField("UnitType", StringType, true),
    StructField("UnitSequenceInCallDispatch", IntegerType, true),
    StructField("FirePreventionDistrict", StringType, true),
    StructField("SupervisorDistrict", StringType, true),
    StructField("Neighborhood", StringType, true),
    StructField("Location", StringType, true),
    StructField("RowID", StringType, true),
    StructField("Delay", FloatType, true)
  ))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [22]:
val sfFireFile="sf-fire-calls.csv"
  val fireDF = sparkSession.read.schema(fireSchema)
    .option("header", "true")
    .csv(sfFireFile)

sfFireFile: String = sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [23]:
val fewFireDF = fireDF
    .select("IncidentNumber", "AvailableDtTm", "CallType")
    .where(col("CallType").notEqual("Medical Incident"))

fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


In [24]:
fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [25]:
//Devuelve el número de tipos distintos de llamadas mediante countDistinct()

  fireDF
    .select("CallType")
    .where(col("CallType").isNotNull)
    .agg(countDistinct("CallType") as "DistinctCallTypes")
    .show()

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



In [26]:
//Podemos enumerar los distintos tipos de llamadas en el conjunto de datos
  fireDF
    .select("CallType")
    .where(col("CallType").isNotNull)
    .distinct()
    .show(10, false)

+-----------------------------+
|CallType                     |
+-----------------------------+
|Elevator / Escalator Rescue  |
|Marine Fire                  |
|Aircraft Emergency           |
|Administrative               |
|Alarms                       |
|Odor (Strange / Unknown)     |
|Citizen Assist / Service Call|
|HazMat                       |
|Watercraft in Distress       |
|Explosion                    |
+-----------------------------+
only showing top 10 rows



In [27]:
val fireTsDF = fireDF
    .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
    .drop("CallDate")
    .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
    .drop("WatchDate")
    .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),
      "MM/dd/yyyy hh:mm:ss a"))
    .drop("AvailableDtTm")

fireTsDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [28]:
fireTsDF
    .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
    .show(5, false)

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [32]:
//Guardar en distintos formatos
  fireDF.write.format("csv")
    .mode("overwrite")
    .save("resources/salida/fire_calls_csv")

In [35]:
fireDF.write.format("json")
    .mode("overwrite")
    .save("resources/salida/fire_calls_json")

In [None]:
fireDF.write.format("avro")
    .mode("overwrite")
    .save("resources/salida/fire_calls_avro")