### データセットの取得
https://github.com/databricks/LearningSparkV2/blob/master/chapter3/data/sf-fire-calls.csv  
を同じフォルダに格納  

In [None]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, to_timestamp, year
from pyspark.sql.functions import sum as fsum, avg as favg, min as fmin, max as fmax

In [None]:
fire_schema = StructType([
                StructField('CallNumber', IntegerType(), True),
                StructField('UnitID', StringType(), True),
                StructField('IncidentNumber', IntegerType(), True),
                StructField('CallType', StringType(), True),
                StructField('CallDate', StringType(), True),
                StructField('WatchDate', StringType(), True),
                StructField('CallFinalDisposition', StringType(), True),
                StructField('AvailableDtTm', StringType(), True),
                StructField('Address', StringType(), True),
                StructField('City', StringType(), True),
                StructField('Zipcode', IntegerType(), True),
                StructField('Battalion', StringType(), True),
                StructField('StationArea', StringType(), True),
                StructField('Box', StringType(), True),
                StructField('OriginalPriority', StringType(), True),
                StructField('Priority', StringType(), True),
                StructField('FinalPriority', IntegerType(), True),
                StructField('ALSUnit', BooleanType(), True),
                StructField('CallTypeGroup', StringType(), True),
                StructField('NumAlarms', IntegerType(), True),
                StructField('UnitType', StringType(), True),
                StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                StructField('FirePreventionDistrict', StringType(), True),
                StructField('SupervisorDistrict', StringType(), True),
                StructField('Neighborhood', StringType(), True),
                StructField('Location', StringType(), True),
                StructField('RowID', StringType(), True),
                StructField('Delay', FloatType(), True)
                ])


In [None]:
sf_fire_file = 'sf-fire-calls.csv'

In [None]:
spark =(SparkSession.builder.appName('Fire').getOrCreate())

In [None]:
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

In [None]:
fire_df.collect()

In [None]:
parquet_path = 'fire_calls_parquet'
fire_df.write.format('parquet').save(parquet_path)

In [None]:
parquet_table = 'fire_calls_table'
fire_df.write.format('parquet').saveAsTable(parquet_table)

In [None]:
few_fire_df = (fire_df
    .select('IncidentNumber', 'AvailableDtTm', 'CallType')
    .where(col('CallType') != 'Medical Incident'))

In [None]:
few_fire_df.show(5, truncate=False)

In [None]:
(fire_df
    .select('CallType')
    .where(col('CallType').isNotNull())
    .agg(countDistinct('CallType').alias('DistinctCallTypes'))
    .show())

In [None]:
(fire_df
    .select('CallType')
    .where(col('CallType').isNotNull())
    .distinct()
    .show(10, False)
)

In [None]:
new_fire_df = fire_df.withColumnRenamed('Delay', 'ResponseDelayedinMins')
(new_fire_df
    .select('ResponseDelayedinMins')
    .where(col('ResponseDelayedinMins') > 5)
    .show(5, False)
)

In [None]:
fire_ts_df = (new_fire_df
    .withColumn('IncidentDate', to_timestamp(col('CallDate'), 'MM/dd/yyyy'))
    .drop('CallDate')
    .withColumn('OnWatchDate', to_timestamp(col('WatchDate'), 'MM/dd/yyyy'))
    .drop('WatchDate')
    .withColumn('AvailableDtTS', to_timestamp(col('AvailableDtTm'), 'MM/dd/yyyy hh:mm:ss a'))
    .drop('AvailableDtTm')
)

In [None]:
(fire_ts_df
    .select('IncidentDate', 'OnWatchDate', 'AvailableDtTS')
    .show(5, False)
)

In [None]:
(fire_ts_df
    .select(year('IncidentDate'))
    .distinct()
    .orderBy(year('IncidentDate'))
    .show()
)   

In [None]:
(fire_ts_df
    .select('CallType')
    .where(col('CallType').isNotNull())
    .groupBy('CallType')
    .count()
    .orderBy('count', ascending=False)
    .show(n=10, truncate=False)
)

In [None]:
(fire_ts_df
    .select(fsum('NumAlarms'), 
            favg('ResponseDelayedinMins'), 
            fmin('ResponseDelayedinMins'), 
            fmax('ResponseDelayedinMins')
    )
    .show()
)