In [1]:
#r "nuget:Microsoft.Spark"

Installed package Microsoft.Spark version 0.12.1

In [2]:
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions; 
using Microsoft.Spark.Sql.Types;
using Microsoft.Spark;

In [None]:
//Since the data is large, lets set the worker local nodes, which will be basically threads, 
//to a higher value

#### Create a SparkConf

In [3]:
var conf = new SparkConf();
conf.SetAppName("ReaderWriter").SetMaster("local[7]");

[2020-09-06T17:04:46.9661832Z] [MININT-GLN1L3K] [Info] [ConfigurationService] 'DOTNETBACKEND_PORT' environment variable is not set.
[2020-09-06T17:04:46.9767981Z] [MININT-GLN1L3K] [Info] [ConfigurationService] Using port 5567 for connection.
[2020-09-06T17:04:46.9779745Z] [MININT-GLN1L3K] [Info] [JvmBridge] JvMBridge port is 5567


#### Create a SparkSession based on the SparkConf

In [4]:
var spark = SparkSession
            .Builder()
            .Config(conf)
            .GetOrCreate();

##### Specify Schema 

In [5]:
//CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,
//AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,
//Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,
//FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay

var schema = new StructType(new [] {
    new StructField("CallNumber", new IntegerType(), false),
    new StructField("UnitId", new StringType(), false),
    new StructField("IncidentNumber", new IntegerType(), false),
    new StructField("CallType", new StringType(), false),
    new StructField("CallDate", new StringType(), false),
    new StructField("WatchDate", new StringType(), false),
    new StructField("CallFinalDisposition", new StringType(), false),
    new StructField("AvailableDtTm", new StringType(), false),
    new StructField("Address", new StringType(), false),
    new StructField("City", new StringType(), false),
    new StructField("ZipCode", new StringType(), false),
    new StructField("Battalion", new StringType(), false),
    new StructField("StationArea", new StringType(), false),
    new StructField("Box", new StringType(), false),
    new StructField("OriginalPriority", new StringType(), false),
    new StructField("Priority", new StringType(), false),
    new StructField("FinalPriority", new StringType(), false),
    new StructField("ALSUnit", new BooleanType(), false),
    new StructField("CallTypeGroup", new StringType(), false),
    new StructField("NumAlarms", new IntegerType(), false),
    new StructField("UnitType", new StringType(), false),
    new StructField("UnitSequenceInCallDispatch", new IntegerType(), false),
    new StructField("FirePreventionDistrict", new StringType(), false),
    new StructField("SupervisorDistrict", new StringType(), false),
    new StructField("Neighborhood", new StringType(), false),
    new StructField("Location", new StringType(), false),
    new StructField("RowID", new StringType(), false),
    new StructField("Delay", new FloatType(), false)  
    }
);

In [6]:
var filePath = @"c:\users\bindeshv\databig\sf-fire-calls.csv";

var fireDataDf = spark.Read().Option("header",true).Schema(schema).Csv(filePath);

fireDataDf.Show();



+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitId|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|ZipCode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

##### Write the dataframe as parque

In [11]:
var saveFilePath = @"c:\users\bindeshv\databig\fire-data.parquet";
fireDataDf.Write().Format("parquet").Save(saveFilePath);

#### DataTransformations

In [8]:
var fewFireDf = fireDataDf.Select("IncidentNumber", "AvailableDtTm", "CallType")
                .Where(Col("CallType") != "Medical Incident");

fewFireDf.Show(5,0, false);

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [10]:
fewFireDf.Explain(true);

== Parsed Logical Plan ==
'Filter NOT ('CallType = Medical Incident)
+- Project [IncidentNumber#2, AvailableDtTm#7, CallType#3]
   +- Relation[CallNumber#0,UnitId#1,IncidentNumber#2,CallType#3,CallDate#4,WatchDate#5,CallFinalDisposition#6,AvailableDtTm#7,Address#8,City#9,ZipCode#10,Battalion#11,StationArea#12,Box#13,OriginalPriority#14,Priority#15,FinalPriority#16,ALSUnit#17,CallTypeGroup#18,NumAlarms#19,UnitType#20,UnitSequenceInCallDispatch#21,FirePreventionDistrict#22,SupervisorDistrict#23,... 4 more fields] csv

== Analyzed Logical Plan ==
IncidentNumber: int, AvailableDtTm: string, CallType: string
Filter NOT (CallType#3 = Medical Incident)
+- Project [IncidentNumber#2, AvailableDtTm#7, CallType#3]
   +- Relation[CallNumber#0,UnitId#1,IncidentNumber#2,CallType#3,CallDate#4,WatchDate#5,CallFinalDisposition#6,AvailableDtTm#7,Address#8,City#9,ZipCode#10,Battalion#11,StationArea#12,Box#13,OriginalPriority#14,Priority#15,FinalPriority#16,ALSUnit#17,CallTypeGroup#18,NumAlarms#19,UnitTyp

#### Count Distinct 'CallTypes'

In [11]:
var distinctCallTypes = fewFireDf
                        .Select("CallType")
                        .Where(Col("CallType").IsNotNull())
                        .Agg(CountDistinct("CallType").Alias("DistinctCallTypes"));

distinctCallTypes.Show();

+-----------------+
|DistinctCallTypes|
+-----------------+
|               29|
+-----------------+



In [12]:
fewFireDf
.Select("CallType")
.Where(Col("CallType").IsNotNull())
.Distinct()
.Show(10,0,false);

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



>By specifying the desired column names in the schema with StructField, as we did,
we effectively changed all names in the resulting DataFrame.
Alternatively, you could selectively rename columns with the withColumnRenamed()
method. 

In [13]:
var newFireDf = fireDataDf.WithColumnRenamed("Delay", "ResponseDelayedInMins");

newFireDf.Select("ResponseDelayedInMins")
    .Where(Col("ResponseDelayedInMins") > 5)
    .Show(5,0,false);

+---------------------+
|ResponseDelayedInMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



#### Convert String to DateType

In [15]:
var fireDfTs = fireDataDf.WithColumn("IncidentDate", ToTimestamp(Col("CallDate"),"MM/dd/yyyy"))
            .Drop("CallDate")
            .WithColumn("OnWatchDate", ToTimestamp(Col("WatchDate"),"MM/dd/yyyy"))
            .Drop("WatchDate")
            .WithColumn("AvailableDtTS", ToTimestamp(Col("AvailableDtTm"),"MM/dd/yyyy hh:mm:sss"))
            .Drop("AvailableDtTm");

fireDfTs.Select("IncidentDate", "OnWatchDate", "AvailableDtTS")
        .Show(5,0,false);

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [18]:
fireDfTs.Select(Year(Col("IncidentDate")))
        .Distinct()
        .OrderBy(Year(Col("IncidentDate")))
        .Show();

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



#### Total Incidents Per Year

In [23]:
fireDfTs.WithColumn("YearOfIncident", Year(Col("IncidentDate")))
   .GroupBy(Col("YearOfIncident"))
    .Agg(Count(Col("YearOfIncident")).Alias("TotalIncidents"))
    .OrderBy(Desc("YearOfIncident"))
    .Show();
        

+--------------+--------------+
|YearOfIncident|TotalIncidents|
+--------------+--------------+
|          2018|         10136|
|          2017|         12135|
|          2016|         11609|
|          2015|         11458|
|          2014|         10775|
|          2013|         10020|
|          2012|          9674|
|          2011|          9735|
|          2010|          9341|
|          2009|          8789|
|          2008|          8869|
|          2007|          8255|
|          2006|          8174|
|          2005|          8282|
|          2004|          8283|
|          2003|          8499|
|          2002|          8090|
|          2001|          7713|
|          2000|          5459|
+--------------+--------------+



#### What were the most common types of fire calls?

In [27]:
fireDataDf.Select("CallType")
    .Where(Col("CallType").IsNotNull())
    .GroupBy("CallType")
    .Count()
    .OrderBy(Desc("count"))
    .Show(10,0,false);

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows

