In [54]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean as _mean, stddev as _stddev, col , collect_list , monotonically_increasing_id
from IPython.display import display

import numpy
from numpy import allclose
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import threading





In [12]:
import findspark
findspark.init()
findspark.find()



'C:\\Program Files\\Spark\\spark-3.5.1-bin-hadoop3'

In [13]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("My Spark App") \
    .getOrCreate()

# load data

In [18]:
features_genre_df = spark.read.csv("../databases/OLD/cleaned_songs.csv", header=True, inferSchema=True)
features_genre_df = features_genre_df.na.drop()


features_genre_df.printSchema()

print(features_genre_df.count())


root
 |-- genre: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- lyrics: string (nullable = true)

18184


# prepare data for classification

In [19]:
features = [col_name for col_name in features_genre_df.columns if (col_name != "genre" and col_name != "lyrics" )]

print(features_genre_df.count())


18184


In [63]:
def apply_z_map_reduce(one_feature_df):
    
    count = one_feature_df.count()
    # reduce keys
    reduce_keys = []
    
    
    # Adding an index to the DataFrame
    indexed_df = one_feature_df.withColumn("index", monotonically_increasing_id())
    
    
    
    

    def z_score_map(start_index , size):
        data = indexed_df.filter((col("index") >= start_index) & (col("index") < start_index + size)).toPandas().to_numpy()
        for value , _ in data:
            reduce_keys.append( ( value , value**2  ,1 ) )
        
        return
    def z_score_reduce():
        sum_value = 0
        sum_of_squares = 0
        count = 0
        for key in reduce_keys:
            
            sum_value += key[0]
            sum_of_squares += key[1]
            count += key[2]
        mean = sum_value / count
        variance = (sum_of_squares - (sum_value**2 / count)) / count
        stddev = variance ** 0.5
        return (mean , stddev)


    

    
    count = one_feature_df.count()
    
    # get number of cores to get best threads size
    num_cores = os.cpu_count()
    

    
    chunk_size = indexed_df.count() // num_cores
    
    threads = []
    
    
    
    #apply map function
    for i in range(num_cores):
        start_index = i * chunk_size
        size = chunk_size
        if i == num_cores - 1:
            size = chunk_size + (count%num_cores)
                
        # Create a thread object
        thread = threading.Thread(target=z_score_map, args=(start_index, size))

        # Start the thread
        thread.start()

        threads.append(thread)
    
    #collect threads
    
    for thread in threads:
        thread.join()
    
    # call reduce function
    mean , stddev = z_score_reduce()

    return mean , stddev


In [64]:
  
    
def apply_max_map_reduce(one_feature_df):

    count = one_feature_df.count()
    
    # get number of cores to get best threads size
    num_cores = os.cpu_count()
    
    # reduce keys
    reduce_keys = []
    
    # Adding an index to the DataFrame
    indexed_df = one_feature_df.withColumn("index", monotonically_increasing_id())
    
    chunk_size = indexed_df.count() // num_cores
    
    threads = []


    def abs_max_map(start_index , size):
        data = indexed_df.filter((col("index") >= start_index) & (col("index") < start_index + size)).toPandas().to_numpy()
        for value in data:
            reduce_keys.append( abs(value[0]) )
    
    def abs_max_reduce():
        max_value = reduce_keys[0]
        for key in reduce_keys:
            if key > max_value:
                max_value = key
        return max_value


    #apply map function
    for i in range(num_cores):
        start_index = i * chunk_size
        size = chunk_size
        if i == num_cores - 1:
            size = chunk_size + (count%num_cores)
                
        # Create a thread object
        thread = threading.Thread(target=abs_max_map, args=(start_index, size))

        # Start the thread
        thread.start()

        threads.append(thread)
    
    #collect threads
    
    for thread in threads:
        thread.join()
    
    # call reduce function
    max_value = abs_max_reduce()
    
    

    

        
    
    return max_value
        
        

In [65]:

for index , feature in enumerate(features):
    # cast to double
    
    features_genre_df = features_genre_df.withColumn(feature, col(feature).cast("double"))
   
   # call map reduce function and get mean and standard deviation
   
    mean , stddev = apply_z_map_reduce(features_genre_df.select(feature))
    
    #perform z-score normalization
    features_genre_df = features_genre_df.withColumn(feature, (col(feature) - mean) / stddev)
    
    # make all values lies between -1 and 1
    
    # get absolute maximum value
    
    max_value = apply_max_map_reduce(features_genre_df.select(feature))
    
    # divide by max value
    features_genre_df = features_genre_df.withColumn(feature, (col(feature) / max_value))

    print("Standard Deviation:", stddev)
    print("Mean:", mean)
    print ("Max Value:", max_value)
    

0 1515
1515 1515
3030 1515
4545 1515
6060 1515
7575 1515
9090 1515
10605 1515
12120 1515
13635 1515
15150 1515
16665 1519
Standard Deviation: 0.31230487177106014
Mean: 7.95929908614499e-12
Max Value: 3.2019993615118003
0 1515
1515 1515
3030 1515
4545 1515
6060 1515
7575 1515
9090 1515
10605 1515
12120 1515
13635 1515
15150 1515
16665 1519
Standard Deviation: 0.001582451594984059
Mean: -0.9940974564614363
Max Value: 3.729999424483588
0 1515
1515 1515
3030 1515
4545 1515
6060 1515
7575 1515
9090 1515
10605 1515
12120 1515
13635 1515
15150 1515
16665 1519
Standard Deviation: 3.6230354921589285
Mean: 5.362145390070922
Max Value: 1.556113546811969
0 1515
1515 1515
3030 1515
4545 1515
6060 1515
7575 1515
9090 1515
10605 1515
12120 1515
13635 1515
15150 1515
16665 1519
Standard Deviation: 2.9428110994334817
Mean: -6.773701241134752
Max Value: 5.5216927658025305
0 1515
1515 1515
3030 1515
4545 1515
6060 1515
7575 1515
9090 1515
10605 1515
12120 1515
13635 1515
15150 1515
16665 1519
Standard De

In [67]:
#export to csv
features_genre_df.toPandas().to_csv("../databases/OLD/normalized_songs.csv", index=False , header=True)