In [1]:
import org.apache.spark.sql.types.{BooleanType, FloatType, IntegerType, StringType, StructField, StructType}
import spark.implicits._

val schema = StructType(List(StructField("CallNumber", IntegerType, true)
  ,StructField("UnitID", StringType, true)
  ,StructField("IncidentNumber", IntegerType, true)
  ,StructField("CallType", StringType, true)
  ,StructField("CallDate", StringType, true)
  ,StructField("WatchDate", StringType, true)
  ,StructField("CallFinalDisposition", StringType, true)
  ,StructField("AvailableDtTm", StringType, true)
  ,StructField("Address", StringType, true)
  ,StructField("City", StringType, true)
  ,StructField("Zipcode", IntegerType, true)
  ,StructField("Battalion", StringType, true)
  ,StructField("StationArea", StringType, true)
  ,StructField("Box", StringType, true)
  ,StructField("OriginalPriority", StringType, true)
  ,StructField("Priority", StringType, true)
  ,StructField("FinalPriority", IntegerType, true)
  ,StructField("ALSUnit", BooleanType, true)
  ,StructField("CallTypeGroup", StringType, true)
  ,StructField("NumAlarms", IntegerType, true)
  ,StructField("UnitType", StringType, true)
  ,StructField("UnitSequenceInCallDispatch", IntegerType, true)
  ,StructField("FirePreventionDistrict", StringType, true)
  ,StructField("SupervisorDistrict", StringType, true)
  ,StructField("Neighborhood", StringType, true)
  ,StructField("Location", StringType, true)
  ,StructField("RowID", StringType, true)
  ,StructField("Delay", FloatType, true)))

val fireCallsDf = spark.read.schema(schema).csv("../../resources/chapter3/sf-fire-calls.csv")

Intitializing Scala interpreter ...

Spark Web UI available at http://725a4c8e5c14:4040
SparkContext available as 'sc' (version = 3.2.1, master = local[*], app id = local-1648483399783)
SparkSession available as 'spark'


import org.apache.spark.sql.types.{BooleanType, FloatType, IntegerType, StringType, StructField, StructType}
import spark.implicits._
schema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringTyp...


In [2]:
//Most common type of fire calls
val commonFireCallsDf = fireCallsDf.select($"CallType")
  .where($"CallType".isNotNull)
  .groupBy($"CallType")
  .count()
  .sort(desc("count"))
  .withColumnRenamed("count","NumberOfCalls")

commonFireCallsDf.show(false)

+-------------------------------+-------------+
|CallType                       |NumberOfCalls|
+-------------------------------+-------------+
|Medical Incident               |113794       |
|Structure Fire                 |23319        |
|Alarms                         |19406        |
|Traffic Collision              |7013         |
|Citizen Assist / Service Call  |2524         |
|Other                          |2166         |
|Outside Fire                   |2094         |
|Vehicle Fire                   |854          |
|Gas Leak (Natural and LP Gases)|764          |
|Water Rescue                   |755          |
|Odor (Strange / Unknown)       |490          |
|Electrical Hazard              |482          |
|Elevator / Escalator Rescue    |453          |
|Smoke Investigation (Outside)  |391          |
|Fuel Spill                     |193          |
|HazMat                         |124          |
|Industrial Accidents           |94           |
|Explosion                      |89     

commonFireCallsDf: org.apache.spark.sql.DataFrame = [CallType: string, NumberOfCalls: bigint]


In [3]:
//Convert String to date format and filter by year
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
val dateFormatDf = fireCallsDf.select($"CallDate",$"WatchDate",$"AvailableDtTm")
  .withColumn("CallDate",to_timestamp($"CallDate","MM/dd/yy"))
  .withColumn("WatchDate",to_timestamp($"WatchDate","MM/dd/yy"))
  .withColumn("AvailableDtTm",to_timestamp($"AvailableDtTm","MM/dd/yy hh:mm:ss a"))
  .filter(year($"CallDate").between(2005,2010))
  .orderBy(desc("CallDate"))

dateFormatDf.show(false)

+-------------------+-------------------+-------------------+
|CallDate           |WatchDate          |AvailableDtTm      |
+-------------------+-------------------+-------------------+
|2010-12-31 00:00:00|2010-12-31 00:00:00|2010-12-31 10:42:35|
|2010-12-31 00:00:00|2010-12-31 00:00:00|2010-12-31 12:03:58|
|2010-12-31 00:00:00|2010-12-31 00:00:00|2010-12-31 15:00:26|
|2010-12-31 00:00:00|2010-12-31 00:00:00|2010-12-31 13:19:54|
|2010-12-31 00:00:00|2010-12-31 00:00:00|2010-12-31 14:09:05|
|2010-12-31 00:00:00|2010-12-31 00:00:00|2010-12-31 14:30:57|
|2010-12-31 00:00:00|2010-12-30 00:00:00|2010-12-31 00:42:41|
|2010-12-31 00:00:00|2010-12-30 00:00:00|2010-12-31 03:40:30|
|2010-12-31 00:00:00|2010-12-31 00:00:00|2010-12-31 13:18:39|
|2010-12-31 00:00:00|2010-12-31 00:00:00|2010-12-31 13:04:53|
|2010-12-31 00:00:00|2010-12-30 00:00:00|null               |
|2010-12-31 00:00:00|2010-12-31 00:00:00|2010-12-31 13:23:55|
|2010-12-31 00:00:00|2010-12-31 00:00:00|2010-12-31 11:16:28|
|2010-12

dateFormatDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallDate: timestamp, WatchDate: timestamp ... 1 more field]


In [4]:
fireCallsDf.coalesce(1)
        .write
        .mode("overwrite")
        .option("header", "true")
        .csv("../../output/scala/chapter3/fireCalls/")

dateFormatDf.coalesce(1)
        .write
        .mode("overwrite")
        .json("../../output/scala/chapter3/fireCallsDates/")

commonFireCallsDf.coalesce(1)
        .write
        .mode("overwrite")
        .option("sep","|")
        .parquet("../../output/scala/chapter3/commonCallsDates/")

/*spark.conf.set("spark.jars.packages","com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.10:1.5.0")
fireCallsDf.coalesce(1)
        .write
        .mode("overwrite")
        .format("avro")
        .save("../../output/scala/chapter3/fireCallsAVRO/")*/