## Load a DataFrame 
First, we **load a Dataframe from a data source**. So, we use a interface of Spark, **DataFrameReader**. It's enable to read data into a DataFrame from myriad data sources in formats such as JSON, CSV, Parquet, Text...

In particular, we want to have a distributed DataFrame composed of San Francisco Fire Department calls in memory. Then, we examine specific aspects of our SF Fire Departament.

In [0]:
## 1º) Create our schema
#Import library
from pyspark.sql.types import *

#Define our schema using DDL
fire_schema = "CallNumber INT, UnitID STRING, IncidentNumber Int, CallType STRING, CallDate STRING, WatchDate STRING, CallFinalDisposition STRING, AvailableDtTm STRING, Address STRING, City STRING, ZipCode INT, Battalion STRING, StationArea STRING, Box STRING, OriginalPriority STRING, Priority STRING, FinalPriority INT, ALSUnit BOOLEAN, CallTypeGroup STRING, NumAlarms INT, UnitType STRING, UnitSequenceInCallDispatch INT, FirePreventionDistrict STRING, SupervisorDistrict STRING, Neighborhood STRING, Location STRING, RowID STRING, Delay FLOAT "

## 2º) Load a dataframe from a csv file
# Define the location of the public dataset on the S3 bucket
sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"

# Use the DataFrameReader interface to load a file
fire_df = spark.read.csv (sf_fire_file, header=True, schema=fire_schema)

The **spark.read.csv()** function reads in the CSV file and returns a DataFrame of rows and named columns with the types dictated in the schema.

In [0]:
## Inspect the data
display(fire_df.limit(5))

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,ZipCode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay
20110014,M29,2003234,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 01:58:43 AM,10TH ST/MARKET ST,SF,94103,B02,36,2338,1,1,2,True,,1,MEDIC,1,2,6,Tenderloin,"(37.7765408927183, -122.417501464907)",020110014-M29,5.233333
20110015,M08,2003233,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 02:10:17 AM,300 Block of 5TH ST,SF,94107,B03,8,2243,1,1,2,True,,1,MEDIC,1,3,6,South of Market,"(37.7792841462441, -122.402061300134)",020110015-M08,3.0833333
20110016,B02,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:47:00 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,CHIEF,6,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-B02,3.05
20110016,B04,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:51:54 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,CHIEF,3,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-B04,2.3166666
20110016,D2,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:47:00 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,CHIEF,4,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-D2,3.0166667


## Write a DataFrame into a external data soucer

To write the DataFrame into an external data source in your format of choice, you can use the **DataFrameWriter** interface. Like DataFrameReader, it supports multiple
data sources.

In [0]:
## To Save a DataFrame as a Parquet File
parquet_path = "dbfs:/FileStore/shared_uploads/maria.puche@bosonit.com/EJEMPLO5"
fire_df.write.format("parquet").save(parquet_path)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-11955300141319>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m## To Save a DataFrame as a Parquet File[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0mparquet_path[0m [0;34m=[0m [0;34m"dbfs:/FileStore/shared_uploads/maria.puche@bosonit.com/EJEMPLO5"[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 3[0;31m [0mfire_df[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"parquet"[0m[0;34m)[0m[0;34m.[0m[0msave[0m[0;34m([0m[0mparquet_path[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36msave[0;34m(self, path, format, mode, partitionBy, **options)[0m
[1;32m    738[0m             [0mself[0m[0;34m.[0m[0m_jwrite[0m[0;34m.[0m[0msave[0m[0;34m([0m[0;34m)[0m[0;34m

In [0]:
## To save a DataFrame as a SQL Table.
parquet_table = "tabla5"
fire_df.write.format("parquet").saveAsTable(parquet_table)


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-11955300141322>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m## To save a DataFrame as a SQL Table.[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0mparquet_table[0m [0;34m=[0m [0;34m"tabla5"[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 3[0;31m [0mfire_df[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"parquet"[0m[0;34m)[0m[0;34m.[0m[0msaveAsTable[0m[0;34m([0m[0mparquet_table[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36msaveAsTable[0;34m(self, name, format, mode, partitionBy, **options)[0m
[1;32m    804[0m         [0;32mif[0m [0mformat[0m [0;32mis[0m [0;32mnot[0m [0;32mNone[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    805[0m            

## Common operations to perform on DataFrames

In Spark, projections are done with the **select()** method, while filters can be expressed using the **filter()** or **where()** method. We can use this technique to examine specific aspects of our SF Fire Department data set:

**1) Compute the numbers of row contained in our dataset**

In [0]:
fire_df.count()

Out[5]: 4380660

**2) Filter out "Medical Incident" call types and select "IncidentNumber", "AvailableDtTm" aad "CallType"**

In [0]:
few_fire_df = (fire_df
.select("IncidentNumber", "AvailableDtTm", "CallType")
.where(fire_df["CallType"] != "Medical Incident"))

few_fire_df.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:47:00 AM|Structure Fire|
|2003235       |01/11/2002 01:51:54 AM|Structure Fire|
|2003235       |01/11/2002 01:47:00 AM|Structure Fire|
|2003235       |01/11/2002 01:47:00 AM|Structure Fire|
|2003235       |01/11/2002 01:51:17 AM|Structure Fire|
+--------------+----------------------+--------------+
only showing top 5 rows



**2) How many distinct types of calls were made to the Fire Department?**

There are 32 diferent types of calls.

In [0]:
from pyspark.sql.functions import *

(fire_df
    .select("CallType")
    .where(col("CallType").isNotNull())
    .distinct()
    .count())

Out[7]: 32

**3) What are distinct types of calls were made to the Fire Department?**

These are all the distinct type of call to the San Francisco Fire Department.

In [0]:
(fire_df
  .select("CallType")
  .where(col("CallType").isNotNull())
  .distinct()
  .show(32,truncate=False))

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

**4) Find out all response or delayed times greater than 5 mins**

In [0]:
# Rename the column Delay and retunr a new DataFrame(new_fire_df)
new_fire_df = fire_df.withColumnRenamed("Delay","ResponseDeLayedinMins")

# Find out all calls where the response time to the fire site was delayed for more tan 5 mins.
(new_fire_df
    .select("ResponseDeLayedinMins")
    .where(new_fire_df["ResponseDeLayedinMins"] > 5)
    .show(5, False))

+---------------------+
|ResponseDeLayedinMins|
+---------------------+
|5.233333             |
|6.9333334            |
|6.116667             |
|7.85                 |
|77.333336            |
+---------------------+
only showing top 5 rows



**5) Transform the string dates to spark Timestamp dat type**

In [0]:
fire_ts_df =(new_fire_df
            .withColumn("IncidentDate", to_timestamp(new_fire_df["CallDate"], "MM/dd/yyyy"))
            .drop("CallDate")
            .withColumn("OnWatchDate", to_timestamp(new_fire_df["WatchDate"], "MM/dd/yyyy"))
            .drop("WatchDate")
            .withColumn("AvailableDtTS", to_timestamp(new_fire_df["AvailableDtTm"], "MM/dd/yyyy hh:mm:ss a"))
            .drop("AvailableDtTm")
            )


Now, we check the transformed columns with Spark Timestamp type

In [0]:
(fire_ts_df
     .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
     .show(5, False))

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:58:43|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:10:17|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:47:00|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:54|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:47:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



**6) How many distinct years of data is in the CSV file?**

In all, we have fire calls from years 2000 - 2018

In [0]:
(fire_ts_df
     .select(year('IncidentDate'))
     .distinct()
     .orderBy(year('IncidentDate'))
     .show())

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



**7) What were the most common types of fire calls??**

It appears that Medical Incidents is the most common type of fire calls.

In [0]:
(fire_ts_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))


+-------------------------------+-------+
|CallType                       |count  |
+-------------------------------+-------+
|Medical Incident               |2843475|
|Structure Fire                 |578998 |
|Alarms                         |483518 |
|Traffic Collision              |175507 |
|Citizen Assist / Service Call  |65360  |
|Other                          |56961  |
|Outside Fire                   |51603  |
|Vehicle Fire                   |20939  |
|Water Rescue                   |20037  |
|Gas Leak (Natural and LP Gases)|17284  |
+-------------------------------+-------+
only showing top 10 rows



**8) What zip codes accounted for most common calls?**

The most common calls were all related to Medical Incident, and the two zip codes are 94102 and 94103

In [0]:
(fire_ts_df
 .select("CallType", "ZipCode")
 .where(col("CallType").isNotNull())
 .groupBy("CallType", "Zipcode")
 .count()
 .orderBy("count", ascending=False)
 .show(10, truncate=False))

+----------------+-------+------+
|CallType        |Zipcode|count |
+----------------+-------+------+
|Medical Incident|94102  |401457|
|Medical Incident|94103  |370215|
|Medical Incident|94110  |249279|
|Medical Incident|94109  |238087|
|Medical Incident|94124  |147564|
|Medical Incident|94112  |139565|
|Medical Incident|94115  |120087|
|Medical Incident|94122  |107602|
|Medical Incident|94107  |107439|
|Medical Incident|94133  |99050 |
+----------------+-------+------+
only showing top 10 rows



**9) What San Francisco neighborhoods are in the zip codes 94102 and 94103?**

Following list them.

In [0]:
(fire_ts_df
    .select("Neighborhood", "Zipcode")
    .where((col("Zipcode") == 94102) | (col("Zipcode") == 94103))
    .distinct()
    .show(10, truncate=False))

+------------------------------+-------+
|Neighborhood                  |Zipcode|
+------------------------------+-------+
|Western Addition              |94102  |
|Tenderloin                    |94102  |
|Nob Hill                      |94102  |
|Castro/Upper Market           |94103  |
|South of Market               |94102  |
|South of Market               |94103  |
|Financial District/South Beach|94102  |
|Tenderloin                    |94103  |
|Financial District/South Beach|94103  |
|Hayes Valley                  |94102  |
+------------------------------+-------+
only showing top 10 rows



**10) What was the sum of all calls, average, minimum and maximum of the response times for calls?**

In [0]:
(fire_ts_df
    .select(sum("NumAlarms").alias("Suma"), avg("ResponseDelayedinMins").alias("Media"), min("ResponseDelayedinMins").alias("Mínimo"), max("ResponseDelayedinMins").alias("Máximo"))
    .show())

+-------+-----------------+-----------+---------+
|   Suma|            Media|     Mínimo|   Máximo|
+-------+-----------------+-----------+---------+
|4403441|3.902170335891614|0.016666668|1879.6167|
+-------+-----------------+-----------+---------+



**11) What were all the different types of fire calls in 2018?**

In [0]:
(fire_ts_df
  .filter(year("IncidentDate") == 2018)
  .select("CallType")
  .where(col("CallType").isNotNull())
  .distinct()
  .show(32,truncate=False))

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Explosion                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Police                               |
|Gas Leak (Natural and LP Gases)             |
|Water Rescue                                |
|Electrical H

**12) What months within the year 2018 saw the highest number of fire calls?**

January was the month with most calls and November was the month with less calls.

In [0]:
(fire_ts_df
  .filter(year("IncidentDate") == 2018)
  .groupBy(month("IncidentDate"))
  .count()
  .orderBy("count", ascending= False)
  .show())

+-------------------+-----+
|month(IncidentDate)|count|
+-------------------+-----+
|                  1|26148|
|                  3|25755|
|                 10|25606|
|                  5|25267|
|                  6|25228|
|                  7|25101|
|                  4|24659|
|                  8|24476|
|                  9|23760|
|                  2|23464|
|                 11| 5138|
+-------------------+-----+



**13) What week of the year in 2018 had the most fire calls?**

The weeks of the year in 2018 that had the most fire calls were the New Years' week (1) and 4th week oj July (25).

In [0]:
(fire_ts_df
     .filter(year("IncidentDate") == 2018)
     .groupBy(weekofyear("IncidentDate"))
     .count()
     .orderBy("count", ascending=False)
     .show())

+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|                       1| 6401|
|                      25| 6163|
|                      13| 6103|
|                      22| 6060|
|                      44| 6048|
|                      27| 6042|
|                      16| 6009|
|                      40| 6000|
|                      43| 5986|
|                       5| 5946|
|                       2| 5929|
|                      18| 5917|
|                       9| 5874|
|                       8| 5843|
|                       6| 5839|
|                      21| 5821|
|                      38| 5817|
|                      10| 5806|
|                      23| 5781|
|                      32| 5764|
+------------------------+-----+
only showing top 20 rows



**14) Which neighborhood in San Francisco generated the most fire calls in 2018?**

Tender Loin is the neighborhood in San Francisco generated the most fire calls in 2018.

In [0]:
(fire_ts_df
    .filter(year("IncidentDate") == 2018)
    .select("Neighborhood")
    .groupBy("Neighborhood")
    .count()
    .orderBy("count", ascending=False)
    .show(10, truncate=False))

+------------------------------+-----+
|Neighborhood                  |count|
+------------------------------+-----+
|Tenderloin                    |35557|
|South of Market               |26642|
|Mission                       |22376|
|Financial District/South Beach|19690|
|Bayview Hunters Point         |12897|
|Sunset/Parkside               |8967 |
|Western Addition              |8635 |
|Nob Hill                      |8028 |
|Castro/Upper Market           |6655 |
|Hayes Valley                  |6385 |
+------------------------------+-----+
only showing top 10 rows



**15) What neighborhoods in San Francisco had the worst response time in 2018?**

West of Twin Peaks is the neighborhoods in San Francisco had the worst response time in 2018. If you living in West of Twin Peaks, the Fire Dept arrived in 754.083 mins.

In [0]:
(fire_ts_df
    .select("Neighborhood", "ResponseDelayedinMins")
    .filter(year("IncidentDate") == 2018)
     .orderBy("ResponseDelayedinMins", ascending=False)
    .show(1, False))

+------------------+---------------------+
|Neighborhood      |ResponseDelayedinMins|
+------------------+---------------------+
|West of Twin Peaks|754.0833             |
+------------------+---------------------+
only showing top 1 row



**16) How can we use Parquet files to store data and read it back?**

In [0]:
fire_ts_df.write.format("parquet").mode("overwrite").save("/tmp/fireServiceParquet/")

In [0]:
file_parquet_df = spark.read.format("parquet").load("/tmp/fireServiceParquet/")

display(file_parquet_df.limit(10))

CallNumber,UnitID,IncidentNumber,CallType,CallFinalDisposition,Address,City,ZipCode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,ResponseDeLayedinMins,IncidentDate,OnWatchDate,AvailableDtTS
111050354,E14,11034920,Medical Incident,Other,500 Block of 21ST AVE,SF,94121,B07,14,7171,3,3,3,True,,1,ENGINE,1,7,1,Outer Richmond,"(37.7774255992901, -122.480311994328)",111050354-E14,4.7833333,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:27:08.000+0000
111050355,E03,11034921,Structure Fire,Other,HYDE ST/BUSH ST,SF,94109,B04,3,1561,3,3,3,True,,1,ENGINE,1,4,3,Nob Hill,"(37.7891101748937, -122.417016879226)",111050355-E03,1.9166666,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:10:54.000+0000
111050355,T03,11034921,Structure Fire,Other,HYDE ST/BUSH ST,SF,94109,B04,3,1561,3,3,3,False,,1,TRUCK,2,4,3,Nob Hill,"(37.7891101748937, -122.417016879226)",111050355-T03,2.4333334,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:10:54.000+0000
111050356,73,11034922,Structure Fire,Other,1000 Block of POTRERO AVE,SF,94110,B10,7,2553,3,3,3,True,,1,MEDIC,10,10,10,Potrero Hill,"(37.7565080013216, -122.40654101432)",111050356-73,2.0666666,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:24:56.000+0000
111050356,B06,11034922,Structure Fire,Other,1000 Block of POTRERO AVE,SF,94110,B10,7,2553,3,3,3,False,,1,CHIEF,6,10,10,Potrero Hill,"(37.7565080013216, -122.40654101432)",111050356-B06,2.6,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:22:46.000+0000
111050356,B10,11034922,Structure Fire,Other,1000 Block of POTRERO AVE,SF,94110,B10,7,2553,3,3,3,False,,1,CHIEF,4,10,10,Potrero Hill,"(37.7565080013216, -122.40654101432)",111050356-B10,3.25,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:25:00.000+0000
111050356,D3,11034922,Structure Fire,Other,1000 Block of POTRERO AVE,SF,94110,B10,7,2553,3,3,3,False,,1,CHIEF,7,10,10,Potrero Hill,"(37.7565080013216, -122.40654101432)",111050356-D3,3.5,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:23:01.000+0000
111050356,E29,11034922,Structure Fire,Other,1000 Block of POTRERO AVE,SF,94110,B10,7,2553,3,3,3,True,,1,ENGINE,8,10,10,Potrero Hill,"(37.7565080013216, -122.40654101432)",111050356-E29,2.6,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:22:50.000+0000
111050356,E37,11034922,Structure Fire,Other,1000 Block of POTRERO AVE,SF,94110,B10,7,2553,3,3,3,False,,1,ENGINE,2,10,10,Potrero Hill,"(37.7565080013216, -122.40654101432)",111050356-E37,2.6666667,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:25:10.000+0000
111050356,RS2,11034922,Structure Fire,Other,1000 Block of POTRERO AVE,SF,94110,B10,7,2553,3,3,3,False,,1,RESCUE SQUAD,5,10,10,Potrero Hill,"(37.7565080013216, -122.40654101432)",111050356-RS2,3.05,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:24:11.000+0000


**17) How can we use Parquet SQL table to store data and read it back?**

In [0]:
fire_ts_df.write.format("parquet").mode("overwrite").saveAsTable("FireServiceCalls1")

In [0]:
%sql
SELECT * FROM FireServiceCalls1 LIMIT 5

CallNumber,UnitID,IncidentNumber,CallType,CallFinalDisposition,Address,City,ZipCode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,ResponseDeLayedinMins,IncidentDate,OnWatchDate,AvailableDtTS
111050354,E14,11034920,Medical Incident,Other,500 Block of 21ST AVE,SF,94121,B07,14,7171,3,3,3,True,,1,ENGINE,1,7,1,Outer Richmond,"(37.7774255992901, -122.480311994328)",111050354-E14,4.7833333,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:27:08.000+0000
111050355,E03,11034921,Structure Fire,Other,HYDE ST/BUSH ST,SF,94109,B04,3,1561,3,3,3,True,,1,ENGINE,1,4,3,Nob Hill,"(37.7891101748937, -122.417016879226)",111050355-E03,1.9166666,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:10:54.000+0000
111050355,T03,11034921,Structure Fire,Other,HYDE ST/BUSH ST,SF,94109,B04,3,1561,3,3,3,False,,1,TRUCK,2,4,3,Nob Hill,"(37.7891101748937, -122.417016879226)",111050355-T03,2.4333334,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:10:54.000+0000
111050356,73,11034922,Structure Fire,Other,1000 Block of POTRERO AVE,SF,94110,B10,7,2553,3,3,3,True,,1,MEDIC,10,10,10,Potrero Hill,"(37.7565080013216, -122.40654101432)",111050356-73,2.0666666,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:24:56.000+0000
111050356,B06,11034922,Structure Fire,Other,1000 Block of POTRERO AVE,SF,94110,B10,7,2553,3,3,3,False,,1,CHIEF,6,10,10,Potrero Hill,"(37.7565080013216, -122.40654101432)",111050356-B06,2.6,2011-04-15T00:00:00.000+0000,2011-04-15T00:00:00.000+0000,2011-04-15T23:22:46.000+0000


**18) Write data to a diferents sources (JSON, CSV and AVRO)**

In [0]:
## JSON
location = "dbfs:/FileStore/shared_uploads/maria.puche@bosonit.com/EjemploJSON"

fire_ts_df.write.format("json").mode("overwrite").save(location)

In [0]:
## CSV
location1 = "dbfs:/FileStore/shared_uploads/maria.puche@bosonit.com/EjemploCSV"

fire_ts_df.write.format("csv").mode("overwrite").save(location1)

In [0]:
## AVRO
location2 = "dbfs:/FileStore/shared_uploads/maria.puche@bosonit.com/EjemploAVRO"

fire_ts_df.write.format("avro").mode("overwrite").save(location2)

**19) Read a data from diferents a data source**

In [0]:
csv = spark.read.format("csv").load(location1)

In [0]:
json = spark.read.format("json").load(location)

In [0]:
avro = spark.read.format("avro").load(location2)

In [0]:
parqueto = spark.read.format("parquet").load(parquet_path)