In [1]:
from itertools import chain
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.sql.functions import when, col, create_map, lit
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("BD").getOrCreate()

In [2]:
def get_numerical(data):
    """
    Returns a list of column names that have numerical data types (int or double).
    """
    return [t[0] for t in data.dtypes if t[1] == 'int' or t[1] == 'double']


def clean(data):
    """
    Cleans the given data by filling missing values, dropping rows with missing values,
    and mapping categorical variables to numerical values.
    """
    data = data.fillna(0, subset=["tempo"])
    data = data.dropna()

    keys = {'A': 0, 'A#': 1, 'B': 2, 'C': 3, 'C#': 4, 'D': 5, 'D#': 6, 'E': 7, 'F': 8, 'F#': 9, 'G': 10, 'G#': 11}
    key_mapping = create_map([lit(x) for x in chain(*keys.items())])
    data = data.withColumn("key", key_mapping[col("key")])

    modes = { 'Minor': 0, 'Major': 1 }
    mode_mapping = create_map([lit(x) for x in chain(*modes.items())])
    data = data.withColumn("mode", mode_mapping[col("mode")])
    
    return data


def norm_data(data, col, mean=None, std=None, maxv=None, minv=None):
    """
    Normalize the data in the specified column within the given DataFrame.

    Args:
        data (DataFrame): The input DataFrame.
        col (str): The name of the column to normalize.
        mean (float, optional): The mean value to use for normalization. If not provided, it will be calculated from the data.
        std (float, optional): The standard deviation value to use for normalization. If not provided, it will be calculated from the data.
        maxv (float, optional): The maximum value to use for normalization. If not provided, it will be calculated from the data.
        minv (float, optional): The minimum value to use for normalization. If not provided, it will be calculated from the data.

    Returns:
        Tuple[DataFrame, float, float, float, float]: A tuple containing the normalized DataFrame, mean, standard deviation, maximum value, and minimum value.

    """
    
    if mean is None: mean = data.agg({col: "mean"}).collect()[0][0]
    if std is None: std = data.agg({col: "stddev"}).collect()[0][0]

    # outlier
    data = data.withColumn(f"{col}_outlier", when((data[col] < mean - 3 * std) | (data[col] > mean + 3 * std), 1).otherwise(0))
    data = data.withColumn(col, when(data[col] < mean - 3 * std, mean - 3 * std).otherwise(data[col]))
    data = data.withColumn(col, when(data[col] > mean + 3 * std, mean + 3 * std).otherwise(data[col]))

    # normalize [0,1] range
    if maxv is None: maxv = data.agg({col: "max"}).collect()[0][0]
    if minv is None: minv = data.agg({col: "min"}).collect()[0][0]
    data = data.withColumn(f"{col}", (data[col] - minv) / (maxv - minv))

    # delete outlier column
    data = data.drop(f"{col}_outlier")

    return data, mean, std, maxv, minv

In [3]:
# load and process data
label = ["music_genre"]
seed = 1235
data = spark.read.csv("../data/training.csv", header=True, inferSchema=True, nullValue="", sep=";")
data = clean(data)
dev, test = data.randomSplit([0.9, 0.1], seed=seed)

# dev processing
col_params = {}
for col_name in get_numerical(dev):
    dev, mean, std, maxv, minv = norm_data(dev, col_name)
    col_params[col_name] = (mean, std, maxv, minv)

# test processing
for col_name in get_numerical(test):
    test, _, _, _, _ = norm_data(test, col_name, *col_params[col_name])

In [4]:
# build and train the pipeline
label_col = "music_genre_index"
indexer = StringIndexer(inputCols=label, outputCols=[label_col])
assembler = VectorAssembler(inputCols=get_numerical(dev), outputCol="features")
estimator = MultilayerPerceptronClassifier(featuresCol="features", labelCol=label_col)
pipeline = Pipeline(stages=[indexer, assembler, estimator])

params = (ParamGridBuilder()
    .addGrid(estimator.layers, [[13, 20, 20, 10]])
    .addGrid(estimator.maxIter, [1000])
    .addGrid(estimator.blockSize, [512])
    .build())
evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction", metricName="f1")
cross = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

model = cross.fit(dev)

In [6]:
# evaluate the model
predictions = model.transform(test)
f1 = evaluator.evaluate(predictions)
print(f"f1: {f1}")

test count:  3029
test count:  3029
f1: 0.5869630239773582
