# Investigate MobilityClassification

James Harrison, 2021-12-02

In the process of addressing FlowKit issue #4615, I'll use this notebook to try out some possible implementations to see what is most efficient.

Of course, findings here from a smallish synthetic dataset may not carry over to a real CDR dataset.

In [1]:
import flowmachine
from flowmachine.core import make_spatial_unit
from flowmachine.core.custom_query import CustomQuery
from flowmachine.features.subscriber.majority_location import MajorityLocation
from flowmachine.features.subscriber.location_visits import LocationVisits
from flowmachine.features.subscriber.day_trajectories import DayTrajectories
from flowmachine.features.subscriber.modal_location import ModalLocation
from flowmachine.features.subscriber.daily_location import daily_location
import os
import numpy as np
import pandas as pd

In [2]:
flowmachine.connect()

FlowMachine version: 0+unknown
Flowdb running on: flowdb:5432/flowdb (connecting user: flowmachine)




In [3]:
flowmachine.core.cache.resync_redis_with_cache(
    flowmachine.core.context.get_db(),
    flowmachine.core.context.get_redis(),
)

## Define and store sub-queries

In [4]:
flowmachine.core.context.get_db().available_dates

defaultdict(list,
            {'calls': [datetime.date(2016, 1, 1),
              datetime.date(2016, 1, 2),
              datetime.date(2016, 1, 3),
              datetime.date(2016, 1, 4),
              datetime.date(2016, 1, 5),
              datetime.date(2016, 1, 6),
              datetime.date(2016, 1, 7),
              datetime.date(2016, 1, 8),
              datetime.date(2016, 1, 9),
              datetime.date(2016, 1, 10),
              datetime.date(2016, 1, 11),
              datetime.date(2016, 1, 12),
              datetime.date(2016, 1, 13),
              datetime.date(2016, 1, 14),
              datetime.date(2016, 1, 15),
              datetime.date(2016, 1, 16),
              datetime.date(2016, 1, 17),
              datetime.date(2016, 1, 18),
              datetime.date(2016, 1, 19),
              datetime.date(2016, 1, 20),
              datetime.date(2016, 1, 21),
              datetime.date(2016, 1, 22),
              datetime.date(2016, 1, 23),
              da

In [5]:
periods = [
    pd.date_range("2016-01-01", "2016-01-07"),
    pd.date_range("2016-01-08", "2016-01-14"),
    pd.date_range("2016-01-15", "2016-01-21"),
    pd.date_range("2016-01-22", "2016-01-28"),
    pd.date_range("2016-01-29", "2016-02-04"),
    pd.date_range("2016-02-05", "2016-02-11"),
]
modal_window_length = 19

In [6]:
majority_locations = [
    MajorityLocation(
        subscriber_location_weights=LocationVisits(
            day_trajectories=DayTrajectories(
                *(
                    ModalLocation(
                        *(
                            daily_location(
                                daily_date.date().isoformat(),
                                spatial_unit=make_spatial_unit("admin", level=2),
                                method="last",
                                table=["events.calls", "events.sms", "events.mds"]
                            )
                            for daily_date in pd.date_range(modal_start_date, periods=modal_window_length)
                        )
                    )
                    for modal_start_date in period
                )
            )
        ),
        weight_column="value",
        include_unlocatable=True,
    )
    for period in periods
]

In [7]:
for ml in majority_locations:
    print(f"Storing {ml.subscriber_location_weights.day_trajectories.start}")
    ml.store(store_dependencies=True)

Storing 2016-01-01 00:00:00
Storing 2016-01-08 00:00:00
Storing 2016-01-15 00:00:00
Storing 2016-01-22 00:00:00
Storing 2016-01-29 00:00:00
Storing 2016-02-05 00:00:00


{"submodule":"flowmachine.core.query","event":"Error executing SQL: 'CREATE INDEX ON cache.xe201ebcfff28eea05775a393baa6cb09 (pcod)'. Error was (psycopg2.errors.DiskFull) could not extend file \"base/16384/28059\": wrote only 4096 of 8192 bytes at block 45\nHINT:  Check free disk space.\n\n[SQL: CREATE INDEX ON cache.xe201ebcfff28eea05775a393baa6cb09 (pcod)]\n(Background on this error at: http://sqlalche.me/e/13/e3q8)","logger":"flowmachine.debug","level":"error","timestamp":"2021-12-03T13:58:57.561866Z"}
{"submodule":"flowmachine.core.cache","event":"Error executing SQL. Error was (psycopg2.errors.DiskFull) could not extend file \"base/16384/28059\": wrote only 4096 of 8192 bytes at block 45\nHINT:  Check free disk space.\n\n[SQL: CREATE INDEX ON cache.xe201ebcfff28eea05775a393baa6cb09 (pcod)]\n(Background on this error at: http://sqlalche.me/e/13/e3q8)","logger":"flowmachine.debug","level":"error","timestamp":"2021-12-03T13:58:57.569859Z"}
{"submodule":"flowmachine.core.query","event

In [24]:
for qid in [
    'e201ebcfff28eea05775a393baa6cb09',
    '903bf742e3ffb8c1e9b414dbf204caa6',
    'fab676b50942f7c3da402bae53e44a41',
    '5ccb0ba0c6f7434935a4ed1f71a658ae',
    '89cd71490689c5d15dd81dec27e62aa3',
]:
    qsm = flowmachine.core.query_state.QueryStateMachine(
        flowmachine.core.context.get_redis(),
        qid,
        flowmachine.core.context.get_db().conn_id,
    )
    qsm.reset()
    qsm.finish_resetting()

In [25]:
majority_locations[5].store(store_dependencies=True)

<Future at 0x7f388d6f68e0 state=pending>

In [32]:
flowmachine.core.context.get_db().fetch(f"SELECT count(*), count(pcod) FROM ({majority_locations[5].get_query()}) _")

[(99985, 98899)]

## Try out some possible implementations of MobilityClassification

### Experiment with options for classifying subscribers as long-term locatable/unlocatable and long-term active/inactive

In [33]:
loc_cols_string = ", ".join(majority_locations[0].spatial_unit.location_id_columns)
locations_union = " UNION ALL ".join(
    f"SELECT subscriber, {loc_cols_string}, {i} AS ordinal FROM ({loc.get_query()}) _"
    for i, loc in enumerate(majority_locations)
)

long_term_activity_1 = CustomQuery(
    f"""
    SELECT
        subscriber,
        count(*) < {len(majority_locations)} AS sometimes_inactive,
        count(coalesce({loc_cols_string})) < {len(majority_locations)} AS sometimes_unlocatable
    FROM ({locations_union}) locations_union
    GROUP BY subscriber
    """,
    column_names=["subscriber", "sometimes_inactive", "sometimes_unlocatable"],
)

long_term_activity_2 = {}
for intersect_type in ["INTERSECT", "INTERSECT ALL"]:
    active_subq = f" {intersect_type} ".join(
        f"SELECT subscriber FROM ({loc.get_query()}) _" for loc in majority_locations
    )
    locatable_subq = f" {intersect_type} ".join(
        f"SELECT subscriber FROM ({loc.get_query()}) _ WHERE coalesce({loc_cols_string}) IS NOT NULL"
        for loc in majority_locations
    )
    long_term_activity_2[intersect_type] = CustomQuery(
        f"""
        SELECT subscriber, TRUE AS always_active, always_locatable
        FROM ({active_subq}) long_term_active
        LEFT JOIN (
            SELECT subscriber, TRUE AS always_locatable
            FROM ({locatable_subq}) _
        ) long_term_locatable
        USING (subscriber)
        """,
        column_names=["subscriber", "always_active", "always_locatable"],
    )

active_subq_join = f"""
SELECT subscriber
FROM ({majority_locations[0].get_query()}) loc0
""" + "\n".join(
    f"INNER JOIN ({loc.get_query()}) loc{i+1} USING (subscriber)"
    for i, loc in enumerate(majority_locations[1:])
)
locatable_subq_join = f"""
SELECT subscriber
FROM (SELECT * FROM ({majority_locations[0].get_query()}) _ WHERE coalesce({loc_cols_string}) IS NOT NULL) loc0
""" + "\n".join(
    f"INNER JOIN (SELECT * FROM ({loc.get_query()}) _ WHERE coalesce({loc_cols_string}) IS NOT NULL) loc{i+1} USING (subscriber)"
    for i, loc in enumerate(majority_locations[1:])
)
long_term_activity_2["INNER JOIN"] = CustomQuery(
    f"""
    SELECT subscriber, TRUE AS always_active, always_locatable
    FROM ({active_subq_join}) long_term_active
    LEFT JOIN (
        SELECT subscriber, TRUE AS always_locatable
        FROM ({locatable_subq_join}) _
    ) long_term_locatable
    USING (subscriber)
    """,
    column_names=["subscriber", "always_active", "always_locatable"],
)

In [34]:
explain_1 = long_term_activity_1.explain(analyse=True)

In [35]:
explain_2 = {
    merge_type: query.explain(analyse=True)
    for merge_type, query in long_term_activity_2.items()
}

In [36]:
print(explain_1)

Finalize GroupAggregate  (cost=13267.05..13319.72 rows=200 width=35) (actual time=8861.009..17355.753 rows=100000 loops=1)
  Group Key: xb06ca27f33181417112b609b3ff26546.subscriber
  ->  Gather Merge  (cost=13267.05..13313.72 rows=400 width=49) (actual time=8860.909..13543.694 rows=299990 loops=1)
        Workers Planned: 2
        Workers Launched: 2
        ->  Sort  (cost=12267.03..12267.53 rows=200 width=49) (actual time=8711.160..9594.104 rows=99997 loops=3)
              Sort Key: xb06ca27f33181417112b609b3ff26546.subscriber
              Sort Method: quicksort  Memory: 17134kB
              Worker 0:  Sort Method: quicksort  Memory: 17135kB
              Worker 1:  Sort Method: quicksort  Memory: 17134kB
              ->  Partial HashAggregate  (cost=12257.38..12259.38 rows=200 width=49) (actual time=6761.314..7603.160 rows=99997 loops=3)
                    Group Key: xb06ca27f33181417112b609b3ff26546.subscriber
                    ->  Parallel Append  (cost=0.00..10382.67 rows

In [37]:
print(explain_2["INTERSECT"])

Hash Left Join  (cost=31250.91..61618.82 rows=49731049 width=34) (actual time=114176.458..118732.061 rows=99928 loops=1)
  Hash Cond: (long_term_active.subscriber = _.subscriber)
  ->  Subquery Scan on long_term_active  (cost=0.00..30099.90 rows=99981 width=32) (actual time=54643.218..57311.459 rows=99928 loops=1)
        ->  HashSetOp Intersect  (cost=0.00..29100.08 rows=99981 width=36) (actual time=54643.193..55533.774 rows=99928 loops=1)
              ->  Append  (cost=0.00..28600.17 rows=199966 width=36) (actual time=43751.685..52666.479 rows=199923 loops=1)
                    ->  Result  (cost=0.00..24666.64 rows=99981 width=36) (actual time=43751.661..46403.664 rows=99938 loops=1)
                          ->  HashSetOp Intersect  (cost=0.00..23666.83 rows=99981 width=36) (actual time=43751.625..44630.806 rows=99938 loops=1)
                                ->  Append  (cost=0.00..23166.90 rows=199970 width=36) (actual time=33025.001..41809.201 rows=199935 loops=1)
              

In [38]:
print(explain_2["INTERSECT ALL"])

Merge Right Join  (cost=76668.38..823131.52 rows=49731049 width=34) (actual time=123111.252..127797.375 rows=99928 loops=1)
  Merge Cond: (_.subscriber = long_term_active.subscriber)
  ->  Sort  (cost=38265.38..38514.08 rows=99481 width=33) (actual time=59730.623..60608.886 rows=93740 loops=1)
        Sort Key: _.subscriber
        Sort Method: quicksort  Memory: 10396kB
        ->  Subquery Scan on _  (cost=0.00..30007.40 rows=99481 width=33) (actual time=55974.240..58568.961 rows=93740 loops=1)
              ->  HashSetOp Intersect All  (cost=0.00..29012.58 rows=99481 width=36) (actual time=55974.191..56838.332 rows=93740 loops=1)
                    ->  Append  (cost=0.00..28515.17 rows=198966 width=36) (actual time=44926.318..53967.316 rows=193681 loops=1)
                          ->  Result  (cost=0.00..24591.64 rows=99481 width=36) (actual time=44926.155..47579.391 rows=94782 loops=1)
                                ->  HashSetOp Intersect All  (cost=0.00..23596.83 rows=99481 wi

In [39]:
print(explain_2["INNER JOIN"])

Hash Left Join  (cost=41698.26..51850.90 rows=99980 width=35) (actual time=30121.776..42725.604 rows=99928 loops=1)
  Hash Cond: (x5573b404d543101dd839fb3af497bf61.subscriber = x5573b404d543101dd839fb3af497bf61_1.subscriber)
  ->  Hash Join  (cost=15918.13..24725.71 rows=99980 width=33) (actual time=9688.366..20358.521 rows=99928 loops=1)
        Hash Cond: (x5573b404d543101dd839fb3af497bf61.subscriber = xd42cc81f0d4bdcecfcba23513c65dd6f.subscriber)
        ->  Hash Join  (cost=12734.47..20167.32 rows=99980 width=165) (actual time=7653.065..16396.939 rows=99938 loops=1)
              Hash Cond: (x5573b404d543101dd839fb3af497bf61.subscriber = xa77df7638f260964ecd150ea4bf2dda4.subscriber)
              ->  Hash Join  (cost=9550.78..15608.91 rows=99980 width=132) (actual time=5717.214..12532.227 rows=99944 loops=1)
                    Hash Cond: (x5573b404d543101dd839fb3af497bf61.subscriber = x22e1a6e2dd32fddb57fa47642c3460ff.subscriber)
                    ->  Hash Join  (cost=6367.19..1

With 100000 subscribers (as with 15000 subscribers), the first approach (group by subscriber; count) is most efficient. "INNER JOIN" is more efficient than "INTERSECT" or "INTERSECT ALL" for the second approach.

### Experiment with options for the full MobilityClassification query

In [40]:
stay_length_threshold = 3

long_term_mobility = f"""
SELECT subscriber, max(stay_length) AS longest_stay
FROM (
    SELECT subscriber, count(*) AS stay_length
    FROM (
        SELECT
            subscriber,
            {loc_cols_string},
            ordinal - dense_rank() OVER (
                PARTITION BY subscriber, {loc_cols_string}
                ORDER BY ordinal
            ) AS stay_id
        FROM ({locations_union}) locations_union
    ) locations_with_stay_id
    GROUP BY subscriber, {loc_cols_string}, stay_id
) stay_lengths
GROUP BY subscriber
"""

long_term_activity_1_using_cte = f"""
SELECT
    subscriber,
    count(*) < {len(majority_locations)} AS sometimes_inactive,
    count(coalesce({loc_cols_string})) < {len(majority_locations)} AS sometimes_unlocatable
FROM locations_union
GROUP BY subscriber
"""

long_term_mobility_using_cte = f"""
SELECT subscriber, max(stay_length) AS longest_stay
FROM (
    SELECT subscriber, count(*) AS stay_length
    FROM (
        SELECT
            subscriber,
            {loc_cols_string},
            ordinal - dense_rank() OVER (
                PARTITION BY subscriber, {loc_cols_string}
                ORDER BY ordinal
            ) AS stay_id
        FROM locations_union
    ) locations_with_stay_id
    GROUP BY subscriber, {loc_cols_string}, stay_id
) stay_lengths
GROUP BY subscriber
"""

mobility_classification_1 = CustomQuery(
    f"""
    SELECT
        subscriber,
        CASE
            WHEN coalesce({loc_cols_string}) IS NULL THEN 'unlocatable'
            WHEN sometimes_inactive THEN 'sometimes_inactive'
            WHEN sometimes_unlocatable THEN 'sometimes_unlocatable'
            WHEN longest_stay < {stay_length_threshold} THEN 'highly_mobile'
            ELSE 'stable'
        END AS value
    FROM ({majority_locations[-1].get_query()}) AS most_recent_period
    LEFT JOIN ({long_term_activity_1.get_query()}) AS long_term_activity
    USING (subscriber)
    LEFT JOIN ({long_term_mobility}) AS long_term_mobility
    USING (subscriber)
    """,
    column_names=["subscriber", "value"],
)

mobility_classification_1_cte = CustomQuery(
    f"""
    WITH locations_union AS ({locations_union})
    SELECT
        subscriber,
        CASE
            WHEN coalesce({loc_cols_string}) IS NULL THEN 'unlocatable'
            WHEN sometimes_inactive THEN 'sometimes_inactive'
            WHEN sometimes_unlocatable THEN 'sometimes_unlocatable'
            WHEN longest_stay < {stay_length_threshold} THEN 'highly_mobile'
            ELSE 'stable'
        END AS value
    FROM ({majority_locations[-1].get_query()}) AS most_recent_period
    LEFT JOIN ({long_term_activity_1_using_cte}) AS long_term_activity
    USING (subscriber)
    LEFT JOIN ({long_term_mobility_using_cte}) AS long_term_mobility
    USING (subscriber)
    """,
    column_names=["subscriber", "value"],
)
    
mobility_classification_2 = CustomQuery(
    f"""
    SELECT
        subscriber,
        CASE
            WHEN coalesce({loc_cols_string}) IS NULL THEN 'unlocatable'
            WHEN NOT coalesce(always_active, FALSE) THEN 'sometimes_inactive'
            WHEN NOT coalesce(always_locatable, FALSE) THEN 'sometimes_unlocatable'
            WHEN longest_stay < {stay_length_threshold} THEN 'highly_mobile'
            ELSE 'stable'
        END AS value
    FROM ({majority_locations[-1].get_query()}) AS most_recent_period
    LEFT JOIN ({long_term_activity_2['INNER JOIN'].get_query()}) AS alternative_long_term_activity
    USING (subscriber)
    LEFT JOIN ({long_term_mobility}) AS long_term_mobility
    USING (subscriber)
    """,
    column_names=["subscriber", "value"],
)

In [41]:
mobility_classification_1_morecte = CustomQuery(
    f"""
    WITH locations_union AS ({locations_union}),
         most_recent_period AS ({majority_locations[-1].get_query()}),
         long_term_activity AS ({long_term_activity_1_using_cte}),
         long_term_mobility AS ({long_term_mobility_using_cte})
    SELECT
        subscriber,
        CASE
            WHEN coalesce({loc_cols_string}) IS NULL THEN 'unlocatable'
            WHEN sometimes_inactive THEN 'sometimes_inactive'
            WHEN sometimes_unlocatable THEN 'sometimes_unlocatable'
            WHEN longest_stay < {stay_length_threshold} THEN 'highly_mobile'
            ELSE 'stable'
        END AS value
    FROM most_recent_period
    LEFT JOIN long_term_activity USING (subscriber)
    LEFT JOIN long_term_mobility USING (subscriber)
    """,
    column_names=["subscriber", "value"],
)

In [42]:
explain_mobclass_1 = mobility_classification_1.explain(analyse=True)
explain_mobclass_1_cte = mobility_classification_1_cte.explain(analyse=True)
explain_mobclass_2 = mobility_classification_2.explain(analyse=True)

In [43]:
explain_mobclass_1_morecte = mobility_classification_1_morecte.explain(analyse=True)

In [44]:
print(explain_mobclass_1)

Hash Left Join  (cost=114002.97..116711.72 rows=99985 width=65) (actual time=79246.048..84183.307 rows=99985 loops=1)
  Hash Cond: (xd42cc81f0d4bdcecfcba23513c65dd6f.subscriber = long_term_mobility.subscriber)
  ->  Hash Left Join  (cost=13324.22..15520.54 rows=99985 width=45) (actual time=20940.835..23882.861 rows=99985 loops=1)
        Hash Cond: (xd42cc81f0d4bdcecfcba23513c65dd6f.subscriber = long_term_activity.subscriber)
        ->  Seq Scan on xd42cc81f0d4bdcecfcba23513c65dd6f  (cost=0.00..1933.85 rows=99985 width=43) (actual time=0.056..956.117 rows=99985 loops=1)
        ->  Hash  (cost=13321.72..13321.72 rows=200 width=35) (actual time=20940.720..20941.009 rows=100000 loops=1)
              Buckets: 131072 (originally 1024)  Batches: 1 (originally 1)  Memory Usage: 7567kB
              ->  Subquery Scan on long_term_activity  (cost=13267.05..13321.72 rows=200 width=35) (actual time=9158.853..19904.466 rows=100000 loops=1)
                    ->  Finalize GroupAggregate  (cost=

In [45]:
print(explain_mobclass_1_cte)

Hash Left Join  (cost=129181.78..131890.54 rows=99985 width=65) (actual time=86556.478..91536.524 rows=99985 loops=1)
  Hash Cond: (xd42cc81f0d4bdcecfcba23513c65dd6f.subscriber = long_term_mobility.subscriber)
  CTE locations_union
    ->  Append  (cost=0.00..14602.59 rows=599906 width=47) (actual time=133.262..17063.626 rows=599906 loops=1)
          ->  Seq Scan on x5573b404d543101dd839fb3af497bf61  (cost=0.00..1933.83 rows=99983 width=47) (actual time=133.242..1082.219 rows=99983 loops=1)
          ->  Seq Scan on xc569af3ede1d4e394fdefb6c742ed587  (cost=0.00..1933.81 rows=99981 width=47) (actual time=0.034..954.042 rows=99981 loops=1)
          ->  Seq Scan on x22e1a6e2dd32fddb57fa47642c3460ff  (cost=0.00..1933.82 rows=99982 width=47) (actual time=0.033..968.393 rows=99982 loops=1)
          ->  Seq Scan on xa77df7638f260964ecd150ea4bf2dda4  (cost=0.00..1933.86 rows=99986 width=47) (actual time=0.039..1015.119 rows=99986 loops=1)
          ->  Seq Scan on xb06ca27f33181417112b609b3

In [46]:
print(explain_mobclass_2)

Hash Left Join  (cost=145560.67..157600.47 rows=99985 width=65) (actual time=92275.066..108862.491 rows=99985 loops=1)
  Hash Cond: (xd42cc81f0d4bdcecfcba23513c65dd6f.subscriber = long_term_mobility.subscriber)
  ->  Hash Right Join  (cost=44881.93..56409.29 rows=99985 width=45) (actual time=33453.105..48061.353 rows=99985 loops=1)
        Hash Cond: (x5573b404d543101dd839fb3af497bf61.subscriber = xd42cc81f0d4bdcecfcba23513c65dd6f.subscriber)
        ->  Hash Left Join  (cost=41698.26..51850.90 rows=99980 width=35) (actual time=31370.347..44026.401 rows=99928 loops=1)
              Hash Cond: (x5573b404d543101dd839fb3af497bf61.subscriber = x5573b404d543101dd839fb3af497bf61_1.subscriber)
              ->  Hash Join  (cost=15918.13..24725.71 rows=99980 width=33) (actual time=10107.734..20816.059 rows=99928 loops=1)
                    Hash Cond: (x5573b404d543101dd839fb3af497bf61.subscriber = xd42cc81f0d4bdcecfcba23513c65dd6f_1.subscriber)
                    ->  Hash Join  (cost=12734.4

In [47]:
print(explain_mobclass_1_morecte)

Hash Left Join  (cost=129181.78..131890.54 rows=99985 width=65) (actual time=83015.694..87783.800 rows=99985 loops=1)
  Hash Cond: (xd42cc81f0d4bdcecfcba23513c65dd6f.subscriber = long_term_mobility.subscriber)
  CTE locations_union
    ->  Append  (cost=0.00..14602.59 rows=599906 width=47) (actual time=55.137..16213.928 rows=599906 loops=1)
          ->  Seq Scan on x5573b404d543101dd839fb3af497bf61  (cost=0.00..1933.83 rows=99983 width=47) (actual time=55.116..991.097 rows=99983 loops=1)
          ->  Seq Scan on xc569af3ede1d4e394fdefb6c742ed587  (cost=0.00..1933.81 rows=99981 width=47) (actual time=0.033..933.682 rows=99981 loops=1)
          ->  Seq Scan on x22e1a6e2dd32fddb57fa47642c3460ff  (cost=0.00..1933.82 rows=99982 width=47) (actual time=0.031..924.716 rows=99982 loops=1)
          ->  Seq Scan on xa77df7638f260964ecd150ea4bf2dda4  (cost=0.00..1933.86 rows=99986 width=47) (actual time=0.036..922.035 rows=99986 loops=1)
          ->  Seq Scan on xb06ca27f33181417112b609b3ff26

For a dataset with 15000 subscribers, approach 1 is the most efficient, and using a CTE for the locations union makes essentially no difference. Using CTEs for all sub-parts of the query results in the same query plan as just using one CTE.

For a dataset with 100000 subscribers, approach 1 is still the most efficient but the difference between approach 1 and approach 2 is smaller. In this case, using a CTE for the locations union makes the query slightly slower (again, not much difference), and using CTEs for everything gives the same query plan again.

### Check approaches give the same results

In [48]:
result_1 = mobility_classification_1.get_dataframe()
result_1_cte = mobility_classification_1_cte.get_dataframe()
result_2 = mobility_classification_2.get_dataframe()

In [49]:
result_1.groupby("value").count()

Unnamed: 0_level_0,subscriber
value,Unnamed: 1_level_1
highly_mobile,15995
sometimes_inactive,57
sometimes_unlocatable,5102
stable,77745
unlocatable,1086


In [50]:
result_1_cte.groupby("value").count()

Unnamed: 0_level_0,subscriber
value,Unnamed: 1_level_1
highly_mobile,15995
sometimes_inactive,57
sometimes_unlocatable,5102
stable,77745
unlocatable,1086


In [51]:
result_2.groupby("value").count()

Unnamed: 0_level_0,subscriber
value,Unnamed: 1_level_1
highly_mobile,15995
sometimes_inactive,57
sometimes_unlocatable,5102
stable,77745
unlocatable,1086


In [52]:
result_1.groupby("value").count().sum()

subscriber    99985
dtype: int64