# **Tutorial 5: Scaling up with Pyspark** 

In this section of the tutorial, we demonstrate how to process a much larger mobility dataset using nomad's software in a Spark cluster. Our target application will be to produce mobility metrics, aggregated at the neighborhood level. 

## Configure your Spark cluster with SparkMagic

The EMR cluster for this demonstration has 1 master node (`m5.xlarge`, 4 vCPU, 16 GiB RAM) and 4 core nodes (`c5.4xlarge`, each with 16 vCPU and 32 GiB RAM). That gives us a total of 48 vCPUs across the workers.

These resources are divided between a **driver** and multiple **executors**:
- This jupyter notebook is inside the **driver** (or master). It coordinates the jobs, helps shuffle data around, and collects results.

- The **executors** (or slaves) are distributed processes that perform actual computations on the worker nodes.

In [None]:
%%configure -f
{"conf":
     {"spark.pyspark.python":"/home/hadoop/nomad-venv/bin/python3",
      "spark.pyspark.virtualenv.bin.path":"/home/hadoop/nomad-venv/bin",
      "spark.driver.memory": "6g",
      "spark.driver.maxResultSize": "4g",
      "spark.executor.memory": "4900m",
      "spark.executor.cores": "4", 
      "spark.dynamicAllocation.enabled": "true",
      "spark.dynamicAllocation.minExecutors": "4",
      "maximizeResourceAllocation": "true",
      "spark.sql.execution.arrow.pyspark.enabled": "true"}
}

## A not-that-off-topic example: count intersecting time intervals

A large number of events := `(event_id, start_datetime, end_datetime)` need to be compared to find the pairs that intersect. 
- Events are all under 15 min long
- Simple sorting can be a bottleneck

In [None]:
from pyspark.sql import functions as F
import datetime as dt

In [None]:
N        = 1_500_000  # records
parts    = 200  # initial partition of the data
DUR_MAX  = 900  # <= 15 min
totally_normal_day  = int(dt.datetime(2021, 1, 6).timestamp())
sec_in_5_weeks   = 5 * 7 * 24 * 3600  # 5 weeks

events = (spark.range(N, numPartitions=parts)
          .withColumn("offset", (F.rand() * sec_in_5_weeks).cast("int"))
          .withColumn("duration", (F.rand() * DUR_MAX + 1).cast("int"))
          .withColumn("start", F.from_unixtime(totally_normal_day + F.col("offset")).cast("timestamp"))
          .withColumn("end",   F.from_unixtime(totally_normal_day + F.col("offset") + F.col("duration")).cast("timestamp"))
          .select("id", "start", "end")
         )

In [None]:
events.show(5, truncate=False)
print(f"Initial partitions: {events.rdd.getNumPartitions()}")

print("Rows in first ten partitions:",
      events.rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).take(10))

### How to parallelize counting overlaps? 

In [None]:
### [click to reveal]
bucketed = (events
            .withColumn("start_bucket", # 30 minute interval of the start time
                        F.floor(F.col("start").cast("int")/1800)) # 30 minute buckets
            .withColumn("bucket",
                        F.explode(
                            F.sequence(
                                F.col("start_bucket"),
                                F.floor(F.col("end").cast("int")/1800)
                            ))))


# No execution yet!

In [None]:
### [click to reveal]
bucketed.groupby("bucket").count().show(10) # triggers execution

### *Pandas user defined functions (pandas_udf)

In [None]:
from pyspark.sql import functions as F
import pandas as pd

def count_overlaps(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf = pdf.sort_values("start")
    starts      = pdf["start"].values
    ends        = pdf["end"].values
    start_bs    = pdf["start_bucket"].values
    cur_bucket  = pdf["bucket"].iat[0]
    
    cnt = 0
    n   = len(starts)
    for i in range(n):
        for j in range(i + 1, n):
            if starts[j] > ends[i]:
                break
            # only count if this bucket is the "canonical" one
            if cur_bucket == max(start_bs[i], start_bs[j]):
                cnt += 1
    
    return pd.DataFrame({"cnt": [cnt]})

# applyInPandas and sum across buckets
overlap_counts = (
    bucketed
      .groupby("bucket")
      .applyInPandas(count_overlaps, schema="cnt long")
)


total = overlap_counts.select(F.sum("cnt")).collect()[0][0]

print("distinct overlap pairs =", total)

In [None]:
overlap_counts.toPandas() # Will trigger all the execution again.

# Large scale mobility dataset (Philadelphia, PA, USA)

In [None]:
from nomad.io.spark import table_columns

data_path = "s3://catalog-csslab/tutorial-large-data/"
table_columns(data_path, include_schema=False) # try True

In [None]:
data = spark.read.parquet(data_path)
data.show(5, truncate=False)

In [None]:
data.select(F.min(data["date"]), F.max(data["date"])).show()

## How many users and records are there in this dataset?

In [None]:
approx_total_records = data.rdd.countApprox(timeout=100,
                                                confidence=0.80) 
# better than data.count( ).collect()[0]
print(f"Approximate total records: {approx_total_records}")

In [None]:
approx_num_users = (
    data
    .agg(F.approx_count_distinct("user_id", rsd=0.15).alias("approx_num_users"))
    .collect()[0]["approx_num_users"]
)
# better than count_distinct()
print(f"Approximate unique devices (user_id): {approx_num_users}")

### We will focus on a smaller box

In [None]:
import geopandas as gpd
import shapely as shp
from shapely.geometry import box, LineString, Point
import contextily as cx
import matplotlib.pyplot as plt

# center city Philadelphia
bbox = box(-75.1680, 39.9400, -75.1440, 39.9557)
old_city = gpd.GeoSeries([bbox], crs="EPSG:4326").to_crs("EPSG:3857").iloc[0]

cbgs = gpd.read_file("s3://ic2s2-emr-setup/tutorial-notebooks/Census_Block_Groups_2010.geojson").to_crs("EPSG:3857")
ax = cbgs.clip(old_city).plot(figsize=(4, 4), alpha=0.4, facecolor="none", linewidth=2)
ax.set_axis_off()
cx.add_basemap(ax, source=cx.providers.CartoDB.Positron)

plt.title("Centery City, Philadelphia")
plt.show()

In [None]:
%matplot plt

In [None]:
from nomad.filters import completeness

@F.pandas_udf("double")
def completeness_udf(local_dt: pd.Series) -> float:
    # local_dt is all the local_datetime values for one user
    return float(completeness(
        data=pd.to_datetime(local_dt, utc=False),
        periods=1,
        freq='d',
        start="2020-02-01",
        end="2020-05-01"
    ))

In [None]:
min_x, min_y, max_x, max_y = old_city.bounds
date_from, date_to = [F.to_date(F.lit(s)) for s in dates]
filtered = (
    data
    .filter((F.col("x") >= min_x) & (F.col("x") <= max_x))
    .filter((F.col("y") >= min_y) & (F.col("y") <= max_y))
    .filter(F.col("date").between("2020-02-01", "2020-05-01"))
    .groupBy("user_id")
      .agg(completeness_udf("local_datetime").alias("completeness"))
)

In [None]:
daily_q = filtered.select(filtered.completeness).toPandas()

In [None]:
fig, ax1 = plt.subplots(figsize=(6, 4))

ax1.hist(daily_q, bins=40)
ax1.set_title('Completeness (d) restricted to Center City')
ax1.set_ylabel('Number of users')
plt.show()

In [None]:
%matplot plt

## Let's persist the final sample of data we will work with

In [None]:
sample_users = (
    filtered
      .filter(F.col("completeness") > 0.1)
      .select("user_id")
)

sample_data = (
    data
      .filter(F.col("date").between("2020-02-01", "2020-05-01"))
      .join(sample_users, on="user_id", how="inner")
)

In [None]:
out_path = "/tmp/temp_data/"
(
    sample_data.write
      .option("hiveStylePartitioning", "true")
      .partitionBy("date")
      .mode("overwrite")
      .parquet(out_path)
)

## **Radius of gyration** based on stops for this small sample

In [None]:
data = spark.read.parquet(out_path)
data.count()

### Like previously, wrap the stop_detection function in a pandas_udf

In [None]:
from nomad.stop_detection.lachesis import lachesis

def _lachesis(pdf):
    pdf = pdf.sort_values(by=['unix_timestamp'])
    stops = lachesis(pdf, dt_max= 240, delta_roam=35,
        complete_output = True, keep_col_names=False, timestamp = 'unix_timestamp',
        x="x", y="y", user_id="user_id", passthrough_cols=['tz_offset', 'local_datetime'])    
    
    schema_cols = ["user_id","start_timestamp", "end_timestamp",
                   "x", "y","n_pings", "max_gap", "duration",
                   "cluster","diameter","local_datetime", "tz_offset"]
    if stops.empty:
        pd.DataFrame(columns=schema_cols, dtype=object)
    else:
        return stops

# For grouped map udfs, the syntax uses applyInPandas on a regular pandas function
schema = (
    f"user_id string, "
    "start_timestamp long, end_timestamp long, "
    "x double, y double, "
    "n_pings long, max_gap long, "
    "duration long, cluster long, "
    "diameter float, "
    "local_datetime string, tz_offset long"
)

stops_data = (
    data
    .groupBy('user_id')
    .applyInPandas(_lachesis, schema)
)

In [None]:
out_path = "/tmp/temp_stops/"
(
    stops_data.write
      .option("hiveStylePartitioning", "true")
      .mode("overwrite")
      .parquet(out_path)
)

### Next we compute the radius of gyration

At this point the data is much smaller, and we can uncomplicate our lives by simply parallelizing a single pandas_udf computing RoGs and aggregating.

In [None]:
import nomad.io.base as loader
traj_cols = {"datetime":"local_datetime", "user_id":"user_id", "timestamp":"unix_timestamp"}
loader.sample_from_file(out_path, format="parquet", frac_users=0.2, traj_cols=traj_cols)

In [None]:
cbgs = gpd.read_file("s3://ic2s2-emr-setup/tutorial-notebooks/Census_Block_Groups_2010.geojson").to_crs("EPSG:3857")
cbgs = cbgs.rename(columns={"GEOID10":"cbg"})
cbgs = cbgs.set_index("cbg", drop=True)

In [None]:
stops["cbg"] = visits.point_in_polygon(
                         data=stops,
                         poi_table=cbgs,
                         max_distance=0,
                         x='x',
                         y='y',
                         method='centroid',
                         data_crs='EPSG:3857')

In [None]:
cand_homes = homes.compute_candidate_homes(stops,
                                           datetime="local_datetime",
                                           location_id="cbgs",
                                           user_id="user_id")

last_date = date(year=2020, month=6, day=1) 
home_table = homes.select_home(cand_homes, min_days=3, min_weeks=2, last_date=last_date, user_id='user_id')

In [None]:
### Compute ROG join with home_table[["user_id", "cbg"]] on 'user_id'