In [1]:
%fs ls /mnt/sf_open_data/fire_dept_calls_for_service/

In [2]:
#Iniciamos spark
spark

In [3]:
#Creamos un dataframe a partir del CSV
myDF = spark.read.csv('/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv', header=True, inferSchema=True)

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType

In [5]:
# Quitamos los espacios de los nombres para que no haya errrores

schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),       
                     StructField('WatchDate', StringType(), True),       
                     StructField('ReceivedDtTm', StringType(), True),       
                     StructField('EntryDtTm', StringType(), True),       
                     StructField('DispatchDtTm', StringType(), True),       
                     StructField('ResponseDtTm', StringType(), True),       
                     StructField('OnSceneDtTm', StringType(), True),       
                     StructField('TransportDtTm', StringType(), True),                  
                     StructField('HospitalDtTm', StringType(), True),       
                     StructField('CallFinalDisposition', StringType(), True),       
                     StructField('AvailableDtTm', StringType(), True),       
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('ZipcodeofIncident', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumberofAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('Unitsequenceincalldispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('NeighborhoodDistrict', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True)])

In [6]:
#Cambiamos el esquema del dataframe al que acabamos de hacer nosotros
myDF = spark.read.csv('/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv', header=True, schema=schema)

In [7]:
#Mostramos las primeras 5 columnas
display(myDF.limit(5))

In [8]:
#Vemos las columnas
myDF.columns

In [9]:
#Miramos el numero total de registros del dataframe
myDF.count()

In [10]:
# Vemos los tipos de llamadas de las primeras 5 columnas
myDF.select('CallType').show(5)

In [11]:
#Añadimos el distinct para mostrar solo los tipos de llamadas distintos
myDF.select('CallType').distinct().show(35, False)

In [12]:
#Vemos cuantos casos hay de cada tipo de llamada
display(myDF.select('CallType').groupBy('CallType').count().orderBy("count", ascending=False).limit(7))

In [13]:
#Mostramos el esquema
myDF.printSchema()

In [14]:
from pyspark.sql.functions import *

In [15]:
# Transformamos a los tipos de fecha de Java

from_pattern1 = 'MM/dd/yyyy'
to_pattern1 = 'yyyy-MM-dd'

from_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'
to_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'


myDF=myDF \
  .withColumn('CallDateTS', unix_timestamp(myDF['CallDate'], from_pattern1).cast("timestamp")) \
  .drop('CallDate') \
  .withColumn('WatchDateTS', unix_timestamp(myDF['WatchDate'], from_pattern1).cast("timestamp")) \
  .drop('WatchDate') \
  .withColumn('ReceivedDtTmTS', unix_timestamp(myDF['ReceivedDtTm'], from_pattern2).cast("timestamp")) \
  .drop('ReceivedDtTm') \
  .withColumn('EntryDtTmTS', unix_timestamp(myDF['EntryDtTm'], from_pattern2).cast("timestamp")) \
  .drop('EntryDtTm') \
  .withColumn('DispatchDtTmTS', unix_timestamp(myDF['DispatchDtTm'], from_pattern2).cast("timestamp")) \
  .drop('DispatchDtTm') \
  .withColumn('ResponseDtTmTS', unix_timestamp(myDF['ResponseDtTm'], from_pattern2).cast("timestamp")) \
  .drop('ResponseDtTm') \
  .withColumn('OnSceneDtTmTS', unix_timestamp(myDF['OnSceneDtTm'], from_pattern2).cast("timestamp")) \
  .drop('OnSceneDtTm') \
  .withColumn('TransportDtTmTS', unix_timestamp(myDF['TransportDtTm'], from_pattern2).cast("timestamp")) \
  .drop('TransportDtTm') \
  .withColumn('HospitalDtTmTS', unix_timestamp(myDF['HospitalDtTm'], from_pattern2).cast("timestamp")) \
  .drop('HospitalDtTm') \
  .withColumn('AvailableDtTmTS', unix_timestamp(myDF['AvailableDtTm'], from_pattern2).cast("timestamp")) \
  .drop('AvailableDtTm')  

In [16]:
#Mostramos el esquema de nuevo y comprobamos que las fechas son timestamp en vez de string
myDF.printSchema()

In [17]:
#Mostramos las 5 primeras filas
display(myDF.limit(5))

In [18]:
#Mostramos los años de los que tenemos datos 
myDF.select(year('CallDateTS')).distinct().orderBy('year(CallDateTS)').show()

In [19]:
# Mostramos los dias en los que más llamadas se han producido
myDF.filter(year('CallDateTS') == '2016').filter(dayofyear('CallDateTS') >= 0).groupBy(dayofyear('CallDateTS')).count().orderBy('count', ascending=False).show()

In [20]:
display(myDF.filter(year('CallDateTS') == '2016').filter(dayofyear('CallDateTS')<= 60).groupBy(dayofyear('CallDateTS')).count().orderBy('dayofyear(CallDateTS)'))

In [21]:
#Vamos a analizr el dia 37, 06-02-2016 
#Mostramos el tipo de llamas que se realizaron ordenadas de mayor a menor.
myDF.filter(year('CallDateTS') == '2016').filter(dayofyear('CallDateTS') == 37).groupBy('CallType').count().orderBy('count', ascending=False).show()

In [22]:
#Vemos de que tipo son las llamadas
myDF.filter(year('CallDateTS') == '2016').groupBy('CallType').count().orderBy('count', ascending=False).show()

In [23]:
#Mostramos en que distrito se realizaron más llamadas.

display(myDF.filter(year('CallDateTS') == '2016').filter(dayofyear('CallDateTS') == 37).groupBy('NeighborhoodDistrict').count().orderBy('count', ascending=False))

In [24]:
#Vemos los barrios con más llamadas durante 2016
myDF.filter(year('CallDateTS') == '2016').groupBy('NeighborhoodDistrict').count().orderBy('count', ascending=False).show(20,False)

In [25]:
#Mostramos un gráfico de la semana de antes y después del día 37
display(myDF.filter(year('CallDateTS') == '2016').filter(dayofyear('CallDateTS') >= 30 ).filter(dayofyear('CallDateTS') <= 44 ).groupBy(dayofyear('CallDateTS')).count().orderBy('dayofyear(CallDateTS)'))

In [26]:
#Vemos el numero de particiones del dataframe
myDF.rdd.getNumPartitions()

In [27]:
#Reparticionamos a un multiplo de 3 para optimizar
myDF.repartition(6).createOrReplaceTempView("fireServiceVIEW");

In [28]:
#Metemos la tabla a memoria para ejecutarlo mas rápidamente
spark.catalog.cacheTable("fireServiceVIEW")

In [29]:
# Materializamos el cache ya que hasta que no se llama a una transformación como count no se cambia
spark.table("fireServiceVIEW").count()

In [30]:
#Creamos un nuevo DataFrame con la vista que hemos creado
fireServiceDF = spark.table("fireServiceVIEW")

In [31]:
# Ejecutamos count para ver cuanto tarda ahora

fireServiceDF.count()

In [32]:
#Comprobamos que la tabla esta en memoria
spark.catalog.isCached("fireServiceVIEW")

In [33]:
%fs ls /tmp/

In [34]:
#Guardamos la tabla como archivo parquet
fireServiceDF.write.format('parquet').mode("overwrite").save('/tmp/fireServiceParquetFinal/')

In [35]:
#Creamos un Dataframe leyendo directamente de ese archivo parquet
tempDF = spark.read.parquet('/tmp/fireServiceParquetFinal/')

In [36]:
#Mostramos que el dataframe ha ingestado los datos correctamente
display(tempDF.limit(2))

In [37]:

%sql SELECT count(*) FROM fireServiceVIEW;
--También podemos hacer consultas mediante sql no solo mediante dataframes

In [38]:
%sql SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2016' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15;


In [39]:
%sql SELECT `CallTypeGroup`, count(`CallTypeGroup`) AS CallTypeGroup_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2015' GROUP BY `CallTypeGroup` ORDER BY CallTypeGroup_Count DESC;

In [40]:
%sql select count(*) from fireServiceVIEW where year(`CallDateTS`) == '2015'
--147470/297724=0.49 casi el 50% de las llamadas son con riesgo de muerte

In [41]:
#Vemos las particiones
spark.conf.get("spark.sql.shuffle.partitions")

In [42]:
#Cambiamos las particiones a 6
spark.conf.set("spark.sql.shuffle.partitions", 6)

In [43]:
#Comprobamos que efectivamente tenemos 6
spark.conf.get("spark.sql.shuffle.partitions")

In [44]:
%sql DESC fireServiceVIEW;

In [45]:
%sql SELECT distinct hour(`ReceivedDtTmTS`) as hora, count( hour(`ReceivedDtTmTS`)) as cuenta FROM fireServiceVIEW WHERE year(`CallDateTS`)==2016 AND dayofyear(`CallDateTS`)=37 group by hour(`ReceivedDtTmTS`) order by hora ;
--Vemos a que hora se registraron más llamadas el día 37

In [46]:
%sql SELECT distinct hour(`ReceivedDtTmTS`) as hora, count( hour(`ReceivedDtTmTS`)) as cuenta FROM fireServiceVIEW WHERE year(`CallDateTS`)==2016 group by hour(`ReceivedDtTmTS`) order by hora;
--Mostramos a que horas se suelen producir más llamadas durante 2016 para comparar los resultados

In [47]:
%sql SELECT distinct hour(`ReceivedDtTmTS`) as hora, count( hour(`ReceivedDtTmTS`)) as cuenta FROM fireServiceVIEW WHERE dayofyear(`CallDateTS`)=1 group by hour(`ReceivedDtTmTS`) order by hora;
--Vemos a que horas se producen más accidentes.

In [48]:
%sql SELECT NeighborhoodDistrict,count(`NeighborhoodDistrict`) FROM fireServiceVIEW WHERE dayofyear(`CallDateTS`)=1 AND CallType LIKE 'Traffic Collision' group by NeighborhoodDistrict order by count(`NeighborhoodDistrict`) desc ;
--Vemos en que barrios se producen más accidentes en año nuevo 

In [49]:
%sql SELECT distinct hour(`ReceivedDtTmTS`) as hora, count( hour(`ReceivedDtTmTS`)) as cuenta FROM fireServiceVIEW WHERE year(`CallDateTS`)==2016 AND dayofyear(`CallDateTS`)=37 group by hour(`ReceivedDtTmTS`) order by cuenta DESC;
--Vemos a que hora se registraron más llamadas el día 37

In [50]:
%sql SELECT distinct hour(`ReceivedDtTmTS`) as hora, count( hour(`ReceivedDtTmTS`)) as cuenta FROM fireServiceVIEW WHERE year(`CallDateTS`)==2016 group by hour(`ReceivedDtTmTS`) order by cuenta DESC;
--Mostramos a que horas se suelen producir más llamadas durante 2016 para comparar los resultados

In [51]:
#Vemos los barrios con mas llamadas
spark.sql("SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2016' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15").show()

In [52]:
#Creamos un nuevo data frame renombrando algunas columnas
incidentsDF = spark.read.csv('/mnt/sf_open_data/fire_incidents/Fire_Incidents.csv', header=True, inferSchema=True).withColumnRenamed('Incident Number', 'IncidentNumber').withColumnRenamed('Neighborhood  District', 'NeighborhoodDistrictIncident').withColumnRenamed('Primary Situation','PrimarySituationIncident').cache()


In [53]:
#Vemos que efectivamente se han renombrado
display(incidentsDF.limit(3))

In [54]:
#Mostramos el esquema del nuevo df
incidentsDF.printSchema()

In [55]:
# Materializamos el cache
incidentsDF.count()

In [56]:
joinedDF = myDF.join(incidentsDF, myDF.IncidentNumber == incidentsDF.IncidentNumber)

In [57]:
display(joinedDF.limit(3))

In [58]:
joinedDF.count()

In [59]:
joinedDF.filter(year('CallDateTS') == '2016').filter(col('NeighborhoodDistrict') == 'Tenderloin').count()

In [60]:
#Mostramos los tipos de llamada mas realizados en Tenderloin en 2016
display(joinedDF.filter(year('CallDateTS') == '2016').filter(col('NeighborhoodDistrict') == 'Tenderloin').groupBy('PrimarySituationIncident').count().orderBy(desc("count")).limit(100))

In [61]:
#Mostramos los barrios donde se producen más llamdas falsas
display(joinedDF.filter(year('CallDateTS') == '2016').filter(col('PrimarySituationIncident') == '700 false alarm or false call, other').groupBy('NeighborhoodDistrict').count().orderBy(desc("count")).limit(10))

In [62]:
display(joinedDF.select('CallType').filter(year('CallDateTS') == '2016').filter(col('PrimarySituationIncident') == '700 false alarm or false call, other').groupBy('CallType').count().orderBy(desc("count")))

In [63]:
#Mostramos de que tipo son las llamadas en Russian Hill limitado a 100
display(joinedDF.filter(year('CallDateTS') == '2016').filter(col('NeighborhoodDistrict') == 'Russian Hill').limit(100))

In [64]:
#Tipos de llamadas en el zip 94132.0 -por brotherhood, Ingleside Heights-
display(joinedDF.filter(year('CallDateTS') == '2016').filter(col('ZipcodeofIncident') == '94132.0').limit(100))

In [65]:
import pandas as pd

In [66]:
#Convertimos los datos del dataframe a pandas cuyo año sea 2016
pandasDF = joinedDF.filter(year('CallDateTS')=='2016').toPandas()

In [67]:
from pyspark.sql.types import *


In [68]:
import numpy as np


In [69]:
from sklearn.cross_validation import train_test_split


In [70]:
pandasDF.columns

In [71]:
joinedDF.columns

In [72]:
#Decidimos los features de nuestro modelo en este caso el tipo de llamada, el codigo postal, la prioridad original de la llamada, la prioridad final de la llamada y por último la zona
features = [3,7,11,13,21]



In [73]:
X = pandasDF.iloc[:,features]
#La Y a predecir será Call type Group
Y= pandasDF.iloc[:,15]


In [74]:
print(Y)

In [75]:
from sklearn import preprocessing
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression

In [76]:
X_values = X.values
Y_values = Y.values

In [77]:
le = preprocessing.LabelEncoder()

In [78]:
#Etiquetamos la Y
y_encoded =le.fit_transform(Y_values)

In [79]:
print(y_encoded)

In [80]:
print(X_values)

In [81]:
#Codificamos a numero los inputs de la X
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
le = preprocessing.LabelEncoder()
X_encoded=X_values
for i in range(len(X_values)):
  X_encoded[i]=le.fit_transform(X_values[i])


In [82]:
print(X_encoded)

In [83]:
#Dividimos todos los datos en train y test dejando un 20% para el test 
from sklearn.cross_validation import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X_encoded, y_encoded, test_size = .2)

In [84]:
from sklearn import tree
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

In [85]:
#Creamos el RandomForestClassifier
clf = RandomForestClassifier(n_jobs=4,random_state=0,n_estimators=100, verbose=0, max_features=5, max_depth=4)

In [86]:
#Entrenamos el modelo
clf.fit(X_train, y_train)

In [87]:
np.set_printoptions(threshold=np.nan)
np.set_printoptions(threshold='nan')

In [88]:
#Predecimos todos los datos de test
from sklearn.metrics import accuracy_score
preds = clf.predict(X_test)
#Imprimimos el porcentaje de acierto del modelo 
print (accuracy_score(y_test, preds))

In [89]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import roc_auc_score

In [90]:
#Predecimos una muestra solamente
#Ejemplo Alarma = 4,0,2,1,3 
#Ejemplo Fuego = 3,0,2,1,3
#Ejemplo No Riesgo Muerte = 0,2,3,1,4
#Ejemplo Riesgo Muerte = 0,2,3,1,2
pred1=clf.predict([0,2,3,1,2])
print(pred1)
predd="a"
if pred1==1:
  print(pred1)
  predd="Alarma"
elif pred1==2:
  predd="Incendio"
elif pred1==3:
  predd="No riesgo de muerte"
elif pred1==4:
  predd="Situación con riesgo potencial de muerte"
print(predd)