In [1]:
# --- CONFIGURATION ---

# Java configuration
jarConfigPath = "/Users/d.veragillard/edu/semester/WIM-1/big-data-advanced-database/bd-project/postgresql-42.7.1.jar"

# Spark configuration
allocated_memory = "5g"  
allocated_cores = "6"  

# Database configuration
database_url = "jdbc:postgresql://localhost:5432/musicbrainz"
properties = {"user": "musicbrainz", "password": "musicbrainz", "driver": "org.postgresql.Driver"}

# --- END OF CONFIGURATION ---

#### Setup

- Initialize Spark session connecting to the Postgres DB

In [2]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()

from pyspark.sql import SparkSession

# Initialize Spark session
# We attach more memory to the driver and executors(https://spark.apache.org/docs/latest/tuning.html#memory-management-overview)
# We use the G1 garbage collector for better performance(https://spark.apache.org/docs/latest/tuning.html#garbage-collection-tuning)
# We add more cores to the driver and executors(https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism)
spark = SparkSession \
    .builder \
    .appName("MusicBrainz PostgreSQL Connection") \
    .config("spark.jars", jarConfigPath) \
    .config("spark.executor.memory", allocated_memory) \
    .config("spark.driver.memory", allocated_memory) \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
    .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC") \
    .config("spark.executor.cores", allocated_cores) \
    .config("spark.driver.cores", allocated_cores) \
    .getOrCreate()

23/12/31 18:48:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Data Collection

- Get the relevant data from Postgres
- Already do cleaning in this stage by only selecting relevant columns

First get general Artist and Area(The country to predict) and additional Artist/Country information that could hint about the artist country:

In [3]:
from pyspark.sql.functions import broadcast

# Read data from artist and area tables with only necessary columns
artist_df = spark.read.jdbc(url=database_url, table="artist", properties=properties).select("id", "name", "area")
area_df = spark.read.jdbc(url=database_url, table="area", properties=properties).select("id", "name")

# Assuming area_df is smaller and can be broadcasted
# Broadcast join for artist and area tables
artist_country_df = artist_df.join(broadcast(area_df), artist_df.area == area_df.id)

# Select relevant columns
artist_country_df = artist_country_df.select(artist_df.name, area_df.name.alias("country"))

# Read more that could be useful for the analysis
language_df = spark.read.jdbc(url=database_url, table="language", properties=properties).select("id", "name")
alias_df = spark.read.jdbc(url=database_url, table="artist_alias", properties=properties).select("artist", "name")

# Join tables...
# Use explicit column names to avoid ambiguity
artist_language_df = artist_df.join(language_df, artist_df.id == language_df.id).select(artist_df.name, language_df.name.alias("language"))
artist_alias_df = artist_df.join(alias_df, artist_df.id == alias_df.artist).select(artist_df.name, alias_df.name.alias("alias"))

# Combining all data into one dataframe with left outer join
combined_artist_df = artist_country_df \
    .join(artist_alias_df, ["name"], "left_outer") \
    .join(artist_language_df, ["name"], "left_outer")

#### Data preprocessing

##### Data cleaning

Handle missing data. F.ex all the NULLs:

In [4]:
from pyspark.sql.functions import col, when

# Dropping rows where 'country', 'name' is null or empty
combined_artist_df = combined_artist_df.filter(combined_artist_df.country.isNotNull())
combined_artist_df = combined_artist_df.filter(combined_artist_df.name.isNotNull())

# Remove all rows in combined_artist_df that have null values
combined_artist_df = combined_artist_df.na.drop()
print("Number of rows: " + str(combined_artist_df.count()))

combined_artist_df.show()

                                                                                

Number of rows: 13892


                                                                                

+------+-------------+--------------------+--------+
|  name|      country|               alias|language|
+------+-------------+--------------------+--------+
|*NSYNC|United States|              N*SYNC|  Uzekwe|
|*NSYNC|United States|             ´N Sync|  Uzekwe|
|*NSYNC|United States|              N'Sync|  Uzekwe|
|*NSYNC|United States|             'N Sync|  Uzekwe|
|*NSYNC|United States|              'NSync|  Uzekwe|
|*NSYNC|United States|             N' Sync|  Uzekwe|
|*NSYNC|United States|              N-SYNC|  Uzekwe|
|*NSYNC|United States|               NSYNC|  Uzekwe|
|*NSYNC|United States|             N'Synch|  Uzekwe|
|*NSYNC|United States|              N Sync|  Uzekwe|
|*NSYNC|United States|              Nsynch|  Uzekwe|
|*NSYNC|United States|              ★NSYNC|  Uzekwe|
|*NSYNC|United States|              ★NSync|  Uzekwe|
|*NSYNC|United States|              *NSYNC|  Uzekwe|
|   666|      Germany|         Sei Sei Sei|   Nsari|
|   666|      Germany|Triangle Six of D...|   

##### Feature transformation

Transform feature strings into more suitable formats. To do this:

1. Use `StringIndexer` to convert the strings in the columns into indices(Like unique IDs)
2. Then use `OneHotEncoder` to convert the categorical indices into a binary vector(F.ex `[0,1,0,...]`)


In [5]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# String Indexing for all categorical columns
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(combined_artist_df) 
            for column in ["name", "country", "language", "alias"]]

# One-Hot Encoding for all indexed columns
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol()+"_vec") 
            for indexer in indexers]

23/12/31 18:48:32 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

##### Feature normalization

Scale transformed values to fixed range. To do this:

1. Use `VectorAssembler` to combine multiple columns into a single vector column. Helps with machine learning algorithms
2. Then apply `StandardScaler`. It helps, to make sure that the model is not influenced by features with larger scales

In [6]:
# Vector Assembling all the features
assemblerInputs = [encoder.getOutputCol() for encoder in encoders]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

# Feature normalization
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

Finally combine all steps into one transformation / normalization pipeline and run it:

In [7]:
# Building a Pipeline for transformations
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

# Transforming the data
model = pipeline.fit(combined_artist_df)
transformed_df = model.transform(combined_artist_df)

23/12/31 18:48:39 WARN DAGScheduler: Broadcasting large task binary with size 1559.5 KiB
23/12/31 18:48:40 WARN DAGScheduler: Broadcasting large task binary with size 1543.4 KiB
                                                                                

#### Data Splitting

- Generate test, train and validation datasets

In [8]:
# Splitting the data into training, validation, and testing sets
train_data, val_data, test_data = transformed_df.randomSplit([0.7, 0.15, 0.15], seed=42)

# Show the count of each dataset
print(f"Training Data Count: {train_data.count()}")
print(f"Validation Data Count: {val_data.count()}")
print(f"Testing Data Count: {test_data.count()}")

23/12/31 18:48:42 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


Training Data Count: 9823


23/12/31 18:48:44 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


Validation Data Count: 2008


23/12/31 18:48:47 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


Testing Data Count: 2064



#### Training

- Select model
- Train model

In [9]:
from pyspark.ml.classification import LogisticRegression

# Initialize the Logistic Regression model
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="country_index")

# Fit the model on the training data
lrModel = lr.fit(train_data)

# Print the coefficients and intercept
print("Coefficients: " + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

# You can also print a summary of the model over the training set
trainingSummary = lrModel.summary
print("Accuracy: ", trainingSummary.accuracy)
print("False Positive Rate: ", trainingSummary.weightedFalsePositiveRate)
print("True Positive Rate: ", trainingSummary.weightedTruePositiveRate)
print("F-Measure: ", trainingSummary.weightedFMeasure())
print("Precision: ", trainingSummary.weightedPrecision)
print("Recall: ", trainingSummary.weightedRecall)


23/12/31 18:48:51 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:48:52 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:48:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/12/31 18:48:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/12/31 18:48:52 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:48:53 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:48:54 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:48:54 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:48:54 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:48:54 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:48:55 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 

Coefficients: DenseMatrix([[-5.99243545e-01,  4.24459041e-01,  1.33329163e-01, ...,
              -1.49719556e-02, -2.32857031e-02,  0.00000000e+00],
             [ 3.66389123e-01,  3.05987535e-01,  1.51954467e-01, ...,
               1.89706244e-03, -9.75152186e-03,  0.00000000e+00],
             [ 3.35403633e-01, -1.15056554e-01,  1.27548917e-01, ...,
               4.58226118e-04, -1.32741295e-03,  0.00000000e+00],
             ...,
             [ 1.66738882e-04, -1.67219635e-04, -5.79973378e-04, ...,
               1.43369402e-05,  1.48928215e-05,  0.00000000e+00],
             [-1.66760880e-04, -7.70518054e-04, -1.37538846e-03, ...,
               1.14108275e-05,  1.24972812e-05,  0.00000000e+00],
             [ 8.39534774e-04, -8.63281495e-04, -2.91724274e-03, ...,
               7.27501149e-05,  7.57907907e-05,  0.00000000e+00]])
Intercept: [7.960963422339186,6.707949359498985,4.949541056970745,3.58626118846711,3.6417932808680957,3.634700081158457,3.4849159877524314,3.3063714835

23/12/31 18:49:23 WARN DAGScheduler: Broadcasting large task binary with size 34.6 MiB


Accuracy:  1.0
False Positive Rate:  0.0
True Positive Rate:  1.0000000000000058
F-Measure:  1.0000000000000058
Precision:  1.0000000000000058
Recall:  1.0000000000000058


                                                                                

#### Evaluation

- Validate model performance
- Adjust parameters respectively (Hyperparameter Tuning, ...)

In [10]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a ParamGrid for tuning parameters
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.maxIter, [10, 50, 100]) \
    .build()

# Create a CrossValidator
cv = CrossValidator(estimator=lr, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=MulticlassClassificationEvaluator(labelCol="country_index", predictionCol="prediction"), 
                    numFolds=3)

# Run cross-validation, and choose the best set of parameters.
cvModel = cv.fit(train_data)

# Use the best model to make predictions on the validation data
val_predictions = cvModel.transform(val_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="country_index", predictionCol="prediction")
accuracy = evaluator.evaluate(val_predictions, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(val_predictions, {evaluator.metricName: "f1"})

print(f"Validation Accuracy: {accuracy}")
print(f"Validation F1 Score: {f1}")

23/12/31 18:49:25 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:26 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:26 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:27 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:27 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:27 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:27 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:28 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:28 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:28 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:28 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/12/31 18:49:29 WARN DAGScheduler: Broadcasting larg



23/12/31 18:51:53 ERROR Instrumentation: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:75)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:53)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.ex

ConnectionRefusedError: [Errno 61] Connection refused

#### Testing

- On unseen Data

In [None]:
# Use the best model to make predictions on the test data
test_predictions = cvModel.transform(test_data)

# Evaluate the model on test data
test_accuracy = evaluator.evaluate(test_predictions, {evaluator.metricName: "accuracy"})
test_f1 = evaluator.evaluate(test_predictions, {evaluator.metricName: "f1"})

print(f"Test Accuracy: {test_accuracy}")
print(f"Test F1 Score: {test_f1}")

#### Deployment

- Deploy into Cloud?

#### Monitoring / Maintainance

- ?