In [1]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

from pyspark.sql.types import StructType, StructField, FloatType, StringType, TimestampType

In [2]:
load_dotenv('../../.env')

True

In [None]:
BUCKET_NAME = os.getenv('BUCKET')
SOURCE_DIR = os.getenv('PROCESSED_PATH')
DEST_PATH = os.getenv('PROCESSED_PATH')
DB_NAME = os.getenv('DB_NAME')
TABLE_NAME = os.getenv('TABLE_NAME')

In [3]:
spark = SparkSession.builder.appName("activity-classifier").getOrCreate()

In [4]:
# define data schema
schema = StructType([
    StructField("stamp", TimestampType(), nullable=True),
    StructField("yaw", FloatType(), nullable=True),
    StructField("pitch", FloatType(), nullable=True),
    StructField("roll", FloatType(), nullable=True),
    StructField("rotation_rate_x", FloatType(), nullable=True),
    StructField("rotation_rate_y", FloatType(), nullable=True),
    StructField("rotation_rate_z", FloatType(), nullable=True),
    StructField("user_acceleration_x", FloatType(), nullable=True),
    StructField("user_acceleration_y", FloatType(), nullable=True),
    StructField("user_acceleration_z", FloatType(), nullable=True),
    StructField("location_type", StringType(), nullable=True),
    StructField("latitude_distance_from_mean", FloatType(), nullable=True),
    StructField("longitude_distance_from_mean", FloatType(), nullable=True),
    StructField("altitude_distance_from_mean", FloatType(), nullable=True),
    StructField("course", FloatType(), nullable=True),
    StructField("speed", FloatType(), nullable=True),
    StructField("horizontal_accuracy", FloatType(), nullable=True),
    StructField("vertical_accuracy", FloatType(), nullable=True),
    StructField("battery_state", StringType(), nullable=True),
    StructField("user_activity_label", StringType(), nullable=True),
])

In [None]:
data = spark \
    .read \
    .option("mode", "DROPMALFORMED") \
    .parquet(f"{os.getenv}/{SOURCE_DIR}")

## Data Processing

In [None]:
string_cols = ['location_type', 'battery_state']

In [None]:
string_indexer = StringIndexer(inputCols=string_cols, outputCols=[s+'_indexed' for s in string_cols])