# Home-work detection

In [1]:
%load_ext autoreload
%autoreload 2
%cd D:\mobi-seg-net

D:\mobi-seg-net


In [4]:
import pandas as pd
import numpy as np
import os
from tqdm import tqdm
import random
import sqlalchemy
from lib import workers as workers
from howde import HoWDe_labelling
from pyspark.sql.functions import col
import pyarrow.parquet as pq
import pyarrow as pa
import matplotlib.pyplot as plt
from collections import Counter
import pickle

In [3]:
# Pyspark set up
os.environ['JAVA_HOME'] = r"C:\Program Files\Eclipse Adoptium\jdk-11.0.27.6-hotspot"
from pyspark.sql import SparkSession
import sys
from pyspark import SparkConf
# Set up pyspark
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# Create new context
mem="50g"
n_workers = 16
spark = SparkSession.builder.config("spark.sql.files.ignoreCorruptFiles","true")\
                                            .config("spark.driver.memory", mem) \
                                            .config("spark.driver.maxResultSize", "40g") \
                                            .config("spark.executer.memory", "40g") \
                                            .config("spark.sql.session.timeZone","Europe/Stockholm")\
                                            .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
                                            .config("spark.driver.maxResultSize", "40g")\
                                            .config("spark.kryoserializer.buffer.max", "128m")\
                                            .config("spark.storage.memoryFraction", "0.5")\
                                            .config("spark.sql.broadcastTimeout", "7200")\
                                            .master(f"local[{n_workers}]").getOrCreate()
java_version = spark._jvm.System.getProperty("java.version")
print(f"Java version used by PySpark: {java_version}")
print('Web UI:', spark.sparkContext.uiWebUrl)

Java version used by PySpark: 11.0.27
Web UI: http://C19YUEI.net.chalmers.se:4040


In [4]:
# Data location
user = workers.keys_manager['database']['user']
password = workers.keys_manager['database']['password']
port = workers.keys_manager['database']['port']
db_name = workers.keys_manager['database']['name']
engine = sqlalchemy.create_engine(f'postgresql://{user}:{password}@localhost:{port}/{db_name}?gssencmode=disable')

In [5]:
def within_box(lat, lng):
    if (lat >= workers.stockholm_box[1]) & (lat <= workers.stockholm_box[3]):
        if (lng >= workers.stockholm_box[0]) & (lng <= workers.stockholm_box[2]):
            return 1
    return 0

def hw_extract(data):
    if len(data[data['location_type'] == 'W']) > 0:
        w_list = data.loc[data['location_type'] == 'W', 'loc'].tolist()
        counter = Counter(w_list)
        most_common_element, count = counter.most_common(1)[0]
    else:
        most_common_element, count = 0, 0
    return pd.Series(dict(workplace_loc=most_common_element, workplace_count=count))

## 1. Load data

In [6]:
cols = ['device_aid', 'loc', 'localtime', 'l_localtime']
uuid_dict = dict()
df_list = []
for i in tqdm(range(0, 50)):
    # Align column names and timestamp format
    df_stops = pd.read_parquet(f"dbs/stops_pr/stops_pr_{i}.parquet",
                               columns=cols)
    df_stops.columns = ["useruuid", "loc", "start", "end"]
    b_dict = {x: y for x, y in zip(df_stops['useruuid'].unique(), range(1, df_stops['useruuid'].nunique() + 1))}
    uuid_dict.update(b_dict)
    df_stops['useruuid'] = df_stops['useruuid'].map(uuid_dict)
    df_list.append(df_stops)
df_stops = pd.concat(df_list)
del df_list

100%|██████████| 50/50 [01:04<00:00,  1.30s/it]


In [9]:
uuid_list = df_stops['useruuid'].unique()
uuid_batch_dict = {x: random.randint(1, 200) for x in uuid_list}
df_stops.loc[:, 'batch'] = df_stops['useruuid'].map(uuid_batch_dict)
print(df_stops.loc[:, 'batch'].nunique())

200


In [10]:
def write_group(data):
    # Write it back with compatible timestamp type (MILLIS)
    table = pa.Table.from_pandas(data)  # force timestamps to milliseconds
    pq.write_table(table, f'dbs/temp/stops_pr_{data.name}_millis.parquet', coerce_timestamps='ms')

In [11]:
tqdm.pandas()
df_stops.groupby('batch').progress_apply(lambda x: write_group(x))

100%|██████████| 200/200 [00:48<00:00,  4.13it/s]


In [12]:
# To test
# df_stops = df_stops.loc[df_stops['useruuid'].isin(df_stops['useruuid'].unique()[:1000]), :]
uuid_r_dict = {v: k for k, v in uuid_dict.items()}
# Save to file
with open('dbs/uuid_r_dict.pkl', 'wb') as f:
    pickle.dump(uuid_r_dict, f)

## 2. Convert data (50 batches) into correct time format

In [7]:
cols = ['device_aid', 'loc', 'localtime', 'l_localtime']
uuid_dict = dict()
df_list = []
for i in tqdm(range(0, 50)):
    # Align column names and timestamp format
    df_stops = pd.read_parquet(f"dbs/stops_pr/stops_pr_{i}.parquet",
                               columns=cols)
    df_stops.columns = ["useruuid", "loc", "start", "end"]
    # b_dict = {x: y for x, y in zip(df_stops['useruuid'].unique(), range(1, df_stops['useruuid'].nunique() + 1))}
    # uuid_dict.update(b_dict)
    # df_stops['useruuid'] = df_stops['useruuid'].map(uuid_dict)
    table = pa.Table.from_pandas(df_stops)  # force timestamps to milliseconds
    pq.write_table(table, f'dbs/hw_detection/stops_pr_{i}_millis.parquet', coerce_timestamps='ms')

100%|██████████| 50/50 [01:40<00:00,  2.00s/it]


## 3. Workplace detection

In [8]:
# Home-work detection
path2data = 'dbs/hw_detection'
files = os.listdir('dbs/hw_detection')
file_paths = [os.path.join(path2data, f) for f in files]

In [15]:
# Load your stop location data
# input_data = spark.read.parquet(*file_paths)

i = 42
input_data = spark.read.parquet(f'dbs/hw_detection/stops_pr_{i}_millis.parquet')
input_data = input_data.repartition('useruuid')
input_data.orderBy(['useruuid', 'start']).show(5)

+--------------------+---+-------------------+-------------------+-----------------+
|            useruuid|loc|              start|                end|__index_level_0__|
+--------------------+---+-------------------+-------------------+-----------------+
|00024acb-1397-459...| 13|2024-01-28 22:50:25|2024-01-29 03:44:04|          1303216|
|00024acb-1397-459...|  3|2024-01-29 09:51:30|2024-01-29 10:11:17|           320442|
|00024acb-1397-459...| 15|2024-01-29 12:08:35|2024-01-29 15:47:36|          1954576|
|00024acb-1397-459...| 14|2024-01-29 17:53:07|2024-01-29 20:53:06|           974049|
|00024acb-1397-459...|  3|2024-01-29 22:15:55|2024-01-30 06:19:16|          1630210|
+--------------------+---+-------------------+-------------------+-----------------+
only showing top 5 rows



In [16]:
input_data.printSchema()

root
 |-- useruuid: string (nullable = true)
 |-- loc: integer (nullable = true)
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [13]:
# Run HoWDe labelling
with open('dbs/uuid_r_dict.pkl', 'rb') as f:
    uuid_r_dict = pickle.load(f)
df_w_list = []
i = 42
print(f'File {i} is being processed...')
# .limit(10000)
input_data = spark.read.parquet(f'dbs/hw_detection/stops_pr_{i}_millis.parquet') \
    .select("useruuid", "loc", "start", "end")

labeled_data = HoWDe_labelling(
    input_data,
    edit_config_default=None,
    range_window_home=28,
    range_window_work=42,
    dhn=3,
    dn_H=0.7,
    dn_W=0.5,
    hf_H=0.7,
    hf_W=0.4,
    df_W=0.6,
    output_format="stop",
    verbose=False,
)

File 42 is being processed...


In [14]:
labeled_data.show(5)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\ProgramData\anaconda3\envs\mobi\Lib\socket.py", line 721, in readinto
    raise
TimeoutError: timed out


In [None]:
# Show the results
df_l = labeled_data.toPandas()
df_l = df_l.loc[df_l['location_type'] == 'W', ['useruuid', 'loc', 'location_type']]
# df_l['useruuid'] = df_l['useruuid'].map(uuid_r_dict)
tqdm.pandas()
df_l = df_l.groupby('useruuid').progress_apply(hw_extract).reset_index()
print(f"No. of commuters: {df_l.shape[0]}")
df_w_list.append(df_l)

In [14]:
# input_data = input_data.repartition("useruuid")
input_data.rdd.getNumPartitions()

4

In [13]:
input_data.orderBy(['useruuid', 'start']).show()

+--------+---+-------------------+-------------------+
|useruuid|loc|              start|                end|
+--------+---+-------------------+-------------------+
|       1|  7|2024-02-22 03:34:06|2024-02-22 05:46:46|
|       1| 52|2024-02-22 07:15:48|2024-02-22 10:15:47|
|       1|  8|2024-02-22 13:38:52|2024-02-22 14:34:24|
|       1|  8|2024-02-23 08:57:47|2024-02-23 16:31:42|
|       1| 53|2024-02-24 20:37:38|2024-02-25 00:58:34|
|       1|  8|2024-02-26 06:49:14|2024-02-26 15:13:41|
|       1|  7|2024-02-26 17:19:34|2024-02-26 18:41:23|
|       1|  7|2024-02-26 19:06:44|2024-02-27 06:26:28|
|       1|  8|2024-02-27 07:51:55|2024-02-27 15:13:02|
|       1|  8|2024-02-28 06:54:46|2024-02-28 09:51:41|
|       1|  8|2024-02-28 13:20:08|2024-02-28 16:28:46|
|       1|  7|2024-02-28 17:22:07|2024-02-28 23:55:15|
|       1| 51|2024-03-21 11:48:49|2024-03-21 16:31:08|
|       1| 54|2024-03-24 10:21:38|2024-03-24 13:21:37|
|       1| 55|2024-03-24 14:36:12|2024-03-24 18:23:38|
|       1|

In [None]:
df_w = pd.concat(df_w_list)
df_w.to_sql('commuter_device', engine, schema='public', index=False,
            if_exists='append', method='multi', chunksize=5000)

In [16]:
len(df_l)

161

In [25]:
# Clear temporal files
folder = 'dbs/temp'

# Loop through all files in the folder
for filename in os.listdir(folder):
    file_path = os.path.join(folder, filename)
    try:
        if os.path.isfile(file_path):
            os.remove(file_path)
            print(f"Deleted file: {file_path}")
    except Exception as e:
        print(f"Error deleting {file_path}: {e}")

Deleted file: dbs/temp\stops_pr_0_millis.parquet


In [4]:
import pandas as pd
pd.read_parquet(f'dbs/cities/stockholm_stops_1.parquet').head()

Unnamed: 0,device_aid,h3_id,loc,latitude,longitude,dur,localtime,l_localtime,date,home,year,weekday,week,seq,batch
0,00071219-6ef6-4d98-ae3a-5a7d7da093e7,880886619bfffff,7,59.345862,18.099108,252.65,2024-01-22 02:17:28+01:00,2024-01-22 06:30:07+01:00,2024-01-22,0.0,2024,0,4,4,1
1,00071219-6ef6-4d98-ae3a-5a7d7da093e7,880886619bfffff,7,59.345862,18.099108,329.55,2024-01-23 02:26:44+01:00,2024-01-23 07:56:17+01:00,2024-01-23,0.0,2024,1,4,5,1
2,00071219-6ef6-4d98-ae3a-5a7d7da093e7,880886619bfffff,7,59.345862,18.099108,16.166667,2024-01-23 16:48:15+01:00,2024-01-23 17:04:25+01:00,2024-01-23,0.0,2024,1,4,6,1
3,00071219-6ef6-4d98-ae3a-5a7d7da093e7,88088662c5fffff,1,59.376227,17.921041,60.066667,2024-02-23 05:54:18+01:00,2024-02-23 06:54:22+01:00,2024-02-23,1.0,2024,4,8,7,1
4,00071219-6ef6-4d98-ae3a-5a7d7da093e7,8808866215fffff,2,59.358313,17.89977,119.5,2024-02-23 09:36:34+01:00,2024-02-23 11:36:04+01:00,2024-02-23,0.0,2024,4,8,8,1
