In [3]:
// In Scala
import org.apache.spark.sql.functions.avg
import org.apache.spark.sql.SparkSession
// Create a DataFrame using SparkSession
val spark = SparkSession
    .builder
    .appName("AuthorsAges")
    .getOrCreate()
// Create a DataFrame of names and ages
val dataDF = spark.createDataFrame(Seq(("Brooke", 20), ("Brooke", 25),
    ("Denny", 31), ("Jules", 30), ("TD", 35))).toDF("name", "age")
// Group the same names together, aggregate their ages, and compute an average
val avgDF = dataDF.groupBy("name").agg(avg("age"))
// Show the results of the final execution
avgDF.show

+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Jules|    30.0|
|    TD|    35.0|
| Denny|    31.0|
+------+--------+



import org.apache.spark.sql.functions.avg
import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@36492084
dataDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
avgDF: org.apache.spark.sql.DataFrame = [name: string, avg(age): double]


In [4]:
import org.apache.spark.sql.types._
val jsonFile = "./LearningSparkV2-master/chapter3/scala/data/blogs.json"

import org.apache.spark.sql.types._
jsonFile: String = ./LearningSparkV2-master/chapter3/scala/data/blogs.json


In [6]:
val schema = StructType(Array(StructField("Id", IntegerType, false),
    StructField("First", StringType, false),
    StructField("Last", StringType, false),
    StructField("Url", StringType, false),
    StructField("Published", StringType, false),
    StructField("Hits", IntegerType, false),
    StructField("Campaigns", ArrayType(StringType), false)))

schema: org.apache.spark.sql.types.StructType = StructType(StructField(Id,IntegerType,false), StructField(First,StringType,false), StructField(Last,StringType,false), StructField(Url,StringType,false), StructField(Published,StringType,false), StructField(Hits,IntegerType,false), StructField(Campaigns,ArrayType(StringType,true),false))


In [7]:
val blogsDF = spark.read.schema(schema).json(jsonFile)
blogsDF.show(false)

+---+---------+-------+-----------------+---------+-----+----------------------------+
|Id |First    |Last   |Url              |Published|Hits |Campaigns                   |
+---+---------+-------+-----------------+---------+-----+----------------------------+
|1  |Jules    |Damji  |https://tinyurl.1|1/4/2016 |4535 |[twitter, LinkedIn]         |
|2  |Brooke   |Wenig  |https://tinyurl.2|5/5/2018 |8908 |[twitter, LinkedIn]         |
|3  |Denny    |Lee    |https://tinyurl.3|6/7/2019 |7659 |[web, twitter, FB, LinkedIn]|
|4  |Tathagata|Das    |https://tinyurl.4|5/12/2018|10568|[twitter, FB]               |
|5  |Matei    |Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB, LinkedIn]|
|6  |Reynold  |Xin    |https://tinyurl.6|3/2/2015 |25568|[twitter, LinkedIn]         |
+---+---------+-------+-----------------+---------+-----+----------------------------+



blogsDF: org.apache.spark.sql.DataFrame = [Id: int, First: string ... 5 more fields]


In [8]:
println(blogsDF.printSchema)
println(blogsDF.schema)


root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaigns: array (nullable = true)
 |    |-- element: string (containsNull = true)

()
StructType(StructField(Id,IntegerType,true), StructField(First,StringType,true), StructField(Last,StringType,true), StructField(Url,StringType,true), StructField(Published,StringType,true), StructField(Hits,IntegerType,true), StructField(Campaigns,ArrayType(StringType,true),true))


In [9]:
import org.apache.spark.sql.functions._
blogsDF.columns

import org.apache.spark.sql.functions._
res5: Array[String] = Array(Id, First, Last, Url, Published, Hits, Campaigns)


In [11]:
blogsDF.col("Id")

res4: org.apache.spark.sql.Column = Id


In [12]:
blogsDF.select(expr("Hits * 2")).show(2)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows



In [13]:
blogsDF.withColumn("Big Hitters", (expr("Hits > 10000"))).show()


+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Big Hitters|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|      false|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|       true|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|       true|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|       true|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+



In [14]:
blogsDF
 .withColumn("AuthorsId", (concat(expr("First"), expr("Last"), expr("Id"))))
 .select(col("AuthorsId"))
 .show(4)


+-------------+
|    AuthorsId|
+-------------+
|  JulesDamji1|
| BrookeWenig2|
|    DennyLee3|
|TathagataDas4|
+-------------+
only showing top 4 rows



In [15]:
blogsDF.select("Hits").show(2)


+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows



In [10]:
// In Scala
import org.apache.spark.sql.Row
// Create a Row
val blogRow = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",
 Array("twitter", "LinkedIn"))
// Access using index for individual items
blogRow(1)


import org.apache.spark.sql.Row
blogRow: org.apache.spark.sql.Row = [6,Reynold,Xin,https://tinyurl.6,255568,3/2/2015,[Ljava.lang.String;@7bc7836]
res6: Any = Reynold


In [17]:
val rows = Seq(("Matei Zaharia", "CA"), ("Reynold Xin", "CA"))
val authorsDF = rows.toDF("Author", "State")
authorsDF.show()


+-------------+-----+
|       Author|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



rows: Seq[(String, String)] = List((Matei Zaharia,CA), (Reynold Xin,CA))
authorsDF: org.apache.spark.sql.DataFrame = [Author: string, State: string]


In [11]:



val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
    StructField("UnitID", StringType, true),
    StructField("IncidentNumber", IntegerType, true),
    StructField("CallType", StringType, true),
    StructField("CallDate", StringType, true), 
    StructField("WatchDate", StringType, true),
    StructField("CallFinalDisposition", StringType, true),
    StructField("AvailableDtTm", StringType, true),
    StructField("Address", StringType, true), 
    StructField("City", StringType, true), 
    StructField("Zipcode", IntegerType, true), 
    StructField("Battalion", StringType, true), 
    StructField("StationArea", StringType, true), 
    StructField("Box", StringType, true), 
    StructField("OriginalPriority", StringType, true), 
    StructField("Priority", StringType, true), 
    StructField("FinalPriority", IntegerType, true), 
    StructField("ALSUnit", BooleanType, true), 
    StructField("CallTypeGroup", StringType, true),
    StructField("NumAlarms", IntegerType, true),
    StructField("UnitType", StringType, true),
    StructField("UnitSequenceInCallDispatch", IntegerType, true),
    StructField("FirePreventionDistrict", StringType, true),
    StructField("SupervisorDistrict", StringType, true),
    StructField("Neighborhood", StringType, true),
    StructField("Location", StringType, true),
    StructField("RowID", StringType, true),
    StructField("Delay", FloatType, true)))



fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [12]:
val sfFireFile="./LearningSparkV2-master/chapter3/data/sf-fire-calls.csv"

sfFireFile: String = ./LearningSparkV2-master/chapter3/data/sf-fire-calls.csv


In [13]:
val fireDF = spark.read.schema(fireSchema)
 .option("header", "true")
 .csv(sfFireFile)

fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [21]:
val fewFireDF = fireDF
 .select("IncidentNumber", "AvailableDtTm", "CallType")
 .where($"CallType" =!= "Medical Incident") 
fewFireDF.show(5, false)


+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


In [22]:
fireDF
 .select("CallType")
 .where(col("CallType").isNotNull)
 .agg(countDistinct('CallType) as 'DistinctCallTypes)
 .show()


+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



In [27]:
fireDF
 .select("CallType")
 .where($"CallType".isNotNull)
 .distinct()
 .show(10, false)

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



In [28]:
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")
newFireDF
 .select("ResponseDelayedinMins")
 .where($"ResponseDelayedinMins" > 5)
 .show(5, false)


+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [29]:
val fireTsDF = newFireDF
 .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
 .drop("CallDate")
 .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
 .drop("WatchDate")
 .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),
 "MM/dd/yyyy hh:mm:ss a"))
 .drop("AvailableDtTm")

fireTsDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [30]:
fireTsDF
 .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
 .show(5, false)


+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [31]:
fireTsDF
 .select(year($"IncidentDate"))
 .distinct()
 .orderBy(year($"IncidentDate"))
 .show()


+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



In [32]:
fireTsDF
 .select("CallType")
 .where(col("CallType").isNotNull)
 .groupBy("CallType")
 .count()
 .orderBy(desc("count"))
 .show(10, false)


+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [33]:
fireTsDF.count()

res20: Long = 175296


In [34]:
import org.apache.spark.sql.{functions => F}
fireTsDF
 .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),
 F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
 .show()


+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



import org.apache.spark.sql.{functions=>F}


In [45]:
fireTsDF
 .select( month(col("IncidentDate")) as "Meses")
 .where(year(col("IncidentDate")) === 2018)
 .groupBy("Meses")
 .count()
 .orderBy(desc("count"))
 .show(13, false)

+-----+-----+
|Meses|count|
+-----+-----+
|10   |1068 |
|5    |1047 |
|3    |1029 |
|8    |1021 |
|1    |1007 |
|7    |974  |
|6    |974  |
|9    |951  |
|4    |947  |
|2    |919  |
|11   |199  |
+-----+-----+



In [47]:
fireTsDF.printSchema

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable =

In [63]:
fireTsDF
    .select("Zipcode")
    .where(col("Zipcode").isNotNull)
    .groupBy("Zipcode")
    .count()
    .orderBy(desc("count"))
    .show(1, false)

+-------+-----+
|Zipcode|count|
+-------+-----+
|94102  |21840|
+-------+-----+
only showing top 1 row



In [99]:
fireTsDF
    .select("Zipcode", "ResponseDelayedinMins")
    .where(col("Zipcode").isNotNull && year(col("IncidentDate")) === 2018)
    .groupBy("Zipcode")
    .avg("ResponseDelayedinMins")
    .orderBy(desc("avg(ResponseDelayedinMins)"))
    .show(50, false)

+-------+--------------------------+
|Zipcode|avg(ResponseDelayedinMins)|
+-------+--------------------------+
|94129  |5.981565614541371         |
|94130  |5.453703684111436         |
|94105  |5.363809512981347         |
|94133  |5.230131806626832         |
|94127  |4.818269235296891         |
|94124  |4.6581693111559535        |
|94111  |4.231922368050883         |
|94118  |4.0838112442830985        |
|94132  |3.98419654689937          |
|94107  |3.9793209974412567        |
|94109  |3.976974945812556         |
|94134  |3.9474026205993833        |
|94102  |3.9365134215903006        |
|94122  |3.8774509791065666        |
|94117  |3.8475649381032238        |
|94114  |3.7992349614373975        |
|94116  |3.7678315418381847        |
|94103  |3.7607596581404255        |
|94121  |3.6857414451842074        |
|94112  |3.6669192000320465        |
|94158  |3.650537653315452         |
|94131  |3.603311964525626         |
|94104  |3.598245608571329         |
|94115  |3.4985232011426852        |
|

In [100]:
import org.apache.spark.sql.Row
val row = Row(350, true, "Learning Spark 2E", null)

import org.apache.spark.sql.Row
row: org.apache.spark.sql.Row = [350,true,Learning Spark 2E,null]


In [101]:
row.getInt(0)

res86: Int = 350


In [102]:
case class DeviceIoTData (battery_level: Long, c02_level: Long,
cca2: String, cca3: String, cn: String, device_id: Long,
device_name: String, humidity: Long, ip: String, latitude: Double,
lcd: String, longitude: Double, scale:String, temp: Long,
timestamp: Long)


defined class DeviceIoTData


In [103]:
val ds = spark.read
.json("./LearningSparkV2-master/databricks-datasets/learning-spark-v2/iot-devices/iot_devices.json")
.as[DeviceIoTData]


ds: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level: bigint, c02_level: bigint ... 13 more fields]


In [104]:
ds.show(5, false)

+-------------+---------+----+----+-------------+---------+---------------------+--------+-------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|cn           |device_id|device_name          |humidity|ip           |latitude|lcd   |longitude|scale  |temp|timestamp    |
+-------------+---------+----+----+-------------+---------+---------------------+--------+-------------+--------+------+---------+-------+----+-------------+
|8            |868      |US  |USA |United States|1        |meter-gauge-1xbYRYcj |51      |68.161.225.1 |38.0    |green |-97.0    |Celsius|34  |1458444054093|
|7            |1473     |NO  |NOR |Norway       |2        |sensor-pad-2n2Pea    |70      |213.161.254.1|62.47   |red   |6.15     |Celsius|11  |1458444054119|
|2            |1556     |IT  |ITA |Italy        |3        |device-mac-36TWSKiT  |44      |88.36.5.1    |42.83   |red   |12.83    |Celsius|19  |1458444054120|
|6            |1080     |US  |USA |United States|4  

In [15]:
val parquetPath = "./LearningSparkV2-master/chapter3/data/prueba"
fireDF.write.format("parquet").save(parquetPath)


parquetPath: String = ./LearningSparkV2-master/chapter3/data/prueba


In [24]:
val jsonpath = "./LearningSparkV2-master/chapter3/data/pruebajson"
fireDF
    .repartition(1)
    .write.format("json")
    .option("header", "true")
    .save(jsonpath)

jsonpath: String = ./LearningSparkV2-master/chapter3/data/pruebajson


In [18]:
val csvpath = "./LearningSparkV2-master/chapter3/data/pruebacsv"
fireDF.write.format("csv").save(csvpath)

csvpath: String = ./LearningSparkV2-master/chapter3/data/pruebacsv


In [22]:
val csvpath = "./LearningSparkV2-master/chapter3/data/pruebacsv"
fireDF
   .repartition(1)
   .write.format("csv")
   .option("header", "true")
   .save(csvpath)

csvpath: String = ./LearningSparkV2-master/chapter3/data/pruebacsv


In [26]:
import org.apache.spark.sql.avro

val avropath = "./LearningSparkV2-master/chapter3/data/pruebaAvro"
fireDF
   .repartition(1)
   .write.format("avro")
   .option("header", "true")
   .save(avropath)

<console>: 35: error: object avro is not a member of package org.apache.spark.sql

In [28]:
fireDF.rdd.getNumPartitions

res21: Int = 4
