In [1]:
import os
from datetime import timedelta
from copy import deepcopy
import pandas as pd

data_dir = "/home/willkewang/Datasets/GLOBEM/"

# A minimal data_factory dict with folder paths
data_factory = {
    "feature_folder": {
        "INS-W": {
            1: "{}/INS-W_1/FeatureData/".format(data_dir),
            2: "{}/INS-W_2/FeatureData/".format(data_dir),
            3: "{}/INS-W_3/FeatureData/".format(data_dir),
            4: "{}/INS-W_4/FeatureData/".format(data_dir),
        }
    },
    "survey_folder": {
        "INS-W": {
            1: "{}/INS-W_1/SurveyData/".format(data_dir),
            2: "{}/INS-W_2/SurveyData/".format(data_dir),
            3: "{}/INS-W_3/SurveyData/".format(data_dir),
            4: "{}/INS-W_4/SurveyData/".format(data_dir),
        }
    },
    "participants_info_folder": {
        "INS-W": {
            1: "{}/INS-W_1/ParticipantsInfoData/".format(data_dir),
            2: "{}/INS-W_2/ParticipantsInfoData/".format(data_dir),
            3: "{}/INS-W_3/ParticipantsInfoData/".format(data_dir),
            4: "{}/INS-W_4/ParticipantsInfoData/".format(data_dir),
        }
    },
    
    # If only loading dep_weekly or dep_endterm,
    # an empty placeholder for threshold_book:
    "threshold_book": {
        # Example of other tasks if expanding beyond dep_weekly / dep_endterm
        # "some_other_survey_task": {"threshold_as_true":10, "threshold_as_false":5}
    }
}
def data_loader_read_label_file(institution: str, phase: int, prediction_target: str):
    if prediction_target == "dep_weekly":
        prediction_target_col = "dep"  # 'dep' is the column in dep_weekly.csv
        csv_path = data_factory["survey_folder"][institution][phase] + "dep_weekly.csv"
        df_label = pd.read_csv(csv_path)
    elif prediction_target == "dep_endterm":
        prediction_target_col = "dep"
        csv_path = data_factory["survey_folder"][institution][phase] + "dep_endterm.csv"
        df_label = pd.read_csv(csv_path)
    else:
        # For other custom tasks, read from other CSVs or threshold_book:
        raise ValueError(f"Unsupported prediction target: {prediction_target}")

    # Make sure date -> datetime, unify pid format
    df_label["date"] = pd.to_datetime(df_label["date"])
    df_label["pid"]  = df_label["pid"].apply(lambda x: f"{x}#{institution}_{phase}")

    # Drop duplicates
    df_label = df_label.drop_duplicates(["pid", "date"], keep="last")

    return df_label, prediction_target_col


def data_loader_single_dataset_label_based(
    institution: str,
    phase: int,
    prediction_target: str,
    flag_more_feat_types: bool = False
) -> pd.DataFrame:
    """
    Loads the 4-week window of daily features (from rapids.csv) up to each label date,
    returning a DataFrame with columns [pid, date, X_raw, y_raw, device_type].
    """
    # --- 1) Read rapids.csv as features ---
    df_full_rawdata = pd.read_csv(
        data_factory["feature_folder"][institution][phase] + "rapids.csv",
        low_memory=False
    )
    df_full_rawdata["date"] = pd.to_datetime(df_full_rawdata["date"])
    # unify the pid format
    df_full_rawdata["pid"] = df_full_rawdata["pid"].apply(lambda x: f"{x}#{institution}_{phase}")

    # --- 2) Read participant info (platform.csv) for device_type
    df_participant_file = pd.read_csv(
        data_factory["participants_info_folder"][institution][phase] + "platform.csv",
        low_memory=False
    )
    df_participant_file["pid"] = df_participant_file["pid"].apply(lambda x: f"{x}#{institution}_{phase}")
    df_participant_file = df_participant_file.set_index("pid")

    # --- 3) Load label file (dep_weekly or dep_endterm) ---
    df_label, prediction_target_col = data_loader_read_label_file(institution, phase, prediction_target)

    # --- 4) Decide which sensor columns to keep
    # If only the basic four sensor types (location, screen, sleep, steps),
    # set flag_more_feat_types=False, otherwise keep Bluetooth/call too.
    if not flag_more_feat_types:
        sensor_prefixes = ['f_loc', 'f_screen', 'f_slp', 'f_steps']
    else:
        sensor_prefixes = ['f_loc', 'f_screen', 'f_slp', 'f_steps', 'f_blue', 'f_call']

    retained_features = ["pid", "date"]
    for col in df_full_rawdata.columns:
        for ft in sensor_prefixes:
            if col.startswith(ft):
                retained_features.append(col)
                break

    # --- 5) Build a 4-week window of data for each label date --- ? is this sufficient?
    datapoints = []
    for _, row in df_label.iterrows():
        pid = row["pid"]
        date_end = row["date"]
        date_start = date_end - timedelta(days=27)  # 4 weeks ~ 28 days

        # slice the raw data for that pid, in [date_start, date_end]
        df_data_window = df_full_rawdata[df_full_rawdata["pid"] == pid]
        df_data_window = df_data_window[
            (df_data_window["date"] >= date_start) &
            (df_data_window["date"] <= date_end)
        ]
        if df_data_window.empty:
            continue

        # to ensure each day is present in X_raw, we do an outer merge with date range
        df_placeholder = pd.DataFrame({"date": pd.date_range(date_start, date_end)})
        df_placeholder["pid"] = pid
        df_data_window = pd.merge(
            df_placeholder,
            df_data_window[retained_features],
            on=["pid","date"],
            how="left"
        )

        # assemble the datapoint
        datapoint = {
            "pid": pid,
            "date": date_end, 
            "X_raw": df_data_window[retained_features],  # 4-week daily rows
            "y_raw": row[prediction_target_col],         # label (0 or 1)
            "device_type": df_participant_file.loc[pid]["platform"].split(";")[0]
        }
        datapoints.append(datapoint)

    df_datapoints = pd.DataFrame(datapoints)

    # optional: remove participants with fewer than 2 label points if prediction_target == 'dep_weekly'
    if prediction_target == "dep_weekly":
        pids_few_response = df_datapoints.groupby("pid").size()
        pids_few_response = pids_few_response[pids_few_response < 2].index
        df_datapoints = df_datapoints[~df_datapoints["pid"].isin(pids_few_response)]

    return df_datapoints

In [2]:
df_datapoints = data_loader_single_dataset_label_based(
    institution="INS-W",
    phase=1,
    prediction_target="dep_weekly",
    flag_more_feat_types=False    
)

print(df_datapoints.shape)
df_datapoints.head()

(2354, 5)


Unnamed: 0,pid,date,X_raw,y_raw,device_type
0,INS-W_001#INS-W_1,2018-04-04,pid date \ 0 INS-W_...,False,android
1,INS-W_001#INS-W_1,2018-04-08,pid date \ 0 INS-W_...,False,android
2,INS-W_001#INS-W_1,2018-04-11,pid date \ 0 INS-W_...,False,android
3,INS-W_001#INS-W_1,2018-04-18,pid date \ 0 INS-W_...,False,android
4,INS-W_001#INS-W_1,2018-04-22,pid date \ 0 INS-W_...,False,android


In [3]:
import re

def contains_dis(col_name: str) -> bool:
    # Split the column name by ':' and '_' using regex.
    tokens = re.split("[:_]", col_name)
    # Check if any of the tokens is 'dis'
    return "dis" in tokens

sample_X = df_datapoints.iloc[0]['X_raw']
for col in sample_X.columns:
    if contains_dis(col):
        print(col)

f_slp:fitbit_sleep_summary_rapids_sumdurationafterwakeupmain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_sumdurationasleepmain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_sumdurationawakemain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_sumdurationtofallasleepmain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_sumdurationinbedmain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_avgefficiencymain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_avgdurationafterwakeupmain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_avgdurationasleepmain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_avgdurationawakemain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_avgdurationtofallasleepmain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_avgdurationinbedmain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_countepisodemain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_firstbedtimemain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_lastbedtimemain_dis:14dhist
f_slp:fitbit_sleep_summary_rapids_firstwaketimemai

In [4]:
df_datapoints["X_raw"].iloc[2].shape

(28, 3755)

In [5]:
df_datapoints["y_raw"].value_counts()

False    1307
True     1047
Name: y_raw, dtype: int64

# Attempt ARM

In [3]:
import re
import pandas as pd

# For demonstration, select an example subject's data from your loaded dataset.
example = df_datapoints.iloc[0]
print("Example subject PID:", example["pid"])
print("Label date:", example["date"])

df_sensor = example["X_raw"]

# Function to check if a column name indicates a discretized feature.
def is_discretized_column(col_name: str) -> bool:
    tokens = re.split("[:_]", col_name)
    return "dis" in tokens

# Robust function to encode a series of discretized values.
def robust_encode(series: pd.Series) -> pd.Series:
    # Get the unique non-null values.
    unique_vals = series.dropna().unique()
    # Check if they are a subset of the expected discrete tokens.
    valid_tokens = {"l", "m", "h"}
    if len(unique_vals) > 0 and all(val in valid_tokens for val in unique_vals):
        mapping = {"l": 0, "m": 1, "h": 2}
        return series.map(mapping)
    else:
        # If not, try converting to numeric (in case it already is numeric).
        try:
            return pd.to_numeric(series)
        except Exception as e:
            # If conversion fails, return the original series.
            return series

# Suppose df_sensor is your sensor DataFrame (the X_raw part for one subject).
df_encoded = df_sensor.copy()

# Identify sensor columns (exclude metadata like "pid" and "date")
sensor_cols = [col for col in df_sensor.columns if col not in ["pid", "date"]]

# Now loop over sensor columns.
for col in sensor_cols:
    if is_discretized_column(col):
        # For columns with "dis", apply robust encoding.
        df_encoded[col] = robust_encode(df_sensor[col])
    else:
        # For columns without "dis", they might be all NaN or raw values.
        # If they are numeric, you can convert them; if they are all NaN, they won't affect ARM.
        try:
            df_encoded[col] = pd.to_numeric(df_sensor[col])
        except:
            df_encoded[col] = df_sensor[col]

# Now df_encoded should have numeric values for the discretized features.
print("Encoded sensor data (first few rows):")
display(df_encoded.head(2))


Example subject PID: INS-W_001#INS-W_1
Label date: 2018-04-04 00:00:00
Encoded sensor data (first few rows):


Unnamed: 0,pid,date,f_slp:fitbit_sleep_summary_rapids_sumdurationafterwakeupmain:14dhist,f_slp:fitbit_sleep_summary_rapids_sumdurationasleepmain:14dhist,f_slp:fitbit_sleep_summary_rapids_sumdurationawakemain:14dhist,f_slp:fitbit_sleep_summary_rapids_sumdurationtofallasleepmain:14dhist,f_slp:fitbit_sleep_summary_rapids_sumdurationinbedmain:14dhist,f_slp:fitbit_sleep_summary_rapids_avgefficiencymain:14dhist,f_slp:fitbit_sleep_summary_rapids_avgdurationafterwakeupmain:14dhist,f_slp:fitbit_sleep_summary_rapids_avgdurationasleepmain:14dhist,...,f_loc:phone_locations_doryab_timeattop2location_norm:weekend,f_loc:phone_locations_doryab_timeattop3location_norm:weekend,f_loc:phone_locations_doryab_totaldistance_norm:weekend,f_loc:phone_locations_doryab_varspeed_norm:weekend,f_loc:phone_locations_locmap_duration_in_locmap_study_norm:weekend,f_loc:phone_locations_locmap_percent_in_locmap_study_norm:weekend,f_loc:phone_locations_locmap_duration_in_locmap_exercise_norm:weekend,f_loc:phone_locations_locmap_percent_in_locmap_exercise_norm:weekend,f_loc:phone_locations_locmap_duration_in_locmap_greens_norm:weekend,f_loc:phone_locations_locmap_percent_in_locmap_greens_norm:weekend
0,INS-W_001#INS-W_1,2018-03-08,,,,,,,,,...,,,,,,,,,,
1,INS-W_001#INS-W_1,2018-03-09,,,,,,,,,...,,,,,,,,,,


In [24]:
df_encoded.shape

(28, 3755)

# ARM Execution

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings("ignore")

# -------------------------------
# Step 0. Define Xu's feature list (from his paper)
# -------------------------------
xu_features = [
    "f_screen:phone_screen_rapids_avgdurationunlock",
    "f_screen:phone_screen_rapids_stddurationunlock",
    "f_screen:phone_screen_rapids_countepisodeunlock",
    "f_screen:phone_screen_rapids_sumdurationunlock",
    "f_slp:fitbit_sleep_intraday_rapids_maxdurationasleepunifiedmain",
    "f_slp:fitbit_sleep_intraday_rapids_countepisodeasleepunifiedmain",
    "f_slp:fitbit_sleep_intraday_rapids_countepisodeawakeunifiedmain",
    "f_slp:fitbit_sleep_intraday_rapids_ratiodurationasleepunifiedwithinmain",
    "f_slp:fitbit_sleep_intraday_rapids_sumdurationasleepunifiedmain",
    "f_slp:fitbit_sleep_intraday_rapids_sumdurationawakeunifiedmain",
    "f_steps:fitbit_steps_intraday_rapids_avgdurationactivebout",
    "f_steps:fitbit_steps_intraday_rapids_avgdurationsedentarybout",
    "f_steps:fitbit_steps_intraday_rapids_countepisodeactivebout",
    "f_steps:fitbit_steps_intraday_rapids_countepisodesedentarybout",
    "f_steps:fitbit_steps_intraday_rapids_sumsteps",
    "f_loc:phone_locations_barnett_circdnrtn",
    "f_loc:phone_locations_doryab_timeathome",
    "f_loc:phone_locations_doryab_locationentropy",
    "f_loc:phone_locations_doryab_loglocationvariance",
    "f_loc:phone_locations_doryab_avglengthstayatclusters",
    "f_loc:phone_locations_doryab_movingtostaticratio",
    "f_loc:phone_locations_doryab_numberlocationtransitions",
    "f_loc:phone_locations_doryab_numberofsignificantplaces",
    "f_loc:phone_locations_doryab_outlierstimepercent",
    "f_loc:phone_locations_doryab_radiusgyration",
    "f_loc:phone_locations_doryab_totaldistance",
    "f_loc:phone_locations_locmap_duration_in_locmap_study",
    "f_loc:phone_locations_locmap_percent_in_locmap_study",
    "f_loc:phone_locations_locmap_duration_in_locmap_exercise",
    "f_loc:phone_locations_locmap_percent_in_locmap_exercise",
    "f_loc:phone_locations_locmap_duration_in_locmap_greens",
    "f_loc:phone_locations_locmap_percent_in_locmap_greens"
]

# -------------------------------
# Step 1. Create Sample Data for One Subject
# -------------------------------
# For demonstration, we simulate a subject's sensor data over 28 days.
dates = pd.date_range(start="2020-01-01", periods=28, freq="D")
np.random.seed(0)
# Simulate numeric sensor values for each Xu feature.
sensor_data = {feat: np.random.rand(28) for feat in xu_features}
df_sensor = pd.DataFrame(sensor_data, index=dates).reset_index().rename(columns={"index": "date"})
df_sensor["pid"] = "subject1"

# Simulate one subject's record (as if loaded by the data loader)
example = {
    "pid": "subject1",
    "date": df_sensor["date"].iloc[-1],
    "X_raw": df_sensor,   # sensor data for 28 days
    "y_raw": False,       # e.g., not depressed
    "device_type": "android"
}
df_datapoints = pd.DataFrame([example])

# -------------------------------
# Step 2. Discretize Sensor Values into "l", "m", "h"
# -------------------------------
def encode_low_med_high(series):
    # Use the 33rd and 66th percentiles as thresholds.
    if series.dropna().empty:
        return series
    q33, q66 = series.quantile(0.33), series.quantile(0.66)
    def mapper(val):
        if pd.isna(val):
            return np.nan
        if val < q33:
            return "l"
        elif val < q66:
            return "m"
        else:
            return "h"
    return series.apply(mapper)

# We only keep Xu's features (filtering out any extra sensors).
# In df_sensor, there are extra columns ("date" and "pid"). So our sensor columns are:
sensor_cols = [col for col in df_sensor.columns if col in xu_features]

# Apply the encoding to each sensor column.
df_encoded = df_sensor.copy()
for col in sensor_cols:
    df_encoded[col] = encode_low_med_high(df_sensor[col])

# -------------------------------
# Step 3. Map "l","m","h" to 0, 1, 2
# -------------------------------
# We set a simple mapping: l -> 0, m -> 1, h -> 2.
mapping = {"l": 0, "m": 1, "h": 2}
df_numeric = df_encoded.copy()
for col in sensor_cols:
    df_numeric[col] = df_numeric[col].map(mapping)

# -------------------------------
# Step 4. Create Transactions for ARM
# -------------------------------
# Here, each row (day) in df_numeric becomes one transaction: a list of numeric sensor codes.
transactions = (
    df_numeric[sensor_cols]
    .dropna(how="all")  # drop any days with all sensor data missing
    .apply(lambda row: [int(row[col]) for col in sensor_cols if pd.notna(row[col])], axis=1)
    .tolist()
)
print("Sample transactions (first 5 days):")
for t in transactions[:5]:
    print(t)

# -------------------------------
# Step 5. Run Association Rule Mining using Spark's FPGrowth
# -------------------------------
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth

# Initialize a SparkSession.
spark = SparkSession.builder.appName("ARMExample").master("local[*]").getOrCreate()

# Create a Spark DataFrame of transactions.
# Each transaction is assigned an ID.
df_transactions = spark.createDataFrame(
    [(i, t) for i, t in enumerate(transactions) if len(t) > 1],
    ["id", "items"]
)
print("Number of transactions:", df_transactions.count())

# Set stricter thresholds – because we have a small number of days.
# For example, require that an itemset appears in at least 30% of days (~9 out of 28) and with high confidence.
min_support = 0.3
min_confidence = 0.7

fpGrowth = FPGrowth(itemsCol="items", minSupport=min_support, minConfidence=min_confidence)
model = fpGrowth.fit(df_transactions)

# Retrieve the association rules as a Pandas DataFrame.
rules = model.associationRules.toPandas()
print("\nAssociation Rules:")
print(rules)

# Stop Spark
spark.stop()


Example subject PID: INS-W_001#INS-W_1
Label date: 2018-04-04 00:00:00

Example numeric sensor data (first few rows):


Unnamed: 0,pid,date,f_slp:fitbit_sleep_summary_rapids_sumdurationafterwakeupmain:14dhist,f_slp:fitbit_sleep_summary_rapids_sumdurationasleepmain:14dhist,f_slp:fitbit_sleep_summary_rapids_sumdurationawakemain:14dhist,f_slp:fitbit_sleep_summary_rapids_sumdurationtofallasleepmain:14dhist,f_slp:fitbit_sleep_summary_rapids_sumdurationinbedmain:14dhist,f_slp:fitbit_sleep_summary_rapids_avgefficiencymain:14dhist,f_slp:fitbit_sleep_summary_rapids_avgdurationafterwakeupmain:14dhist,f_slp:fitbit_sleep_summary_rapids_avgdurationasleepmain:14dhist,...,f_loc:phone_locations_doryab_timeattop2location_norm:weekend,f_loc:phone_locations_doryab_timeattop3location_norm:weekend,f_loc:phone_locations_doryab_totaldistance_norm:weekend,f_loc:phone_locations_doryab_varspeed_norm:weekend,f_loc:phone_locations_locmap_duration_in_locmap_study_norm:weekend,f_loc:phone_locations_locmap_percent_in_locmap_study_norm:weekend,f_loc:phone_locations_locmap_duration_in_locmap_exercise_norm:weekend,f_loc:phone_locations_locmap_percent_in_locmap_exercise_norm:weekend,f_loc:phone_locations_locmap_duration_in_locmap_greens_norm:weekend,f_loc:phone_locations_locmap_percent_in_locmap_greens_norm:weekend
0,INS-W_001#INS-W_1,2018-03-08,,,,,,,,,...,,,,,,,,,,
1,INS-W_001#INS-W_1,2018-03-09,,,,,,,,,...,,,,,,,,,,
2,INS-W_001#INS-W_1,2018-03-10,,,,,,,,,...,,,,,,,,,,
3,INS-W_001#INS-W_1,2018-03-11,,,,,,,,,...,,,,,,,,,,
4,INS-W_001#INS-W_1,2018-03-12,,,,,,,,,...,,,,,,,,,,


Sample transactions (first 5 days):
[418, 421, 424, 428, 430, 435, 436, 439, 442, 446, 448, 451, 456, 457, 646, 651, 654, 657, 661, 665, 668, 672, 675, 676, 679, 682, 686, 688, 691, 694, 697, 700, 704, 706, 710, 1669, 1672, 1675, 1679, 1681, 1686, 1687, 1690, 1693, 1697, 1699, 1702, 1707, 1709, 1897, 1902, 1905, 1908, 1912, 1916, 1919, 1923, 1926, 1927, 1930, 1933, 1937, 1939, 1942, 1945, 1948, 1951, 1955, 1957, 1961, 3165, 3168, 3170, 3174, 3177, 3180, 3181, 3184, 3188, 3190, 3193, 3198, 3201, 3204, 3206, 3209, 3213, 4172, 4174, 4177, 4181, 4183, 4187, 4190, 4192, 4195, 4199, 4201, 4205, 4209, 4212, 4219, 4222, 4227, 4228, 4232, 4235, 4238, 4241, 4245, 4247, 4249, 4253, 4255, 4259, 4261, 4265, 4268, 4271, 4401, 4404, 4407, 4410, 4416, 4419, 4421, 4425, 4428, 4431, 4432, 4435, 4439, 4441, 4444, 4449, 4452, 4455, 4457, 4460, 4463, 5666, 5669, 5672, 5675, 5678, 5682, 5684, 5687, 5690, 5692, 5696, 5700, 5702, 5705, 5708, 5710, 5713, 6721, 6724, 6729, 6731, 6735, 6738, 6741, 6744, 6747, 67

25/03/28 18:21:53 WARN Utils: Your hostname, willkewang-OMEN-by-HP resolves to a loopback address: 127.0.1.1; using 192.168.86.78 instead (on interface wlp3s0)
25/03/28 18:21:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/28 18:21:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.ExceptionInInitializerError
	at org.apache.spark.unsafe.array.ByteArrayMethods.<clinit>(ByteArrayMethods.java:52)
	at org.apache.spark.memory.MemoryManager.defaultPageSizeBytes$lzycompute(MemoryManager.scala:261)
	at org.apache.spark.memory.MemoryManager.defaultPageSizeBytes(MemoryManager.scala:251)
	at org.apache.spark.memory.MemoryManager.$anonfun$pageSizeBytes$1(MemoryManager.scala:270)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.memory.MemoryManager.<init>(MemoryManager.scala:270)
	at org.apache.spark.memory.UnifiedMemoryManager.<init>(UnifiedMemoryManager.scala:58)
	at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:207)
	at org.apache.spark.SparkEnv$.create(SparkEnv.scala:325)
	at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:196)
	at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:279)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:464)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:53)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.IllegalStateException: java.lang.NoSuchMethodException: java.nio.DirectByteBuffer.<init>(long,int)
	at org.apache.spark.unsafe.Platform.<clinit>(Platform.java:113)
	... 27 more
Caused by: java.lang.NoSuchMethodException: java.nio.DirectByteBuffer.<init>(long,int)
	at java.base/java.lang.Class.getConstructor0(Class.java:3761)
	at java.base/java.lang.Class.getDeclaredConstructor(Class.java:2930)
	at org.apache.spark.unsafe.Platform.<clinit>(Platform.java:71)
	... 27 more


In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth

# Initialize a local SparkSession
spark = SparkSession.builder \
    .appName("ARMExample") \
    .master("local[*]") \
    .getOrCreate()

spark.stop()

In [2]:
import gc
gc.collect()

17

In [3]:
import os
import shutil

# You may need to adjust this path if you have a custom setting.
spark_temp_dir = "/tmp"

# List all items in the temporary directory
for item in os.listdir(spark_temp_dir):
    # Check if the item starts with 'spark-' (typical prefix for Spark temp directories)
    if item.startswith("spark-"):
        full_path = os.path.join(spark_temp_dir, item)
        try:
            shutil.rmtree(full_path)
            print(f"Removed temporary folder: {full_path}")
        except Exception as e:
            print(f"Could not remove {full_path}: {e}")

# MLXtend implementation

In [4]:
import pandas as pd
import numpy as np
from datetime import timedelta
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import fpgrowth, association_rules

# --- STEP 0: Define a simple encoder function (if needed) ---
def encode_low_med_high(series):
    # If the series is all NaN, just return it
    if series.dropna().empty:
        return series
    # Compute the 33rd and 66th percentiles
    q33, q66 = series.quantile(0.33), series.quantile(0.66)
    # Map values: below q33 -> "l", between -> "m", above -> "h"
    def mapper(val):
        if pd.isna(val):
            return np.nan
        if val < q33:
            return "l"
        elif val < q66:
            return "m"
        else:
            return "h"
    return series.apply(mapper)

# --- STEP 1: Select one subject's data ---
# Assume df_datapoints is your loaded DataFrame with columns:
# [pid, date, X_raw, y_raw, device_type]
# And that X_raw is a DataFrame for a 4-week window (each row = one day).
example = df_datapoints.iloc[0]
print("Example subject PID:", example["pid"])
print("Label date:", example["date"])

# Extract the sensor data (the 4-week window)
df_sensor = example["X_raw"]

# --- STEP 2: Encode sensor features ---
# Make a copy to encode into low/med/high
df_encoded = df_sensor.copy()

# Identify sensor columns (we assume metadata "pid" and "date" are in the DataFrame)
sensor_cols = [col for col in df_sensor.columns if col not in ["pid", "date"]]

# Apply encoding column‐wise only if the column is numeric.
# (If already encoded to strings like "l", "m", "h", this step can be skipped.)
for col in sensor_cols:
    if pd.api.types.is_numeric_dtype(df_sensor[col]):
        df_encoded[col] = encode_low_med_high(df_sensor[col])
    else:
        # If not numeric, assume it's already encoded
        df_encoded[col] = df_sensor[col]

print("\nEncoded sensor data (first few rows):")
print(df_encoded.head())

# --- STEP 3: Create a mapping dictionary to assign unique numeric codes ---
# We’ll map each sensor column’s levels: "l" -> unique code, "m" -> code+1, "h" -> code+2.
mapping_dict = {}
counter = 1
for col in sensor_cols:
    mapping_dict[col] = {"l": counter, "m": counter + 1, "h": counter + 2}
    counter += 3

# Create a numeric version of the sensor data.
df_numeric = df_encoded.copy()

def map_val(val, col):
    if pd.isna(val):
        return np.nan
    try:
        return mapping_dict[col][str(val).lower()]
    except KeyError:
        # If the value isn’t one of l, m, h (for example, in metadata), return NaN.
        return np.nan

for col in sensor_cols:
    df_numeric[col] = df_numeric[col].apply(lambda v: map_val(v, col))

print("\nExample numeric sensor data (first few rows):")
print(df_numeric.head())

# --- STEP 4: Create transactions for ARM ---
# Each row in df_numeric represents one day.
# We convert each day into a list of item codes (ignoring NaNs).
transactions = (
    df_numeric[sensor_cols]
    .dropna(how="all")  # drop days with all missing sensor data
    .apply(lambda row: [int(row[col]) for col in sensor_cols if pd.notna(row[col])], axis=1)
    .tolist()
)

print("\nSample transactions (first 5 days):")
for t in transactions[:5]:
    print(t)

# --- STEP 5: Run FP-growth using mlxtend ---
# Transform the transactions using TransactionEncoder.
te = TransactionEncoder()
te_ary = te.fit(transactions).transform(transactions)
df_trans = pd.DataFrame(te_ary, columns=te.columns_)

# Run FP-growth to extract frequent itemsets.
# Set min_support as the fraction of days in which the itemset appears.
min_support = 0.3  # adjust as needed
frequent_itemsets = fpgrowth(df_trans, min_support=min_support, use_colnames=True)
print("\nFrequent itemsets:")
print(frequent_itemsets)

# Extract association rules from the frequent itemsets.
# Set min_threshold for confidence.
min_confidence = 0.7  # adjust as needed
rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=min_confidence)
print("\nAssociation Rules:")
print(rules)


Example subject PID: INS-W_001#INS-W_1
Label date: 2018-04-04 00:00:00

Encoded sensor data (first few rows):
                 pid       date  \
0  INS-W_001#INS-W_1 2018-03-08   
1  INS-W_001#INS-W_1 2018-03-09   
2  INS-W_001#INS-W_1 2018-03-10   
3  INS-W_001#INS-W_1 2018-03-11   
4  INS-W_001#INS-W_1 2018-03-12   

  f_slp:fitbit_sleep_summary_rapids_sumdurationafterwakeupmain:14dhist  \
0                                                NaN                     
1                                                NaN                     
2                                                NaN                     
3                                                NaN                     
4                                                NaN                     

  f_slp:fitbit_sleep_summary_rapids_sumdurationasleepmain:14dhist  \
0                                                NaN                
1                                                NaN                
2                          

KeyboardInterrupt: 

# With Filtering

In [3]:
import pandas as pd
import numpy as np
from datetime import timedelta
import warnings
warnings.filterwarnings("ignore")

# ================================
# SETTINGS & FUNCTIONS
# ================================

# Xu's published base features (the ones used in his interpretable model)
xu_features = [
    "f_screen:phone_screen_rapids_avgdurationunlock",
    "f_screen:phone_screen_rapids_stddurationunlock",
    "f_screen:phone_screen_rapids_countepisodeunlock",
    "f_screen:phone_screen_rapids_sumdurationunlock",
    "f_slp:fitbit_sleep_intraday_rapids_maxdurationasleepunifiedmain",
    "f_slp:fitbit_sleep_intraday_rapids_countepisodeasleepunifiedmain",
    "f_slp:fitbit_sleep_intraday_rapids_countepisodeawakeunifiedmain",
    "f_slp:fitbit_sleep_intraday_rapids_ratiodurationasleepunifiedwithinmain",
    "f_slp:fitbit_sleep_intraday_rapids_sumdurationasleepunifiedmain",
    "f_slp:fitbit_sleep_intraday_rapids_sumdurationawakeunifiedmain",
    "f_steps:fitbit_steps_intraday_rapids_avgdurationactivebout",
    "f_steps:fitbit_steps_intraday_rapids_avgdurationsedentarybout",
    "f_steps:fitbit_steps_intraday_rapids_countepisodeactivebout",
    "f_steps:fitbit_steps_intraday_rapids_countepisodesedentarybout",
    "f_steps:fitbit_steps_intraday_rapids_sumsteps",
    "f_loc:phone_locations_barnett_circdnrtn",
    "f_loc:phone_locations_doryab_timeathome",
    "f_loc:phone_locations_doryab_locationentropy",
    "f_loc:phone_locations_doryab_loglocationvariance",
    "f_loc:phone_locations_doryab_avglengthstayatclusters",
    "f_loc:phone_locations_doryab_movingtostaticratio",
    "f_loc:phone_locations_doryab_numberlocationtransitions",
    "f_loc:phone_locations_doryab_numberofsignificantplaces",
    "f_loc:phone_locations_doryab_outlierstimepercent",
    "f_loc:phone_locations_doryab_radiusgyration",
    "f_loc:phone_locations_doryab_totaldistance",
    "f_loc:phone_locations_locmap_duration_in_locmap_study",
    "f_loc:phone_locations_locmap_percent_in_locmap_study",
    "f_loc:phone_locations_locmap_duration_in_locmap_exercise",
    "f_loc:phone_locations_locmap_percent_in_locmap_exercise",
    "f_loc:phone_locations_locmap_duration_in_locmap_greens",
    "f_loc:phone_locations_locmap_percent_in_locmap_greens"
]

# Function to check if a column belongs to one of the Xu features.
def is_xu_feature(col):
    # In our data, sensor column names may have an extra suffix (e.g., an epoch),
    # so we check if the column starts with one of the xu_features.
    for feat in xu_features:
        if col.startswith(feat):
            return True
    return False

# A simple function to discretize a numeric series into "l", "m", "h" based on its 33rd and 66th percentiles.
def encode_low_med_high(series):
    if series.dropna().empty:
        return series
    q33, q66 = series.quantile(0.33), series.quantile(0.66)
    def mapper(val):
        if pd.isna(val):
            return np.nan
        if val < q33:
            return "l"
        elif val < q66:
            return "m"
        else:
            return "h"
    return series.apply(mapper)

# A mapping function that (for each sensor) assigns a unique numeric code to each level.
# We simply offset each sensor’s codes by its index*3 so that, for example, sensor 1’s
# l, m, h are 0,1,2 and sensor 2’s are 3,4,5, etc.
def map_level_to_code(val, col_index):
    level_map = {"l": 0, "m": 1, "h": 2}
    return col_index * 3 + level_map.get(val, -1)  # if value not found, return -1

# ================================
# MAIN SCRIPT FOR ONE SUBJECT
# ================================

# (Assume df_datapoints is already loaded by your data loader.)
# For demonstration, select the first subject's row.
example = df_datapoints.iloc[0]
print("Subject PID:", example["pid"])
print("Label date:", example["date"])

# Get the raw sensor data (assumed to be a DataFrame with a "date" column and sensor columns)
df_sensor = example["X_raw"]

# Filter sensor columns to only those in Xu's base feature list.
# We also always keep the "date" column.
sensor_cols = [col for col in df_sensor.columns if col != "date" and is_xu_feature(col)]
df_filtered = df_sensor[["date"] + sensor_cols].copy()

# For each sensor column, if it is numeric, discretize it into "l", "m", "h".
for col in sensor_cols:
    if pd.api.types.is_numeric_dtype(df_filtered[col]):
        df_filtered[col] = encode_low_med_high(df_filtered[col])
    # (If the column already contains strings like "l", "m", "h", we assume it is already encoded.)

# Now, we need to create daily transactions.
# For simplicity, assume that df_filtered contains one row per day.
# (If there are multiple rows per day, you may need to groupby date and take the last non-NaN value per sensor.)
transactions = []
for idx, row in df_filtered.iterrows():
    # Build a transaction: for each sensor column, if there is a non-NaN value, map it to its unique code.
    transaction = []
    for col in sensor_cols:
        val = row[col]
        if pd.notna(val):
            code = map_level_to_code(val, sensor_cols.index(col))
            transaction.append(code)
    if len(transaction) > 1:  # Only consider transactions with more than one item.
        transactions.append(transaction)

print("Sample transactions (first 5 days):")
for t in transactions[:5]:
    print(t)

# -------------------------------
# Step 5. Run Association Rule Mining using Spark's FPGrowth
# -------------------------------
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth

# Initialize a SparkSession (local mode).
spark = SparkSession.builder.appName("ARMExample").master("local[*]").getOrCreate()

# Create a Spark DataFrame from the transactions.
# Each transaction (i.e., a day) is assigned an ID.
df_transactions = spark.createDataFrame(
    [(i, t) for i, t in enumerate(transactions) if len(t) > 1],
    ["id", "items"]
)
print("Number of transactions (days):", df_transactions.count())

# Set thresholds according to a stricter setting (adapted to a small number of days).
# For instance, require that an itemset appears in at least 30% of days (~9 out of 28 days) and has high confidence.
min_support = 0.3
min_confidence = 0.7

fpGrowth = FPGrowth(itemsCol="items", minSupport=min_support, minConfidence=min_confidence)
model = fpGrowth.fit(df_transactions)

# Retrieve the association rules as a Pandas DataFrame.
rules = model.associationRules.toPandas()
print("\nAssociation Rules:")
print(rules)

# Stop the Spark session to release resources.
spark.stop()


Subject PID: INS-W_001#INS-W_1
Label date: 2018-04-04 00:00:00
Sample transactions (first 5 days):
[90, 93, 96, 99, 102, 246, 249, 252, 255, 258, 402, 405, 408, 411, 414, 558, 561, 564, 567, 570, 714, 717, 720, 723, 726, 870, 873, 876, 879, 882, 1028, 1031, 1032, 1037, 1038, 1184, 1187, 1188, 1193, 1195, 1340, 1343, 1344, 1349, 1350, 1406, 1409, 1412, 1415, 1418, 1419, 1496, 1499, 1500, 1505, 1506, 1562, 1564, 1567, 1570, 1573, 1576, 1652, 1655, 1656, 1661, 1663, 1718, 1721, 1724, 1727, 1730, 1731, 1808, 1811, 1812, 1817, 1818, 1964, 1967, 1968, 1973, 1974, 2119, 2123, 2124, 2129, 2130, 2276, 2279, 2280, 2285, 2286, 2342, 2345, 2348, 2351, 2354, 2357, 2430, 2433, 2438, 2439, 2442, 2498, 2501, 2504, 2507, 2510, 2512, 2586, 2589, 2594, 2595, 2598, 2654, 2657, 2660, 2663, 2666, 2669, 2742, 2745, 2750, 2751, 2754, 2810, 2811, 2816, 2819, 2822, 2823, 2900, 2901, 2906, 2907, 2912, 2965, 2967, 2970, 2973, 2976, 2979, 3056, 3059, 3060, 3065, 3068, 3122, 3123, 3128, 3131, 3134, 3135, 3212, 3213

25/03/30 20:30:23 WARN Utils: Your hostname, willkewang-OMEN-by-HP resolves to a loopback address: 127.0.1.1; using 192.168.86.78 instead (on interface wlp3s0)
25/03/30 20:30:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/30 20:30:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Number of transactions (days): 2


25/03/30 21:31:21 ERROR DiskBlockObjectWriter: Exception occurred while manually close the output stream to file /tmp/blockmgr-439b5045-cfad-480b-ad67-c7c1bd3c4754/12/temp_shuffle_36b3bca5-2e75-493c-a068-0e7aa1415390, No space left on device
25/03/30 21:31:21 ERROR DiskBlockObjectWriter: Exception occurred while manually close the output stream to file /tmp/blockmgr-439b5045-cfad-480b-ad67-c7c1bd3c4754/0a/temp_shuffle_7516e873-6c1a-43db-95b5-8fced5b99205, No space left on device
25/03/30 21:31:21 ERROR DiskBlockObjectWriter: Exception occurred while manually close the output stream to file /tmp/blockmgr-439b5045-cfad-480b-ad67-c7c1bd3c4754/36/temp_shuffle_d542394a-769d-49b5-a3db-6d5360577091, No space left on device
25/03/30 21:31:21 ERROR DiskBlockObjectWriter: Exception occurred while manually close the output stream to file /tmp/blockmgr-439b5045-cfad-480b-ad67-c7c1bd3c4754/19/temp_shuffle_aebf0b9e-dab3-4e51-bece-389c0d48d4a2, No space left on device
25/03/30 21:31:21 ERROR DiskBloc

Py4JJavaError: An error occurred while calling o72.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 9.0 failed 1 times, most recent failure: Lost task 14.0 in stage 9.0 (TID 207) (willkewang-omen-by-hp.lan executor driver): java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1580)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.SerializationStream.writeKey(Serializer.scala:132)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:309)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3997)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3994)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1580)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.SerializationStream.writeKey(Serializer.scala:132)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:309)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
