# Big Data - Final Work - Scala

## Initial configuration for Spark + JVM

In [2]:
%%init_spark
launcher.master = "local[*]"
launcher.driver_memory = '20g'
launcher.executor_memory = '20g'
launcher.verbose = 'true'

In [None]:
%%init_spark
launcher.conf.set("spark.app.name", "scalaXgbTest")
launcher.num_executors = 3
launcher.executor_cores = 7 //launcher.conf.spark.executor.cores = 8
launcher.conf.spark.task.cpus = 6
launcher.conf.set("spark.executor.heartbeatInterval", "6000s")
launcher.conf.set("spark.yarn.scheduler.heartbeat.interval-ms", "10000s")
launcher.conf.set("spark.network.timeout", "10000s")
launcher.conf.set("spark.yarn.executor.memoryOverhead", "8192")
launcher.conf.set("spark.sql.catalogImplementation", "hive")
launcher.jars = ["file://some/jar.jar", "xgboost-maven-0.82/xgboost4j-spark-0.82.jar", "xgboost-maven-0.82/xgboost4j-0.82.jar"]

In [3]:
println(sc.appName)
println(sc.master)

spylon-kernel
local[*]


## General imports

In [4]:
import org.apache.spark.sql.types._      // include the Spark Types to define our schema
import org.apache.spark.sql.functions._  // include the Spark helper functions
import spark.implicits._                 // For implicit conversions like converting RDDs to DataFrames
import org.apache.spark.sql.expressions._

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions._


## DB JSON schema

In [5]:
val location_schema =
    MapType(StringType,
        new StructType()
            .add("accuracy", DoubleType)
            .add("address", StringType)
            .add("altitude", DoubleType)
            .add("country", StringType)
            .add("latitude", DoubleType)
            .add("longitude", DoubleType)
            .add("provider", StringType)
            .add("timestamp", 
             new StructType()
                .add("date", LongType)
                .add("day", LongType)
                .add("hours", LongType)
                .add("minutes", LongType)
                .add("month", LongType)
                .add("nanos", LongType)
                .add("seconds", LongType)
                .add("time", LongType)
                .add("timezoneOffset", LongType)
                .add("year", LongType)
            )
            .add("uid", StringType)
        )

val schema = new StructType()
    .add("locations", location_schema)
    .add("user-locations",
        MapType(StringType, location_schema)
    )
    .add("users",
        MapType(StringType,
            new StructType()
                .add("email", StringType)
                .add("username", StringType)
        )
    )

location_schema: org.apache.spark.sql.types.MapType = MapType(StringType,StructType(StructField(accuracy,DoubleType,true), StructField(address,StringType,true), StructField(altitude,DoubleType,true), StructField(country,StringType,true), StructField(latitude,DoubleType,true), StructField(longitude,DoubleType,true), StructField(provider,StringType,true), StructField(timestamp,StructType(StructField(date,LongType,true), StructField(day,LongType,true), StructField(hours,LongType,true), StructField(minutes,LongType,true), StructField(month,LongType,true), StructField(nanos,LongType,true), StructField(seconds,LongType,true), StructField(time,LongType,true), StructField(timezoneOffset,LongType,true), StructField(year,LongType,true)),true), StructField(uid,StringType,true)),true)
schema: org.a...


## Import JSON DB

In [6]:
val df = spark.read.option("multiline", true).schema(schema).json("trackme-export.json")

df: org.apache.spark.sql.DataFrame = [locations: map<string,struct<accuracy:double,address:string,altitude:double,country:string,latitude:double,longitude:double,provider:string,timestamp:struct<date:bigint,day:bigint,hours:bigint,minutes:bigint,month:bigint,nanos:bigint,seconds:bigint,time:bigint,timezoneOffset:bigint,year:bigint>,uid:string>>, user-locations: map<string,map<string,struct<accuracy:double,address:string,altitude:double,country:string,latitude:double,longitude:double,provider:string,timestamp:struct<date:bigint,day:bigint,hours:bigint,minutes:bigint,month:bigint,nanos:bigint,seconds:bigint,time:bigint,timezoneOffset:bigint,year:bigint>,uid:string>>> ... 1 more field]


In [7]:
df.printSchema()

root
 |-- locations: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- accuracy: double (nullable = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- altitude: double (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- provider: string (nullable = true)
 |    |    |-- timestamp: struct (nullable = true)
 |    |    |    |-- date: long (nullable = true)
 |    |    |    |-- day: long (nullable = true)
 |    |    |    |-- hours: long (nullable = true)
 |    |    |    |-- minutes: long (nullable = true)
 |    |    |    |-- month: long (nullable = true)
 |    |    |    |-- nanos: long (nullable = true)
 |    |    |    |-- seconds: long (nullable = true)
 |    |    |    |-- time: long (nullable = true)
 |    |    |    |-- timezoneOffset: long (nullable = true)
 |    |    |    |-- year:

## Breakdown raw DB into contextual structures

In [8]:
val rawLocationsDF = df.select(explode($"locations") as Seq("timestamp_id", "value"))
val rawUserLocationsDF = df.select(explode($"user-locations") as Seq("uid", "timestamp"))
val rawUsersDF = df.select(explode($"users") as Seq("uid", "user_attr"))

rawLocationsDF: org.apache.spark.sql.DataFrame = [timestamp_id: string, value: struct<accuracy: double, address: string ... 7 more fields>]
rawUserLocationsDF: org.apache.spark.sql.DataFrame = [uid: string, timestamp: map<string,struct<accuracy:double,address:string,altitude:double,country:string,latitude:double,longitude:double,provider:string,timestamp:struct<date:bigint,day:bigint,hours:bigint,minutes:bigint,month:bigint,nanos:bigint,seconds:bigint,time:bigint,timezoneOffset:bigint,year:bigint>,uid:string>>]
rawUsersDF: org.apache.spark.sql.DataFrame = [uid: string, user_attr: struct<email: string, username: string>]


In [9]:
rawUsersDF.printSchema()

root
 |-- uid: string (nullable = false)
 |-- user_attr: struct (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- username: string (nullable = true)



## Function to flatten schema

In [10]:
import org.apache.spark.sql.Column

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)

    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName))
    }
  })
}

import org.apache.spark.sql.Column
flattenSchema: (schema: org.apache.spark.sql.types.StructType, prefix: String)Array[org.apache.spark.sql.Column]


## Flat struct DataFrames

In [11]:
val locationsDF = rawLocationsDF.select(flattenSchema(rawLocationsDF.schema):_*)
val userLocationsDF = rawUserLocationsDF.select(flattenSchema(rawUserLocationsDF.schema):_*)
val usersDF = rawUsersDF.select(flattenSchema(rawUsersDF.schema):_*)

locationsDF: org.apache.spark.sql.DataFrame = [timestamp_id: string, accuracy: double ... 17 more fields]
userLocationsDF: org.apache.spark.sql.DataFrame = [uid: string, timestamp: map<string,struct<accuracy:double,address:string,altitude:double,country:string,latitude:double,longitude:double,provider:string,timestamp:struct<date:bigint,day:bigint,hours:bigint,minutes:bigint,month:bigint,nanos:bigint,seconds:bigint,time:bigint,timezoneOffset:bigint,year:bigint>,uid:string>>]
usersDF: org.apache.spark.sql.DataFrame = [uid: string, email: string ... 1 more field]


In [12]:
locationsDF.printSchema()

root
 |-- timestamp_id: string (nullable = false)
 |-- accuracy: double (nullable = true)
 |-- address: string (nullable = true)
 |-- altitude: double (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- provider: string (nullable = true)
 |-- date: long (nullable = true)
 |-- day: long (nullable = true)
 |-- hours: long (nullable = true)
 |-- minutes: long (nullable = true)
 |-- month: long (nullable = true)
 |-- nanos: long (nullable = true)
 |-- seconds: long (nullable = true)
 |-- time: long (nullable = true)
 |-- timezoneOffset: long (nullable = true)
 |-- year: long (nullable = true)
 |-- uid: string (nullable = true)



In [13]:
usersDF.take(2).foreach(println)

[5Jf44SGWhzZmxsZs7n6KLzrHark1,rodrigomesquita0@gmail.com,rodrigomesquita0]
[BHNpkg1LH2Sna0axjb8pFWDIycD2,vivian.lopesg@gmail.com,vivian.lopesg]


## Create timestamp formatted column

In [14]:
val locationsWithDateDF = locationsDF.withColumn("ts_date", (col("timestamp_id")/1000).cast(TimestampType))

locationsWithDateDF: org.apache.spark.sql.DataFrame = [timestamp_id: string, accuracy: double ... 18 more fields]


## Join with Users DB

In [15]:
val joinExpression = locationsWithDateDF.col("uid") === usersDF.col("uid")
var joinType = "inner"
val locWithDateJoinUserDF = locationsWithDateDF.join(usersDF, joinExpression, joinType).drop(usersDF.col("uid")).orderBy($"timestamp_id")

joinExpression: org.apache.spark.sql.Column = (uid = uid)
joinType: String = inner
locWithDateJoinUserDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [timestamp_id: string, accuracy: double ... 20 more fields]


In [16]:
locWithDateJoinUserDF.select("email", "latitude", "longitude", "ts_date").sample(false, 0.04).take(5).foreach(println)

[wallace.mendes.rj@gmail.com,-22.8381126,-43.2623022,2020-10-07 23:00:28.694]
[wallace.mendes.rj@gmail.com,-22.8381171,-43.2623166,2020-10-07 23:13:12.297]
[wallace.mendes.rj@gmail.com,-22.838115,-43.2623124,2020-10-07 23:39:32.518]
[viniciusmgaspar@gmail.com,-22.9143672,-43.2479284,2020-10-07 23:42:09.072]
[viniciusmgaspar@gmail.com,-22.9143672,-43.2479284,2020-10-07 23:43:09.149]


## Function for distance calculation based on lat/long

### Element version - calculate_distance_elem

In [17]:
import scala.math._

def calculate_distance_elem(lat1:Double, lon1:Double, lat2:Double, lon2:Double):Double = {   
    val earth_radius = 6371e3;           // meters
    val phi1 = lat1 * Pi/180;                  // radians
    val phi2 = lat2 * Pi/180;                  // radians
    val delta_phi = phi2 - phi1;               // radians

    val delta_lampda = (lon2 - lon1) * Pi/180; // radians

    val a = sin(delta_phi/2)*sin(delta_phi/2) + cos(phi1)*cos(phi2)*sin(delta_lampda/2)*sin(delta_lampda/2);
    val c = 2*atan2(sqrt(a), sqrt(1-a));

    val d = earth_radius*c; // meters
    
    return d
}

val calculate_distance_elem_sqlfunc = udf(calculate_distance_elem(_,_,_,_))

import scala.math._
calculate_distance_elem: (lat1: Double, lon1: Double, lat2: Double, lon2: Double)Double
calculate_distance_elem_sqlfunc: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3841/1820714411@38347476,DoubleType,List(Some(class[value[0]: double]), Some(class[value[0]: double]), Some(class[value[0]: double]), Some(class[value[0]: double])),None,false,true)


### Test - calculate_distance_elem

In [None]:
val lat1 = -22.9556473
val lon1 = -43.1881019

val lat2 = -23.9556473
val lon2 = -44.1881019

val dist = calculate_distance_elem(lat1, lon1, lat2, lon2)

assert (dist == 150894.75616346067)

### Column version

In [19]:
import scala.math.Pi
import org.apache.spark.sql.functions._

def calculate_distance_col(lat1:org.apache.spark.sql.Column, lon1:org.apache.spark.sql.Column, lat2:org.apache.spark.sql.Column, lon2:org.apache.spark.sql.Column):org.apache.spark.sql.Column = {   
    val earth_radius = 6371e3;           // meters
    val pi_over_180 = lit(Pi/180);
    val phi1 = lat1 * pi_over_180;                  // radians
    val phi2 = lat2 * pi_over_180;                  // radians
    val delta_phi = phi2 - phi1;               // radians

    val delta_lampda = (lon2 - lon1) * pi_over_180; // radians

    val a = sin(delta_phi/2)*sin(delta_phi/2) + cos(phi1)*cos(phi2)*sin(delta_lampda/2)*sin(delta_lampda/2);
    val c = lit(2)*atan2(sqrt(a), sqrt(lit(1)-a));

    val d = lit(earth_radius)*c; // meters
    
    return d;
}

// val calculate_distance_sqlfunc = udf(calculate_distance(_,_,_,_))

import scala.math.Pi
import org.apache.spark.sql.functions._
calculate_distance_col: (lat1: org.apache.spark.sql.Column, lon1: org.apache.spark.sql.Column, lat2: org.apache.spark.sql.Column, lon2: org.apache.spark.sql.Column)org.apache.spark.sql.Column


## Calculate distance

In [18]:
import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.expressions.Window


In [20]:
// val lat_col = Window.partitionBy("latitude").orderBy($"timestamp_id".asc)
// val lon_col = Window.partitionBy("longitude").orderBy($"timestamp_id".asc)
val lat_col = Window.orderBy($"timestamp_id".asc)
val lon_col = Window.orderBy($"timestamp_id".asc)

val lat2 = col("latitude")
val lat1 = lag("latitude", 1).over(lat_col)
// val lat1 = when((lag("latitude", 1).over(lat_col)).isNotNull, lag("latitude", 1).over(lat_col)).otherwise(0)

val lon2 = col("longitude")
val lon1 = lag("longitude", 1).over(lat_col)
// val lon1 = when((lag("longitude", 1).over(lon_col)).isNotNull, lag("longitude", 1).over(lon_col)).otherwise(0)

lat_col: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@65ebf5ba
lon_col: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7294fe7e
lat2: org.apache.spark.sql.Column = latitude
lat1: org.apache.spark.sql.Column = lag(latitude, 1, NULL) OVER (ORDER BY timestamp_id ASC NULLS FIRST unspecifiedframe$())
lon2: org.apache.spark.sql.Column = longitude
lon1: org.apache.spark.sql.Column = lag(longitude, 1, NULL) OVER (ORDER BY timestamp_id ASC NULLS FIRST unspecifiedframe$())


In [21]:
val emails = "viniciusmgaspar@gmail.com"

val joinExpression = locationsWithDateDF.col("uid") === usersDF.col("uid")
var joinType = "inner"
val locationsWithDatePerUserDF = locationsWithDateDF.join(usersDF, joinExpression, joinType).drop(usersDF.col("uid")).filter($"Email" === emails).orderBy($"timestamp_id")

val locDatePerUserDistDF = locationsWithDatePerUserDF.withColumn("distance", when(calculate_distance_col(lat1, lon1, lat2, lon2).isNotNull,calculate_distance_col(lat1, lon1, lat2, lon2)).otherwise(0.0))

emails: String = viniciusmgaspar@gmail.com
joinExpression: org.apache.spark.sql.Column = (uid = uid)
joinType: String = inner
locationsWithDatePerUserDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [timestamp_id: string, accuracy: double ... 20 more fields]
locDatePerUserDistDF: org.apache.spark.sql.DataFrame = [timestamp_id: string, accuracy: double ... 21 more fields]


# Show walked distance

In [22]:
locDatePerUserDistDF.orderBy($"ts_date".asc)
.select("username","ts_date","latitude","longitude","distance")
    .show(false)

+---------------+-----------------------+-----------+-----------+------------------+
|username       |ts_date                |latitude   |longitude  |distance          |
+---------------+-----------------------+-----------+-----------+------------------+
|viniciusmgaspar|2020-10-07 23:32:51.945|-22.9143424|-43.2479267|0.0               |
|viniciusmgaspar|2020-10-07 23:34:51.961|-22.9143476|-43.2479225|0.720675492840136 |
|viniciusmgaspar|2020-10-07 23:36:56.689|-22.9143736|-43.2479302|2.9967018357873862|
|viniciusmgaspar|2020-10-07 23:37:57.042|-22.9143736|-43.2479302|0.0               |
|viniciusmgaspar|2020-10-07 23:38:57.116|-22.9143736|-43.2479302|0.0               |
|viniciusmgaspar|2020-10-07 23:40:02.011|-22.914351 |-43.247925 |2.5688213633650445|
|viniciusmgaspar|2020-10-07 23:41:02.089|-22.914351 |-43.247925 |0.0               |
|viniciusmgaspar|2020-10-07 23:42:09.072|-22.9143672|-43.2479284|1.8347079899956038|
|viniciusmgaspar|2020-10-07 23:43:09.149|-22.9143672|-43.2479284|

## Walked distance per day

In [23]:
locDatePerUserDistDF.groupBy($"email",date_format(col("ts_date"),"dd-MM-yyyy").as("date")).sum("distance").orderBy($"email".asc, $"date".asc)
.show(false)

+-------------------------+----------+------------------+
|email                    |date      |sum(distance)     |
+-------------------------+----------+------------------+
|viniciusmgaspar@gmail.com|07-10-2020|27.519468577232455|
|viniciusmgaspar@gmail.com|08-10-2020|27236.48613099252 |
|viniciusmgaspar@gmail.com|09-10-2020|9797.745650086414 |
|viniciusmgaspar@gmail.com|11-10-2020|617.8502728067592 |
|viniciusmgaspar@gmail.com|12-10-2020|2232.8849177640095|
+-------------------------+----------+------------------+



## Usage of reduceByKey to sum up values

In [None]:
// val userReducedDistance = flatLocationsWithDistDF.filte().filter().reduceByKey((v1,v2) => v1 + v2)

## Distance Between Two People

In [None]:
val user1 = "henrique.mageste@gmail.com"
val user2 = "wallace.mendes.rj@gmail.com"
val time_interval_1 = lit("2020-10-07 20:20")
val time_interval_2 = lit("2020-10-07 20:20")

val user1_DF = locWithDateJoinUserDF.filter($"email" === user1).as("user1")
val user2_DF = locWithDateJoinUserDF.filter($"email" === user2).as("user2")

val joinExpression = ( col("user1.date") === user2_DF.col("user2.date") &&  col("user1.hours") === user2_DF.col("user2.hours") && col("user1.minutes") === user2_DF.col("user2.minutes"))
val joinType = "inner"
val c = user1_DF.join(user2_DF, joinExpression, joinType)

// mostrar o resultado do join
c.select(
    col("user1.ts_date")
    ,col("user2.ts_date")
    ,col("user1.hours")
    ,col("user2.hours")
    ,col("user1.minutes")
    ,col("user2.minutes")
    ,col("user1.email")
    ,col("user2.email")
    ,col("user1.longitude")
    ,col("user2.longitude")
    ,col("user1.latitude")
    ,col("user2.latitude")
    ,col("user1.address")
    ,col("user2.address")
    )
    .withColumn("distance_between", calculate_distance_col(col("user1.latitude"), col("user1.longitude"), col("user2.latitude"), col("user2.longitude")) )
    
val c2= c.select(
    col("user1.ts_date")
    ,col("user2.ts_date")
    ,col("user1.hours")
    ,col("user2.hours")
    ,col("user1.minutes")
    ,col("user2.minutes")
    ,col("user1.email")
    ,col("user2.email")
    ,col("user1.longitude")
    ,col("user2.longitude")
    ,col("user1.latitude")
    ,col("user2.latitude")
    ,col("user1.address")
    ,col("user2.address")
    )
    .withColumn("distance_between", calculate_distance_col(col("user1.latitude"), col("user1.longitude"), col("user2.latitude"), col("user2.longitude")) )

In [None]:
// AGREGADO POR DATA
c2.groupBy(col("user1.ts_date"),col("user2.ts_date")).agg(min("distance_between"),max("distance_between")).show()

// AGREGADO POR USUÁRIO    
c2.groupBy(col("user1.email"),col("user2.email")).agg(min("distance_between"),max("distance_between")).show()

In [None]:
val user1 = "henrique.mageste@gmail.com"
val user2 = "viniciusmgaspar@gmail.com"
val time_interval_1 = lit("2020-10-07 20:20")
val time_interval_2 = lit("2020-10-07 20:20")

var joinType = "inner"
val joinExpression = locationsWithDateDF.col("uid") === usersDF.col("uid")

val user1_DF = locationsWithDateDF.join(usersDF, joinExpression, joinType).drop(usersDF.col("uid")).filter($"Email" === user1).orderBy($"timestamp_id")
val user2_DF = locationsWithDateDF.join(usersDF, joinExpression, joinType).drop(usersDF.col("uid")).filter($"Email" === user2).orderBy($"timestamp_id")

val user1_time_interval = user1_DF.filter($"ts_date" >= time_interval_1).filter($"ts_date" <= time_interval_2)
val user2_time_interval = user2_DF.filter($"ts_date" >= time_interval_1).filter($"ts_date" <= time_interval_2)

user1_time_interval.show()
// val user1_lat = user1_time_interval.select("latitude").first()
// val user1_lon = user1_time_interval.select("longitude").first()

// val user2_lat = user2_time_interval.select("latitude").first()
// val user2_lon = user2_time_interval.select("longitude").first()

// val d = calculate_distance_elem(user1_lat.getDouble(0), user1_lon.getDouble(0), user2_lat.getDouble(0), user2_lon.getDouble(0))

// println("distance between user1 and user2: " + d)


## For writing SQL syntax

In [29]:
// Register the DataFrame as a SQL temporary view
locDatePerUserDistDF.createOrReplaceTempView("locationsSQL")

val sqlDF = spark.sql("SELECT * FROM locationsSQL")
sqlDF.show()

+-------------+------------------+--------------------+------------------+-------+-----------+-----------+--------+----+---+-----+-------+-----+---------+-------+-------------+--------------+----+--------------------+--------------------+--------------------+---------------+------------------+
| timestamp_id|          accuracy|             address|          altitude|country|   latitude|  longitude|provider|date|day|hours|minutes|month|    nanos|seconds|         time|timezoneOffset|year|                 uid|             ts_date|               email|       username|          distance|
+-------------+------------------+--------------------+------------------+-------+-----------+-----------+--------+----+---+-----+-------+-----+---------+-------+-------------+--------------+----+--------------------+--------------------+--------------------+---------------+------------------+
|1602124371945|14.199000358581543|R. Tôrres Homem, ...|19.200000762939453| Brasil|-22.9143424|-43.2479267|   fused|

sqlDF: org.apache.spark.sql.DataFrame = [timestamp_id: string, accuracy: double ... 21 more fields]


## DataFrame to Dataset

In [24]:
case class UserLocation(email: String, latitude: Double, longitude: Double, distance: Double)

val locWithDateJoinUserDS = locWithDateJoinUserDF.select("email", "latitude", "longitude", "distance").as[UserLocation]

<console>: 16: error: Unable to find encoder for type UserLocation. An implicit Encoder[UserLocation] is needed to store UserLocation instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.