# __HoWDe__ 
### _a Home and Work location Detection algorithm for GPS data analytics_

This notebook is intended to work as a brief tutorial on how to validate "HoWDe" against ground-truth data.
<!-- Ground truth data is available at: (link in paper)  -->

In [None]:
%config InlineBackend.figure_format = 'retina'

In [None]:
# Import howde
from howde import *

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

# Set up Spark
temp_folder = '/data/work/user/sdsc'
mem="50g"
n_workers = 10
spark = SparkSession.builder.config("spark.sql.files.ignoreCorruptFiles","true")\
                                            .config("spark.driver.memory", mem) \
                                            .config("spark.driver.maxResultSize", "40g") \
                                            .config("spark.executer.memory", "40g") \
                                            .config("spark.local.dir",temp_folder)\
                                            .config("spark.sql.session.timeZone","UTC")\
                                            .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
                                            .config("spark.driver.maxResultSize", "40g")\
                                            .config("spark.kryoserializer.buffer.max", "128m")\
                                            .config("spark.storage.memoryFraction", "0.5")\
                                            .config("spark.sql.broadcastTimeout", "7200")\
                                            .master(f"local[{n_workers}]").getOrCreate()



25/05/27 13:39:36 WARN Utils: Your hostname, bohr resolves to a loopback address: 127.0.1.1; using 130.225.68.124 instead (on interface eno1)
25/05/27 13:39:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/05/27 13:39:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/27 13:39:36 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
25/05/27 13:39:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/27 13:39:37 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [None]:
# Define folder with Data
PATH = ''

## Validation against D1

In [None]:
# ---------------------- IMPORT HOME/WORK GROUND TRUTH - WEEK LEVEL ----------------------
fname = 'D1_truelabels_uwy'
truelabels_wy = spark.read.format("parquet").load(PATH+fname, pathGlobFilter="*.parquet")

# - Show sample of week-level true labels
truelabels_wy.orderBy(["useruuid", "s_yy", "s_woy", "loc"]).show(4)
print(truelabels_wy.select('useruuid').dropDuplicates().count())

+--------+----+-----+---+------------------+
|useruuid|s_yy|s_woy|loc|true_location_type|
+--------+----+-----+---+------------------+
|       1|1970|   13| 17|                 W|
|       2|1970|    5|  0|                 W|
|       3|1970|   22|  2|                 W|
|       4|1971|    6|  1|                 H|
+--------+----+-----+---+------------------+
only showing top 4 rows

4847


In [6]:
# ---------------------- IMPORT STOPS ----------------------
fname = 'D1_stops'
stops_anonym_sample = spark.read.format("parquet").load(PATH+fname, pathGlobFilter="*.parquet")
stops_anonym_sample.orderBy(['useruuid', 'start']).show()
print(stops_anonym_sample.select('useruuid').dropDuplicates().count())

                                                                                

+--------+---+-------+-------+-------+
|useruuid|loc|  start|    end|country|
+--------+---+-------+-------+-------+
|       1|  0| 674400| 676200|   GL0B|
|       1|  1| 678000| 697200|   GL0B|
|       1|  1| 717600| 732600|   GL0B|
|       1|  1| 767400| 819600|   GL0B|
|       1|  2| 820200| 850800|   GL0B|
|       1|  1| 852000| 904200|   GL0B|
|       1|  0| 933000| 934800|   GL0B|
|       1|  1| 935400| 987000|   GL0B|
|       1|  1|1026000|1069800|   GL0B|
|       1|  2|1088400|1111200|   GL0B|
|       1|  1|1113600|1159200|   GL0B|
|       1|  6|1159800|1163400|   GL0B|
|       1| 79|1165200|1168200|   GL0B|
|       1|  2|1174800|1197600|   GL0B|
|       1|  1|1199400|1251000|   GL0B|
|       1|  1|1290600|1337400|   GL0B|
|       1|  4|1339800|1341000|   GL0B|
|       1| 36|1351200|1353600|   GL0B|
|       1| 37|1353600|1362000|   GL0B|
|       1|  5|1362600|1370400|   GL0B|
+--------+---+-------+-------+-------+
only showing top 20 rows

4847


In [None]:
# ---------------------- HOME/WORK DETECTION with HoWDe ----------------------
stops_c_hw2 = HoWDe_labelling(
    input_data = stops_anonym_sample,
    spark = spark,
    range_window_home = 28,
    range_window_work = 42,
    dhn = 3,    
    dn_H = 0.7, 
    dn_W = 0.5, 
    hf_H = 0.7, 
    hf_W = 0.4, 
    df_W = 0.6, 
    stops_output = True,
    verbose = True,
).cache()

HoWDe Labelling: computing LABs ...


  0%|          | 0/1 [00:00<?, ?it/s]

 >>> stops pre-processed
 >>> home/works detected


100%|██████████| 1/1 [00:02<00:00,  2.67s/it]

 >>> output formatted as stops: True
HoWDe Labelling: computations completed!





25/05/27 13:39:59 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [8]:
## Stops with detected home/work location
stops_c_hw2.orderBy(['useruuid', 'start']).withColumn("start_ts", F.to_timestamp("start"))\
    .withColumn("end_ts", F.to_timestamp("end")).show(10, truncate=False)



+--------+-------+---+-------------------+------+------+-------------+-------------+------------+------------+-------------------+-------------------+
|useruuid|country|loc|date               |start |end   |stop_duration|location_type|detect_H_loc|detect_W_loc|start_ts           |end_ts             |
+--------+-------+---+-------------------+------+------+-------------+-------------+------------+------------+-------------------+-------------------+
|1       |GL0B   |0  |1970-01-08 00:00:00|674400|676200|1800         |O            |1           |null        |1970-01-08 19:20:00|1970-01-08 19:50:00|
|1       |GL0B   |1  |1970-01-08 00:00:00|678000|691199|13199        |H            |1           |null        |1970-01-08 20:20:00|1970-01-08 23:59:59|
|1       |GL0B   |1  |1970-01-09 00:00:00|691200|697200|6000         |H            |1           |null        |1970-01-09 00:00:00|1970-01-09 01:40:00|
|1       |GL0B   |1  |1970-01-09 00:00:00|717600|732600|15000        |H            |1         

                                                                                

In [9]:
# ---------------------- ALIGN STOPS DATASET TO LABELLED DATA TIME-RESOLUTION ----------------------
# -- STEP 1: Assign week-year to detected labels -----------------------------
detectlocs_wy = (
    stops_c_hw2.withColumn(
        "date",
        F.date_format(F.from_unixtime("start").cast("timestamp"), "yyyy-MM-dd"),
    )
    .withColumn("s_woy", F.weekofyear("date"))  # Week of year
    .withColumn("s_yy", F.year("date"))  # Year
    .select(["useruuid", "s_yy", "s_woy", "loc", "location_type", "detect_H_loc", "detect_W_loc"])
    .dropDuplicates()
)

# - Show sample of week-level detected labels
detectlocs_wy.orderBy(["useruuid", "s_yy", "s_woy", "loc"]).show(4)

[Stage 63:>                                                       (0 + 10) / 11]

+--------+----+-----+---+-------------+------------+------------+
|useruuid|s_yy|s_woy|loc|location_type|detect_H_loc|detect_W_loc|
+--------+----+-----+---+-------------+------------+------------+
|       1|1970|    2|  0|            O|           1|        null|
|       1|1970|    2|  1|            H|           1|        null|
|       1|1970|    2|  2|            O|           1|        null|
|       1|1970|    3|  1|            H|           1|        null|
+--------+----+-----+---+-------------+------------+------------+
only showing top 4 rows



                                                                                

In [10]:
# ---------------------- GET HOME/WORK ACCURACY AND FRACTION OF NOT DETCED AT user-week level ----------------------

def evaluate_weekly_detection_accuracy(detectlocs_wy, truelabels_wy, target_label: str)-> "DataFrame":
    """
    Evaluate weekly accuracy of detected Home or Work locations. 
    A match is counted when the detected location matches
    the annotated (true) location for the same user and week.

    Parameters:
    - detectlocs_wy: Spark DataFrame with columns ['useruuid', 's_woy', 's_yy', 'loc', 'location_type']
                     The weekly-detected locations with estimated labels.
    - truelabels_wy: Spark DataFrame with columns ['useruuid', 's_woy', 's_yy', 'loc', 'true_location_type']
                     The weekly-labeled ground truth data.
    - target_label: 'H' for Home or 'W' for Work

    Returns:
    - Spark DataFrame with columns:
        count_:     total user-week-locations with known true label
        match_sum:  total matches between detected and true label
        detected_X:   number of weeks with detection of target label
        acc_X:      accuracy among detected weeks
        none_X:     percentage of weeks with no detection
    """

    # ---------------------- STEP 1: Flag if any target label type was detected per user-week ----------------------
    window_uw = Window.partitionBy("useruuid", "s_woy", "s_yy")

    detect_with_flag = detectlocs_wy.withColumn(
        f"hasdetected_{target_label}_uw",
        F.max(F.when(F.col(f'detect_{target_label}_loc').isNotNull(), 1).otherwise(0)).over(window_uw)
    )

    # ---------------------- STEP 2: Filter annotated weekly labels of the given type and join detection info -------------------
    truelabels_target_detectflag = truelabels_wy.filter(
        F.col("true_location_type") == target_label
    ).join(
        detect_with_flag.select("useruuid", "s_woy", "s_yy", f"hasdetected_{target_label}_uw").dropDuplicates(),
        on=["useruuid", "s_woy", "s_yy"]
    )

    # ---------------------- STEP 3: Keep only one detected label for each user-week-loc  ----------------------
    # -- Attention: In a given  week, a loc can have multiple labels detected,  we consider a match if one of this labels is the target label
    w_uwloc = Window.partitionBy("useruuid", "s_woy", "s_yy", "loc").orderBy(F.desc("is_target_label_uwloc"))
    
    detect_with_flag_agg_wloc  = detect_with_flag.withColumn(
        'is_target_label_uwloc', F.when(F.col('loc') == F.col(f'detect_{target_label}_loc'), F.lit(1)).otherwise(F.lit(0))
    ).withColumn("sortby_bestlabel_uwloc", F.row_number().over(w_uwloc)
    ).filter(
        F.col("sortby_bestlabel_uwloc") == 1
    ).drop("is_target_label_uwloc", "sortby_bestlabel_uwloc")

    # ---------------------- STEP 4: Match annotated location with detected location ----------------------
    match_weekly = truelabels_target_detectflag.join(
        detect_with_flag_agg_wloc.select("useruuid", "s_woy", "s_yy", "loc", "location_type", f'detect_{target_label}_loc'),
        on=["useruuid", "s_woy", "s_yy", "loc"]
    ).withColumn(
        f"match_{target_label}label_uw",
        F.when(F.col("loc") == F.col(f'detect_{target_label}_loc'), 1).otherwise(0)
    )

    # ---------------------- STEP 5: Aggregate counts, compute summary stats ----------------------
    agg = match_weekly.agg(
        F.count(f"match_{target_label}label_uw").alias(f"count_{target_label}"),
        F.sum(f"match_{target_label}label_uw").alias(f"match_sum_{target_label}"),
        F.sum(f"hasdetected_{target_label}_uw").alias(f"detected_{target_label}")
    )

    result = (
        agg.withColumn(
            f"acc_{target_label}",
            F.round(100.0 * F.col(f"match_sum_{target_label}") / F.col(f"detected_{target_label}"), 2)
        ).withColumn(
            f"none_{target_label}",

            F.round(100.0 * (F.col(f"count_{target_label}") - F.col(f"detected_{target_label}")) / F.col(f"count_{target_label}"), 2)
        )
    )

    return result

In [11]:
# Home
res_weekly_H = evaluate_weekly_detection_accuracy(detectlocs_wy, truelabels_wy, target_label="H")
res_weekly_H.show()

# Work
res_weekly_W = evaluate_weekly_detection_accuracy(detectlocs_wy, truelabels_wy, target_label="W")
res_weekly_W.show()

                                                                                

+-------+-----------+----------+-----+------+
|count_H|match_sum_H|detected_H|acc_H|none_H|
+-------+-----------+----------+-----+------+
|   3196|       2593|      2734|94.84| 14.46|
+-------+-----------+----------+-----+------+



                                                                                

+-------+-----------+----------+-----+------+
|count_W|match_sum_W|detected_W|acc_W|none_W|
+-------+-----------+----------+-----+------+
|   2953|       1875|      2265|82.78|  23.3|
+-------+-----------+----------+-----+------+



                                                                                

## Validation against D2

In [None]:
# ---------------------- IMPORT HOME/WORK GROUND TRUTH - USER LEVEL ----------------------
fname = 'D2_truelabels_u'
truelabels_u = spark.read.format("parquet").load(PATH+fname, pathGlobFilter="*.parquet")

# - Show sample of week-level true labels
truelabels_u.orderBy(["useruuid","loc"]).show(4)

In [13]:
# ---------------------- IMPORT STOPS ----------------------
fname = 'D2_stops'
stops_anonym_sample = spark.read.format("parquet").load(PATH+fname, pathGlobFilter="*.parquet")
stops_anonym_sample.orderBy(['useruuid', 'start']).show()
print(stops_anonym_sample.select('useruuid').dropDuplicates().count())

+--------+---+------+------+-------+
|useruuid|loc| start|   end|country|
+--------+---+------+------+-------+
|       1|  4|526200|528000|     ID|
|       1|  5|546600|556200|     ID|
|       1|  1|556800|562800|     ID|
|       1|  1|589800|592800|     ID|
|       1|  1|640200|643200|     ID|
|       1|  1|648000|649800|     ID|
|       1|  2|666000|666600|     ID|
|       1|  3|667800|671400|     ID|
|       1|  0|739800|741000|     ID|
|       1|  1|756000|757200|     ID|
|       1|  1|775200|775800|     ID|
|       1|  6|779400|780000|     ID|
|       1|  1|795000|801000|     ID|
|       1|  7|836400|838800|     ID|
|       1|  8|843000|844200|     ID|
|       1|  1|861600|864000|     ID|
|       1|  1|864000|865800|     ID|
|       1|  9|901800|910800|     ID|
|       1| 10|919800|925800|     ID|
|       1| 10|930000|930600|     ID|
+--------+---+------+------+-------+
only showing top 20 rows

500


In [20]:
# # ---------------------- HOME/WORK DETECTION with HoWDe ----------------------
stops_c_hw2 = HoWDe_labelling(
    input_data = stops_anonym_sample,
    spark = spark,
    range_window_home = 730, ### >> ATTENTION: Here we set-up 730days to reproduce the view of the annotaters, this is not the intended use of the window
    range_window_work = 730, ### >> ATTENTION: Here we set-up 730days to reproduce the view of the annotaters, this is not the intended use of the window
    dhn = 3,   
    dn_H = 0.8, ### >> Increasing allowed fraction of missing days in window to compensate for the long windows
    dn_W = 0.8, ### >> Increasing allowed fraction of missing days in window to compensate for the long windows
    hf_H = 0.7, 
    hf_W = 0.4, 
    df_W = 0.8, 
    stops_output = True,
    verbose = True,
).cache()

25/05/27 13:57:37 WARN CacheManager: Asked to cache already cached data.
HoWDe Labelling: computing LABs ...


  0%|          | 0/1 [00:00<?, ?it/s]

 >>> stops pre-processed
 >>> home/works detected


100%|██████████| 1/1 [00:02<00:00,  2.06s/it]

 >>> output formatted as stops: True
HoWDe Labelling: computations completed!





In [21]:
## Stops with detected home/work location
stops_c_hw2.orderBy(['useruuid', 'start']).withColumn("start_ts", F.to_timestamp("start"))\
    .withColumn("end_ts", F.to_timestamp("end")).show(10, truncate=False)

                                                                                

+--------+-------+---+-------------------+------+------+-------------+-------------+------------+------------+-------------------+-------------------+
|useruuid|country|loc|date               |start |end   |stop_duration|location_type|detect_H_loc|detect_W_loc|start_ts           |end_ts             |
+--------+-------+---+-------------------+------+------+-------------+-------------+------------+------------+-------------------+-------------------+
|1       |ID     |4  |1970-01-07 00:00:00|526200|528000|1800         |O            |null        |null        |1970-01-07 02:10:00|1970-01-07 02:40:00|
|1       |ID     |5  |1970-01-07 00:00:00|546600|556200|9600         |O            |null        |null        |1970-01-07 07:50:00|1970-01-07 10:30:00|
|1       |ID     |1  |1970-01-07 00:00:00|556800|562800|6000         |O            |null        |null        |1970-01-07 10:40:00|1970-01-07 12:20:00|
|1       |ID     |1  |1970-01-07 00:00:00|589800|592800|3000         |O            |null      

                                                                                

In [None]:
# ---------------------- ALIGN STOPS DATASET TO LABELLED DATA TIME-RESOLUTION ----------------------
# -- STEP 1: Prepare user-location detected labels -----------------------------
detectlocs_u = stops_c_hw2.select(["useruuid", "loc", "location_type"]).dropDuplicates()
detectlocs_u.filter(F.col('location_type')!='O').orderBy(["useruuid", "loc"]).show() 

+--------+---+-------------+
|useruuid|loc|location_type|
+--------+---+-------------+
|       2|  0|            H|
|       2|  5|            W|
|       3|  4|            H|
|       3|  5|            W|
|       4|  0|            H|
|       4|  4|            W|
|       5|  0|            H|
|       6|  0|            H|
|       6|  1|            W|
|       7|  5|            H|
|       8|  0|            H|
|      10|  0|            H|
|      10|  6|            W|
|      11|  4|            W|
|      11|  5|            H|
|      12|  0|            H|
|      13|  0|            H|
|      14|  0|            H|
|      14| 12|            W|
|      15|  0|            H|
+--------+---+-------------+
only showing top 20 rows



In [23]:
# ---------------------- GET HOME/WORK ACCURACY AND FRACTION OF NOT DETCED AT user level ----------------------
def evaluate_userlevel_accuracy(truelabels_u, detectlocs_u, target_label: str,  it_cols: list = []) -> "DataFrame":
    """
    Evaluate user-level detection accuracy for Home or Work labels.
    A match is counted if any detected location matches the annotated true location
    for the same user at any time.

    Parameters:
    - truelabels_u: Spark DataFrame with columns ['useruuid', 'loc', 'true_location_type', ...]
                    Annotated true labels at user-location level.
    - detectlocs_u: Spark DataFrame with columns ['useruuid', 'loc', 'location_type', ...]
                    Detected locations at user-level.
    - target_label: 'H' for Home or 'W' for Work
    - it_cols:      Optional list of grouping columns for stratified results (e.g., ['iteration'])

    Returns:
    - Spark DataFrame with:
        count_X:     Number of annotated users
        match_sum_X: Number of correctly detected rows (label matches)
        detected_X:  Number of users with any detection of the target label
        acc_X:       Accuracy of detected labels (rows matched / rows detected)
        none_X:      % of annotated users with no detection
    """

    # 1) join & Boolean flags
    joined = (
        truelabels_u.alias("t")
        .join(detectlocs_u.alias("d"), ["useruuid", "loc"], "left")
        .select( *it_cols, "useruuid", "loc",
                 (F.col("t.true_location_type") == target_label).cast("int").alias("true_f"),
                 (F.col("d.location_type")      == target_label).cast("int").alias("det_f") )
    )
    # 2) one row per (iter, user, loc) keeping any detection of the label
    uloc = (
        joined.groupBy( *it_cols, "useruuid", "loc")
              .agg( F.max("true_f").alias("true_f"),
                    F.max("det_f").alias("det_f") )
    )
    # 3) user-level roll-up
    per_user = (
        uloc.groupBy( *it_cols, "useruuid")
            .agg(
                F.max("true_f").alias("has_true"),         # user is annotated for the label
                F.max("det_f").alias("has_detect"),        # user ever detected the label
                F.sum("true_f").alias("annot_rows"),
                F.sum( F.when( (F.col("true_f") == 1) & (F.col("det_f") == 1), 1)
                       .otherwise(0) ).alias("match_rows") )
            .filter("has_true = 1")                        # only annotated users
    )
    # 4) final metrics
    agg = (
        per_user.groupBy( *it_cols )
            .agg(
                F.count("*").alias("count_u"),
                F.sum("has_detect").alias("wdetec_u"),
                F.sum( F.when( F.col("has_detect") == 1, F.col("annot_rows") )
                       .otherwise(0) ).alias("total_rows"),
                F.sum( F.when( F.col("has_detect") == 1, F.col("match_rows") )
                       .otherwise(0) ).alias("match_sum") )
            .withColumn("acc",   F.col("match_sum") / F.col("total_rows"))
            .withColumn("none",  1 - F.col("wdetec_u") / F.col("count_u"))
    )

    sel = it_cols + [
        f"count_u        as count_{target_label}",
        f"match_sum      as match_sum_{target_label}",
        f"wdetec_u       as detected_{target_label}",
        f"ROUND(acc*100, 2)  as acc_{target_label}",
        f"ROUND(none*100,2) as none_{target_label}",
    ]
    return agg.selectExpr(*sel)

In [24]:
# Home
res_u_H = evaluate_userlevel_accuracy(truelabels_u, detectlocs_u, target_label="H")
res_u_H.show()

# Work
res_u_W = evaluate_userlevel_accuracy(truelabels_u, detectlocs_u, target_label="W")
res_u_W.show()

+-------+-----------+----------+-----+------+
|count_H|match_sum_H|detected_H|acc_H|none_H|
+-------+-----------+----------+-----+------+
|    500|        393|       395| 75.0|  21.0|
+-------+-----------+----------+-----+------+

+-------+-----------+----------+-----+------+
|count_W|match_sum_W|detected_W|acc_W|none_W|
+-------+-----------+----------+-----+------+
|    287|        149|       168|73.76| 41.46|
+-------+-----------+----------+-----+------+

