-
Notifications
You must be signed in to change notification settings - Fork 20
/
pwo.py
297 lines (256 loc) · 11.6 KB
/
pwo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# -*- coding: utf-8 -*-
"""
The population-weighted opportunities model uses
population distributions for predicting the
probability of movement between populated
areas. The model computes the attraction
between populated centers relative to the
distance between each origin and destination
locations and also to all other equidistant
locations from the destination. The model
works similarly to the Radiation Model.
The original publication suggests that its
ideal usage is in cities.
References
----------
Yan X-Y, Zhao C, Fan Y, Di Z, Wang W-X. 2014 "Universal predictability of mobility patterns in cities". J. R. Soc. Interface 11: 20140834. http://dx.doi.org/10.1098/rsif.2014.0834
"""
import warnings
from typing import List, Optional, Union, Tuple
import pandas as pd
from flowmachine.core.query import Query
from flowmachine.features.subscriber import daily_location
from flowmachine.utils import list_of_dates, standardise_date
from flowmachine.features.subscriber import ModalLocation
from flowmachine.core import make_spatial_unit
from flowmachine.core.spatial_unit import LonLatSpatialUnit
from flowmachine.features.spatial.distance_matrix import DistanceMatrix
import structlog
logger = structlog.get_logger("flowmachine.debug", submodule=__name__)
class _PopulationBuffer(Query):
"""
Private class for calculating a population
vector based on buffers generated from
the distance radii between point locations.
Parameters
----------
population_object : flowmachine.features.utilities.spatial_aggregate.SpatialAggregate
An aggregated subscriber locating object
distance_matrix : flowmachine.features.spatial.distance_matrix.DistanceMatrix
A distance matrix
"""
def __init__(self, population_object, distance_matrix):
self.population_object = population_object
self.distance_matrix = distance_matrix
self.spatial_unit = self.distance_matrix.spatial_unit
super().__init__()
@property
def column_names(self) -> List[str]:
cols = self.spatial_unit.location_id_columns
return (
[f"{c}_from" for c in cols]
+ [f"{c}_to" for c in cols]
+ ["buffer_population", "src_pop", "sink_pop"]
)
def _make_query(self):
"""
Protected method that generates SQL
that calculates the population that is
covered by a buffer.
"""
cols = self.spatial_unit.location_id_columns
#
# The summing over partition - this works because at each row you're summing over the pops of all
# rows with a distance less than the current one with the partitions over the destination row.
#
sql = f"""
SELECT
{", ".join(f"{c}_{direction}" for direction in ("from", "to") for c in cols)},
1/(sum(src_pop) over (partition by {", ".join(f"{c}_to" for c in cols)} order by value)+sink_pop-src_pop) as buffer_population,
src_pop, sink_pop
FROM (
(SELECT * FROM
(SELECT {", ".join(f"hl_{direction}.{c} as {c}_{direction}" for c in cols for direction in ("to", "from"))}, hl_from.value as src_pop, hl_to.value as sink_pop
FROM
({self.population_object.get_query()}) as hl_from
LEFT JOIN
({self.population_object.get_query()}) as hl_to
ON
{" OR ".join(f"hl_from.{c} != hl_to.{c}" for c in cols)}
) pops
LEFT JOIN
({self.distance_matrix.get_query()}) as dm
USING ({", ".join(f"{c}_{direction}" for c in cols for direction in ("to", "from"))})
)
) distance_pop_matrix
"""
return sql
class PopulationWeightedOpportunities(Query):
"""
Population-weighted opportunities model [1]_.
The model predicts the mobility between populated
areas in cities based only on the population densities
of those areas, their spatial distribution, and
the number of people that depart a certain area. This
model is useful for studying mobility pattern in cities.
Parameters
----------
start : str
ISO format date string denoting the first day of data to include
stop : str
ISO format date string denoting the last day of data to include
spatial_unit : flowmachine.core.spatial_unit.*SpatialUnit, default versioned-site
Spatial unit to which subscriber locations will be mapped. See the
docstring of make_spatial_unit for more information.
departure_rate : float, or Dataframe, default 0.1
Either one uniform departure rate, or a dataframe with a rate column
and columns matching those of the spatial unit. If the latter, results
are only returned for locations in this dataframe
hours : 'all', or tuple of ints, default 'all'
The hours of the day to include activity in
method : {'last', 'most-common'}, default 'last'
Method used to resolve a daily location
table : str, or list of str, default 'all'
Specify which event types to include. 'all', all available event
types are included. Otherwise, should be a (schema qualified) list of
events tables.
subscriber_identifier : {'msisdn', 'imei'}, default 'msisdn'
Either msisdn, or imei, the column that identifies the subscriber.
subscriber_subset : flowmachine.core.Query, default None
If provided, a query or table which has a column with a named
subscriber to limit results to.
Examples
--------
>>> p = PopulationWeightedOpportunities('2016-01-01', '2016-01-07', departure_rate=pd.Dataframe([{"site_id":'0xqNDj', "rate":0.9}]))
One can also run the model with uniform departure
rates for all locations as follows:
>>> PopulationWeightedOpportunities('2016-01-01', '2016-01-07', departure_rate=0.5).head()
origin destination prediction probability
0 0xqNDj 8wPojr 0.384117 0.010670
1 0xqNDj B8OaG5 0.344384 0.009566
2 0xqNDj DonxkP 0.715311 0.019870
3 0xqNDj zdNQx2 0.267854 0.007440
Where prediction is the absolute number of people
that move from one location to another. (This should
be interpreted as a integer, but floats are provided
for evaluating results in a continuous scale.) And
probability is the predicted value over the total
population leaving the origin (T_i). That is, how
likely it is that a person leaving the origin will
be found in a given destination.
References
----------
.. [1] Yan X-Y, Zhao C, Fan Y, Di Z, Wang W-X. 2014 "Universal predictability of mobility patterns in cities". J. R. Soc. Interface 11: 20140834. http://dx.doi.org/10.1098/rsif.2014.0834
"""
def __init__(
self,
start: str,
stop: str,
*,
spatial_unit: Optional[LonLatSpatialUnit] = None,
departure_rate: Union[pd.DataFrame, float] = 0.1,
hours: Union[str, Tuple[int, int]] = "all",
method: str = "last",
table: Union[str, List[str]] = "all",
subscriber_identifier: str = "msisdn",
subscriber_subset: Optional[Query] = None,
):
warnings.warn(
"The PopulationWeightedOpportunities model is currently **experimental**. "
+ "Please review Yan X-Y et al. "
+ "(http://dx.doi.org/10.1098/rsif.2014.0834) "
+ "before using this model in production."
)
if isinstance(departure_rate, pd.DataFrame):
# Rename the columns to match what we'll join to
# sort the dataframe so we'll have a consistent md5
self.departure_rate = departure_rate.rename(
columns=lambda x: x if x == "rate" else f"{x}_from"
).apply(lambda x: x.sort_values().values)
self.departure_rate = self.departure_rate.reindex(
columns=sorted(self.departure_rate.columns)
)
elif isinstance(departure_rate, float):
self.departure_rate = departure_rate
else:
raise TypeError(f"{departure_rate} must be a float or dataframe")
self.start = standardise_date(start)
self.stop = standardise_date(stop)
if spatial_unit is None:
self.spatial_unit = make_spatial_unit("versioned-site")
else:
self.spatial_unit = spatial_unit
self.distance_matrix = DistanceMatrix(
spatial_unit=self.spatial_unit, return_geometry=True
)
self.population_object = ModalLocation(
*[
daily_location(
d,
spatial_unit=self.spatial_unit,
hours=hours,
method=method,
table=table,
subscriber_identifier=subscriber_identifier,
ignore_nulls=True,
subscriber_subset=subscriber_subset,
)
for d in list_of_dates(self.start, self.stop)
]
).aggregate()
self.population_buffer_object = _PopulationBuffer(
population_object=self.population_object,
distance_matrix=self.distance_matrix,
)
@property
def column_names(self) -> List[str]:
return [
"{}_{}".format(c, d)
for d in ("from", "to")
for c in self.spatial_unit.location_id_columns
] + ["prediction", "probability"]
def _make_query(self):
if isinstance(self.departure_rate, float):
scaled_buffer_query = (
f"SELECT buffer.src_pop*{self.departure_rate} as T_i, * FROM buffer"
)
elif isinstance(self.departure_rate, pd.DataFrame):
scaled_buffer_query = f"""
SELECT buffer.*, buffer.src_pop*rate as T_i FROM
(VALUES {", ".join([str(tuple(x)) for x in self.departure_rate.values])})
AS t({", ".join(c for c in self.departure_rate.columns)})
LEFT JOIN buffer
USING ({", ".join(c for c in self.departure_rate.columns if c != 'rate')})
"""
else:
raise ValueError(
f"Unexpected departure rate type! Got {self.departure_rate}."
)
return f"""
WITH buffer AS ({self.population_buffer_object.get_query()}),
beta AS (SELECT 1.0/sum(value) as beta FROM ({self.population_object.get_query()}) pops),
sigma AS (
SELECT
{", ".join(f"{c}_from" for c in self.spatial_unit.location_id_columns)},
sum(sink_pop*(buffer_population-(SELECT beta FROM beta))) as sigma
FROM buffer
GROUP BY {", ".join(f"{c}_from" for c in self.spatial_unit.location_id_columns)}
)
SELECT {", ".join("{}_{}".format(c, d)
for d in ("from", "to")
for c in self.spatial_unit.location_id_columns)}, prediction, COALESCE(prediction / T_i, 0.) as probability
FROM
(SELECT {", ".join("{}_{}".format(c, d)
for d in ("from", "to")
for c in self.spatial_unit.location_id_columns)},
NULLIF(T_i, 0) as T_i,
(T_i*sink_pop*(buffer_population-(SELECT beta FROM beta)))/sigma as prediction
FROM
({scaled_buffer_query}) scaled_buf
LEFT JOIN
sigma
USING ({", ".join(f"{c}_from" for c in self.spatial_unit.location_id_columns)}) ) pwo_pred
"""