The purpose of this project is to analyze K-pop album sales data and build a predictive model that explains and forecasts sales performance. By cleaning the dataset, exploring key factors such as chart position, release timing, and cumulative sales, and then applying a linear regression model, the project aims to identify which variables most strongly influence album success and to evaluate how accurately future sales can be predicted.

In [0]:
path = "/FileStore/tables/Kpop_4th_gen_Sales___Sheet1-1.csv"
df = spark.read.csv(path, header=True, inferSchema=True)
df.printSchema()
display(df)

root
 |-- Artist: string (nullable = true)
 |-- title: string (nullable = true)
 |-- date: date (nullable = true)
 |-- country: string (nullable = true)
 |-- sales: string (nullable = true)
 |-- peak_chart: integer (nullable = true)



Artist,title,date,country,sales,peak_chart
IZONE,Bloom*iz,2020-02-17,KOR,492504.0,2.0
IZONE,Bloom*iz,2020-02-17,JPN,23960.0,3.0
IZONE,TWELVE,2020-10-21,JPN,135113.0,1.0
IZONE,Color*Iz,2018-10-29,KOR,282816.0,2.0
IZONE,Color*Iz,2018-10-29,JPN,48734.0,1.0
IZONE,Heart*Iz,2020-04-01,KOR,306812.0,1.0
IZONE,Heart*Iz,2020-04-01,JPN,40065.0,4.0
IZONE,Oneiric Diary,2020-06-15,KOR,559138.0,2.0
IZONE,Oneiric Diary,2020-06-15,JPN,19928.0,8.0
IZONE,One-reeler/Act IV,2020-12-07,KOR,447749.0,1.0


In [0]:
#dropping all the null values
df=df.dropna()

In [0]:
from pyspark.sql.functions import sum, col
#this is to change the sales column to integer so we can perform calculations on it
df = df.withColumn('sales', col('sales').cast('integer'))
df.printSchema()

root
 |-- Artist: string (nullable = true)
 |-- title: string (nullable = true)
 |-- date: date (nullable = true)
 |-- country: string (nullable = true)
 |-- sales: integer (nullable = true)
 |-- peak_chart: integer (nullable = true)



In [0]:
	
from pyspark.sql.functions import sum, avg, max, count
#Here we are looking at the with the most number of songs
display(df.groupBy("Artist").count())

#if you see the pie chart you can see that most of the songs were produced by Stray kids. The second artist who produced the most songs is Ateez with 15.2% of the songs. Cravity produced the least number of songs with only producing 2.17% of them

Artist,count
IZONE,10
Ateez,14
Loona,9
TXT,9
Verivery,6
(G)I-dle,9
Treasure,5
Everglow,5
ITZY,6
Cravity,2


Databricks visualization. Run in Databricks to view.

In [0]:
display(df.groupBy("Country").count())
#This shows which country has the most artists
# The bar plot shoes most of the songs were produced un korea as this is where K-pop was invented. The least number of songs were produced in China with only 1 song being produced there.

Country,count
KOR,56
CHN,1
JPN,35


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import sum, col
display(df.groupBy('artist').agg(sum('sales').alias('total_sales')) \
.orderBy(col('total_sales').desc()))
#this groupby command and aggregate function helps us calculate the total sales of all the artists

artist,total_sales
IZONE,2308085
Stray Kids,1916965
TXT,1692632
Ateez,1588505
Treasure,1112672
(G)I-dle,787686
ITZY,560073
Loona,357581
Verivery,212097
Cravity,161664


In [0]:
from pyspark.sql.functions import when
# I used the when function to rate the songs of each artist bases on their sales
df=df.withColumn("sales_success", \
                when(df["sales"] < 50000, "low").\
                when(df["sales"] < 100000, "moderate").\
                when(df["sales"] < 500000, "high").\
                otherwise("very high"))

display(df)

Artist,title,date,country,sales,peak_chart,sales_success
IZONE,Bloom*iz,2020-02-17,KOR,492504.0,2,high
IZONE,Bloom*iz,2020-02-17,JPN,23960.0,3,low
IZONE,TWELVE,2020-10-21,JPN,135113.0,1,high
IZONE,Color*Iz,2018-10-29,KOR,282816.0,2,high
IZONE,Color*Iz,2018-10-29,JPN,,1,very high
IZONE,Heart*Iz,2020-04-01,KOR,306812.0,1,high
IZONE,Heart*Iz,2020-04-01,JPN,40065.0,4,low
IZONE,Oneiric Diary,2020-06-15,KOR,559138.0,2,very high
IZONE,Oneiric Diary,2020-06-15,JPN,19928.0,8,low
IZONE,One-reeler/Act IV,2020-12-07,KOR,447749.0,1,high


In [0]:
from pyspark.sql.functions import when
# I have used when function to create a columns is_chart_topper so we can get the top rated songs. I have used the isin function to create a range of values.Anything within that range will be considered True and the rest will be considered false
display(df.withColumn('is_chart_topper', when(col('peak_chart').isin([1, 2, 3]), True).otherwise(False)))

Artist,title,date,country,sales,peak_chart,sales_success,is_chart_topper
IZONE,Bloom*iz,2020-02-17,KOR,492504.0,2,high,True
IZONE,Bloom*iz,2020-02-17,JPN,23960.0,3,low,True
IZONE,TWELVE,2020-10-21,JPN,135113.0,1,high,True
IZONE,Color*Iz,2018-10-29,KOR,282816.0,2,high,True
IZONE,Color*Iz,2018-10-29,JPN,,1,very high,True
IZONE,Heart*Iz,2020-04-01,KOR,306812.0,1,high,True
IZONE,Heart*Iz,2020-04-01,JPN,40065.0,4,low,False
IZONE,Oneiric Diary,2020-06-15,KOR,559138.0,2,very high,True
IZONE,Oneiric Diary,2020-06-15,JPN,19928.0,8,low,False
IZONE,One-reeler/Act IV,2020-12-07,KOR,447749.0,1,high,True


In [0]:
from pyspark.sql.functions import when
#i have used when function to create a column is_chart_topper whic
display(df.withColumn('is_chart_topper', when(col('peak_chart').isin([1, 2, 3]), True).otherwise(False)))

Artist,title,date,country,sales,peak_chart,sales_success,is_chart_topper
IZONE,Bloom*iz,2020-02-17,KOR,492504.0,2,high,True
IZONE,Bloom*iz,2020-02-17,JPN,23960.0,3,low,True
IZONE,TWELVE,2020-10-21,JPN,135113.0,1,high,True
IZONE,Color*Iz,2018-10-29,KOR,282816.0,2,high,True
IZONE,Color*Iz,2018-10-29,JPN,,1,very high,True
IZONE,Heart*Iz,2020-04-01,KOR,306812.0,1,high,True
IZONE,Heart*Iz,2020-04-01,JPN,40065.0,4,low,False
IZONE,Oneiric Diary,2020-06-15,KOR,559138.0,2,very high,True
IZONE,Oneiric Diary,2020-06-15,JPN,19928.0,8,low,False
IZONE,One-reeler/Act IV,2020-12-07,KOR,447749.0,1,high,True


In [0]:
#I did calculated the average sales of all the artist by grouping them with sales and using the average function
display(df.groupBy("Artist").agg(avg("sales").alias("avg_sales")).orderBy(col("avg_sales").desc()))

#I have used a barplot to visaulize the average sales of an artist and accoring to the plot ITZY has the highest average sales with it being more than 250k and he is shortly followed by Treasure which has around 220k. The lowest average sale is of EVERGLOW

Artist,avg_sales
IZONE,256453.88888888888
Treasure,222534.4
TXT,188070.22222222225
Ateez,113464.64285714286
Stray Kids,112762.64705882352
ITZY,93345.5
(G)I-dle,87520.66666666667
Cravity,80832.0
Loona,39731.22222222222
Verivery,35349.5


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import year, to_date
# i have used the year function to extract the year from the date and then i used filtering to get the songs which were released between 2019 and 2020
df = df.withColumn('release_year', year(df['date']))
df_2019_2020=df.filter((df['release_year'] >= 2019) & (df['release_year'] <= 2020))
display(df_2019_2020)
#here i used groupby to get the artists and the number of songs they produced in these years
display(df_2019_2020.groupBy("artist").count())
#I have created a pie chart which shows that Ateez and stray kids were the singers that produced the most number of songs between the years 2019 and 2020 and again Cravity had the least number of songs

Artist,title,date,country,sales,peak_chart,sales_success,release_year
IZONE,Bloom*iz,2020-02-17,KOR,492504,2,high,2020
IZONE,Bloom*iz,2020-02-17,JPN,23960,3,low,2020
IZONE,TWELVE,2020-10-21,JPN,135113,1,high,2020
IZONE,Heart*Iz,2020-04-01,KOR,306812,1,high,2020
IZONE,Heart*Iz,2020-04-01,JPN,40065,4,low,2020
IZONE,Oneiric Diary,2020-06-15,KOR,559138,2,very high,2020
IZONE,Oneiric Diary,2020-06-15,JPN,19928,8,low,2020
IZONE,One-reeler/Act IV,2020-12-07,KOR,447749,1,high,2020
ITZY,It'z Icy,2019-05-29,KOR,149349,3,high,2019
ITZY,It'z Icy,2019-05-29,JPN,15742,12,low,2019


artist,count
IZONE,8
Ateez,10
Loona,3
TXT,8
Verivery,5
(G)I-dle,7
Treasure,3
Everglow,5
ITZY,6
Cravity,2


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import to_date, current_date, datediff
#I have used current_date and datediff to get the number of days by subtracting date from the current date
df = df.withColumn('days_since_release', datediff(current_date(), df['date']))
display(df)

Artist,title,date,country,sales,peak_chart,sales_success,release_year,days_since_release
IZONE,Bloom*iz,2020-02-17,KOR,492504.0,2,high,2020,2052
IZONE,Bloom*iz,2020-02-17,JPN,23960.0,3,low,2020,2052
IZONE,TWELVE,2020-10-21,JPN,135113.0,1,high,2020,1805
IZONE,Color*Iz,2018-10-29,KOR,282816.0,2,high,2018,2528
IZONE,Color*Iz,2018-10-29,JPN,,1,very high,2018,2528
IZONE,Heart*Iz,2020-04-01,KOR,306812.0,1,high,2020,2008
IZONE,Heart*Iz,2020-04-01,JPN,40065.0,4,low,2020,2008
IZONE,Oneiric Diary,2020-06-15,KOR,559138.0,2,very high,2020,1933
IZONE,Oneiric Diary,2020-06-15,JPN,19928.0,8,low,2020,1933
IZONE,One-reeler/Act IV,2020-12-07,KOR,447749.0,1,high,2020,1758


In [0]:
from pyspark.sql.functions import cume_dist
from pyspark.sql.window import Window
# i created a windowspec which i partioned by artist and country and ordered by sales
windowSpec = Window.partitionBy('Artist', 'country').orderBy('sales')
#i then used the window spec to calculate the cumulative distribution of the artists
df=df.withColumn('cumulative_sales_dist', cume_dist().over(windowSpec))
display(df)

Artist,title,date,country,sales,peak_chart,sales_success,release_year,days_since_release,cumulative_sales_dist
(G)I-dle,Dumdi Dumdi,2020-08-03,CHN,182749.0,0,high,2020,1884,1.0
(G)I-dle,I Trust,2020-04-06,JPN,1380.0,48,low,2020,2003,0.3333333333333333
(G)I-dle,Oh My God,2020-08-26,JPN,4229.0,23,low,2020,1861,0.6666666666666666
(G)I-dle,Latata,2019-07-29,JPN,8229.0,5,low,2019,2255,1.0
(G)I-dle,I Made,2019-02-26,KOR,51041.0,2,moderate,2019,2408,0.2
(G)I-dle,I Am,2018-05-02,KOR,51831.0,6,moderate,2018,2708,0.4
(G)I-dle,Dumdi Dumdi,2020-08-03,KOR,125801.0,2,high,2020,1884,0.6
(G)I-dle,I Trust,2020-04-06,KOR,154499.0,1,high,2020,2003,0.8
(G)I-dle,I Burn,2021-01-11,KOR,207927.0,3,high,2021,1723,1.0
Ateez,Treasure Epilogue: Action to Answer,2020-01-06,JPN,1472.0,31,low,2020,2094,0.1428571428571428


In [0]:
windowSpec = Window.partitionBy("artist").orderBy("date").rowsBetween(Window.unboundedPreceding, 0)

# Calculate the running total of sales for each artist over time
df = df.withColumn("running_total_sales", sum("sales").over(windowSpec))
display(df)

Artist,title,date,country,sales,peak_chart,sales_success,release_year,days_since_release,cumulative_sales_dist,running_total_sales
(G)I-dle,I Am,2018-05-02,KOR,51831.0,6,moderate,2018,2708,0.4,51831.0
(G)I-dle,I Made,2019-02-26,KOR,51041.0,2,moderate,2019,2408,0.2,102872.0
(G)I-dle,Latata,2019-07-29,JPN,8229.0,5,low,2019,2255,1.0,111101.0
(G)I-dle,I Trust,2020-04-06,JPN,1380.0,48,low,2020,2003,0.3333333333333333,112481.0
(G)I-dle,I Trust,2020-04-06,KOR,154499.0,1,high,2020,2003,0.8,266980.0
(G)I-dle,Dumdi Dumdi,2020-08-03,CHN,182749.0,0,high,2020,1884,1.0,449729.0
(G)I-dle,Dumdi Dumdi,2020-08-03,KOR,125801.0,2,high,2020,1884,0.6,575530.0
(G)I-dle,Oh My God,2020-08-26,JPN,4229.0,23,low,2020,1861,0.6666666666666666,579759.0
(G)I-dle,I Burn,2021-01-11,KOR,207927.0,3,high,2021,1723,1.0,787686.0
Ateez,Treasure EP.1: All to Zero,2018-10-24,KOR,85109.0,7,moderate,2018,2533,0.1428571428571428,85109.0


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, round

def sales_per_day_udf(x, y):
  #x is the number of sales and y is the days since the albums were released
    # Check if either x or y is None
    if x is None or y is None or y <= 0: # if would return zero if there were no sales or the album was just released
        return 0
    else:
        return float(x) / float(y)
    
sales_per_day = udf(lambda x,y: sales_per_day_udf(x,y),FloatType()) 
df = df.withColumn("sales_per_day", round(sales_per_day(df["sales"], df["days_since_release"])))
display(df)


Artist,title,date,country,sales,peak_chart,sales_success,release_year,days_since_release,cumulative_sales_dist,running_total_sales,sales_per_day
(G)I-dle,I Am,2018-05-02,KOR,51831.0,6,moderate,2018,2708,0.4,51831.0,19.0
(G)I-dle,I Made,2019-02-26,KOR,51041.0,2,moderate,2019,2408,0.2,102872.0,21.0
(G)I-dle,Latata,2019-07-29,JPN,8229.0,5,low,2019,2255,1.0,111101.0,4.0
(G)I-dle,I Trust,2020-04-06,JPN,1380.0,48,low,2020,2003,0.3333333333333333,112481.0,1.0
(G)I-dle,I Trust,2020-04-06,KOR,154499.0,1,high,2020,2003,0.8,266980.0,77.0
(G)I-dle,Dumdi Dumdi,2020-08-03,CHN,182749.0,0,high,2020,1884,1.0,449729.0,97.0
(G)I-dle,Dumdi Dumdi,2020-08-03,KOR,125801.0,2,high,2020,1884,0.6,575530.0,67.0
(G)I-dle,Oh My God,2020-08-26,JPN,4229.0,23,low,2020,1861,0.6666666666666666,579759.0,2.0
(G)I-dle,I Burn,2021-01-11,KOR,207927.0,3,high,2021,1723,1.0,787686.0,121.0
Ateez,Treasure EP.1: All to Zero,2018-10-24,KOR,85109.0,7,moderate,2018,2533,0.1428571428571428,85109.0,34.0


This code trains a linear regression model to predict album sales. It first cleans the data, then combines key factors like chart position, days since release, and total sales into features. The data is split into training and testing sets, the model is trained on the training set, and performance is checked using R-squared and error scores. Finally, it tests the model on new data to see how well it predicts sales.

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

df_cleaned = df.dropna(subset=["sales"])
assembler = VectorAssembler(
    inputCols=["peak_chart", "days_since_release", "cumulative_sales_dist", "running_total_sales"], 
    outputCol="features",
    handleInvalid="skip"
)
df_features = assembler.transform(df_cleaned)

(trainData, testData) = df_features.randomSplit([0.7, 0.3])
lr = LinearRegression(featuresCol="features", labelCol="sales")
lrModel = lr.fit(trainData)

trainingSummary = lrModel.summary
print(f"R-squared: {trainingSummary.r2}")
print(f"Root Mean Squared Error: {trainingSummary.rootMeanSquaredError}")
print(f"Mean Squared Error: {trainingSummary.meanSquaredError}")

predictions = lrModel.transform(testData)
evaluator = RegressionEvaluator(labelCol="sales", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Test RMSE: {rmse}")

R-squared: 0.40971901451637105
Root Mean Squared Error: 106837.52332629781
Mean Squared Error: 11414256390.49723
Test RMSE: 103853.47773406806
