In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BigDataProject').getOrCreate()

In [2]:
alldata = spark.read.csv("/FileStore/tables/musicdata.csv", header = True, inferSchema = True)
alldata.show()

In [3]:
alldata.printSchema()

In [4]:
#from pyspark.sql.types import IntegerType
#alldata = alldata.withColumn("duration", alldata["duration"].cast(IntegerType()))
#alldata = alldata.withColumn("track_comments", alldata["track_comments"].cast(IntegerType()))

In [5]:
alldata.columns

In [6]:
selecteddata = alldata.select(['duration',
 'track_comments',
 'track_instrumental',
 'track_listens',
 'track_favorites',
 'acousticness',
 'danceability',
 'energy',
 'instrumentalness',
 'liveness',
 'speechiness',
 'tempo',
 'valence', 
 'artist_discovery_rank',
 'artist_familiarity_rank',
 'artist_hotttnesss_rank',
 'song_currency_rank',
 'song_hotttnesss_rank',
 'artist_discovery',
 'artist_familiarity',
 'artist_hotttnesss',
 'song_currency',
 'song_hotttnesss'])

In [7]:
finaldata = selecteddata.na.drop()

In [8]:
finaldata.show()

In [9]:
#from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer
#genre_indexer = StringIndexer(inputCol = 'genre', outputCol = 'GenreIndex')
#genre_encoder = OneHotEncoder(inputCol = 'GenreIndex', outputCol = 'GenreVector')
#trackexplicit_indexer = StringIndexer(inputCol = 'track_explicit', outputCol = 'TrackExplicitIndex')
#trackexplicit_encoder = OneHotEncoder(inputCol = 'TrackExplicitIndex', outputCol = 'TrackExplicitVector')

In [10]:
finaldata.columns

In [11]:
assembler = VectorAssembler(inputCols = ['duration',
 'track_comments',
 'track_instrumental',
 'track_listens',
 'track_favorites',
 'acousticness',
 'danceability',
 'energy',
 'instrumentalness',
 'liveness',
 'speechiness',
 'tempo',
 'valence',
 'artist_discovery_rank',
 'artist_familiarity_rank',
 'artist_hotttnesss_rank',
 'song_currency_rank',
 'song_hotttnesss_rank',
 'artist_discovery',
 'artist_familiarity',
 'artist_hotttnesss',
 'song_currency'], outputCol = 'features')

In [12]:
all_data_new = assembler.transform(finaldata)

In [13]:
all_data_new.select('features').show()

In [14]:
final_data = all_data_new.select(['song_hotttnesss', 'features'])

In [15]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [16]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol = 'song_hotttnesss', predictionCol = 'predicted_hottness')

In [17]:
lr_Model = lr.fit(train_data)

In [18]:
predicted_data = lr_Model.transform(test_data)

In [19]:
summary = lr_Model.evaluate(test_data)

In [20]:
summary.rootMeanSquaredError

In [21]:
summary.r2

In [22]:
lr_Model.coefficients

In [23]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [24]:
from pyspark.ml.feature import ChiSqSelector
selector = ChiSqSelector(featuresCol="features",
                         outputCol="selectedFeatures", labelCol="song_hotttnesss", selectorType='fpr')
selector_model = selector.fit(train_data)
result = selector_model.transform(train_data)
print(selector.getFpr())
result.show()

In [25]:
train_data = result.select('song_hotttnesss', 'selectedFeatures')
train_data = train_data.withColumnRenamed('selectedFeatures', 'features')
train_data.show()

In [26]:
test_data = selector_model.transform(test_data)
test_data = test_data.select('song_hotttnesss', 'selectedFeatures')
test_data = test_data.withColumnRenamed('selectedFeatures', 'features')
test_data.show()

In [27]:
from pyspark.ml.regression import LinearRegression
lr_selected = LinearRegression(featuresCol = 'features', labelCol = 'song_hotttnesss', predictionCol = 'predicted_song_hotness')

In [28]:
lr_selected_Model = lr_selected.fit(train_data)

In [29]:
predicted_data_2 = lr_selected_Model.transform(test_data)

In [30]:
predicted_data_2.show()

In [31]:
summary_2 = lr_selected_Model.evaluate(test_data)

In [32]:
summary_2.r2

In [33]:
summary_2.rootMeanSquaredError

In [34]:
lr_selected_Model.coefficients