In [1]:
# In Python, define a schema
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder.appName("Authors").getOrCreate()

your 131072x1 screen size is bogus. expect trouble
23/04/16 19:37:24 WARN Utils: Your hostname, pc-branco resolves to a loopback address: 127.0.1.1; using 172.20.88.244 instead (on interface eth0)
23/04/16 19:37:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/16 19:37:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Programmatic way to define a schema
fire_schema = StructType(
    [
        StructField("CallNumber", IntegerType(), True),
        StructField("UnitID", StringType(), True),
        StructField("IncidentNumber", IntegerType(), True),
        StructField("CallType", StringType(), True),
        StructField("CallDate", StringType(), True),
        StructField("WatchDate", StringType(), True),
        StructField("CallFinalDisposition", StringType(), True),
        StructField("AvailableDtTm", StringType(), True),
        StructField("Address", StringType(), True),
        StructField("City", StringType(), True),
        StructField("Zipcode", IntegerType(), True),
        StructField("Battalion", StringType(), True),
        StructField("StationArea", StringType(), True),
        StructField("Box", StringType(), True),
        StructField("OriginalPriority", StringType(), True),
        StructField("Priority", StringType(), True),
        StructField("FinalPriority", IntegerType(), True),
        StructField("ALSUnit", BooleanType(), True),
        StructField("CallTypeGroup", StringType(), True),
        StructField("NumAlarms", IntegerType(), True),
        StructField("UnitType", StringType(), True),
        StructField("UnitSequenceInCallDispatch", IntegerType(), True),
        StructField("FirePreventionDistrict", StringType(), True),
        StructField("SupervisorDistrict", StringType(), True),
        StructField("Neighborhood", StringType(), True),
        StructField("Location", StringType(), True),
        StructField("RowID", StringType(), True),
        StructField("Delay", FloatType(), True),
    ]
)


In [4]:
# Use the DataFrameReader interface to read a CSV file
sf_fire_file = "/home/dgeison/projetos/LearningSparkV2/chapter3/data/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

fire_df.show(100)


23/04/16 19:37:30 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+------+--------------+--------------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+----------+
|CallNumber|UnitID|IncidentNumber|            CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|     Delay|
+----------+------+--------------+--------------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+------------

In [5]:
fire_df.printSchema()


root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [6]:
# In Python
few_fire_df = fire_df.select("IncidentNumber", "AvailableDtTm", "CallType").where(
    col("CallType") != "Medical Incident"
)
few_fire_df.show(5, truncate=False)


+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [7]:
# In Python, return number of distinct types of calls using countDistinct()
from pyspark.sql.functions import *
(fire_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .agg(countDistinct("CallType").alias("DistinctCallTypes"))
 .show())

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



In [8]:
# In Python, filter for only distinct non-null CallTypes from all the rows
(fire_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .distinct()
 .show(10, False))

+-----------------------------+
|CallType                     |
+-----------------------------+
|Elevator / Escalator Rescue  |
|Marine Fire                  |
|Aircraft Emergency           |
|Administrative               |
|Alarms                       |
|Odor (Strange / Unknown)     |
|Citizen Assist / Service Call|
|HazMat                       |
|Watercraft in Distress       |
|Explosion                    |
+-----------------------------+
only showing top 10 rows



In [9]:
# In Python
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(
    new_fire_df.select("ResponseDelayedinMins")
    .where(col("ResponseDelayedinMins") > 5)
    .show(5, False)
)

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



In [10]:
# In Python
fire_ts_df = (
    new_fire_df.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
    .drop("CallDate")
    .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
    .drop("WatchDate")
    .withColumn(
        "AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")
    )
    .drop("AvailableDtTm")
)

# Select the converted columns
(fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False))

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [11]:
fire_ts_df.show(5, False)

+----------+------+--------------+----------------+--------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+---------------------+-------------------------------------+-------------+---------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|CallType        |CallFinalDisposition|Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood         |Location                             |RowID        |ResponseDelayedinMins|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+----------+------+--------------+----------------+--------------------+---------------------------+----+-

In [12]:
# In Python
(fire_ts_df
 .select(year('IncidentDate'))
 .distinct()
 .orderBy(year('IncidentDate'))
 .show())

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



In [13]:
# In Python
(fire_ts_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [14]:
# In Python
import pyspark.sql.functions as F
(fire_ts_df
 .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),
         F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
 .show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



Quais foram todos os diferentes tipos de chamadas de incêndio em 2018?

Que meses dentro do ano de 2018 viu o maior número de chamadas de incêndio?

Qual bairro de São Francisco gerou mais chamadas de incêndio em 2018?

Quais bairros tiveram os piores tempos de resposta a chamadas de incêndio em 2018?

Qual semana do ano de 2018 teve mais chamadas de incêndio?

Existe uma correlação entre vizinhança, CEP e número de chamadas de incêndio?

Como podemos usar arquivos Parquet ou tabelas SQL para armazenar esses dados e lê-los de volta?

In [15]:
# In Python
from pyspark.sql import Row
row = Row(350, True, "Learning Spark 2E", None)

In [25]:
row[1]

True

In [24]:
row[2]

'Learning Spark 2E'

In [26]:
# In Python
count_mnm_df = (mnm_df
    .select("State", "Color", "Count")
    .groupBy("State", "Color")
    .agg(sum("Count")
    .alias("Total"))
    .orderBy("Total", ascending=False))

NameError: name 'mnm_df' is not defined