In [1]:
#importing libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, Normalizer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.pipeline import Pipeline


spark = SparkSession.builder.appName("NN music class").getOrCreate()


# import csv files
df1 = spark.read.csv(
    path="/user1/songs.csv",
    sep='\t',
    header = True,
    quote = '"',
    inferSchema= True)

df2 = spark.read.csv(
    path="/user1/acoustic_features.csv",
    sep='\t',
    header = True,
    quote = '"',
    inferSchema= True)


# Create views for df
df1.createOrReplaceTempView("features")
df2.createOrReplaceTempView("songs")

#show spark UI
spark

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [2]:
df1.show(truncate=5)

+-------+---------+---------+-------+----------+--------+---------+
|song_id|song_name|billboard|artists|popularity|explicit|song_type|
+-------+---------+---------+-------+----------+--------+---------+
|  3e...|    th...|    ('...|  {'...|        86|    true|     Solo|
|  5p...|    Wi...|    ('...|  {'...|        87|    true|     Solo|
|  2x...|    SI...|    ('...|  {'...|        85|    true|     Solo|
|  3K...|    Su...|    ('...|  {'...|        92|   false|    Co...|
|  1r...|    Hi...|    ('...|  {'...|        86|   false|     Solo|
|  0b...|    Al...|    ('...|  {'...|        63|   false|     Solo|
|  5h...|    It...|    ("...|  {'...|        52|   false|     Solo|
|  2E...|    Ro...|    ("...|  {'...|        53|   false|     Solo|
|  0M...|    A ...|    ('...|  {'...|         3|   false|     Solo|
|  6Z...|    Ji...|    ('...|  {'...|        51|   false|     Solo|
|  1x...|    Mo...|    ('...|  {'...|        81|    true|     Solo|
|  4P...|    Th...|    ('...|  {'...|        48|

In [3]:
# SQL join df1+df2
joined_df = spark.sql("""
    SELECT *
    FROM features
    INNER JOIN songs ON features.song_id = songs.song_id
""")

#drop columns
for x in ['songs.song_id','song_name','billboard','artists','features.song_id']:
    joined_df = joined_df.drop(joined_df[x])

In [4]:
#SQL queries before feature engineering
joined_df.createOrReplaceTempView("df")
result = spark.sql("""
    SELECT DISTINCT time_signature
    FROM df
""")
result.show()

result = spark.sql("""
    SELECT DISTINCT key
    FROM df
""")
result.show()

result = spark.sql("""
    SELECT DISTINCT mode

    FROM df
""")
result.show()

result = spark.sql("""
    SELECT DISTINCT song_type

    FROM df
""")
result.show()

                                                                                

+--------------+
|time_signature|
+--------------+
|             1|
|             3|
|             5|
|             4|
|             0|
+--------------+

+---+
|key|
+---+
|  1|
|  6|
|  3|
|  5|
|  9|
|  4|
|  8|
|  7|
| 10|
| 11|
|  2|
|  0|
+---+

+----+
|mode|
+----+
|   1|
|   0|
+----+

+-------------+
|    song_type|
+-------------+
|Collaboration|
|         Solo|
+-------------+



In [7]:
#feature engineering based on EDA
from pyspark.sql.functions import col,when

joined_df = joined_df.withColumn('song_type',when(col('song_type')=='Solo',1).otherwise(0))
joined_df = joined_df.withColumn('popularity',when(col('popularity')>=50,1).otherwise(0))
joined_df.printSchema()

root
 |-- popularity: integer (nullable = false)
 |-- explicit: boolean (nullable = true)
 |-- song_type: integer (nullable = false)
 |-- duration_ms: integer (nullable = true)
 |-- mode: integer (nullable = true)
 |-- time_signature: integer (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)



In [8]:
#Normalize and create vector for NN
columns_to_normalize = [
 'explicit',
 'song_type',
 'duration_ms',
 'mode',
 'time_signature',
 'acousticness',
 'danceability',
 'energy',
 'instrumentalness',
 'liveness',
 'loudness',
 'speechiness',
 'valence',
 'tempo']

#assembler
assembler = VectorAssembler(inputCols=columns_to_normalize, outputCol="features")

#normalizer
normalizer = Normalizer(inputCol="features", outputCol="normalized_features", p=2.0)

#put all together in pipeline
pipeline = Pipeline(stages=[assembler, normalizer])

#final df
normalized_df = pipeline.fit(joined_df).transform(joined_df)
#show normalized vector
normalized_df.select("normalized_features").show(5)

2023-09-23 01:12:37,851 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+
| normalized_features|
+--------------------+
|[4.82346067092288...|
|[4.95881589199965...|
|[3.19672615899143...|
|[0.0,0.0,0.999999...|
|[0.0,5.2370548235...|
+--------------------+
only showing top 5 rows



In [9]:
#NN creation
layers = [14, 64, 32, 16, 2] 

#Neural network params
classifier = MultilayerPerceptronClassifier(
    layers=layers,
    labelCol="popularity",  
    featuresCol="normalized_features",
    maxIter=100,            
    blockSize=128,         
    seed=42
)

# final pipeline
pipeline = Pipeline(stages=[assembler, normalizer, classifier])

# Train test split

train_ratio = .8
test_ratio = .2
train_data, test_data = joined_df.randomSplit([train_ratio, test_ratio], seed=42)

# Fit the model 
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluation pf different metrics

metric_names = ["accuracy", "f1", "weightedPrecision", "weightedRecall"]

for metric_name in metric_names:

    evaluator = MulticlassClassificationEvaluator(
        labelCol="popularity",
        predictionCol="prediction",
        metricName=metric_name)


# print each metric
    metric_value = evaluator.evaluate(predictions, {evaluator.metricName: metric_name})
    print(f"{metric_name}: {metric_value:.4f}")




2023-09-23 01:12:55,220 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2023-09-23 01:12:55,221 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


accuracy: 0.7087
f1: 0.5878
weightedPrecision: 0.5022
weightedRecall: 0.7087


In [10]:
print(classifier.explainParams())

blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. (default: 128, current: 128)
featuresCol: features column name. (default: features, current: normalized_features)
initialWeights: The initial weights of the model. (undefined)
labelCol: label column name. (default: label, current: popularity)
layers: Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons. (current: [14, 64, 32, 16, 2])
maxIter: max number of iterations (>= 0). (default: 100, current: 100)
predictionCol: prediction column name. (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (defaul

In [11]:
sc.stop()