In [2]:
""" Filter India data

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE
"""

from collections import namedtuple
import logging
import sys

from geopy.distance import great_circle
import pandas as pd
import geopandas as gpd
import numpy as np

from datetime import timedelta, date, datetime
from statistics import *


from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import (
    StructType,
    LongType,
    StructField,
    IntegerType,
    StringType,
    DoubleType,
    TimestampType,
    ArrayType,
    BinaryType
)
import pyspark.sql.functions as F
from math import *
import time
import json
import boto3

from sedona.register import SedonaRegistrator  
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from pyspark.sql.functions import udf
from sedona.utils.adapter import Adapter
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.SpatialRDD import PointRDD, SpatialRDD, CircleRDD
from sedona.sql.types import GeometryType
from sedona.core.enums import GridType, IndexType
from sedona.core.spatialOperator import JoinQueryRaw
from sedona.core.spatialOperator import JoinQuery
from sedona.core.enums import IndexType
from sedona.core.formatMapper.disc_utils import load_spatial_rdd_from_disc, GeoType
from sedona.core.formatMapper import WktReader, GeoJsonReader
# from sedona.spark import *
#sedona = SedonaContext.create(config)


from shapely.wkt import loads as wkt_loads
from shapely.geometry import Point, Polygon, shape
from shapely.ops import transform
import shapely

import s3fs

s3 = s3fs.S3FileSystem(anon=False)

spark = (SparkSession.builder.appName("sedona")
                 .config("spark.serializer", KryoSerializer.getName)          
        .config("spark.kryo.registrator",     
                  SedonaKryoRegistrator.getName)    
         .config("spark.driver.maxResultSize", "3g")
    .getOrCreate() 
        )

# Register Sedona UDTs and UDFs
SedonaRegistrator.registerAll(spark)
spark.sparkContext.addPyFile("s3://ipsos-dvd/scripts/utils.py")

bsdir = "s3://ipsos-dvd/fdd/"
data_dir = bsdir + "data/"
fn = data_dir +  "pings_IN_2021-09-01_2021-09-30/"

#-------
# parameters
crs = "epsg:3857"



def spatialIntersection(pings, poly, build_on_spatial_partitioned_rdd = True, using_index = True, origin_crs="epsg:4326", crs="epsg:4326", transform=True): 
    
    poly_rdd = Adapter.toSpatialRdd(poly, "geometry")

    pings.createOrReplaceTempView("pings")

    # Read Hive table
    if transform:
        pings = spark.sql(
              f"""SELECT ST_Transform(ST_FlipCoordinates(ST_Point(pings.longitude, 
              pings.latitude)), "{origin_crs}", "{crs}") AS point, 
              *
              FROM pings;
              """
        )
    else:
        pings = spark.sql(
              f"""SELECT ST_FlipCoordinates(ST_Transform(ST_FlipCoordinates(ST_Point(pings.longitude, 
              pings.latitude)), "{origin_crs}", "{crs}")) AS point, 
              *
              FROM pings;
              """
        )
    num_partitions = 1000
    pings = pings.repartition(num_partitions)
    pings = pings.cache()
    
    grid_type = GridType.QUADTREE # this shit works so much better for skewed data

    points_rdd = Adapter.toSpatialRdd(pings, "point")
    points_rdd.analyze()
    points_rdd.spatialPartitioning(grid_type)
    
    poly_rdd.analyze()
    poly_rdd.spatialPartitioning(points_rdd.getPartitioner())
    
     ## Set to TRUE only if run join query
    points_rdd.buildIndex(IndexType.QUADTREE, build_on_spatial_partitioned_rdd)
    
    result = JoinQueryRaw.SpatialJoinQueryFlat(points_rdd, poly_rdd, using_index, True)

    return Adapter.toDf(result, poly_rdd.fieldNames, points_rdd.fieldNames, spark)




VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
india = spark.read.parquet(fn)


VBox()

Starting Spark application


In [None]:
# filter on bounding box
minx, maxx = 68.716667, 80.983333 # Longitude
miny, maxy = 22.166667, 30.916667 # Latitude

# filter on rajastan
india = india.filter(F.col('longitude').between(minx, maxx) & F.col('latitude').between(miny, maxy))

# filter on internet blackout dates
india = india.withColumn("date", F.from_utc_timestamp(F.col("utc_timestamp").cast(TimestampType()), tz = "IST"))
# india = india.filter(F.to_date(F.col("date")).isin(["2021-09-05", "2021-09-12", "2021-09-19", "2021-09-26"]))
india = india.filter(F.to_date(F.col("date")).isin(["2021-09-06", "2021-09-13", "2021-09-20", "2021-09-27"]))


india.write.mode("overwrite").parquet("s3://ipsos-dvd/fdd/data/rajasatan_monday_sep_2021")

## Intersect with shops and grid for subsampling

In [None]:
### First intersect shops

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# read data
pings = spark.read.parquet("s3://ipsos-dvd/fdd/data/rajasatan_sundays_sep_2021") # .sample(0.05)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# downloading geojson from s3 is much quicker
s3 = boto3.client('s3')
s3.download_file('ipsos-dvd', 'fdd/data/shops.geojson', 'shops_s3.geojson')
poly_raw = gpd.read_file("shops_s3.geojson")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
crs = "epsg:3857"
poly_raw = poly_raw.to_crs(crs)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
poly_raw.drop(columns = "tags", inplace=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
poly = spark.createDataFrame(poly_raw).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
pings.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

126010338

In [None]:
pings = pings.withColumn("pings_id", F.monotonically_increasing_id())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
joined = spatialIntersection(pings, poly, crs=crs)
# I think I found the issue: a lot of pings are from the same lat-long but I think this spatial join only retains the unique geometries
# maybe not... doesn't seem to be the case

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('3.0', '2.12', '1.4.0')]

In [None]:
# joined.agg(F.countDistinct("pings_id")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# combine with original points data and filter duplicates
unmatched_pings_df = pings.join(joined, on = pings.columns, how = "left_anti")

# Concatenate the matched rows and unmatched rows
joined = joined.unionByName(unmatched_pings_df,  allowMissingColumns=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
test = (joined.filter(F.hour(F.col("date")).between(6,18))
            .withColumns({"post" : (F.to_date(F.col("date") == "2021-09-26")).cast(IntegerType()), "shop" : F.col("id").isNotNull().cast(IntegerType())})
            .groupBy("caid", "latitude", "longitude", "utc_timestamp", "date", "post", "pings_id").agg(F.max("shop").alias("shop"))
        ).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Now intersect with grid

In [None]:
temp = test.agg(F.min("longitude"),F.min('latitude'), F.max("longitude"), F.max("latitude"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
test=test.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
def create_grid(min_x, max_x, min_y, max_y, step):
    grid = []
    for x in np.arange(min_x, max_x, step):
        for y in np.arange(min_y, max_y, step):
            grid.append(Polygon([(x, y), (x+step, y), (x+step, y+step), (x, y+step)]))
    return grid

# create the grid
min_x, min_y, max_x, max_y =[7649981.160830013, 2531547.754779711, 9015022.947024144, 3621930.062059432]
grid = create_grid(min_x, max_x, min_y, max_y, step=40000) # 5km grid

# create a GeoDataFrame from the grid
grid_gdf = gpd.GeoDataFrame(geometry=grid)
grid_gdf['x'] = grid_gdf.geometry.centroid.x
grid_gdf['y'] = grid_gdf.geometry.centroid.y

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
grid_gdf.crs = crs
grid_poly = spark.createDataFrame(grid_gdf).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
grid_poly.write.mode("overwrite").parquet(data_dir + 'grid_poly')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
test.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

93785780

In [None]:
joined = spatialIntersection(test, grid_poly, crs=crs)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
joined = joined.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
joined = joined.withColumn("date", F.to_date("date"))
(joined.drop("rightgeometry", "leftgeometry", "caid", "latitude", "longitude", "utc_timestamp").write.mode("overwrite")
        .option("header","true").csv(data_dir + "joined_subsampling", compression="gzip"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
joined = joined.withColumn("post", (F.col("date") == "2021-09-26").cast(IntegerType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
(joined.drop("rightgeometry", "leftgeometry", "caid", "latitude", "longitude", "utc_timestamp")
    .write.mode("overwrite").option("header","true").csv(data_dir + "joined_subsampling", compression="gzip")
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### And finally add GADM for day of aggregation

In [3]:
joined = spark.read.csv(data_dir + "joined_subsampling", header=True)
grid_poly = spark.read.parquet(data_dir + "grid_poly")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Initialize an empty DataFrame for the result
grid_full = grid_poly.withColumn("post", F.lit(0)).union(grid_poly.withColumn("post", F.lit(1)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# merge missing grid cells back in 
grid_full = (joined.drop("rightgeometry", "leftgeometry", "caid", "latitude", "longitude", "utc_timestamp")
                    .join(grid_full.drop("geometry"), how = "outer", on = ["x", "y", "post"])
            )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
gidname = "gadm41_IND_3.json"
gadm = gpd.read_file(data_dir + gidname).to_crs("epsg:3857")
# create the grid
min_x, min_y, max_x, max_y =[7649981.160830013, 2531547.754779711, 9015022.947024144, 3621930.062059432]
gadm = gadm.cx[min_x:max_x, min_y:max_y]
gadm = spark.createDataFrame(gadm).cache().select("NAME_3", "geometry")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
grid_full = grid_full.withColumns({'longitude' : F.col('x').cast(DoubleType()), 'latitude' : F.col('y').cast(DoubleType())})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
joined = spatialIntersection(grid_full, gadm, origin_crs=crs, crs=crs, transform=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('3.0', '2.12', '1.4.0')]

In [9]:
joined.drop("rightgeometry", "leftgeometry").repartition(1).write.mode("overwrite").option("header","true").csv(data_dir + "grid_subsampling", compression="gzip")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
joined = spark.read.csv(data_dir + "grid_subsampling", header=True)
joined.agg(F.mean("shop")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+
|         avg(shop)|
+------------------+
|0.1793590379033833|
+------------------+