In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = (
    SparkSession.builder
    .appName("TreeBased")
    .config("spark.sql.shuffle.partitions", "200")  # tune to your cores
    .getOrCreate()
)
spark.conf.set("spark.ui.showConsoleProgress", "true")


In [None]:
schema = StructType([
    StructField("Index", LongType(), True),
    StructField("Arrival_Time", LongType(), True),
    StructField("Creation_Time", LongType(), True),
    StructField("x", DoubleType(), True),
    StructField("y", DoubleType(), True),
    StructField("z", DoubleType(), True),
    StructField("User", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("Device", StringType(), True),
    StructField("gt", StringType(), True),
])

# <- change these to your paths
paths = {
    "accel_phone": "/content/drive/MyDrive/Bigdata (1)/Phones_accelerometer.csv",
    "gyro_phone":  "/content/drive/MyDrive/Bigdata (1)/Phones_gyroscope.csv",
    "accel_watch": "/content/drive/MyDrive/Bigdata (1)/Watch_accelerometer.csv",
    "gyro_watch":  "/content/drive/MyDrive/Bigdata (1)/Watch_gyroscope.csv",
}

def read_with(sensor, device_kind, path):
    return (
        spark.read.csv(path, header=True, schema=schema)
             .withColumn("sensor", lit(sensor))           # 'accel' | 'gyro'
             .withColumn("device_kind", lit(device_kind))  # 'phone' | 'watch'
    )

raw = (
    read_with("accel", "phone", paths["accel_phone"])
    .unionByName(read_with("gyro",  "phone", paths["gyro_phone"]))
    .unionByName(read_with("accel", "watch", paths["accel_watch"]))
    .unionByName(read_with("gyro",  "watch", paths["gyro_watch"]))
)

In [None]:
# Timestamp formatting
ts_sec = when(col("Creation_Time") > 1e14, col("Creation_Time")/1e9).when(col("Creation_Time") > 1e11, col("Creation_Time")/1e3).otherwise(col("Creation_Time").cast("double"))

df = (raw
      .withColumn("ts_sec", ts_sec)
      .withColumn("ts", to_timestamp(from_unixtime(col("ts_sec"))))
      .withColumn("m", sqrt(col("x")**2 + col("y")**2 + col("z")**2))
      .filter(col("gt").isNotNull() & (col("gt") != "null"))
     ).cache()

df.select(min("ts").alias("min_ts"), max("ts").alias("max_ts")).show()
df.groupBy("gt").count().orderBy(desc("count")).show(10, False)

In [None]:
# compile parquet file @ https://drive.google.com/drive/folders/1txVGk0HsBMto9J9rr0rbLoEm7DkvVgEu?usp=sharing
# (df
#  .repartition("User","Device","sensor")
#  .write.mode("overwrite").partitionBy("User","Device","sensor")
#  .parquet("/content/drive/MyDrive/Bigdata"))
# df = spark.read.parquet("/content/drive/MyDrive/Bigdata-Compile").cache()

In [None]:
# Filter to 2014-2016 
df_clean = (df
    .filter((F.year("ts") >= 2014) & (F.year("ts") <= 2016))
    .cache())

print("Before:", df.count(), "rows")
print("After:", df_clean.count(), "rows")

In [None]:
from pyspark.sql.functions import window, col

from pyspark.sql.functions import window, col, count, avg, stddev_samp, min, max, corr, lit
from pyspark.sql.window import Window
wdef = window(col("ts"), "2.56 seconds", "1.28 seconds")

# Feature lists
feat = (df_clean.groupBy("User","Device","Model","sensor","window_id")
        .agg(
            count(lit(1)).alias("n"),
            avg("x").alias("x_mean"), stddev_samp("x").alias("x_std"),
            avg("y").alias("y_mean"), stddev_samp("y").alias("y_std"),
            avg("z").alias("z_mean"), stddev_samp("z").alias("z_std"),
            avg("m").alias("m_mean"), stddev_samp("m").alias("m_std"),
            min("m").alias("m_min"), max("m").alias("m_max"),
            avg(col("m")*col("m")).alias("m_energy"),
            corr("x","y").alias("xy_corr"),
            corr("y","z").alias("yz_corr"),
            corr("x","z").alias("xz_corr"),
        )
        .filter(col("n") >= 10)
)


lab_counts = (df_clean.groupBy("User","Device","Model","window_id","gt")
                .agg(F.count("*").alias("cnt")))

rk = Window.partitionBy("User","Device","Model","window_id").orderBy(F.desc("cnt"))

labels = (lab_counts.withColumn("r", F.row_number().over(rk))
          .filter(col("r")==1)
          .select("User","Device","Model","window_id","gt")
          .withColumnRenamed("gt","label"))



In [None]:
# Join feature from accel + gyro
feat_acc = (feat.filter(col("sensor")=="accel")
            .drop("sensor")
            .toDF(*[c if c in {"User","Device","Model","window_id"}
                    else c+"_acc" for c in feat.columns if c!="sensor"]))

feat_gyro = (feat.filter(col("sensor")=="gyro")
             .drop("sensor")
             .toDF(*[c if c in {"User","Device","Model","window_id"}
                     else c+"_gyro" for c in feat.columns if c!="sensor"]))

fused = (feat_acc.join(feat_gyro, ["User","Device","Model","window_id"], "outer")
               .na.fill(0.0))

data = fused.select("User","Device","Model","window_id", *[c for c in fused.columns if c not in {"User","Device","Model","window_id"}]).join(labels, ["User","Device","Model","window_id"], "inner")

print("Final joined windows:", data.count())
data.groupBy("label").count().show()

In [None]:
# Check columns
feature_cols = [c for c in data.columns if c.endswith("_acc") or c.endswith("_gyro")]

data = data.select("window_id", "User", "Device", "Model", "label", *feature_cols)

In [None]:
# 5 Fold split by User
from pyspark.sql.functions import abs as spark_abs, hash as spark_hash

k = 5
data = data.withColumn("fold", (spark_abs(spark_hash("User")) % k))
train = data.filter(col("fold") != 0)
test  = data.filter(col("fold") == 0)

