# Retrieval of individual human movement trajectories and collective OD matrices from mobile phone data

This notebook gives a glimpse of how pyspark can be used to retrieve individual human movements trajectories and collective OD matrices from mobile phone data. 
It can be run in a loop over multiple time intervals using the "Optional__Bash_command_for_loop.py" script. The "Optional__Bash_command_for_loop.py" script requires a "timestamps_df.csv" file as an input. This can be generated via the notebook "Optional__Retrieval_of_timestamps_from_calldata_filenames_to_run_analysis_in_a_loop.ipynb". In a loop the "Optional__Bash_command_for_loop.py" script will automatically set the start_timestamp and end_timestamp parameters for this notebook. If not run in a loop start and end timestmap can be set manually (as demonstrated below).

In [None]:
# Set timestamp parameters manually, when not using a loop
start_timestamp = '20210925001002' # example timestamp
end_timestamp   = '20210925011002' # example timestamp

# Part A: Retrieve human movement transitions between antennas

Load packages

In [None]:
import gzip
import shutil
import glob
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, DoubleType, LongType, StructField,  TimestampType
from datetime import datetime
from pyspark.sql.functions import countDistinct
import os
import pickle
import logging
import traceback
from datetime import datetime
import sys

Monitor process time

In [None]:
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time);

### A.0) Load data in PySpark

Define path names and parameters

In [None]:
path_to_zipped_calldata_on_server_from_notebook_working_path = '../data/0_input_data/calldata/zipped'
path_to_store_unzipped_txt_files                             = '../data/0_input_data/calldata/unzipped'
amount_of_additional_txt_files_to_unzip_and_read             = 1 # can be set to a higher number, if mobile phone records can appear in datafiles of subsequent hours/days (the timestmap of the filenames does not have to be consistent with the timestmaps of the actual mobile phone records within a .txt file) 

Convert timestamps to string format

In [None]:
start_timestamp = str(start_timestamp)
end_timestamp = str(end_timestamp)

#### A.0.0) Unzip data for time intervall of analysis specified by start and end timestamp (especially relevant when handling big data e.g. on a cluster)

In [None]:
# Get list of all filenames 
files_list = os.listdir(path_to_zipped_calldata_on_server_from_notebook_working_path) # get all file names
# Filter by prefix
files_list = [x for x in files_list if x.startswith('DDD')]
# Sort by name
files_list.sort()

# Subset file name list to relevant time intervall
start_index = files_list.index('DDD_21_' + start_timestamp + '.txt.gz') 
end_index = files_list.index('DDD_21_' + end_timestamp + '.txt.gz')
files_list = files_list[start_index:end_index + amount_of_additional_txt_files_to_unzip_and_read]

# Unzip raw datafiles within interval of analysis for computation, unzipped files will be removed later
for file in files_list:
    with gzip.open(os.path.join(path_to_zipped_calldata_on_server_from_notebook_working_path, file) , "rb") as gz:
            with open(os.path.join(path_to_store_unzipped_txt_files, file[:-3]), "wb") as f:
                shutil.copyfileobj(gz, f)

#### A.0.1) Create Spark session with unzipped data

For the processing of large data sets the code line of ".config("spark.driver.memory", "16g")" should be adjusted to the available RAM on a server.

In [None]:
# Set Spark spark.local.dir
import os
os.environ["SPARK_LOCAL_DIRS"] = "../data/spark"

# Create a spark session:
spark = (
    SparkSession.builder.master("local[*]")
    .config("spark.driver.memory", "16g")
    .appName("mobility_RJ")
    .getOrCreate())

# the schema of raw mobile phone records
schema = (
    StructType()
    .add("time",  StringType(), True, metadata={"maxlength":14, "minlength":14}) # the metadata is optional
    .add("user", StringType(), True, metadata={"maxlength":32, "minlength":32}) # the metadata is optional
    .add("zip1", IntegerType(), True)
    .add("zip2", IntegerType(), True)
    .add("lat", DoubleType(), True)
    .add("lon", DoubleType(), True))

# insert metadata
from pyspark.sql.functions import col
col("time").alias("time", metadata={"maxlength":14})
col("user").alias("time", metadata={"maxlength":32})

# specify pyspark options
data = spark.read.option("mode", "DROPMALFORMED").csv(os.path.join(path_to_store_unzipped_txt_files, '*.txt'), sep="|", schema=schema)

# print spark directory
print(os.environ['SPARK_LOCAL_DIRS'])

### A.1) Retrieval of individual human movement trajectories

#### A.1.0 Filter data records by timestamps

We filter data points whose timestamp is within the given time intervall. 
This step is necessary because sometimes there is a mismatch in the data between the split into data files and the contained data points. 
This allows us to first load a larger amount of data files and then filter away unwanted points:

In [None]:
filtered_data = data.rdd.filter(lambda row: row["time"] > start_timestamp and row["time"]  < end_timestamp)

Store some metadata about the data

In [None]:
metadata = {}
metadata["call_count"] = filtered_data.count() # count amount of phone connections in the data
metadata["start_timestamp"] = start_timestamp
metadata["end_timestamp"] = end_timestamp

#### A.1.1 Retrieve and store antenna locations from the data

Introduce nice consecutive indices for antennas. To do so, we create a mapping of `hash(lat, lon) -> idx` such that the index `idx` is consecutive across the antennas that are present in the data. The mapping is stored as a Python `dict` on the frontend but also distributed back to the cluster for use in further data transformations. The first step could be done once and loaded from disk when you are sure that all antennas are included. The second step needs to be performed even with the mapping being loaded from disk.

In [None]:
antennas_dict = dict(filtered_data.map(lambda row: (row["lat"], row["lon"])).distinct().zipWithIndex().collect())
antennas = spark.sparkContext.broadcast(antennas_dict)

# Additionally, we also write the antenna positions in indexed order into a file for further processing:
inv_antennas = {i: pos for pos, i in antennas_dict.items()}
with open("../data/1_intermediate_output/antenna_positions/antenna_positions_" + end_timestamp + ".csv", "w") as f:
    for i in range(len(inv_antennas)):
        f.write(f"{inv_antennas[i][0]}, {inv_antennas[i][1]}\n")

#### A.1.2 Derive human movement trajectories from sequential antenna connection (+ apply inter-event-time (IET) filtering)

We replace the `lat` and `lon` field of each connection in the original RDD with the index of the antennas and at the same time we will drop unnecessary data. 
Note that this RDD is never `collect`ed, which means that the entire evaluation is lazy and will be executed in one sweep with the follow-up data transformations. 
After this transformation, the rows are of the following form: `userid, (timestamp, antennaid)`:

In [None]:
preprocessed = filtered_data.map(lambda row: ( row["user"],(row["time"], antennas.value[(row["lat"], row["lon"])]),))

The next transformation is the cornerstone of the analysis as it does the tracking of all users in a single dataset sweep. After the grouping operation, we drop the userid as it is not needed anymore. The data then has the form `List[(timestamp, antenna_id)]` with one row per user.

In [None]:
grouped = preprocessed.groupByKey().map(lambda row: row[1].data).persist()

Store the amount of unique users in the metadata dictionary

In [None]:
metadata["user_count"] = grouped.count()

Next, we want to identify only transitions between antennas fullfilling special time constraints (measured in seconds). The following function extracts transitions for each single user according to these constraints.

In [None]:
minimum_stay_duration_threshold = 60*15  # Stays at an antenna below this threshold time will be aggregated into subsequent antenna transitions (=15 min)
maximum_inter_event_time = 60*60*4  # Transitions with a inter-time event above this threshold will not be considered in the analysis (=4std.)

In [None]:
def extract_transitions(filtering=True):
    def extractor(events):
        try:
            # Is this the correct sorting criterion?
            sorted_events = sorted(events, key=lambda e: e[0])
            time_between_events = [(datetime.strptime(b[0], "%Y%m%d%H%M%S") - datetime.strptime(a[0], "%Y%m%d%H%M%S")).seconds for a, b in zip(sorted_events[:-1], sorted_events[1:])]
            event_time = time_between_events + [minimum_stay_duration_threshold + 1]
            antennas = [e[1] for e in sorted_events]
            eventtimepairs = tuple(zip(sorted_events, event_time))
            if filtering:
                eventtimepairs = tuple(filter(lambda item: item[1] > minimum_stay_duration_threshold, eventtimepairs))
            ret = []
            for ((ts0, a0), _), ((ts1, a1), _) in zip(eventtimepairs[:-1], eventtimepairs[1:]):
                inter_event_time = (datetime.strptime(ts1, "%Y%m%d%H%M%S") - datetime.strptime(ts0, "%Y%m%d%H%M%S")).seconds
                if (not filtering) or inter_event_time < maximum_inter_event_time:
                    ret.append((a0, a1, inter_event_time))
            return ret
        except ValueError:
            return []
    return extractor

Next, we find all transitions in the dataset. The rows in our dataset are of the form `antenna1, antenna2` with one row per registered transition. Note that we still have not `collect`ed the result! 

In [None]:
transitions = grouped.flatMap(extract_transitions(filtering=True)).persist()

Store the amount of IET-filtered and IET-unfiltered transitions in the metadata dictionary

In [None]:
metadata["transition_count_unfiltered"] = grouped.flatMap(extract_transitions(filtering=False)).count()

In [None]:
metadata["transition_count_filtered"] = transitions.count()

Finally, we count the transitions for all pairs of antennas. The `countByValue` operations does an implicit collect:

In [None]:
transitions_counts = transitions.map(lambda row: (row[0], row[1])).countByValue()

Retrieve and store inter event times as stay times at previous antenna

In [None]:
stay_times = transitions.map(lambda row: (row[0], row[2])).groupByKey().map(lambda row: (row[0], np.mean(row[1].data), np.std(row[1].data), np.count_nonzero(row[1].data))).collect()

In [None]:
with open("../data/1_intermediate_output/stay_times_at_antennas/stay_times_at_antennas_" + end_timestamp + ".csv", "w") as f:
    f.write("lat, lon, mean, stddev, count\n")
    for antenna, mean, stddev, count in stay_times:
        f.write(f"{inv_antennas[antenna][0]}, {inv_antennas[antenna][1]}, {mean}, {stddev}, {count}\n")

These counts can be fed into a dense `numpy` data structure. An error will be returned if no transitions remains after filtering by IET threshold.

In [None]:
tower2tower = np.full((len(antennas_dict), len(antennas_dict)), 0)
entries = np.array([(i0, i1, v) for (i0, i1), v in transitions_counts.items()])
tower2tower[entries[:, 0], entries[:, 1]] = entries[:, 2]
np.save("../data/1_intermediate_output/tower2tower/tower2tower_" + end_timestamp + ".npy", tower2tower)

Save metadata with 'end_timestamps' in the filename

In [None]:
metadata_df = pd.DataFrame.from_dict(metadata.items())
metadata_df.columns = ['metadata', end_timestamp]
metadata_df.set_index('metadata', inplace=True)
metadata_df.to_csv('../data/1_intermediate_output/metadata/metadata_' + end_timestamp + '.csv')

# Part B) Conversion of Origin-Destination (OD) matrix from antenna to admin scale

Load packages

In [None]:
import pyproj
import functools
import geojson
import geopandas
import itertools
import matplotlib.pyplot as plt
import numpy as np
import shapely.geometry as geo
import scipy.spatial as spatial
import pandas as pd
from numpy import genfromtxt
import seaborn as sns 
import networkx as nx
import contextily as cx 
from contextily import add_basemap

Specify paths and input parameters

In [None]:
regions_geojson_file = ("../data/0_input_data/study_region/study_region_RJ.geojson")  # GeoJSON file with administrative regions used for the analysis
antennas_csv_file = "../data/1_intermediate_output/antenna_positions/antenna_positions_" + end_timestamp + ".csv"  # The csv file with antenna locations
max_antenna_range = 5000  # The maximum range of an antenna in meters. This can be used to crop infinite Voronoi cells to a reasonable size.
epsg_regions = 29193  # The EPSG code of the regions file

#### B.0) Create antenna to antenna OD matrix

Load admin regions

In [None]:
regions_df = geopandas.read_file(regions_geojson_file)
regions_df.crs = epsg_regions

Load antenna locations

In [None]:
antennas = np.genfromtxt(antennas_csv_file, delimiter=",")
projection = pyproj.Transformer.from_crs("EPSG:4326", f"EPSG:{epsg_regions}", always_xy=False).transform
antennas = np.apply_along_axis(lambda row: projection(*row), 1, antennas)
antenna_df = geopandas.GeoDataFrame(geometry=[geo.Point(a) for a in antennas]) # create GDF
antenna_df.crs = 29193#epsg_regions
antenna_df = antenna_df.to_crs(epsg="29193")

Plot study region with antennas

In [None]:
ax = regions_df.plot(figsize=(20, 8))
ax.set_axis_off()
ax = antenna_df.plot(ax=ax, color="black", markersize=2, aspect=1);

Build method to calculate a Voronoi tesselation and transform to a dataframe

In [None]:
def voronoi_finite_polygons_2d(vor, radius=None):
    """Reconstruct infinite Voronoi regions in a
    2D diagram to finite regions.
    Source:
    [https://stackoverflow.com/a/20678647/1595060](https://stackoverflow.com/a/20678647/1595060)
    """
    if vor.points.shape[1] != 2:
        raise ValueError("Requires 2D input")
    new_regions = []
    new_vertices = vor.vertices.tolist()
    center = vor.points.mean(axis=0)
    if radius is None:
        radius = vor.points.ptp().max()
    # Construct a map containing all ridges for a
    # given point
    all_ridges = {}
    for (p1, p2), (v1, v2) in zip(vor.ridge_points, vor.ridge_vertices):
        all_ridges.setdefault(p1, []).append((p2, v1, v2))
        all_ridges.setdefault(p2, []).append((p1, v1, v2))
    # Reconstruct infinite regions
    for p1, region in enumerate(vor.point_region):
        vertices = vor.regions[region]
        if all(v >= 0 for v in vertices):
            # finite region
            new_regions.append(vertices)
            continue
        # reconstruct a non-finite region
        ridges = all_ridges[p1]
        new_region = [v for v in vertices if v >= 0]
        for p2, v1, v2 in ridges:
            if v2 < 0:
                v1, v2 = v2, v1
            if v1 >= 0:
                # finite ridge: already in the region
                continue
            # Compute the missing endpoint of an
            # infinite ridge
            t = vor.points[p2] - vor.points[p1]  # tangent
            t /= np.linalg.norm(t)
            n = np.array([-t[1], t[0]])  # normal
            midpoint = vor.points[[p1, p2]].mean(axis=0)
            direction = np.sign(np.dot(midpoint - center, n)) * n
            far_point = vor.vertices[v2] + direction * radius
            new_region.append(len(new_vertices))
            new_vertices.append(far_point.tolist())
        # Sort region counterclockwise.
        vs = np.asarray([new_vertices[v] for v in new_region])
        c = vs.mean(axis=0)
        angles = np.arctan2(vs[:, 1] - c[1], vs[:, 0] - c[0])
        new_region = np.array(new_region)[np.argsort(angles)]
        new_regions.append(new_region.tolist())

    return new_regions, np.asarray(new_vertices)

Create Voronoi tesselation and transform to a dataframe

In [None]:
vor = spatial.Voronoi(antennas)
regions, vertices = voronoi_finite_polygons_2d(vor)
antenna_df["voronoi"] = [geo.Polygon([vertices[i] for i in reg]) for reg in regions]
antenna_df = antenna_df.set_geometry("voronoi")

As the Voronoi diagram extends beyond our specified geographic region, we now intersect each Voronoi region with the union of all regions

In [None]:
all_regions = functools.reduce(lambda a, b: a.union(b), regions_df.geometry, geo.MultiPolygon())
antenna_df["cut_voronoi"] = antenna_df.voronoi.intersection(all_regions)
antenna_df = antenna_df.set_geometry("cut_voronoi")

Plot tesselations (therefore drop antennas outside of tesselations)

In [None]:
antenna_df_filt = antenna_df[antenna_df.cut_voronoi.area > 0]
ax = antenna_df_filt.plot(figsize=(20, 8))
ax.set_axis_off()
ax = antenna_df_filt["geometry"].copy().plot(ax=ax, markersize=2, color="black");

Save tesselations in study regions

In [None]:
cut_voronoi = antenna_df_filt['cut_voronoi']
cut_voronoi.to_file('../data/1_intermediate_output/antenna_tesselations/tesselations_' + end_timestamp + '.shp')

#### B.1) Convert tower2tower matrix to admin2admin matrix

Calculate the transformation matrices that describe the relation ship between antennas and admin regions

In [None]:
admin2tower = np.zeros(shape=(antenna_df.shape[0], regions_df.shape[0]))
tower2admin = np.zeros(shape=(regions_df.shape[0], antenna_df.shape[0]))
for (i, antenna), (j, region) in itertools.product(antenna_df.iterrows(), regions_df.iterrows()):
    if not antenna["voronoi"].intersects(region["geometry"]):
        continue
    inside_area = all_regions.intersection(antenna["voronoi"]).area
    outside_area = (antenna["voronoi"].difference(all_regions).intersection(antenna["geometry"].buffer(max_antenna_range)).area)
    admin2tower[i, j] = antenna["voronoi"].intersection(region["geometry"]).area / (inside_area + outside_area)
    tower2admin[j, i] = (antenna["voronoi"].intersection(region["geometry"]).area/ region["geometry"].area)

Matrix multiplication to generate admin2admin matrix

In [None]:
tower2tower = np.load('../data/1_intermediate_output/tower2tower/tower2tower_' + end_timestamp + ".npy")  # Read matrices
admin2admin = np.matmul(np.matmul(tower2admin,tower2tower),admin2tower)                                   # Matrix multiplication
np.fill_diagonal(admin2admin, 0)                                                                          # Optional: Fill diagonal with zeros
np.save("../data/1_intermediate_output/admin2admin/admin2admin_" + end_timestamp + ".npy", admin2admin)   # Save

# Part C) Delete unzipped calldata files

Delete unzipped calldata files to not run out of memeory on server. If this notebook is run in a loop, the enxt loop will unzip raw calldata file of the next time interval of interest in the analysis

In [None]:
import os, shutil
folder = '../data/0_input_data/calldata/unzipped'
for filename in os.listdir(folder):
    file_path = os.path.join(folder, filename)
    try:
        if os.path.isfile(file_path) or os.path.islink(file_path):
            os.unlink(file_path)
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)
    except Exception as e:
        print('Failed to delete %s. Reason: %s' % (file_path, e))

Monitor runtime

In [None]:
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)