####Requirement:
We have colleced fire calls data file sf-fire-calls.csv.
1. Read the data file
2. Load it into a table for analysis
3. Verify all 175296 records are loaded correctly 
4. The table is predefined as below


In [0]:
%sql

CREATE TABLE IF NOT EXISTS dev.spark_db.sf_fire_calls (
  CallNumber INT, UnitID STRING, IncidentNumber INT, CallType STRING, CallDate DATE,
  WatchDate DATE, CallFinalDisposition STRING, AvailableDtTm TIMESTAMP, Address STRING,
  City STRING, Zipcode STRING, Battalion STRING, StationArea STRING, Box STRING,
  OriginalPriority STRING, Priority STRING, FinalPriority STRING, ALSUnit BOOLEAN,
  CallTypeGroup STRING, NumAlarms INT, UnitType STRING, UnitSequenceInCallDispatch INT,
  FirePreventionDistrict STRING, SupervisorDistrict STRING, Neighborhood STRING,
  Location STRING, RowID STRING, Delay DOUBLE);

####Solution approach


1. Check the data file structure.
2. Read the data file and create a dataframe

In [0]:
raw_fire_df=(
    spark.read.format('csv')
        .option('header','true')
        .option('inferScehma','true')
        .load("/Volumes/dev/spark_db/datasets/spark_programming/data/sf-fire-calls.csv")
)

3. Count the records(count)
4. Check the dataframe for potential problems (display/show)
5. Verify dataframe schema with the target table (printSchema)
6. Transform the dataframe to match target table structure (withColumns, to_date, to_timestamp, cast)

In [0]:
raw_fire_df.count()
raw_fire_df

In [0]:
raw_fire_df.display()

In [0]:
raw_fire_df.printSchema()

In [0]:
from pyspark.sql.functions import to_timestamp,expr
fire_df=raw_fire_df.withColumns({
    "AvailableDtTm":to_timestamp("AvailableDtTm", "MM/dd/yyyy hh:mm:ss a"),
    "Zipcode":expr("cast(Zipcode as string)"),
    "FinalPriority":expr("cast(FinalPriority as string)")
})
