# Exploration of geo data with Spark SQL

Create the Spark Context:

In [ ]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Spark SQL basic example")
                .config("dfs.client.use.datanode.hostname", "true")
                .config("yarn.resourcemanager.address", "localhost").getOrCreate()

// This configuration is used to run on docker. If running on the cloud or other server, you can change to:
// val spark = SparkSession.builder().appName("Spark SQL basic example").getOrCreate()


Read a local data file with geo data and create a Data Frame:

In [ ]:
val df = spark.read.option("header","true").csv("/opt/docker/notebooks/spark-sql-demo/example-database.csv").toDF

Show the data retrieved from the file:

In [ ]:
df.show

If you want to see the whole columns, show the results like this:

In [ ]:
df.show(false)

When we read the file, we let Spark SQL to infer the schema in the file. Let's see what it got:

In [ ]:
df.printSchema

Note that latitude and longitude were inferred as string, although they are numeric types. To solve this, let's create our own shcema

In [ ]:
import org.apache.spark.sql.types._

val schema = StructType(Array(
    StructField("ad_id",StringType,true),
    StructField("utc_timestamp",LongType,true),
    StructField("id_type",StringType,true),
    StructField("latitude",DoubleType,true),
    StructField("longitude",DoubleType,true),
    StructField("horizontal_accuracy",DoubleType,true),
    StructField("geo_hash",StringType,true)           
    )
)

And now let's read the file again, but using our schema this time:

In [ ]:
val  records = spark.read.schema(schema).option("header","true").csv("/opt/docker/notebooks/spark-sql-demo/example-database.csv")

And let's see how is the schema now:

In [ ]:
records.printSchema

And how the data is printed:

In [ ]:
records.show()

We can read data from several other data sources. For example, let's read a file from HDFS:

In [ ]:
val fromHDFS = spark.read.schema(schema).option("header","true")
          .csv("hdfs://namenode:9000/spark-sql-demo/example-database-on-hdfs.csv").toDF

In [ ]:
fromHDFS.show()

We can also read data from Amazon S3, but in this case we need to use external libraries.

If you want use external libraries with the spark-shell, you just have to start it this way:

spark-shell --jars /jar/aws-java-sdk-1.7.4.jar,/jar/hadoop-aws-2.7.1.jar

With notebooks, however, you have to edit the Notebook metadata and add the external dependencies. If they can be retrieved from a Maven repository, the notebook will automatically download it.

To edit the notebook metadata, go to Edit/Edit Notebook Metadata. Look for the customDeps section and you will see that we already have our dependencies there:

"customDeps": [
    "com.amazonaws:aws-java-sdk:1.7.4",
    "org.apache.hadoop:hadoop-aws:2.7.1"
  ],
  
To read files from a private S3 repository, you need the S3 credentials. A good practice is store the credentials as environment variables, so we don't expose them. Here is how to access external variables:

`val S3_KEY=sys.env("S3_KEY")
`val S3_SECRET=sys.env("S3_SECRET")

If you are running on our docker demo, you will notice we read a file at ~/.notebooks/variables.env when starting the notebook (look at the docker/docker-compose.yml file). You can add your environment variables there and use them in the notebook.

Now, let's read the data form S3:

`sc.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
`sc.hadoopConfiguration.set("fs.s3a.access.key", S3_KEY)
`sc.hadoopConfiguration.set("fs.s3a.secret.key", S3_SECRET)

`val csv = spark.read.format("com.databricks.spark.csv").option("delimiter", " ").load("s3a:/temp/geodata.log.gz")

Note that we can read compressed files and also configure custom delimiters.

Once we have the data, we can explore it. Let's start by doing a simple select query:

In [ ]:
records.filter("ad_id = 'd8df62d20b0538dedc173fbab140bbfa3c075e73'").show(false)

We can also sort data:

In [ ]:
records.sort(asc("ad_id")).show()

With Spark Notebooks, we can use Java/Scala libraries and even our own Java/Scala code. 

In our git repo, you will find a custom library we created for testing. It was compiled and added to the ../data/notebook-repository folder. This folder was mapped in our docker-compose.yml to the .m2 local repository used by Spark notebook. So, since this library is not available on the Maven central repository, we can find the library in the local disk.

We added our custom library to the Notebook Meta Data and now we can execute our custom code:

In [ ]:
com.tailtarget.sparksqlgeohash.Util.tellTime()

We can also register our own functions and use them in our queries. For example, in our sample code we have the following class:

`public class SparkGeoHash implements UDF3<Double, Double, Integer, String> {

    public String call(Double latitude, Double longitude, Integer numberOfCharacters){

        return GeoHash.withCharacterPrecision(latitude, longitude, numberOfCharacters).toBase32();
    }
}`

This code shows how to create a User Defined Function. Our sample function received a latitute and longitude pair and returns the corresponding geohash with the requested number of character.

You can register a function like this:


In [ ]:
import org.apache.spark.sql.functions.udf

val geoHash = new com.tailtarget.sparksqlgeohash.udf.SparkGeoHash();
spark.udf.register("geoHash", geoHash, DataTypes.StringType);

Now we can use our function in our queries:

In [ ]:
val recordsWithFunction = records
                       .withColumn("generatedGeoHash", callUDF("geoHash", col("latitude"), col("longitude"), lit(12)))
                       .select(col("ad_id"), col("latitude"),col("longitude"),col("geo_hash"),col("generatedGeoHash"))

recordsWithFunction.show()

You can also register your common functions by code, like this:

`public class Util {

    public static void registerUDFS(SparkSession context) {
    	context.udf().register("geoHash", new SparkGeoHash(), DataTypes.StringType);
    }
`}

And then, you call your method:

In [ ]:
com.tailtarget.sparksqlgeohash.Util.registerUDFS(spark)

val recordsWithFunction = records
                       .withColumn("generatedGeoHash", 
                                   callUDF("geoHash", col("latitude"), col("longitude"), lit(12)))
                       .select(col("ad_id"), 
                               col("latitude"),
                               col("longitude"),
                               col("geo_hash"),
                               col("generatedGeoHash"))

recordsWithFunction.show()

Let's count devices by a geohash:

In [ ]:
val devicesByGeoHash = records
                       .withColumn("generatedGeoHash", 
                                   callUDF("geoHash", col("latitude"), col("longitude"), lit(2)))
                       .select(col("ad_id"), 
                               col("generatedGeoHash"))
                       .distinct().groupBy("generatedGeoHash").count().sort(desc("count"))
devicesByGeoHash.show()

+----------------+-----+
|generatedGeoHash|count|
+----------------+-----+
|              6g| 2503|
|              75|  907|
|              7h|  813|
|              7n|  763|
|              6u|  734|
|              6v|  515|
|              7p|  427|
|              6f|  422|
|              7j|  385|
|              6z|  223|
|              6x|  125|
|              6y|   91|
|              6w|   47|
|              6t|   34|
|              d8|   31|
|              db|   23|
|              6q|   21|
|              6s|    5|
|              6r|    4|
|              6d|    4|
+----------------+-----+
only showing top 20 rows

devicesByGeoHash: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [generatedGeoHash: string, count: bigint]


Ploting results a bar chart:

In [ ]:
devicesByGeoHash.createOrReplaceTempView("devicesByGeoHash")

BarChart(devicesByGeoHash, fields=Some{("generatedGeoHash", "count")})

res91: notebook.front.widgets.charts.BarChart[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = <BarChart widget>


Plotting data as a map:

In [ ]:
recods.createOrReplaceTempView("records")

val grupedLocation = spark.sql(
  "SELECT first(latitude) as latitude, first(longitude) as longitude, first(geo_hash), first(substring(geo_hash, 0, 2)) " +  
  "FROM records group by substring(geo_hash, 0, 3)");

GeoPointsChart(grupedLocation,latLonFields=Some{("latitude","longitude")})

grupedLocation: org.apache.spark.sql.DataFrame = [latitude: double, longitude: double ... 2 more fields]
res86: notebook.front.widgets.charts.GeoPointsChart[org.apache.spark.sql.DataFrame] = <GeoPointsChart widget>


To improve performance, you can persist temporary results. Compare the time to compute the following two equivalent code snippets:

In [ ]:

val devicesByGeoHash = records
                       .withColumn("generatedGeoHash", 
                                   callUDF("geoHash", col("latitude"), col("longitude"), lit(2)))
                       .select(col("ad_id"), 
                               col("generatedGeoHash"))
                       .distinct().groupBy("generatedGeoHash").count().sort(desc("count"))

val moreThan1000 = devicesByGeoHash.filter($"count" > 1000).count();
val between500And1000 = devicesByGeoHash.filter($"count" > 500 && $"count" < 1000).count();
val lessThan500 = devicesByGeoHash.filter($"count" < 500).count();


devicesByGeoHash: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [generatedGeoHash: string, count: bigint]
moreThan1000: Long = 1
between500And1000: Long = 5
lessThan500: Long = 15


In [ ]:
val devicesByGeoHash = records
                       .withColumn("generatedGeoHash", 
                                   callUDF("geoHash", col("latitude"), col("longitude"), lit(2)))
                       .select(col("ad_id"), 
                               col("generatedGeoHash"))
                       .distinct().groupBy("generatedGeoHash").count().sort(desc("count"))

val persisted = devicesByGeoHash.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK)

val moreThan1000 = persisted.filter($"count" > 1000).count();
val between500And1000 = persisted.filter($"count" > 500 && $"count" < 1000).count();
val lessThan500 = persisted.filter($"count" < 500).count();

persisted.unpersist()

devicesByGeoHash: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [generatedGeoHash: string, count: bigint]
persisted: devicesByGeoHash.type = [generatedGeoHash: string, count: bigint]
moreThan1000: Long = 1
between500And1000: Long = 5
lessThan500: Long = 15
res100: persisted.type = [generatedGeoHash: string, count: bigint]
