# Spark notebook for initial processing of raw mobility data

This notebook performs a spatial join of administrative codes onto mobility data points falling within administrative bounds. Spatial joins are famously computationally expensive operations and we do this at a very high level of detail for massive amounts of points. Thus, this is an expensive to run notebook. The benefit is having paid that expense once, you save yourself money going forward: all your data is henceforth highly spatially accurate and analysis can be performed just by looking at the admin codes, without actually manipulating spatial objects.

In [None]:
# installing the python packages using dbutils
dbutils.library.installPyPI("pandas",'1.0.4')
dbutils.library.installPyPI("numpy",'1.19')
dbutils.library.installPyPI("geopandas")
dbutils.library.installPyPI("descartes")
dbutils.library.installPyPI("fiona")
dbutils.library.installPyPI("shapely")
dbutils.library.installPyPI("pyproj")
dbutils.library.installPyPI("matplotlib")
dbutils.library.installPyPI("pyspark")
dbutils.library.installPyPI("geospark")

# importing the required python packages
import time
import datetime

import pandas as pd
import geopandas as gp

from pyspark.sql import SparkSession

from geospark.register import upload_jars
from geospark.register import GeoSparkRegistrator

upload_jars() # necessary to load in GeoSpark libraries manually installed to the server

spark = SparkSession.builder.getOrCreate()

GeoSparkRegistrator.registerAll(spark)

In [None]:
# import a more extensive list of pySpark and GeoSpark packages
import warnings
warnings.filterwarnings("ignore")

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as fs
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.functions import count
from pyspark.sql.functions import col, countDistinct
from pyspark.sql.functions import broadcast
from pyspark import SparkContext

import pylab as plt
from pyspark.sql.functions import lag
from pyspark.sql.window import Window
from pyspark.sql.functions import acos, cos, sin, lit, toRadians

import geospark
from geospark.register import GeoSparkRegistrator
from geospark.utils import GeoSparkKryoRegistrator, KryoSerializer
from geospark.register import upload_jars
from geospark.core.formatMapper.shapefileParser import ShapefileReader
from geospark.core import SpatialRDD

In [None]:
# These help optimize the operations of Spark
# Reference for partitions: https://stackoverflow.com/questions/35800795/number-of-partitions-in-rdd-and-performance-in-spark and https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4
# Reference for memory / ResultSize limits: https://stackoverflow.com/questions/47996396/total-size-of-serialized-results-of-16-tasks-1048-5-mb-is-bigger-than-spark-dr

spark.conf.set("spark.sql.shuffle.partitions", 5000) # above 2000 it uses a different process. Setting to 500 so ( ( 56 / 8) * 120) / 5000) = 168 MB per partition
spark.conf.set("spark.network.timeout", 10000)
spark.conf.set("spark.driver.memory", 0)
spark.conf.set("spark.driver.maxResultSize", '16g')
spark.conf.set("spark.serializer", KryoSerializer.getName)
# spark/conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrator", GeoSparkKryoRegistrator.getName)
spark.conf.set("geospark.global.index","true") # test
spark.conf.set("geospark.join.gridtype","kdbtree")
 
spark.conf.set("spark.sql.execution.arrow.enabled", "false")


In [None]:
%scala
// import various GeoSpark libraries. 
// Full set of libraries can be tediously found here: https://github.com/DataSystemsLab/GeoSpark/tree/master/core/src/main/java/org/datasyslab/geospark

import com.vividsolutions.jts.geom.{Coordinate, Geometry, GeometryFactory}
import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader
import org.datasyslab.geospark.spatialRDD.SpatialRDD
import org.datasyslab.geospark.enums.{GridType,IndexType}
import org.datasyslab.geospark.spatialPartitioning
import org.datasyslab.geospark.spatialOperator.{JoinQuery}
import org.datasyslab.geosparksql.utils.{Adapter, GeoSparkSQLRegistrator}
GeoSparkSQLRegistrator.registerAll(sqlContext)

### One time -- load in administrative unit shapefiles being used to join codes

In [None]:
# # Only need to do this one time and save out the geopackage to a global table

# # read admin shapefiles from the GeoPackage
# admin1 = gp.read_file("/dbfs/mnt/CUBEIQ/esapv/India/India_Administrative_Boundaries.gpkg",layer="Admin1")
# admin2 = gp.read_file("/dbfs/mnt/CUBEIQ/esapv/India/India_Administrative_Boundaries.gpkg",layer="Admin2")
# # admin3 = gp.read_file("/dbfs/mnt/CUBEIQ/esapv/India/India_Administrative_Boundaries.gpkg",layer="Admin3")
# admin5 = gp.read_file("/dbfs/mnt/CUBEIQ/esapv/India/India_Administrative_Boundaries.gpkg",layer="Admin5_TownVillageWard_FullCodes")

# # Enable Arrow-based columnar data transfer for speeding up this process
# # Interestingly this operation can NOT make use of additional workers, which is why it takes over an hour to convert Admin3 and admin5 to dataframes. Geopandas is not at all optimized for distributed computing, basically.

# spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# # Creating spark dataframes from the geopandas GeoDataFrames

# admin1 = spark.createDataFrame(
#     admin1[['L1_CODE', 'geometry']]
#   )

# admin2 = spark.createDataFrame(
#     admin2[['L1_CODE', 'L2_CODE', 'geometry']]
#   )

# admin5 = spark.createDataFrame(
#     admin5[['L1_CODE', 'L2_CODE', 'L3_CODE','L4_CODE', 'geo_id', 'geometry']]
#   )

# admin1.write.saveAsTable("IN_adm1")
# admin2.write.saveAsTable("IN_adm2")
# admin5.write.saveAsTable("IN_adm5")

### Load and process data

In [None]:
# load the whole vendor's data into a spark dataframe -- throughout this notebook we are using Veraset as an example
sdf = sqlContext.sql("SELECT * FROM veraset_1 WHERE veraset_1.country = 'IN'")

# register sdf spark dataframe as sdf_ tempview
sdf.createOrReplaceTempView('sdf_')

# delta out path variable

out_delta_path = "/mnt/CUBEIQ/esapv/India/delta_veraset"
filter_range = -60

# converting the long and lat columns into Point(X, Y) column using geospark

points = spark.sql(f"SELECT device_id, \
                           timestamp, \
                           to_date(from_unixtime(timestamp,'yyyy-MM-dd')) as date, \
                           ST_Point(CAST(lon AS Decimal(24,20)),CAST(lat AS Decimal(24,20))) AS geometry \
                   FROM Veraset_1 \
                   WHERE Veraset_1.country = 'IN' AND timestamp > unix_timestamp(date_add(current_date(), {filter_range})) ") # loads within current date - filter range
 

In [None]:
# Load in the geospatial administrative files directly from a global table
admin5 = sqlContext.sql("SELECT * FROM IN_adm5")

In [None]:
# now register spark dataframes as tables
sqlContext.registerDataFrameAsTable(points, "points")
sqlContext.registerDataFrameAsTable(admin5, "admin5_tbl")

In [None]:
# here data can be filtered using some date range; this is optional. Cuts down on processing time and keeps it from crashing for truly massive (India scale) tasks
from_ = "01/01/2021"
to_ = "15/02/2021"

from_strp = time.mktime(datetime.datetime.strptime(from_, "%d/%m/%Y").timetuple())
to_strp = time.mktime(datetime.datetime.strptime(to_, "%d/%m/%Y").timetuple())

timestamps = (from_strp, to_strp)
points1 = points.where(col('timestamp').between(*timestamps))

# now register this as a table
sqlContext.registerDataFrameAsTable(points1, "pts_filter_tbl")

### Spatial join and export to delta

In [None]:
# spatial join for adm5
# This is a bit brute force and computationally expensive. A more refined approach would use H3. One for the future.

intersect_query_adm5 = """
  SELECT 
        s.L1_CODE as adm1_code, 
        s.L2_CODE as adm2_code, 
        s.L3_CODE as adm3_code, 
        s.L4_CODE as adm4_code,
        s.geo_id as adm5_code, 
        p.device_id, 
        p.timestamp, 
        p.date,
        p.geometry 
  FROM pts_filter_tbl AS p, admin5_tbl AS s 
  WHERE ST_Intersects(p.geometry, s.geometry)
"""

spatial_join_result_final = spark.sql(intersect_query_adm5)

In [None]:
# Exporting spatial join results as a delta table we can access at any time
# Partitioning by date because most queries are by date and file sizes are roughly the same, reducing skewness across dates when accessing the data. Less skewness = less processing time = less money spent.
# Based off here https://docs.databricks.com/delta/intro-notebooks.html

spatial_join_result_final.write.format("delta")\
                                .mode("overwrite")\
                                .option("replaceWhere", f"date >= {from_} AND date <= {to_}") \
                                .partitionBy("date")\
                                .save(out_delta_path)