In [1]:
# Installing pyspark on local machine
!pip install pyspark 



In [2]:
# Importing installed pyspark
import pyspark

In [3]:
# Importing PySpark Session 
from pyspark.sql import SparkSession

In [4]:
# Creating Spark and setting a name for the application
spark=SparkSession.builder.appName('DBDA Project').getOrCreate()

In [5]:
spark

In [6]:
#REMOVING STRUCTURAL ERRORS FROM DATASET

In [10]:
# Replacing Column number with header name 
music=spark.read.option('header','true').csv(r"C:\Users\pisal\OneDrive\Desktop\8_march_project\final_spotify.csv")

In [11]:
music

DataFrame[index_id: string, track_id: string, artists: string, album_name: string, track_name: string, popularity: string, duration_ms: string, explicit: string, danceability: string, energy: string, key: string, loudness: string, mode: string, speechiness: string, acousticness: string, instrumentalness: string, liveness: string, valence: string, tempo: string, time_signature: string, track_genre: string, spotify_release_date: string]

In [12]:
#displaying the edited file
music.show()

+--------+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+--------------------+
|index_id|            track_id|             artists|          album_name|          track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|track_genre|spotify_release_date|
+--------+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+--------------------+
|       0|5SuOikwiRyPMVoIQD...|         Gen Hoshino|              Comedy|              Comedy|        73|     230666|   FALSE|       0.676| 0.461|  1|  -6.746| 

In [13]:
## Check the schema of the dataset
music.printSchema()

root
 |-- index_id: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- explicit: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: string (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- tempo: string (nullable = true)
 |-- time_signature: string (nullable = true)
 |-- track_genre: string (nullable = true)
 |-- spotify_release_date: string (nullable = true)



In [14]:
#TYPECASTING to use data in valid datatype

In [15]:
# Changing the datatype of the columns
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import BooleanType
from pyspark.sql.types import DateType

music = music.withColumn("popularity",music["popularity"].cast(IntegerType()))
music = music.withColumn("duration_ms",music["duration_ms"].cast(DoubleType()))
music = music.withColumn("explicit",music["explicit"].cast(BooleanType()))
music = music.withColumn("danceability",music["danceability"].cast(DoubleType()))
music = music.withColumn("energy",music["energy"].cast(DoubleType()))
music = music.withColumn("key",music["key"].cast(IntegerType()))
music = music.withColumn("loudness",music["loudness"].cast(DoubleType()))
music = music.withColumn("mode",music["mode"].cast(BooleanType()))
music = music.withColumn("speechiness",music["speechiness"].cast(DoubleType()))
music = music.withColumn("acousticness",music["acousticness"].cast(DoubleType()))
music = music.withColumn("instrumentalness",music["instrumentalness"].cast(DoubleType()))
music = music.withColumn("liveness",music["liveness"].cast(DoubleType()))
music = music.withColumn("valence",music["valence"].cast(DoubleType()))
music = music.withColumn("spotify_release_date",music["spotify_release_date"].cast(DateType()))
music = music.withColumn("tempo",music["tempo"].cast(DoubleType()))
music = music.withColumn("time_signature",music["time_signature"].cast(DoubleType()))

In [16]:
music.groupBy("track_genre").count().show()

+-----------+-----+
|track_genre|count|
+-----------+-----+
|      anime| 1000|
|          3|   23|
|      0.576|    1|
|alternative| 1000|
|death-metal| 1000|
|     74.077|    1|
|    105.188|    1|
|     68.958|    2|
|    ambient| 1000|
|   cantopop| 1000|
|      blues| 1000|
|  breakbeat| 1000|
|     68.453|    1|
|      dance| 1000|
|     brazil| 1000|
|    151.539|    1|
|     117.11|    1|
|     76.791|    1|
|      chill| 1000|
|  bluegrass| 1000|
+-----------+-----+
only showing top 20 rows



In [17]:
# to verify whether the data types are changed or not
music.printSchema()

root
 |-- index_id: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: boolean (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- track_genre: string (nullable = true)
 |-- spotify_release_date: date (nullable = true)



In [18]:
#REMOVING THE NULL VALUES

In [19]:
# Deleting null values
music.na.drop(how='any').count()

113865

In [20]:
# Rows with any null value are deleted

In [21]:
music.show(truncate=False)

+--------+----------------------+------------------------------------+------------------------------------------------------+--------------------------------+----------+-----------+--------+------------+------+---+--------+-----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+--------------------+
|index_id|track_id              |artists                             |album_name                                            |track_name                      |popularity|duration_ms|explicit|danceability|energy|key|loudness|mode |speechiness|acousticness|instrumentalness|liveness|valence|tempo  |time_signature|track_genre|spotify_release_date|
+--------+----------------------+------------------------------------+------------------------------------------------------+--------------------------------+----------+-----------+--------+------------+------+---+--------+-----+-----------+------------+----------------+--------+-------+-------+--------------

In [22]:
#FEATURE EXTRACTION 

In [23]:
from pyspark.sql.functions import col

X = music.drop("track_id", "index_id", "artists", "album_name", "track_name", "track_genre", "duration_ms", "explicit", "key","mode", "spotify_release_date")

X = X.select([col(c).cast("double") for c in X.columns])  # convert columns to double type

y = music.select("track_genre")

In [24]:
X.printSchema()

root
 |-- popularity: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)



In [25]:
from pyspark.ml.linalg import Vector 
from pyspark.ml.feature import VectorAssembler

In [26]:
X.columns,len(X.columns)

(['popularity',
  'danceability',
  'energy',
  'loudness',
  'speechiness',
  'acousticness',
  'instrumentalness',
  'liveness',
  'valence',
  'tempo',
  'time_signature'],
 11)

In [27]:
assembler = VectorAssembler(inputCols = ['popularity','danceability','energy','loudness','speechiness','acousticness','instrumentalness','liveness','valence','tempo','time_signature'], outputCol="features")

In [28]:
assembled_data=assembler.setHandleInvalid("skip").transform(X)

In [29]:
assembled_data.show(truncate=False)

+----------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+--------------+------------------------------------------------------------------------+
|popularity|danceability|energy|loudness|speechiness|acousticness|instrumentalness|liveness|valence|tempo  |time_signature|features                                                                |
+----------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+--------------+------------------------------------------------------------------------+
|73.0      |0.676       |0.461 |-6.746  |0.143      |0.0322      |1.01E-6         |0.358   |0.715  |87.917 |4.0           |[73.0,0.676,0.461,-6.746,0.143,0.0322,1.01E-6,0.358,0.715,87.917,4.0]   |
|55.0      |0.42        |0.166 |-17.235 |0.0763     |0.924       |5.56E-6         |0.101   |0.267  |77.489 |4.0           |[55.0,0.42,0.166,-17.235,0.0763,0.924,5.56E-6,0.101,0.267,77.489,4.0]   |
|57.0      |0.4

In [30]:
from pyspark.ml.feature import StandardScaler

In [31]:
scale=StandardScaler(inputCol='features',outputCol='standardized')

In [32]:
scaleModel = scale.fit(assembled_data)

In [33]:
#Model Selection and Fiting

In [34]:
music.select("track_genre").show(11)

+-----------+
|track_genre|
+-----------+
|   acoustic|
|   acoustic|
|   acoustic|
|   acoustic|
|   acoustic|
|   acoustic|
|   acoustic|
|   acoustic|
|   acoustic|
|   acoustic|
|   acoustic|
+-----------+
only showing top 11 rows



Logistic Regression

In [35]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

# Drop the irrelevant columns
data = music.drop('index_id','track_id', 'artists', 'album_name','mode',
                             'track_name','key','duration_ms','explicit', 'spotify_release_date')

# Convert the genre column to numerical labels
labelIndexer = StringIndexer(inputCol='track_genre', outputCol='label', handleInvalid="skip").fit(data)
data = labelIndexer.transform(data)
data = data.drop("track_genre")

# # Split the data into training and test sets
(trainingData, testData) = data.randomSplit([0.7, 0.3], seed=123)

# # Assemble the features into a vector
assembler = VectorAssembler(inputCols=data.columns, outputCol='features')
assembled_data=assembler.setHandleInvalid("skip").transform(trainingData)

trainingData = assembler.transform(trainingData)
testData = assembler.transform(testData)

# Train a Multiclass Logistic Regression model
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial", labelCol="label")
start_time = time.time()
lrModel = lr.fit(trainingData)
end_time = time.time()
fitting_time = end_time - start_time

# Make predictions on the test set
predictions = lrModel.transform(testData)

# Evaluate the performance of the model using accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Logistic Regression")
print("Accuracy: %.2f%%" % (accuracy * 100.0))
print("Time taken to fit the model: {:.2f} seconds".format(fitting_time))


Logistic Regression
Accuracy: 0.80%
Time taken to fit the model: 17.32 seconds


In [36]:
testData

DataFrame[popularity: int, danceability: double, energy: double, loudness: double, speechiness: double, acousticness: double, instrumentalness: double, liveness: double, valence: double, tempo: double, time_signature: double, label: double, features: vector]

In [37]:
# define udf to convert VectorType to array of lists
vector_to_list = udf(lambda v: v.toArray().tolist(), ArrayType(DoubleType()))

# apply udf to VectorType column and create new column
df = testData.withColumn("features_list", vector_to_list(col("features")))


NameError: name 'udf' is not defined

In [38]:
testData.show(3)

+----------+------------+-------+--------+-----------+------------+----------------+--------+-------+-------+--------------+-----+--------------------+
|popularity|danceability| energy|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|label|            features|
+----------+------------+-------+--------+-----------+------------+----------------+--------+-------+-------+--------------+-----+--------------------+
|         0|      0.0688|  0.237| -18.403|     0.0408|        0.64|           0.977|   0.099| 0.0363| 76.487|           4.0|  3.0|[0.0,0.0688,0.237...|
|         0|      0.0713|0.00696| -39.507|     0.0405|       0.994|           0.915|   0.071| 0.0977|213.848|           3.0|  3.0|[0.0,0.0713,0.006...|
|         0|      0.0877| 0.0452| -22.712|     0.0431|       0.988|           0.936|   0.109| 0.0353| 85.425|           4.0|  3.0|[0.0,0.0877,0.045...|
+----------+------------+-------+--------+-----------+------------+----------------+----

In [39]:
df.select("features_list").first()

NameError: name 'df' is not defined

In [40]:
from pyspark.sql import Row
first_row = df.first().asDict()
selected_cols = {k:v for k,v in first_row.items() if k != "features"}


NameError: name 'df' is not defined

In [41]:
withoutFeatures = {k:v for k,v in selected_cols.items() if k != "features_list"}

NameError: name 'selected_cols' is not defined

In [42]:
dict_values_lst = []
for i in withoutFeatures.values():
  dict_values_lst.append(i)

NameError: name 'withoutFeatures' is not defined

In [43]:
a = [0, 0.0, 0.04, -29.714, 0.0, 0.928, 0.956, 0.115, 0.0, 0.0, 0.0, 109.0]

In [44]:
len(testData.schema), testData.schema

(13,
 StructType([StructField('popularity', IntegerType(), True), StructField('danceability', DoubleType(), True), StructField('energy', DoubleType(), True), StructField('loudness', DoubleType(), True), StructField('speechiness', DoubleType(), True), StructField('acousticness', DoubleType(), True), StructField('instrumentalness', DoubleType(), True), StructField('liveness', DoubleType(), True), StructField('valence', DoubleType(), True), StructField('tempo', DoubleType(), True), StructField('time_signature', DoubleType(), True), StructField('label', DoubleType(), False), StructField('features', VectorUDT(), True)]))

In [45]:
from pyspark.ml.linalg import Vectors
vector = Vectors.dense(a)
print(vector)

[0.0,0.0,0.04,-29.714,0.0,0.928,0.956,0.115,0.0,0.0,0.0,109.0]


In [46]:
dict_values_lst.append(vector)

In [47]:
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.ml.linalg import VectorUDT
# Get the schema of the existing Training DataFrame 
test_schema = StructType([StructField('popularity', IntegerType(), True), StructField('danceability', DoubleType(), True), StructField('energy', DoubleType(), True), StructField('loudness', DoubleType(), True), StructField('speechiness', DoubleType(), True), StructField('acousticness', DoubleType(), True), StructField('instrumentalness', DoubleType(), True), StructField('liveness', DoubleType(), True), StructField('valence', DoubleType(), True), StructField('tempo', DoubleType(), True), StructField('time_signature', DoubleType(), True), StructField('label', DoubleType(), False),StructField('features', VectorUDT(), True)])
# Create a DataFrame from the list of data and the schema
test_df = spark.createDataFrame([dict_values_lst], test_schema)
print(test_schema)

ValueError: Length of object (1) does not match with length of fields (13)

In [48]:
predictions = lrModel.transform(test_df)

NameError: name 'test_df' is not defined

In [49]:
# Evaluate the performance of the model using accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Logistic Regression")
print("Accuracy: %.2f%%" % (accuracy * 100.0))
print("Time taken to fit the model: {:.2f} seconds".format(fitting_time))

Logistic Regression
Accuracy: 0.80%
Time taken to fit the model: 17.32 seconds


In [50]:
# music_encoded.printSchema()

In [51]:
# df = music_encoded

In [None]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import IntegerType
# from pyspark.ml.linalg import VectorUDT

# # assuming that you have already loaded the DataFrame into a variable called "df"

# # define a UDF to convert a vector to an integer
# vector_to_int_udf = udf(lambda v: int(v[0]), IntegerType())

# # add a new column "int_col" to the DataFrame, which applies the UDF to the "vector_col" column
# df_with_int_col = df.withColumn("int_col", vector_to_int_udf(df["track_genre_encoded"]))

# # show the DataFrame with the new "int_col" column
# df_with_int_col.show()


In [None]:
# from pyspark.sql.functions import udf
# from pyspark.ml.linalg import Vectors
# from pyspark.sql.types import StringType

# # assuming that you have a DataFrame called "df" with a Vector column "vector_col"

# # define a UDF to convert the Vector column to a String column
# def vector_to_string(vector):
#     return str(vector)

# vector_to_string_udf = udf(vector_to_string, StringType())

# # add a new column called "string_col" with the converted values
# df = df.withColumn("string_col", vector_to_string_udf("track_genre_encoded"))

# # show the updated DataFrame
# df.show()


In [None]:
# df.printSchema()

In [None]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import IntegerType
# from pyspark.ml.linalg import VectorUDT

# # assuming that you have already loaded the DataFrame into a variable called "df"

# # define a UDF to convert a vector to an integer
# vector_to_int_udf = udf(lambda v: int(v[0]), IntegerType())

# # add a new column "int_col" to the DataFrame, which applies the UDF to the "vector_col" column
# df_with_int_col = df.withColumn("int_col", vector_to_int_udf(df["string_col"]))

# # show the DataFrame with the new "int_col" column
# df_with_int_col.show()


In [54]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

# Drop the irrelevant columns
data = music.drop('index_id','track_id', 'artists', 'album_name','mode','track_name','key','duration_ms','explicit',
                  'spotify_release_date')

# Convert the genre column to numerical labels
labelIndexer = StringIndexer(inputCol='track_genre', outputCol='label', handleInvalid="skip").fit(data)
data = labelIndexer.transform(data)
data = data.drop("track_genre")

# # Split the data into training and test sets
(trainingData, testData) = data.randomSplit([0.7, 0.3], seed=123)

# # Assemble the features into a vector
assembler = VectorAssembler(inputCols=data.columns, outputCol='features')
assembled_data=assembler.setHandleInvalid("skip").transform(data)

trainingData = assembler.transform(trainingData)
testData = assembler.transform(testData)

# Train the Random Forest model
rf = RandomForestClassifier(numTrees=40, maxBins=200, featureSubsetStrategy='sqrt', labelCol='label')
start_time = time.time()
rfModel = rf.fit(trainingData)
end_time = time.time()
fitting_time = end_time - start_time

# Make predictions on the test data
predictions = rfModel.transform(testData)

# # Evaluate the accuracy of the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print("Random Forest Classifier")
print("Accuracy: %.2f%%" % (accuracy * 100.0))
print("Time taken to fit the model: {:.2f} seconds".format(fitting_time))

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [53]:
testData.show()

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [None]:
trainingData.printSchema

In [None]:
testData.printSchema

In [55]:
y = rfModel.transform([50,0.369,0.598,-6.984,0.0304,0.005110,0.000000,0.0466,148.014,4])

NameError: name 'rfModel' is not defined

In [None]:
a = ([50,0.369,0.598,-6.984,0.0304,0.005110,0.000000,0.0466,148.014,4])

In [None]:
predictions = rfModel.transform([50,0.369,0.598,-6.984,0.0304,0.005110,0.000000,0.0466,148.014,4])

In [None]:
feature_cols = ["popularity", "danceability", "energy", "loudness", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo", "time_signature"]

In [None]:
#b = ([(0.0,0.0688,0.237,-18.403,0.0408,0.64,0.977,0.099,0.0363,76.487,4.0)])

In [None]:
#a = (('50','0.369','0.598','-6.984','0.0304','0.005110','0.000000','0.0466','148.014','4'))

In [None]:
sample = spark.createDataFrame([(0.0, 0.0688, 0.237, -18.403, 0.0408, 0.64, 0.977, 0.099, 0.0363, 76.487, 4.0)], feature_cols)

In [None]:
sample = assembler.transform(sample)

In [None]:
predictions = rfModel.predict(sample)

In [None]:
predictions = rfModel.predict(sample)
predictions

In [None]:
predictions.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Define the features to be used in the model
feature_cols = ["popularity", "danceability", "energy", "loudness", "speechiness", 
                "acousticness", "instrumentalness", "liveness", "valence", "tempo", 
                "time_signature"]

# Define the label column
label_col = "label"

# Create a VectorAssembler to convert the features into a vector format
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features4", handleInvalid = "skip")
data = data.drop("track_genre")
data = assembler.transform(data)

# Split the data into training and test sets
(training_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

# Train a random forest model using the training dataset
rf = RandomForestClassifier(numTrees=10, maxDepth=5, labelCol=label_col, featuresCol="features4")
model = rf.fit(training_data)

# Create a sample to test
sample = spark.createDataFrame([(0.0, 0.0688, 0.237, -18.403, 0.0408, 0.64, 0.977, 0.099, 0.0363, 76.487, 4.0)], feature_cols)

# Convert the sample into a vector format
sample = assembler.transform(sample)

# Use the model to make predictions on the sample
predictions = rf.transform(sample)

# View the predictions
predictions.show()


In [None]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

# Create a sample track as a Row object
sample_track = Row(track_id='sample_id', danceability=0.8, energy=0.9, key=2, loudness=-6.0, mode=1, 
                   speechiness=0.1, acousticness=0.2,instrumentalness=0.0, liveness=0.7, valence=0.8, tempo=120.0)

# Create a DataFrame from the sample track
sample_df = spark.createDataFrame([sample_track])

# Assemble the features into a vector
assembler = VectorAssembler(inputCols=['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 
                                       'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo'],
                            outputCol='features')

sample_df = assembler.transform(sample_df)

# Scale the features
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures', withStd=True, withMean=False)
sample_df = scaler.fit(sample_df).transform(sample_df)

# Make a prediction using the trained model
predictions = rfModel.transform(sample_df)

# Print the predicted genre
predicted_genre = predictions.select('prediction').collect()[11][11]
print('Predicted genre:', predicted_genre)

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Select the relevant columns and rename them
data = data.select(col('valence').alias('label'), col('acousticness'), col('danceability'), col('energy'), col('instrumentalness'), col('loudness'), col('speechiness'), col('tempo'))

# Split the data into training and testing sets
(train_data, test_data) = data.randomSplit([0.7, 0.3], seed=123)

# Vectorize the features using a VectorAssembler
assembler = VectorAssembler(inputCols=['acousticness', 'danceability', 'energy', 'instrumentalness', 'loudness', 'speechiness', 'tempo'], outputCol='features')
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# Define the random forest model
rf = RandomForestClassifier(labelCol='label', featuresCol='features', numTrees=10, maxDepth=5)

# Train the model
model = rf.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

# Stop the SparkSession
spark.stop()
