In [0]:
%pyspark
import pyspark.sql.functions as f
path = '/datasets/sf-fire-calls.csv'
df_calls = spark.read\
            .option("header", 'true')\
            .option("dateFormat", "dd/MM/yyyy")\
            .option("timestampFormat", "dd/MM/yyyy HH:mm:ss a")\
            .option("inferSchema", "true")\
            .csv(path)
df_calls.printSchema()
z.show(df_calls.limit(20))

# what were all the different types of fire calls in 2018?

In [2]:
%pyspark
# .asc() по возрастанию
# здесь сделал без datetime
year = '2018'
fire_types = df_calls\
            .filter(df_calls["CallDate"].contains(year))\
            .groupBy("CallType")\
            .agg(f.count('*'))\
            .sort(f.col('count(1)').desc())\
            .collect()

print("Different types of fire calls in {}:".format(year))
print(' -- ' + ',\n -- '.join([str(fire_type[0]) for fire_type in fire_types]))

# what month within the year 2018 saw the highest number of fire calls?

In [4]:
%pyspark
df_calls.select(f.month("AvailableDtTm").alias('MONTH'))\
        .filter(f.year("AvailableDtTm") == 2018)\
        .groupBy('MONTH')\
        .agg(f.count('*').alias('count(Calls)'))\
        .sort(f.col('count(Calls)').desc())\
        .show()

In [5]:
%pyspark
year = 2018
df_calls.select(f.month("AvailableDtTm").alias('MONTH'))\
        .filter(f.year("AvailableDtTm") == year)\
        .groupBy('MONTH')\
        .agg(f.count('*').alias('count(Calls)'))\
        .sort(f.col('count(Calls)').desc())\
        .limit(1)\
        .show()

# which neighborhood in San Francisco generated the most fire calls in 2018?

In [7]:
%pyspark
city = 'FRANCISCO'
year = 2018
df_calls.select("Neighborhood")\
        .filter(f.year("AvailableDtTm") == year)\
        .filter(df_calls["Address"].contains(city))\
        .groupBy('Neighborhood')\
        .agg(f.count('*').alias('count(Calls)'))\
        .sort(f.col('count(Calls)').desc())\
        .show()

In [8]:
%pyspark
df_calls.select("Neighborhood")\
        .filter(f.year("AvailableDtTm") == year)\
        .filter(df_calls["Address"].contains(city))\
        .groupBy('Neighborhood')\
        .agg(f.count('*').alias('count(Calls)'))\
        .sort(f.col('count(Calls)').desc())\
        .limit(1)\
        .show()

# which neighborhoods had the worst response times to fire calls in 2018?

In [10]:
%pyspark
year = 2018
df_calls.select("Neighborhood", "Delay")\
        .filter(f.year("AvailableDtTm") == year)\
        .groupBy('Neighborhood')\
        .agg(f.mean('Delay').alias('mean(Delay)'))\
        .sort(f.col('mean(Delay)').desc())\
        .show()

In [11]:
%pyspark
df_calls.select("Neighborhood", "Delay")\
        .filter(f.year("AvailableDtTm") == year)\
        .groupBy('Neighborhood')\
        .agg(f.mean('Delay').alias('mean(Delay)'))\
        .sort(f.col('mean(Delay)').desc())\
        .limit(1)\
        .show()

# which week in the year in 2018 had the most fire calls?

In [13]:
%pyspark
year = 2018
df_calls.select("AvailableDtTm")\
        .filter(f.year("AvailableDtTm") == year)\
        .select(f.weekofyear("AvailableDtTm").alias('weekOfYear'))\
        .groupBy('weekOfYear')\
        .agg(f.count('*').alias('count(weekOfYear)'))\
        .sort(f.col('count(weekOfYear)').desc())\
        .show()

In [14]:
%pyspark
year = 2018
df_calls.select("AvailableDtTm")\
        .filter(f.year("AvailableDtTm") == year)\
        .select(f.weekofyear("AvailableDtTm").alias('weekOfYear'))\
        .groupBy('weekOfYear')\
        .agg(f.count('*').alias('count(weekOfYear)'))\
        .sort(f.col('count(weekOfYear)').desc())\
        .limit(1)\
        .show()
