# We need some libraries and initial parameters...

In [110]:
library(SparkR)
library(leaflet)
library(sp)
options(digits=15)

epsilon = 100
source("pbfe.R")

# Let load some data...

In [64]:
data = read.csv("sample_small.csv")
names(data) = c("ID","lat","lng")
head(data)

ID,lat,lng
1,39.590583,116.271945
2,39.633484,116.72824
3,39.6558133,116.6641916
4,39.659678,116.671149
5,39.6654583,116.3098399
6,39.6728416,116.3086649


# and visualize them in a map...

In [65]:
map = leaflet() %>%
  addTiles() %>%
  addCircleMarkers(lng=data$lng, lat=data$lat,weight=2,fillOpacity=1,color="blue",radius=2)

file = 'map1.html'
htmlwidgets::saveWidget(map, file = file, selfcontained = F)
IRdisplay::display_html(paste("<iframe width=100% height=400 src=' ", file, " ' ","/>"))

https://ec2-35-162-64-98.us-west-2.compute.amazonaws.com:8888/home/ubuntu/notebooks/files/map1.html

# Connecting with Simba and getting a SQLContext...

In [66]:
sc <- sparkR.init("local[*]", "SparkR")
sqlContext <- sparkRSQL.init(sc)

Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context


# Let read the data into Simba and apply a transformation...

In [67]:
dataRDD = SparkR:::textFile(sc,"sample_small.csv")
dataRDD = SparkR:::map(dataRDD, transformCoords)

# Now We create a DataFrame and cache the data...

In [68]:
schema <- structType(structField("id", "double"), structField("lng", "double"), structField("lat", "double"))
points <- createDataFrame(sqlContext, dataRDD, schema = schema)
cache(points)

DataFrame[id:double, lng:double, lat:double]

# Let's have a look...

In [69]:
head(points)
count(points)

id,lng,lat
0,-300629.930290752,4419848.7012437
1,-336418.933794024,4429641.88724925
2,-296625.402561355,4430225.49965138
3,-301876.5474525,4433286.74767265
4,-301232.904159629,4433654.10554999
5,-332247.212576558,4437621.06989079


# Now, We need register a pair of temporal tables...

In [70]:
registerTempTable(points, "p1")
registerTempTable(points, "p2")

# It is time to execute the SQL statement...

```SQL
SELECT
    * 
FROM 
    p1 
DISTANCE JOIN 
    p2
ON 
    POINT(p2.lng, p2.lat) IN CIRCLERANGE(POINT(p1.lng, p1.lat), epsilon)
WHERE 
    p2.id < p1.id
```

In [71]:
sql = paste0("SELECT * FROM p1 DISTANCE JOIN p2 ON POINT(p2.lng, p2.lat) IN CIRCLERANGE(POINT(p1.lng, p1.lat), ",epsilon,") WHERE p2.id < p1.id")
pairs = sql(sqlContext,sql)
head(pairs)
nrow(pairs)

id,lng,lat,id.1,lng.1,lat.1
26,-332354.697252151,4456150.25607556,24,-332269.409834687,4456108.23475632
46,-334242.745346436,4459715.58979402,45,-334252.427014354,4459698.61791281
121,-353770.269435442,4466969.59768473,120,-353766.652493862,4466963.17659202
130,-333308.00077632,4464833.44031216,128,-333297.180249493,4464823.99102774
304,-329311.693033604,4470875.30350448,303,-329325.492285997,4470858.56913779
315,-327600.494406201,4470962.75742948,314,-327656.535980798,4470964.20013813


# Now We need to calculate the disk locations for each pair...

In [73]:
centers <- SparkR:::map(pairs, calculateDisk)
schema <- structType(structField("id1", "double"), structField("id2", "double"), structField("lng1", "double"), structField("lat1", "double"), structField("lng2", "double"), structField("lat2", "double"))
d <- createDataFrame(sqlContext, centers, schema = schema)
head(d)
count(d)

id1,id2,lng1,lat1,lng2,lat2
26,24,-332305.205643173,4456143.14406876,-332318.901443665,4456115.34676312
46,45,-334204.992900705,4459682.80637075,-334290.179460085,4459731.40133608
121,120,-353725.015378929,4466990.85963123,-353811.906550376,4466941.91464552
130,128,-333270.043247426,4464865.98606458,-333335.137778387,4464791.44527532
526,525,-333095.941684751,4473214.22184632,-333093.90595447,4473233.68774871
304,303,-329280.934882795,4470835.88350754,-329356.250436806,4470897.98913473


# Let's collect the data back to LatLong coordinates...

In [74]:
centers_lnglat <- SparkR:::map(centers, transformCenters)
disks <- as.data.frame(createDataFrame(sqlContext,centers_lnglat))
names(disks) = c("id1","id2","lng1","lat1","lng2","lat2")
head(disks)
nrow(disks)

id1,id2,lng1,lat1,lng2,lat2
26,24,116.285885554993,39.8298282965222,116.28576296379,39.829568212028
46,45,116.259531520747,39.8593958326468,116.258488803823,39.8597439691014
121,120,116.025337096455,39.9049404914115,116.024399992141,39.9044193990716
130,128,116.263731245981,39.9063158237837,116.263075584504,39.9055905801566
526,525,116.255128264593,39.9805850682347,116.255126967427,39.9807598381811
304,303,116.302161916191,39.9632009910608,116.301214396641,39.9636789126113


# Let's have a look at the results...

In [75]:
p = sort(unique(c(disks$id1,disks$id2)))
data2 = data[p,]
map = leaflet() %>% setView(lat = 39.990010, lng = 116.317406, zoom = 15) %>% addTiles() %>% 
        addCircles(lng=disks$lng1, lat=disks$lat1, weight=2, fillOpacity=0.25, color="red", radius = epsilon/2) %>%
        addCircles(lng=disks$lng2, lat=disks$lat2, weight=2, fillOpacity=0.25, color="red", radius = epsilon/2) %>%
        addCircleMarkers(lng=data$lng, lat=data$lat, weight=2, fillOpacity=1,radius = 2) %>%
        addCircleMarkers(lng=data2$lng, lat=data2$lat, weight=2, fillOpacity=1, color="purple", radius = 2) %>% 
        addProviderTiles("Esri.WorldImagery", group = "ESRI") %>% 
        addLayersControl(baseGroup = c("OSM(default)", "ESRI"))

file = 'map2.html'
htmlwidgets::saveWidget(map, file = file, selfcontained = F)
IRdisplay::display_html(paste("<iframe width=100% height=400 src=' ", file, " ' ","/>"))

https://ec2-35-162-64-98.us-west-2.compute.amazonaws.com:8888/home/ubuntu/notebooks/files/map2.html

In [76]:
registerTempTable(d, "d")
registerTempTable(points, "p")

In [125]:
sql = paste0("SELECT d.lng1 AS lng, d.lat1 AS lat, id AS id_member FROM d DISTANCE JOIN p ON POINT(p.lng, p.lat) IN CIRCLERANGE(POINT(d.lng1, d.lat1), ",(epsilon/2)+0.01,")")
mdisks = sql(sqlContext,sql)
registerTempTable(mdisks, "m")
sql = "SELECT lng, lat FROM m GROUP BY lng, lat HAVING count(id_member) >= 3"
mdisks1 = sql(sqlContext,sql)

In [126]:
sql = paste0("SELECT d.lng2 AS lng, d.lat2 AS lat, id AS id_member FROM d DISTANCE JOIN p ON POINT(p.lng, p.lat) IN CIRCLERANGE(POINT(d.lng2, d.lat2), ",(epsilon/2)+0.01,")")
mdisks = sql(sqlContext,sql)
registerTempTable(mdisks, "m")
sql = "SELECT lng, lat FROM m GROUP BY lng, lat HAVING count(id_member) >= 3"
mdisks2 = sql(sqlContext,sql)

In [127]:
mdisks = as.data.frame(rbind(mdisks1, mdisks2))
id = seq(1,nrow(mdisks))
mdisks$id = id

coordinates(mdisks) = ~lng+lat
proj4string(mdisks) = mercator
mdisks = spTransform(mdisks, wgs84)
mdisks$lng1 = coordinates(mdisks)[,1]
mdisks$lat1 = coordinates(mdisks)[,2]

In [129]:
map = leaflet() %>% setView(lat = 39.990010, lng = 116.317406, zoom = 15) %>% addTiles() %>% 
        addCircles(lng=mdisks$lng1, lat=mdisks$lat1, weight=2, fillOpacity=0.25, color="blue", radius = epsilon/2) %>%
        addCircleMarkers(lng=data$lng, lat=data$lat, weight=2, fillOpacity=1,radius = 2) %>%
        addCircleMarkers(lng=data2$lng, lat=data2$lat, weight=2, fillOpacity=1, color="purple", radius = 2) %>% 
        addProviderTiles("Esri.WorldImagery", group = "ESRI") %>% 
        addLayersControl(baseGroup = c("OSM(default)", "ESRI"))

file = 'map3.html'
htmlwidgets::saveWidget(map, file = file, selfcontained = F)
IRdisplay::display_html(paste("<iframe width=100% height=400 src=' ", file, " ' ","/>"))

In [137]:
v = c(0,1,2,3,4,5,6,7,8,9)

k = c(7,5,2,9)

prod(is.element(k, v) )

k = c(7,5,20,9)

prod(is.element(k, v) )

In [172]:
m <- as.data.frame(rbind(mdisks1, mdisks2))
m$id = seq(1,nrow(m))
m = createDataFrame(sqlContext, m)
head(m)
count(m)
registerTempTable(m, "m")

head(points)
count(points)

sql = paste0("SELECT m.id AS mid, p.id AS pid FROM m DISTANCE JOIN p ON POINT(p.lng, p.lat) IN CIRCLERANGE (POINT(m.lng, m.lat), ",(epsilon/2)+0.01,")")
t = sql(sqlContext,sql)
head(t, 20)
count(t)

lng,lat,id
-326785.031291999,4474106.72293867,1
-326642.410067071,4471946.67646511,2
-326719.658916002,4471883.24867805,3
-326725.183218763,4471877.3279332,4
-326438.602588559,4471946.88995418,5
-327206.286667153,4475653.52300156,6


id,lng,lat
0,-300629.930290752,4419848.7012437
1,-336418.933794024,4429641.88724925
2,-296625.402561355,4430225.49965138
3,-301876.5474525,4433286.74767265
4,-301232.904159629,4433654.10554999
5,-332247.212576558,4437621.06989079


mid,pid
10,638
10,641
10,645
27,435
35,623
35,627
35,632
45,435
45,455
50,572


In [170]:
library(sqldf)

Loading required package: gsubfn
Loading required package: proto
Loading required package: RSQLite
Loading required package: DBI


In [177]:
t = as.data.frame(t)
g = sqldf("SELECT mid, group_concat(pid) AS pids FROM t GROUP BY mid")
head(g)
nrow(g)

mid,pids
1,"689.0,693.0,691.0"
2,"444.0,451.0,458.0,463.0"
3,"438.0,444.0,443.0,447.0"
4,"438.0,443.0,447.0"
5,"448.0,449.0,460.0"
6,"763.0,764.0,762.0"


In [178]:
j = sqldf("SELECT * FROM g AS g1 JOIN g AS g2 WHERE g1.mid < g2.mid")
head(j)
nrow(j)

mid,pids,mid.1,pids.1
1,"689.0,693.0,691.0",2,"444.0,451.0,458.0,463.0"
1,"689.0,693.0,691.0",3,"438.0,444.0,443.0,447.0"
1,"689.0,693.0,691.0",4,"438.0,443.0,447.0"
1,"689.0,693.0,691.0",5,"448.0,449.0,460.0"
1,"689.0,693.0,691.0",6,"763.0,764.0,762.0"
1,"689.0,693.0,691.0",7,"194.0,195.0,196.0,197.0"
