In [1]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StructType, TimestampType}
import org.apache.spark.sql.expressions.Window
import org.apache.parquet.format.IntType

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.0.11:4041
SparkContext available as 'sc' (version = 3.1.2, master = local[*], app id = local-1634625573258)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StructType, TimestampType}
import org.apache.spark.sql.expressions.Window
import org.apache.parquet.format.IntType


In [2]:
val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Spark SQL KeepCoding Base")
      .getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@3aeaae24


## API Dataframe

In [4]:
def dataframeAPI_1(spark: SparkSession): Unit = {

    spark
      .read
      .format("json")
      .load("Data/exercise5_sparkcore_data/*.json")
      //.withColumn("timestamp", ($"timestamp" / 60).cast(IntegerType) * 60)
      .withColumn("timestamp", window($"timestamp".cast(TimestampType), "1 minute"))
      .groupBy($"timestamp", $"sensor_id")
      .agg(round(avg($"temperature"), 2).as("temperature"), round(avg($"humidity"), 2).as("humidity"))
      .sort("sensor_id", "timestamp")
      .show(truncate = false)   
  }

dataframeAPI_1(spark)

+------------------------------------------+---------+-----------+--------+
|timestamp                                 |sensor_id|temperature|humidity|
+------------------------------------------+---------+-----------+--------+
|{2020-09-10 19:22:00, 2020-09-10 19:23:00}|1        |28.67      |63.67   |
|{2020-09-10 19:23:00, 2020-09-10 19:24:00}|1        |27.9       |67.7    |
|{2020-09-10 19:24:00, 2020-09-10 19:25:00}|1        |28.2       |62.0    |
|{2020-09-10 19:22:00, 2020-09-10 19:23:00}|2        |29.13      |62.13   |
|{2020-09-10 19:23:00, 2020-09-10 19:24:00}|2        |28.2       |68.5    |
|{2020-09-10 19:24:00, 2020-09-10 19:25:00}|2        |28.11      |69.33   |
|{2020-09-10 19:22:00, 2020-09-10 19:23:00}|3        |27.78      |69.89   |
|{2020-09-10 19:23:00, 2020-09-10 19:24:00}|3        |29.55      |65.45   |
|{2020-09-10 19:24:00, 2020-09-10 19:25:00}|3        |28.64      |68.14   |
+------------------------------------------+---------+-----------+--------+



dataframeAPI_1: (spark: org.apache.spark.sql.SparkSession)Unit


In [6]:
def dataframeAPI_2(spark: SparkSession): Unit = {
    spark
      .read
      .format("json")
      .load("Data/exercise5_sparkcore_data/*.json")
      .groupBy($"sensor_id", window($"timestamp".cast(TimestampType), "1 minute"))
      .agg(round(avg($"temperature"), 2).as("temperature"), round(avg($"humidity"), 2).as("humidity"))
     // .withColumn("timestamp", $"window.start".cast(LongType))  // para poder pasarlo a json
      .withColumn("timestamp", $"window.start")
      .drop($"window")
      .sort("sensor_id", "timestamp")
      .show(truncate = false)
 }

dataframeAPI_2(spark)

+---------+-----------+--------+-------------------+
|sensor_id|temperature|humidity|timestamp          |
+---------+-----------+--------+-------------------+
|1        |28.67      |63.67   |2020-09-10 19:22:00|
|1        |27.9       |67.7    |2020-09-10 19:23:00|
|1        |28.2       |62.0    |2020-09-10 19:24:00|
|2        |29.13      |62.13   |2020-09-10 19:22:00|
|2        |28.2       |68.5    |2020-09-10 19:23:00|
|2        |28.11      |69.33   |2020-09-10 19:24:00|
|3        |27.78      |69.89   |2020-09-10 19:22:00|
|3        |29.55      |65.45   |2020-09-10 19:23:00|
|3        |28.64      |68.14   |2020-09-10 19:24:00|
+---------+-----------+--------+-------------------+



dataframeAPI_2: (spark: org.apache.spark.sql.SparkSession)Unit


## API SQL

In [7]:
def sqlAPI(spark: SparkSession) = {

    spark
      .read
      .format("json")
      .load("Data/exercise5_sparkcore_data/*.json")
      .createOrReplaceTempView("sensor_data_view")

    spark.sql(
      """
        |SELECT sensor_id,
        |       ROUND(AVG(humidity), 2) AS humidity,
        |       ROUND(AVG(temperature), 2) AS temperature,
        |       CAST(window.start AS INT) AS timestamp
        |FROM sensor_data_view
        |GROUP BY sensor_id, window(CAST(timestamp AS TIMESTAMP), "1 minute")
        |ORDER BY sensor_id, timestamp ASC
      """.stripMargin
    ).show()
  }

sqlAPI(spark)

+---------+--------+-----------+----------+
|sensor_id|humidity|temperature| timestamp|
+---------+--------+-----------+----------+
|        1|   63.67|      28.67|1599758520|
|        1|    67.7|       27.9|1599758580|
|        1|    62.0|       28.2|1599758640|
|        2|   62.13|      29.13|1599758520|
|        2|    68.5|       28.2|1599758580|
|        2|   69.33|      28.11|1599758640|
|        3|   69.89|      27.78|1599758520|
|        3|   65.45|      29.55|1599758580|
|        3|   68.14|      28.64|1599758640|
+---------+--------+-----------+----------+



sqlAPI: (spark: org.apache.spark.sql.SparkSession)Unit


## API Dataset

In [8]:
sealed case class SensorData(sensor_id: Int, temperature: Int, humidity: Int, timestamp: Long)

  def datasetAPI(spark: SparkSession) = {
    import spark.implicits._
    import org.apache.spark.sql.catalyst.ScalaReflection
    val schema = ScalaReflection.schemaFor[SensorData].dataType.asInstanceOf[StructType]

    spark
      .read
      .schema(schema)
      .format("json")
      .load("Data/exercise5_sparkcore_data/data1.json")
      .as[SensorData]
      .map { sensor =>
        val roundTimestamp = (sensor.timestamp / 60).toInt * 60
        (sensor.copy(timestamp = roundTimestamp), 1)
      }
      .groupByKey { case (sensor, _) =>
        (sensor.sensor_id, sensor.timestamp)
      }
      .reduceGroups { (sensor1, sensor2) =>
          (
            sensor1._1.copy(
              temperature = sensor1._1.temperature + sensor2._1.temperature,
              humidity = sensor1._1.humidity + sensor2._1.humidity,
            ),
            sensor1._2 + sensor2._2
          )
      }
      .map { case (_, (sensor, count)) =>
        sensor.copy(
          temperature = sensor.temperature / count,
          humidity = sensor.humidity / count
        )
      }
      .sort("sensor_id", "timestamp")
      .show()
  }

datasetAPI(spark)

java.lang.NullPointerException: 

In [9]:
//1. Calcular el numero de mensajes medio que reportan entre todos los sensores por minuto.
def extra1(spark: SparkSession): Unit = {
    spark
      .read
      .format("json")
      .load(s"Data/exercise5_sparkcore_data/*.json")
      .groupBy($"sensor_id", window($"timestamp".cast(TimestampType), "1 minute"))
      .agg(count("*").as("total_messages"))
      .agg(avg("total_messages"))
      .show()
  }

extra1(spark)

+-------------------+
|avg(total_messages)|
+-------------------+
|  11.11111111111111|
+-------------------+



extra1: (spark: org.apache.spark.sql.SparkSession)Unit


In [10]:
//2. Calcular la temperatura y humedad media por sensor en cada hora.
  def extra2(spark: SparkSession): Unit = {

    spark
      .read
      .format("json")
      .load("Data/exercise5_sparkcore_data/*.json")
      .groupBy($"sensor_id", window($"timestamp".cast(TimestampType), "1 hour"))
      .agg(avg($"temperature").as("temperature"), avg($"humidity").as("humidity"))
      .withColumn("timestamp", $"window.start")
      .drop($"window")
      .sort("sensor_id", "timestamp")
      .show()
  }

extra2(spark)

+---------+------------------+-----------------+-------------------+
|sensor_id|       temperature|         humidity|          timestamp|
+---------+------------------+-----------------+-------------------+
|        1|          28.28125|         64.40625|2020-09-10 19:00:00|
|        2| 28.58823529411765|65.91176470588235|2020-09-10 19:00:00|
|        3|28.705882352941178|67.73529411764706|2020-09-10 19:00:00|
+---------+------------------+-----------------+-------------------+



extra2: (spark: org.apache.spark.sql.SparkSession)Unit


In [11]:
//3. Calcular la temperatura y humedad media por minuto de todos los sensores.
  def extra3(spark: SparkSession): Unit = {

    spark
      .read
      .format("json")
      .load("Data/exercise5_sparkcore_data/*.json")
      .groupBy(window($"timestamp".cast(TimestampType), "1 minute"))
      .agg(avg($"temperature").as("temperature"), avg($"humidity").as("humidity"))
      .withColumn("timestamp", $"window.start".cast(LongType))
      .drop($"window")
      .sort("timestamp")
      .show()
  }

extra3(spark)

+------------------+-----------------+----------+
|       temperature|         humidity| timestamp|
+------------------+-----------------+----------+
| 28.63888888888889|64.58333333333333|1599758520|
|28.580645161290324|67.16129032258064|1599758580|
|28.363636363636363|66.60606060606061|1599758640|
+------------------+-----------------+----------+



extra3: (spark: org.apache.spark.sql.SparkSession)Unit


In [12]:
//4. Calcular el sensor con la temperatura más baja de cada minuto.
  def extra4(spark: SparkSession): Unit = {

    spark
      .read
      .format("json")
      .load("Data/exercise5_sparkcore_data/*.json")
      .select($"sensor_id", $"timestamp", $"temperature")
      .groupBy($"sensor_id", window($"timestamp".cast(TimestampType), "1 minute"))
      .agg(avg($"temperature").as("temperature"))
      .withColumn("timestamp", $"window.start")
      .drop($"window")
      .withColumn("row", row_number.over(Window.partitionBy("timestamp").orderBy($"temperature".asc)))
      .where($"row" === 1)
      .drop($"row")
      .sort($"timestamp".asc)
      .show()
  }

extra4(spark)

+---------+-----------------+-------------------+
|sensor_id|      temperature|          timestamp|
+---------+-----------------+-------------------+
|        3|27.77777777777778|2020-09-10 19:22:00|
|        1|             27.9|2020-09-10 19:23:00|
|        2|28.11111111111111|2020-09-10 19:24:00|
+---------+-----------------+-------------------+



extra4: (spark: org.apache.spark.sql.SparkSession)Unit
