# Nijmegen Open Data

Spark Notebook to help you analyze data sets provided by Nijmegen's city council.

## Nijmegen data

The city of Nijmegen provides a variety of resources as open data at
[opendata.nijmegen.nl](https://opendata.nijmegen.nl/).

Our goal is to analyze these together - to investigate if we can say anything about the relation between population statistics in areas of the city and the activities that are organized there.

Specifically, we will integrate the following two data sets:
- Streetnames and their quarters: _not available through portal anymore_, use the `wget` command instead
    * `wget https://raw.githubusercontent.com/rubigdata/course/gh-pages/data/BAG_ADRES.csv`
- Public artworks: https://www.nijmegen.nl/kos/opendata/
    * Due to a mistake in the lastest version of this data, the CSV in this link is corrupt. Use the file `kunstopstraat-kunstwerk.csv` in your repository instead.

You need to download the data yourself and copy the files into the container, e.g. using `docker cp`.
The notebook assumes you choose the CSV version of the datasets, and copy them to a directory `/data/bigdata` in the notebook,
that you may still have to create (e.g., `mkdir -p /data/bigdata` inside the container).

This notebook contains code to get you started in the analysis, so you can pick up Spark by example and refresh your SQL.

To fully understand everything, you will need the documentation: http://spark.apache.org/docs/latest/sql-programming-guide.html

Hints of interesting things to discuss in the blog for assignment 3 start with  _**Q:**_.

Next, more preamble, importing some necessary classes and creating a `SparkSession` object.

In [ ]:
import org.apache.spark.sql.types._

import java.text.NumberFormat
import java.util.Locale

val spark = SparkSession
   .builder()
   .appName("A3b-spark-df")
   .getOrCreate()

import org.apache.spark.sql.types._
import java.text.NumberFormat
import java.util.Locale
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2bb55b1c


### Addresses and Quarters

We start with the so-called BAG (_basisadministratie adressen en gebouwen_, i.e., records of addresses and buildings), a standardized resource for which the definition is managed at the national level.

Like most data, the BAG is distributed as CSV, Comma-Separated-Values. Just using `split` may work on very small datasets with regular data, but as soon as quoted values contain commas, we would get in trouble. Spark 2.0 comes with support to parse various data formats, including CSV and JSON, so let us use the standard interfaces.

In [ ]:
val bagdata = spark.read.format("csv").option("header", true).load("./notebooks/rubigdata/bag-adres.csv").cache()

bagdata: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ACTCODE: string, ADRES_ID: string ... 18 more fields]


In [ ]:
bagdata.describe()

res3: org.apache.spark.sql.DataFrame = [summary: string, ACTCODE: string ... 19 more fields]


#### Real-life data

DataFrames are very handy to get a quick impression of the data, e.g., using `show()`, `sample()` and `describe()`.

If you think a bit longer about the reported statistics, you realize that not all the data is equally well formatted - the data dump contains address records without x and/or y coordinates; which you can validate as follows:

In [ ]:
println("X coord nulls: " + bagdata.filter( $"X_COORD".isNull ).count)
println("Y coord nulls: " + bagdata.filter( $"Y_COORD".isNull ).count)

X coord nulls: 45
Y coord nulls: 45


How to proceed is a decision that the data scientist needs to take - that is, it will be up to your judgement. Can we proceed without these records? In this case, the inconsistent records concern only a very small fraction of the data (45 out of almost 100K records), and we might decide to simply ignore these without valid `(x,y)` coordinates.

In many real life situations, however, we would need to invest more effort and clean the data to overcome these deficiencies. If you look into examples of records without coordinates (_Q: can you write the small numbers of lines of code to inspect these records?_), you find that some of these records have a value "Niet authentiek" in field STATUS, whereas almost all records have "Naamgeving uitgegeven". We lack the domain knowledge to understand the details of this labelling, and may need to dig deeper, or even have a chat with the owners of the data to learn how these values are assigned.

Keep in mind that we only discovered this anomaly in the data when we attempted to define a proper schema, and discovered that the data did not satisfy that schema. Switching from transformations over RDDs to the more structured Data Frame representation can help avoid mistakes in the analysis.

**Q:** _Discuss decisions in data selection and data cleaning that you took to complete the assignment in your blog post._

In [ ]:
var bagdataFiltered = bagdata.filter(  $"X_COORD".isNotNull )

bagdataFiltered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ACTCODE: string, ADRES_ID: string ... 18 more fields]


In [ ]:
println("X coord nulls: " + bagdataFiltered.filter( $"X_COORD".isNull ).count)
println("Y coord nulls: " + bagdataFiltered.filter( $"Y_COORD".isNull ).count)

X coord nulls: 0
Y coord nulls: 0


In [ ]:
bagdataFiltered.describe()

res10: org.apache.spark.sql.DataFrame = [summary: string, ACTCODE: string ... 19 more fields]


#### Schema

The Data Frame API supports a more structured interface to data by defining a `case class` to represent the data.

In [ ]:
case class Addr(street:String, quarter:String, x:Float, y:Float)

defined class Addr


Project the desired columns onto the fields of the `case class` and convert to a so-called `dataset`.

In [ ]:
val addrDF = bagdataFiltered.select($"STRAAT" as "street",
                            $"X_COORD".cast(FloatType) as "x",
                            $"Y_COORD".cast(FloatType) as "y",
                            $"WIJK_OMS" as "quarter").as[Addr].cache()
addrDF.show(5)

+--------------------+----+----+----------+
|              street|   x|   y|   quarter|
+--------------------+----+----+----------+
|Dr. Claas Noordui...|null|null|Hunnerberg|
|Dr. Claas Noordui...|null|null|Hunnerberg|
|Dr. Claas Noordui...|null|null|Hunnerberg|
|Dr. Claas Noordui...|null|null|Hunnerberg|
|Dr. Claas Noordui...|null|null|Hunnerberg|
+--------------------+----+----+----------+
only showing top 5 rows

addrDF: org.apache.spark.sql.Dataset[Addr] = [street: string, x: float ... 2 more fields]


Actually, parsing the X and Y coordinates is a little messy: the data contains the X and Y coordinates in the Dutch locale, which means that they are written as 0,5 instead of 0.5.

When parsing fails, we end up with `null` values.

We define a function to convert values where possible.

In [ ]:
val nf = NumberFormat.getInstance(Locale.forLanguageTag("nl"));

def convToFloat(s: String): Option[Float] = {
  try {
    Some(nf.parse(s).floatValue)
  } catch {
    case e: Exception => None
  }
}

nf: java.text.NumberFormat = java.text.DecimalFormat@674dc
convToFloat: (s: String)Option[Float]


We have to register this function as a _user-defined function_ so it can be used in Spark SQL.

In [ ]:
val tfloat = udf((f: String) => convToFloat(f).getOrElse(0f))

val addrFloats = bagdataFiltered.select($"STRAAT" as "street",
                            tfloat($"X_COORD").cast(FloatType) as "x",
                            tfloat($"Y_COORD").cast(FloatType) as "y",
                            $"WIJK_OMS" as "quarter").as[Addr].cache()
addrFloats.show(5)

+--------------------+----+----+----------+
|              street|   x|   y|   quarter|
+--------------------+----+----+----------+
|Dr. Claas Noordui...|null|null|Hunnerberg|
|Dr. Claas Noordui...|null|null|Hunnerberg|
|Dr. Claas Noordui...|null|null|Hunnerberg|
|Dr. Claas Noordui...|null|null|Hunnerberg|
|Dr. Claas Noordui...|null|null|Hunnerberg|
+--------------------+----+----+----------+
only showing top 5 rows

tfloat: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,FloatType,Some(List(StringType)))
addrFloats: org.apache.spark.sql.Dataset[Addr] = [street: string, x: float ... 2 more fields]


Let's check that the missing values have been placed on the origin:

In [ ]:
printf(
  "%d points at origin, %d null values",
  addrFloats.filter("x = 0 or y = 0").count,
  addrFloats.filter('x.isNull or 'y.isNull).count
)

0 points at origin, 0 null values

Use `describe` to get an overview of the data:

In [ ]:
addrFloats.describe()

res20: org.apache.spark.sql.DataFrame = [summary: string, street: string ... 3 more fields]


#### Using Data Frame Operators

In [ ]:
val qc_1 = addrFloats.groupBy("quarter").count.cache()

qc_1.show(5)

+-----------+-----+
|    quarter|count|
+-----------+-----+
|    Malvert| 1817|
|     Hatert| 6385|
|Kwakkenberg|  929|
|   Aldenhof| 1504|
|  Meijhorst| 2058|
+-----------+-----+
only showing top 5 rows

qc_1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [quarter: string, count: bigint]


What are the top 10 largest quarters of Nijmegen?

In [ ]:
val qc_1_top = qc_1.orderBy(desc("count")).limit(10)

qc_1_top.show()

+--------------+-----+
|       quarter|count|
+--------------+-----+
|  Stadscentrum| 6468|
|        Hatert| 6385|
|        Biezen| 4945|
|Neerbosch-Oost| 4415|
|          Lent| 4332|
|     Hengstdal| 3868|
|      Heseveld| 3559|
|    Galgenveld| 3510|
|       Altrade| 3269|
|     Grootstal| 3079|
+--------------+-----+

qc_1_top: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [quarter: string, count: bigint]


Wondering what happens inside? The `explain` operator gives you the _physical_ query plan. If you add a boolean, you also see the logical plans, both the initial as parsed, and the plan that is the result of query optimization.

_It is okay if you do not fully understand the full query plan - but do try for a minute or so._

In [ ]:
println("Group by and count:")
println("===================")
qc_1.explain(true)

println("Order descending limit 10:")
println("==========================")
qc_1_top.explain(true)

Group by and count:
== Parsed Logical Plan ==
Aggregate [quarter#4059], [quarter#4059, count(1) AS count#4613L]
+- AnalysisBarrier
      +- Project [STRAAT#21 AS street#4056, cast(UDF(X_COORD#28) as float) AS x#4057, cast(UDF(Y_COORD#29) as float) AS y#4058, WIJK_OMS#26 AS quarter#4059]
         +- Filter isnotnull(X_COORD#28)
            +- Relation[ACTCODE#10,ADRES_ID#11,ADR_ADRESNR#12,HUISLETTER#13,HUISNUMMER#14,HUISNUMMERTOEVOEGING#15,OPENBARERUIMTE_ID#16,POSTK_A#17,POSTK_N#18,STADSDEEL#19,STATUS#20,STRAAT#21,STRAAT_OFF#22,STRAAT_OFF_D#23,VOLGNR#24,WIJKCODE#25,WIJK_OMS#26,WOONPLAATS#27,X_COORD#28,Y_COORD#29] csv

== Analyzed Logical Plan ==
quarter: string, count: bigint
Aggregate [quarter#4059], [quarter#4059, count(1) AS count#4613L]
+- Project [STRAAT#21 AS street#4056, cast(UDF(X_COORD#28) as float) AS x#4057, cast(UDF(Y_COORD#29) as float) AS y#4058, WIJK_OMS#26 AS quarter#4059]
   +- Filter isnotnull(X_COORD#28)
      +- Relation[ACTCODE#10,ADRES_ID#11,ADR_ADRESNR#12,HUISLETT

#### Using SQL

You are not restricted to building queryplans yourself by applying operators to Data Frames; instead, you can also use the SQL interface, and, mix and match SQL querying with follow-up operations using the Data Frame API, or even convert the data back to RDDs and continue to perform analyses directly working with RDDs.

Using SQL is most convenient when queries get larger and more complicated. Consider for example the query for the 10 largest quarters:

In [ ]:
addrFloats.createOrReplaceTempView("addresses")

In [ ]:
val qc_2_top = spark.sql("SELECT quarter, count(quarter) AS qc FROM addresses GROUP BY quarter ORDER BY qc DESC LIMIT 10")

qc_2_top: org.apache.spark.sql.DataFrame = [quarter: string, qc: bigint]


In [ ]:
qc_2_top.show

+--------------+----+
|       quarter|  qc|
+--------------+----+
|  Stadscentrum|6468|
|        Hatert|6385|
|        Biezen|4945|
|Neerbosch-Oost|4415|
|          Lent|4332|
|     Hengstdal|3868|
|      Heseveld|3559|
|    Galgenveld|3510|
|       Altrade|3269|
|     Grootstal|3079|
+--------------+----+



In [ ]:
qc_2_top.explain(true)

== Parsed Logical Plan ==
'GlobalLimit 10
+- 'LocalLimit 10
   +- 'Sort ['qc DESC NULLS LAST], true
      +- 'Aggregate ['quarter], ['quarter, 'count('quarter) AS qc#4710]
         +- 'UnresolvedRelation `addresses`

== Analyzed Logical Plan ==
quarter: string, qc: bigint
GlobalLimit 10
+- LocalLimit 10
   +- Sort [qc#4710L DESC NULLS LAST], true
      +- Aggregate [quarter#4059], [quarter#4059, count(quarter#4059) AS qc#4710L]
         +- SubqueryAlias addresses
            +- Project [STRAAT#21 AS street#4056, cast(UDF(X_COORD#28) as float) AS x#4057, cast(UDF(Y_COORD#29) as float) AS y#4058, WIJK_OMS#26 AS quarter#4059]
               +- Filter isnotnull(X_COORD#28)
                  +- Relation[ACTCODE#10,ADRES_ID#11,ADR_ADRESNR#12,HUISLETTER#13,HUISNUMMER#14,HUISNUMMERTOEVOEGING#15,OPENBARERUIMTE_ID#16,POSTK_A#17,POSTK_N#18,STADSDEEL#19,STATUS#20,STRAAT#21,STRAAT_OFF#22,STRAAT_OFF_D#23,VOLGNR#24,WIJKCODE#25,WIJK_OMS#26,WOONPLAATS#27,X_COORD#28,Y_COORD#29] csv

== Optimized Logical

### Artworks

Now that we have the address data prepared, move on to look into the data about the artworks.

Load the data, look at the schema and global statistics, and inspect a small sample:

In [ ]:
val kunst = spark.read
    .format("csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .load("./notebooks/rubigdata/kunstopstraat-kunstwerk.csv").cache()


kunst: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [naam: string, bouwjaar: string ... 8 more fields]


In [ ]:
kunst.printSchema

root
 |-- naam: string (nullable = true)
 |-- bouwjaar: string (nullable = true)
 |-- kunstenaar: string (nullable = true)
 |-- locatie: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- omschrijving: string (nullable = true)
 |-- eigendom: string (nullable = true)
 |-- bron: string (nullable = true)
 |-- url: string (nullable = true)



In [ ]:
// Select all the public art created before the year 2000
val kunstwerken = kunst.select("naam", "locatie", "latitude", "longitude", "bouwjaar", "url").where("bouwjaar <= 1999")

kunstwerken: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [naam: string, locatie: string ... 4 more fields]


In [ ]:
kunstwerken.sample(true, 0.1).show()

+--------------------+--------------------+----------+----------+--------+--------------------+
|                naam|             locatie|  latitude| longitude|bouwjaar|                 url|
+--------------------+--------------------+----------+----------+--------+--------------------+
|Weesjongen en wee...|   Begijnenstraat 29|51.8485003|5.86054756|    1645|                null|
|               Leeuw|Groesbeekseweg (g...|51.8340911|5.87236642|    1903|                null|
|    C.A.P. Ivensbank|          Hunnerpark| 51.847941|  5.872782|    1936|                null|
|    Achttien hoofden|Burchtstraat (sta...|51.8470413|5.86588919|    1948|                null|
|Acht keizerbeelde...|Burchtstraat (gev...|51.8470240|5.86595084|    1950|                null|
|Zonnewijzer met z...|Groenestraat 310/336|51.8325252|5.84410544|    1953|http://www.nijmeg...|
|          Mariabeeld|Burchtstraat 20 (...|51.8470070|5.86601127|    1953|                null|
|         Zes reli?fs|      Burchtstraat

The coordinates are not correctly detected as floats - again, due to the Locale.
A quick hack (but not robust code) to cast the values to floats:

In [ ]:
val kunstxy = kunstwerken
  .withColumn("latitude", translate(kunstwerken.col("latitude"), ",", ".").cast("float"))
  .withColumn("longitude", translate(kunstwerken.col("longitude"), ",", ".").cast("float"))

kunstxy: org.apache.spark.sql.DataFrame = [naam: string, locatie: string ... 4 more fields]


Let's inspect some of the data, using global information and a sample.

In [ ]:
kunstxy.sample(true, 0.05).show()

+--------------------+--------------------+---------+---------+--------+--------------------+
|                naam|             locatie| latitude|longitude|bouwjaar|                 url|
+--------------------+--------------------+---------+---------+--------+--------------------+
|Weesjongen en wee...|   Begijnenstraat 29|  51.8485|5.8605475|    1645|                null|
|           Dominicus|Heyendaalseweg 12...|51.827034|5.8713107|    1880|                null|
|               Wapen|Smetiusstraat (Ko...| 51.84415| 5.858717|    1882|http://www.nijmeg...|
|               Leeuw|    Kronenburgerpark|51.845955| 5.857805|    1886|                null|
|Vincentius Ferrerius| Dominicanenstraat 6|51.841824|5.8734784|    1904|http://www.nijmeg...|
|             Madonna|  Heyendaalseweg 239|51.812054|5.8674207|    1927|                null|
|Zonnewijzer met z...|Groenestraat 310/336|51.832523|5.8441052|    1953|http://www.nijmeg...|
|         Ruiterbeeld|       Stationsplein| 51.84214| 5.8531

Artworks created during WWII:

In [ ]:
kunstxy.filter("bouwjaar >= 1940 and bouwjaar <= 1945").show()

+--------------------+--------------------+---------+---------+--------+--------------------+
|                naam|             locatie| latitude|longitude|bouwjaar|                 url|
+--------------------+--------------------+---------+---------+--------+--------------------+
|        Twee leeuwen|    Veerpoorttrappen|51.848396| 5.868978|    1941|http://www.nijmeg...|
|               Kopje|Nieuwstraat (stad...| 51.84608|5.8656554|    1943|http://www.nijmeg...|
|Gedenksteen voor ...|    Joris Ivensplein| 51.84864|5.8566613|    1945|                null|
|Gedenksteen Jan v...|Waalbrug, pijler ...| 51.85252| 5.870856|    1945|                null|
+--------------------+--------------------+---------+---------+--------+--------------------+



In [ ]:
kunstxy.describe().show()

+-------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|summary|                naam|             locatie|            latitude|           longitude|          bouwjaar|                 url|
+-------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|  count|                 300|                 300|                 300|                 300|               300|                 102|
|   mean|                null|                null|  51.838399492899576|    5.85550477027893|1948.7966666666666|                null|
| stddev|                null|                null|0.012622929581466448|0.022618858781211955| 68.08067702490506|                null|
|    min|'Vervult u met de...|Achter de Hoofdwacht|           51.800934|           5.7574415|              1554|   "de onafwendbare"|
|    max|     muurschildering|d'Almarasweg (Bot...|           

Many items have no URL information, given the difference in counts between `url` and `latitude`.

Let's inspect the problematic tuples (using SQL - _"niet omdat het moet, maar omdat het kan"_):

In [ ]:
kunstxy.createOrReplaceTempView("kunstxy")
spark.sql("SELECT * FROM kunstxy WHERE url IS NULL LIMIT 10").show()

+--------------------+--------------------+---------+---------+--------+----+
|                naam|             locatie| latitude|longitude|bouwjaar| url|
+--------------------+--------------------+---------+---------+--------+----+
|Ingangspoort van ...|     Burchtstraat 20| 51.84708| 5.865753|    1554|null|
|              Reli?f|   Begijnenstraat 29| 51.84851|5.8605824|    1618|null|
|Weesjongen en wee...|   Begijnenstraat 29|  51.8485|5.8605475|    1645|null|
|Poort Oud Burgere...|Professor Corneli...|51.833538| 5.865971|    1645|null|
|   Vier gevelbeelden|Postweg 80 (Villa...| 51.83032| 5.878629|    1930|null|
|Poort Roomsch Kat...|   Begijnenstraat 29|51.848743|  5.86041|    1860|null|
|Gevelsteen met Wa...|         Waalkade 65|51.849136| 5.867189|    1861|null|
|           Dominicus|Heyendaalseweg 12...|51.827034|5.8713107|    1880|null|
|Spoorwegmonument ...|Hoogstraat bij Va...|51.847115|5.8688755|    1884|null|
|Spoorwegmonument ...|Hoogstraat bij Va...|51.847115|5.8688755| 

If you would use the data in practice, you can simply ignore the `url` column, or define a new table that excludes the NULL values.

Let us however go back to the source data, in the Dataframe called `kunst`, and look into the dataset in more detail:

In [ ]:
kunst.createOrReplaceTempView("kunst")
kunst.describe().show()

+-------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+
|summary|                naam|         bouwjaar|          kunstenaar|             locatie|            latitude|           longitude|        omschrijving|   eigendom|                bron|                 url|
+-------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+
|  count|                1016|              812|                 607|                 589|                 488|                 416|                 376|        122|                  27|                 117|
|   mean|                null|2043.293948126801|                null|                null|   51.83873464913294|   5.854925002441861|                null|       null|   

Even the counts of tuples with longitude values does not equal the counts with latitude values... and a mean "bouwjaar" in the future is, well, suspicious!

Let's investigate in more detail, listing missing lat/lon and out-of-range year values in a few SQL queries.

__Q:__ _Try to understand the various data problems we encounter when working on this dataset. Which are fundamental, and which are just an artifact of our processing?_

In [ ]:
spark.sql("select * from kunst where (latitude is not null and longitude is null) or (latitude is null and longitude is not null)")
  .show(15)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+----+----+
|                naam|            bouwjaar|          kunstenaar|             locatie|            latitude|           longitude|        omschrijving|eigendom|bron| url|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+----+----+
|De gebeeldhouwde ...| in 1554 gemaakt ...| is grotendeels a...|   gemeente Nijmegen|                null|http://www.nijmeg...|                null|    null|null|null|
|De wegens faillis...| maar ook de natu...| afkomstig van de...| wordt ook meldin...|   gemeente Nijmegen|                null|http://www.nijmeg...|    null|null|null|
|Wie de vier besch...| omdat vaststaat ...|         particulier|                null|http://www.nijmeg...|                null|                null|    null|nul

In [ ]:
spark.sql("select * from kunst where bouwjaar > 2017")
  .show(10)

+-------------+--------+--------------------+--------------------+----------+----------+--------------------+-----------------+--------------------+--------------------+
|         naam|bouwjaar|          kunstenaar|             locatie|  latitude| longitude|        omschrijving|         eigendom|                bron|                 url|
+-------------+--------+--------------------+--------------------+----------+----------+--------------------+-----------------+--------------------+--------------------+
|       Object|    9999|            Onbekend|   Geert Grooteplein|51.8239293|5.86176819|Deze gespleten st...|             null|                null|                null|
|Glazen Paneel|    9999|Montse Hernandez ...|Eikenlaan (Lent-N...|51.8676363|5.87309422|Dit kunstwerk wer...|gemeente Nijmegen|www.montsehern?nd...|http://www.nijmeg...|
|    Sculptuur|    9999|    Gerard Walraeven|Wout Wagtmanspad ...| 51.810100|  5.782957|Dit kunstwerk is ...|             null|                null|  

In [ ]:
spark.sql("select * from kunst where bouwjaar is null")
  .show(10)

+--------------------+--------+----------+-------+--------+---------+------------+--------+----+----+
|                naam|bouwjaar|kunstenaar|locatie|latitude|longitude|omschrijving|eigendom|bron| url|
+--------------------+--------+----------+-------+--------+---------+------------+--------+----+----+
|Het reli?f hoort ...|    null|      null|   null|    null|     null|        null|    null|null|null|
|In het midden van...|    null|      null|   null|    null|     null|        null|    null|null|null|
|De volgende tekst...|    null|      null|   null|    null|     null|        null|    null|null|null|
|Aenschouwer van t...|    null|      null|   null|    null|     null|        null|    null|null|null|
|Wilt doch om Chri...|    null|      null|   null|    null|     null|        null|    null|null|null|
|Gedachtlich syn g...|    null|      null|   null|    null|     null|        null|    null|null|null|
|Want sulcx is aen...|    null|      null|   null|    null|     null|        null|

### Art on the map

To continue, let us first create an artworks dataset that is relatively clean, removing tuples that resulted from parsing errors and/or those with missing values.

Can we then find out which quarters have the most artworks?
Perhaps we can even identify the development of the city over time using the dates of these artworks?

To answer these questions, we need to join the dataset with addresses with the one with the artworks.
The best link between the two datasets seems to be the coordinates. Let's see how to exploit the location fields to answer our questions.

In [ ]:
val ks = spark.sql("select * from kunst where (latitude is not null and longitude is not null) and bouwjaar < 9999")

ks: org.apache.spark.sql.DataFrame = [naam: string, bouwjaar: string ... 8 more fields]


In [ ]:
ks.describe()

res61: org.apache.spark.sql.DataFrame = [summary: string, naam: string ... 9 more fields]


In [ ]:
val kos = ks
  .withColumn("latitude", translate(ks.col("latitude"), ",", ".").cast("float"))
  .withColumn("longitude", translate(ks.col("longitude"), ",", ".").cast("float"))
  .withColumn("bouwjaar", col("bouwjaar").cast("int"))
  .select("naam", "locatie", "latitude", "longitude", "bouwjaar", "url")

kos: org.apache.spark.sql.DataFrame = [naam: string, locatie: string ... 4 more fields]


In [ ]:
kos.createOrReplaceTempView("kos")
kos.printSchema

root
 |-- naam: string (nullable = true)
 |-- locatie: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- bouwjaar: integer (nullable = true)
 |-- url: string (nullable = true)



In [ ]:
kos.show(5, false)

+----------------------------------------------+-------------------------------------------+---------+---------+--------+----------------------------------------------+
|naam                                          |locatie                                    |latitude |longitude|bouwjaar|url                                           |
+----------------------------------------------+-------------------------------------------+---------+---------+--------+----------------------------------------------+
|Ingangspoort van het stadhuis                 |Burchtstraat 20                            |51.84708 |5.865753 |1554    |null                                          |
|Reli?f                                        |Begijnenstraat 29                          |51.84851 |5.8605824|1618    |null                                          |
|Poort van het Oude Weeshuis                   |Gebroeders van Limburgplein                |51.84726 |5.859645 |1640    |http://www.nijmegen.nl/kos/kunstwe

In [ ]:
// Metadata from the catalogue
spark.catalog.listDatabases.show(false)
spark.catalog.listTables.show(false)

+-------+----------------+--------------------------------+
|name   |description     |locationUri                     |
+-------+----------------+--------------------------------+
|default|default database|file:/opt/docker/spark-warehouse|
+-------+----------------+--------------------------------+

+---------+--------+-----------+---------+-----------+
|name     |database|description|tableType|isTemporary|
+---------+--------+-----------+---------+-----------+
|addresses|null    |null       |TEMPORARY|true       |
|kos      |null    |null       |TEMPORARY|true       |
|kunst    |null    |null       |TEMPORARY|true       |
|kunstxy  |null    |null       |TEMPORARY|true       |
+---------+--------+-----------+---------+-----------+



#### Coordinates, coordinates

Both datasets include (x,y) locations, but they are in different coordinate systems. BAG uses an official Dutch system known as RD New, from "Rijksdriehoeksco??rdinaten" (~ "national triangle coordinates").

The other dataset uses (lat,lon) coordinates to show artworks on the map, see e.g.:
["de pleinenroute"](http://www.nijmegen.nl/kos/kunstroute.aspx?id=1) (Route of the squares) 

Luckily, we are not the first who need to convert values between coordinate systems - I found the following "easy-to-use" (compared to geo-informatics alternatives) 
Java library:
[Coordinate Transformation Suite (CTS)](https://github.com/orbisgis/cts). 

_(We already loaded this library using the `:dp` directive at the top of the notebook.)_

The following snippet of Java/Scala code sets up the library to transform map coordinates to RD. To use an external library in Spark, all the objects must be serializable; as they are shipped to the worker nodes. Here, this is achieved by using the `@transient` directive to instruct Scala not to include this part of the
object in the serialization, in combination with checking for `null` values when using these variables.

More information, if you want to dig deeper:
* RD New, or "Amersfoort": [Wikipedia entry](https://nl.wikipedia.org/wiki/Rijksdriehoeksco%C3%B6rdinaten)
* The transformation: http://pdok-ngr.readthedocs.io/handleidingen.html#coord-trans
* Serialization and `object` vs. `class`: http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark
* Role of "annotation" `@transient`: http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/

_Understanding all the details of the `CT` class and its inner workings is not necessary to complete assignment 3._

In [ ]:
object CT extends Serializable {
  
  import org.cts.CRSFactory;
  import org.cts.crs.GeodeticCRS;
  import org.cts.registry.EPSGRegistry;
  import org.cts.op.CoordinateOperationFactory;
  import org.cts.op.CoordinateOperation;

  // global variables to keep state for transformations
  @transient private var xy2latlonOp : CoordinateOperation = null;
  @transient private var latlon2xyOp : CoordinateOperation = null;

  // Create the coordinate transformation functions to convert from RD New to WGS:84 and vice versa
  def initTransforms : (CoordinateOperation, CoordinateOperation) = {
    
    // Create a new CRSFactory, a necessary element to create a CRS without defining one by one all its components
    val cRSFactory = new CRSFactory();

    // Add the appropriate registry to the CRSFactory's registry manager. Here the EPSG registry is used.
    val registryManager = cRSFactory.getRegistryManager();
    registryManager.addRegistry(new EPSGRegistry());

    // CTS will read the EPSG registry seeking the 4326 code, when it finds it,
    // it will create a CoordinateReferenceSystem using the parameters found in the registry.
    val crs1 : GeodeticCRS = (cRSFactory.getCRS("EPSG:28992")).asInstanceOf[GeodeticCRS];
    val crs2 : GeodeticCRS = (cRSFactory.getCRS("EPSG:4326") ).asInstanceOf[GeodeticCRS];
    
    // Transformation (x,y) -> (lon,lat)
    val xy2latlonOps = CoordinateOperationFactory.createCoordinateOperations(crs1,crs2);
    val xy2latlon = xy2latlonOps.iterator().next(); //get(0);
    
    val latlon2xyOps = CoordinateOperationFactory.createCoordinateOperations(crs2,crs1);
    val latlon2xy = latlon2xyOps.iterator().next(); //get(0);
    
    (xy2latlon, latlon2xy)
  }

  // Encapsulate private transient variable (for serializability of the object)
  def getXYOp : CoordinateOperation = {
    if (xy2latlonOp == null){
      val ts = initTransforms
      xy2latlonOp = ts._1
      latlon2xyOp = ts._2
    }
    xy2latlonOp
  }

  // Encapsulate private transient variable (for serializability of the object)
  def getLatLonOp : CoordinateOperation = {
    if (latlon2xyOp == null){
      val ts = initTransforms
      xy2latlonOp = ts._1
      latlon2xyOp = ts._2
    }
    latlon2xyOp
  }
  
  // Use the library's transformation function to convert the coordinates
  def transformXY(x:Float, y:Float) : (Float, Float) = {   
    // Note: easily confused, (lat,lon) <-> (y,x)
    val lonlat = this.getXYOp.transform(Array(x.toDouble, y.toDouble));
    return ( lonlat(1).toFloat, lonlat(0).toFloat)
  }
  
  // Use the library's transformation function to convert the coordinates
  def transformLatLon(lat:Float, lon:Float) : (Float, Float) = {
    // Note: easily confused, (lat,lon) <-> (y,x)
    val xy = this.getLatLonOp.transform(Array(lon.toDouble, lat.toDouble));
    return ( xy(0).toFloat, xy(1).toFloat)
  }
}

defined object CT


Using the transformation from RD New to WGS:84 (the latitude/longitude pairs used in google maps and open streetmap), we can now plot our BAG data on a map.

Consider the following example, where we take a sample of points from the address data that corresponds to _Toernooiveld_ and put these on the map of Nijmegen, using the widgets provided by spark notebook.

In [ ]:
val txyudf = udf( CT.transformXY _ )

txyudf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,StructType(StructField(_1,FloatType,false), StructField(_2,FloatType,false)),Some(List(FloatType, FloatType)))


In [ ]:
// Register the transformation function as UDF and apply it to the BAG dataframe
val ta = addrFloats.withColumn("latlon", txyudf($"x", $"y"))
ta.show(5)

+--------------------+---------+---------+----------+--------------------+
|              street|        x|        y|   quarter|              latlon|
+--------------------+---------+---------+----------+--------------------+
|Dr. Claas Noordui...|188670.94|428403.16|Hunnerberg|[51.843193, 5.875...|
|Dr. Claas Noordui...| 188679.6| 428375.1|Hunnerberg|[51.84294, 5.875935]|
|Dr. Claas Noordui...|188685.73|428373.06|Hunnerberg|[51.842922, 5.876...|
|Dr. Claas Noordui...|188692.36|428389.72|Hunnerberg|[51.84307, 5.876122]|
|Dr. Claas Noordui...|188699.69| 428403.5|Hunnerberg|[51.843193, 5.876...|
+--------------------+---------+---------+----------+--------------------+
only showing top 5 rows

ta: org.apache.spark.sql.DataFrame = [street: string, x: float ... 3 more fields]


In [ ]:
val ds = ta.filter('street === "Toernooiveld").select('street, 'x, 'y, 'latlon .getField("_1") as "lat", 'latlon .getField("_2") as "lon")
ds.show(10)

+------------+---------+---------+---------+---------+
|      street|        x|        y|      lat|      lon|
+------------+---------+---------+---------+---------+
|Toernooiveld| 188226.5| 426080.1|51.822342|5.8691397|
|Toernooiveld|188207.22|425981.78|51.821457|5.8688507|
|Toernooiveld| 188179.1|426072.34|51.822273| 5.868451|
|Toernooiveld|188190.48|426339.44|51.824673|5.8686423|
|Toernooiveld|188190.48|426339.44|51.824673|5.8686423|
|Toernooiveld|188207.19| 426401.2|51.825226|5.8688903|
|Toernooiveld|188204.88| 426401.3| 51.82523| 5.868857|
|Toernooiveld|188337.97|426131.72|51.822796|5.8707614|
|Toernooiveld|188504.58|426120.12| 51.82268|5.8731766|
|Toernooiveld|188429.06|426198.25| 51.82339| 5.872089|
+------------+---------+---------+---------+---------+
only showing top 10 rows

ds: org.apache.spark.sql.DataFrame = [street: string, x: float ... 3 more fields]


In [ ]:
ds.select('lat,'lon).show(5)

+---------+---------+
|      lat|      lon|
+---------+---------+
|51.822342|5.8691397|
|51.821457|5.8688507|
|51.822273| 5.868451|
|51.824673|5.8686423|
|51.824673|5.8686423|
+---------+---------+
only showing top 5 rows



In [ ]:
val w = GeoPointsChart(ds.select('lat, 'lon))
w

w: notebook.front.widgets.charts.GeoPointsChart[org.apache.spark.sql.DataFrame] = <GeoPointsChart widget>
res89: notebook.front.widgets.charts.GeoPointsChart[org.apache.spark.sql.DataFrame] = <GeoPointsChart widget>


In [ ]:
// Cell left empty on purpose, to encourage you to plot your own street data on the map!



After all this fun with the open address data, we'd almost forget that we set out to join the two datasets!

Let's switch back to SQL, and use the artworks as a starting point.

__Q:__ _Why would you apply the transformation on the artworks dataset to join the result against the addresses, instead of vice versa?_

In [ ]:
spark.udf.register("transformLatLon", (lat:Float, lon:Float) => CT.transformLatLon(lat,lon))

res91: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,StructType(StructField(_1,FloatType,false), StructField(_2,FloatType,false)),Some(List(FloatType, FloatType)))


In [ ]:
val kosxy = spark.sql("select naam, bouwjaar, transformLatLon(latitude, longitude) as XY from kos")
kosxy.createOrReplaceTempView("kosxy")

kosxy: org.apache.spark.sql.DataFrame = [naam: string, bouwjaar: int ... 1 more field]


In [ ]:
val kosquarter = spark.sql(
  "select distinct naam, quarter, min(bouwjaar) as jaar from kosxy, addresses " +
  "where abs(XY._1 - x) < 10.0 and abs(XY._2 - y) < 10.0 " +
  "group by naam, quarter "
).cache()

kosquarter: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [naam: string, quarter: string ... 1 more field]


In [ ]:
kosquarter.show(5, false)

+-------------------------------+------------+----+
|naam                           |quarter     |jaar|
+-------------------------------+------------+----+
|Maria, Middelares              |Galgenveld  |1955|
|Spoorwegmonument Nijmegen-Kleve|Benedenstad |1884|
|Petrus Canisius                |Stadscentrum|1927|
|Madonna                        |Hazenkamp   |1909|
|Objecten                       |Heijendaal  |1987|
+-------------------------------+------------+----+
only showing top 5 rows



Let us now answer the question we started out with initially: can we infer the growth of the city of Nijmegen through the years from the years in which the artworks were created?

In [ ]:
// A nice example where SQL is a good choice to answer the information need:
kosquarter.createOrReplaceTempView("kosquarter")
spark.sql("select distinct quarter, min(jaar) as jaar from kosquarter group by quarter order by jaar").show(20, false)

+--------------+----+
|quarter       |jaar|
+--------------+----+
|Stadscentrum  |1554|
|Benedenstad   |1618|
|Bottendaal    |1900|
|Altrade       |1904|
|Hazenkamp     |1909|
|Hees          |1922|
|Hengstdal     |1930|
|Nije Veld     |1955|
|Galgenveld    |1955|
|Heijendaal    |1959|
|Hunnerberg    |1960|
|Meijhorst     |1970|
|Neerbosch-Oost|1983|
|Wolfskuil     |1985|
|Hatert        |1985|
|Lent          |1990|
|Goffert       |1995|
|Biezen        |2015|
+--------------+----+



A few missing city quarters in this list. Do they really have no artworks, or did we not manage to find their corresponding quarter using the spatial join on coordinates?

Inspecting the artworks not matched up with their quarters is easy, and there are quite a few of those:

In [ ]:
// Artworks not found in the spatial join result:
spark.sql("select naam from kos where naam not in (select naam from kosquarter)").count

res102: Long = 189


__Q:__ _Can you produce the list of quarters that is missing because no artwork has been situated in the quarter?_

__Q:__ _Can you produce a longer list of quarters and their oldest artworks?_

__Q:__ _What are the years associated to artworks not yet matched up with the addresses database? What does this mean for our initial research question?_

In [ ]:
// Cell empty on purpose, try your approach here!
