In [1]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,harsha2010:magellan:1.0.5-s_2.11,com.esri.geometry:esri-geometry-api:1.2.1,commons-io:commons-io:2.6,org.apache.spark:spark-streaming_2.11:2.2.0,org.apache.spark:spark-sql_2.11:2.2.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11",
        "spark.dynamicAllocation.enabled": false
    }
}

In [2]:
/**
 * @Description: a spatial join based on Filter-refine approach for NYC taxicab data
 * @author: Isam Al Jawarneh
 * @date: 02/02/2019
 *last update: 14/04/2021
 */

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
18,application_1618739841902_0022,spark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [3]:
sc.version

res3: String = 2.2.0.2.6.3.84-1

In [4]:
import util.control.Breaks._
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.util.random.XORShiftRandom
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLImplicits
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.ForeachWriter
import magellan._
import magellan.index.ZOrderCurve
import magellan.{Point, Polygon}

import org.apache.spark.sql.magellan.dsl.expressions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{
  DoubleType,
  StringType,
  StructField,
  StructType
}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{collect_list, collect_set}
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}
import scala.collection.mutable
import scala.concurrent.duration.Duration
import java.io.{BufferedWriter, FileWriter}
import org.apache.commons.io.FileUtils
import java.io.File
import scala.collection.mutable.ListBuffer
import java.time.Instant
import org.apache.spark.util.CollectionAccumulator
import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.DataFrame

In [5]:
/////////////////////////////
/// Definition of schemas ///
/////////////////////////////

In [6]:
val parametersRegistrySchema = StructType(Array(
    StructField("IdParametro", StringType, false),
    StructField("PARAMETRO", StringType, false),
    StructField("UM", StringType, false),
    StructField("Tmed (min)", DoubleType, false),
    StructField("NOTE", StringType, false)))

parametersRegistrySchema: org.apache.spark.sql.types.StructType = StructType(StructField(IdParametro,StringType,false), StructField(PARAMETRO,StringType,false), StructField(UM,StringType,false), StructField(Tmed (min),DoubleType,false), StructField(NOTE,StringType,false))

In [7]:
val stationsRegistrySchema = StructType(Array(
    StructField("Stazione", StringType, false),
    StructField("Cod_staz", StringType, false),
    StructField("COMUNE", StringType, false),
    StructField("INDIRIZZO", StringType, false),
    StructField("PROVINCIA", StringType, false),
    StructField("Altezza", StringType, false),
    StructField("Id_Param", StringType, false),
    StructField("PARAMETRO", StringType, false),
    StructField("UM", StringType, false),
    StructField("Coord_X", DoubleType, false),
    StructField("Coord_Y", DoubleType, false),
    StructField("SR", StringType, false),
    StructField("LON_GEO", DoubleType, false),
    StructField("LAT_GEO", DoubleType, false),
    StructField("SR_GEO", StringType, false)))

stationsRegistrySchema: org.apache.spark.sql.types.StructType = StructType(StructField(Stazione,StringType,false), StructField(Cod_staz,StringType,false), StructField(COMUNE,StringType,false), StructField(INDIRIZZO,StringType,false), StructField(PROVINCIA,StringType,false), StructField(Altezza,StringType,false), StructField(Id_Param,StringType,false), StructField(PARAMETRO,StringType,false), StructField(UM,StringType,false), StructField(Coord_X,DoubleType,false), StructField(Coord_Y,DoubleType,false), StructField(SR,StringType,false), StructField(LON_GEO,DoubleType,false), StructField(LAT_GEO,DoubleType,false), StructField(SR_GEO,StringType,false))

In [8]:
val airDataSchema = StructType(Array(
    StructField("COD_STAZ", StringType, false),
    StructField("ID_PARAM", StringType, false),
    StructField("DATA_INIZIO", StringType, false),
    StructField("DATA_FINE", StringType, false),
    StructField("VALORE", DoubleType, false),
    StructField("UM", StringType, false)))

airDataSchema: org.apache.spark.sql.types.StructType = StructType(StructField(COD_STAZ,StringType,false), StructField(ID_PARAM,StringType,false), StructField(DATA_INIZIO,StringType,false), StructField(DATA_FINE,StringType,false), StructField(VALORE,DoubleType,false), StructField(UM,StringType,false))

In [9]:
/////////////////////////////
///// Import Dataframes /////
/////////////////////////////

In [10]:
//"wasb[s]://<BlobStorageContainerName>@<StorageAccountName>.blob.core.windows.net/<path>"
val parametersRegistry = (spark.read.format("csv")
                          .option("header", "true")
                          .schema(parametersRegistrySchema)
                          .csv("wasbs://sspark-2021-04-17t10-30-16-344z@ssparkhdistorage.blob.core.windows.net/registries/Parametri.csv")
                          .select($"IdParametro".as("Parameter_Id"),
                                  $"PARAMETRO".as("Parameter_Name"),
                                  $"UM".as("Unit_Of_Measurement"),
                                  $"Tmed (min)"))

parametersRegistry: org.apache.spark.sql.DataFrame = [Parameter_Id: string, Parameter_Name: string ... 2 more fields]

In [11]:
//"wasb[s]://<BlobStorageContainerName>@<StorageAccountName>.blob.core.windows.net/<path>"
val stationsRegistry = (spark.read.format("csv")
                        .option("header", "true")
                        .schema(stationsRegistrySchema)
                        .csv("wasbs://sspark-2021-04-17t10-30-16-344z@ssparkhdistorage.blob.core.windows.net/registries/Stazioni Aria.csv")
                        .select($"Stazione".as("Station"), $"Cod_staz", $"COMUNE".as("City"), $"INDIRIZZO".as("Address"), $"PROVINCIA".as("Province"), $"LON_GEO", $"LAT_GEO")
                        .withColumn("Station_Code", regexp_replace($"Cod_staz", "\\.", ""))
                        .withColumn("Point", point($"LON_GEO",$"LAT_GEO"))
                        .drop("Cod_Staz", "LON_GEO", "LAT_GEO"))

stationsRegistry: org.apache.spark.sql.DataFrame = [Station: string, City: string ... 4 more fields]

In [12]:
val airData = (spark.read.format("csv")
               .option("header", "true")
               .schema(airDataSchema)
               .load("wasbs://sspark-2021-04-17t10-30-16-344z@ssparkhdistorage.blob.core.windows.net/datasets_air_quality/")
               .select($"COD_STAZ".as("Station_Code"), $"ID_PARAM".as("Parameter_Id"), $"DATA_INIZIO", $"DATA_FINE", $"VALORE".as("Value"))
               .withColumn("Start_Date", to_timestamp($"DATA_INIZIO", "dd/MM/yyyy HH"))
               .withColumn("End_Date", to_timestamp($"DATA_FINE", "dd/MM/yyyy HH"))
               .drop("DATA_INIZIO", "DATA_FINE")
              )

airData: org.apache.spark.sql.DataFrame = [Station_Code: string, Parameter_Id: string ... 3 more fields]

In [13]:
//////////////////
/// Geohashing ///
//////////////////

In [14]:
// a user defined function to get geohash from long/lat point 
val geohashUDF = udf{(curve: Seq[ZOrderCurve]) => curve.map(_.toBase32())}

geohashUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StringType,true),Some(List(ArrayType(org.apache.spark.sql.types.ZOrderCurveUDT@17ef6849,true))))

In [15]:
val precision = 30

precision: Int = 30

In [16]:
//getting plain data from CSV file (file with point Data Structure) and use UDF to get geohashes
val geohashedStations = (stationsRegistry
                         .withColumn("index", $"point" index  precision)
                         .withColumn("geohashArray1", geohashUDF($"index.curve")))
val explodedGeohashedStations = (geohashedStations
                                 .explode("geohashArray1", "geohash")
                                 { a: mutable.WrappedArray[String] => a })

explodedGeohashedStations: org.apache.spark.sql.DataFrame = [Station: string, City: string ... 7 more fields]

In [17]:
explodedGeohashedStations.select("Station", "Point", "index", "geohashArray1", "geohash").show(2,false)

+-------------------+-------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+-------+
|Station            |Point                                      |index                                                                                                                                                 |geohashArray1|geohash|
+-------------------+-------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+-------+
|GIARDINI MARGHERITA|Point(11.35406170088398, 44.48267113876953)|[[ZOrderCurve(11.348876953125, 44.4781494140625, 11.35986328125, 44.483642578125, 30, -4191424035449470976, 110001011101010100010010000011),Contains]]|[srbj43]     |srbj43 |
|GIARDINI MARGHERITA|Point(11.35406170088398

In [18]:
val rawEmiliaRomagna= (spark.read.format("magellan")
                  .option("type", "geojson")
                  .load("wasbs://sspark-2021-04-17t10-30-16-344z@ssparkhdistorage.blob.core.windows.net/bologna/")
                  .select($"polygon", $"metadata"("NOME_COM").as("City_Name")).cache()
                  )
val emiliaRomagna = (rawEmiliaRomagna
               .withColumn("index", $"polygon" index  precision)
               .select($"polygon", $"index", $"City_Name")
               .cache())
val zorderIndexedEmiliaRomagna = (emiliaRomagna
                            .withColumn("index", explode($"index"))
                            .select("polygon", "index.curve", "index.relation","City_Name")
                          )
val geohashedEmiliaRomagna = emiliaRomagna.withColumn("geohashArray", geohashUDF($"index.curve"))
val explodedGeohashedEmiliaRomagna = geohashedEmiliaRomagna.explode("geohashArray", "geohash") { a: mutable.WrappedArray[String] => a }
explodedGeohashedEmiliaRomagna.count()

res22: Long = 54674

In [19]:
//joining geohashed trips with exploded geohashed neighborhood using filter-and-refine approach (.where($"point" within $"polygon") is refine --> using the brute force method ray casting for edge cases or false positives)
val stationsInEmiliaRomagna = (explodedGeohashedEmiliaRomagna
                         .join(explodedGeohashedStations,
                               explodedGeohashedEmiliaRomagna("geohash") === explodedGeohashedStations("geohash"))
                         .where($"point" within $"polygon")
                        )
stationsInEmiliaRomagna.show(3)

+--------------------+--------------------+--------------------+--------------------+-------+---------+--------------------+-----------+--------+------------+--------------------+--------------------+-------------+-------+
|             polygon|               index|           City_Name|        geohashArray|geohash|  Station|                City|    Address|Province|Station_Code|               Point|               index|geohashArray1|geohash|
+--------------------+--------------------+--------------------+--------------------+-------+---------+--------------------+-----------+--------+------------+--------------------+--------------------+-------------+-------+
|magellan.Polygon@...|[[ZOrderCurve(9.6...|Lugagnano Val d'Arda|[spyysw, spyysx, ...| spyyzx|LUGAGNANO|LUGAGNANO VAL D'ARDA|VIA FERMI 9|      PC|     5000007|Point(9.829360661...|[[ZOrderCurve(9.8...|     [spyyzx]| spyyzx|
|magellan.Polygon@...|[[ZOrderCurve(9.6...|Lugagnano Val d'Arda|[spyysw, spyysx, ...| spyyzx|LUGAGNANO|LUGAG

In [20]:
stationsInEmiliaRomagna.columns

res25: Array[String] = Array(polygon, index, City_Name, geohashArray, geohash, Station, City, Address, Province, Station_Code, Point, index, geohashArray1, geohash)

In [21]:
val airDataWithParameters = (airData
                             .join(
                                 parametersRegistry,
                                 airData("Parameter_Id") === parametersRegistry("Parameter_Id")))
airDataWithParameters.show(2)

+------------+------------+-----+-------------------+-------------------+------------+--------------------+-------------------+----------+
|Station_Code|Parameter_Id|Value|         Start_Date|           End_Date|Parameter_Id|      Parameter_Name|Unit_Of_Measurement|Tmed (min)|
+------------+------------+-----+-------------------+-------------------+------------+--------------------+-------------------+----------+
|     5000033|          10|  1.2|2020-01-01 00:00:00|2020-01-01 01:00:00|          10|CO (Monossido di ...|              mg/m3|      60.0|
|     5000033|          10|  1.0|2020-01-01 01:00:00|2020-01-01 02:00:00|          10|CO (Monossido di ...|              mg/m3|      60.0|
+------------+------------+-----+-------------------+-------------------+------------+--------------------+-------------------+----------+
only showing top 2 rows

In [22]:
val finalTable = (airDataWithParameters
                  .join(stationsInEmiliaRomagna,
                        airDataWithParameters("Station_Code") === stationsInEmiliaRomagna("Station_Code")))
finalTable.show(3)

+------------+------------+-----+-------------------+-------------------+------------+--------------------+-------------------+----------+--------------------+--------------------+---------+--------------------+-------+---------+-----+---------------+--------+------------+--------------------+--------------------+-------------+-------+
|Station_Code|Parameter_Id|Value|         Start_Date|           End_Date|Parameter_Id|      Parameter_Name|Unit_Of_Measurement|Tmed (min)|             polygon|               index|City_Name|        geohashArray|geohash|  Station| City|        Address|Province|Station_Code|               Point|               index|geohashArray1|geohash|
+------------+------------+-----+-------------------+-------------------+------------+--------------------+-------------------+----------+--------------------+--------------------+---------+--------------------+-------+---------+-----+---------------+--------+------------+--------------------+--------------------+---------

In [23]:
finalTable.printSchema()

root
 |-- Station_Code: string (nullable = true)
 |-- Parameter_Id: string (nullable = true)
 |-- Value: double (nullable = true)
 |-- Start_Date: timestamp (nullable = true)
 |-- End_Date: timestamp (nullable = true)
 |-- Parameter_Id: string (nullable = true)
 |-- Parameter_Name: string (nullable = true)
 |-- Unit_Of_Measurement: string (nullable = true)
 |-- Tmed (min): double (nullable = true)
 |-- polygon: polygon (nullable = true)
 |-- index: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- curve: zordercurve (nullable = false)
 |    |    |-- relation: string (nullable = false)
 |-- City_Name: string (nullable = true)
 |-- geohashArray: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- geohash: string (nullable = true)
 |-- Station: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Station_Code: string (nullable = true)
 