In [1]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))


#LIBRARIES
import matplotlib.pyplot as plt
from collections import OrderedDict
import seaborn as sns

import pandas as pd

import simpy

from plotnine import *
import plotly.graph_objects as go
import numpy as np


# PYSPARK 
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession 
from pyspark.conf import SparkConf
from pyspark.sql.functions import *
from pyspark.mllib.stat import Statistics
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType, StringType
from pyspark.sql.window import Window

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .master("local[4]") \
    .config("spark.driver.maxResultSize", "8g") \
    .getOrCreate()

In [3]:
artist_df = spark.read.option("header", "true").csv("../data/artists.csv")

In [4]:
tracks_df = spark.read.option("header", "true").csv("../data/tracks.csv")

In [10]:
artist_df.show(10)
artist_df.printSchema()
artist_df.count()

+--------------------+---------+------+--------------------+----------+
|                  id|followers|genres|                name|popularity|
+--------------------+---------+------+--------------------+----------+
|0DheY5irMjBUeLybb...|      0.0|    []|Armid & Amir Zare...|         0|
|0DlhY15l3wsrnlfGi...|      5.0|    []|         ปูนา ภาวิณี|         0|
|0DmRESX2JknGPQyO1...|      0.0|    []|               Sadaa|         0|
|0DmhnbHjm1qw6NCYP...|      0.0|    []|           Tra'gruda|         0|
|0Dn11fWM7vHQ3rinv...|      2.0|    []|Ioannis Panoutsop...|         0|
|0DotfDlYMGqkbzfBh...|      7.0|    []|       Astral Affect|         0|
|0DqP3bOCiC48L8SM9...|      1.0|    []|           Yung Seed|         0|
|0Drs3maQb99iRglyT...|      0.0|    []|               Wi'Ma|         0|
|0DsPeAi1gxPPnYjgp...|      0.0|    []|             lentboy|         0|
|0DtvnTxgZ9K5YaPS5...|     20.0|    []|            addworks|         0|
+--------------------+---------+------+--------------------+----

1104349

In [9]:
tracks_df.show(10)
tracks_df.printSchema()
tracks_df.count()

+--------------------+--------------------+----------+-----------+--------+-------------------+--------------------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+
|                  id|                name|popularity|duration_ms|explicit|            artists|          id_artists|release_date|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|
+--------------------+--------------------+----------+-----------+--------+-------------------+--------------------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+
|35iwgR4jXetI318WE...|               Carve|         6|     126903|       0|            ['Uli']|['45tIt06XoI0Iio4...|  1922-02-22|       0.645| 0.445|  0| -13.338|   1|      0.451|       0.674|           0.744|   0.151|  0.127|104.851|             3|


586672

# Data Cleaning

In [None]:
tracks_df.select("explicit").distinct().show(10)

In [None]:
#missing values
tracks_df.select([count(when(isnan(c), c)).alias(c) for c in tracks_df.columns]).show()
artist_df.select([count(when(isnan(c), c)).alias(c) for c in artist_df.columns]).show()

In [None]:
#null values
tracks_df.select([count(when(col(c).isNull(), c)).alias(c) for c in tracks_df.columns]).show()
artist_df.select([count(when(col(c).isNull(), c)).alias(c) for c in artist_df.columns]).show()

### Cast data type

In [None]:
artist_df = artist_df.withColumn("followers", artist_df.followers.cast(IntegerType())) \
         .withColumn("popularity", artist_df.popularity.cast(IntegerType()))

In [None]:
artist_df.printSchema()

In [None]:
tracks_df= tracks_df.withColumn("duration_ms", tracks_df.duration_ms.cast(IntegerType())) \
         .withColumn("popularity", tracks_df.popularity.cast(IntegerType())) \
         .withColumn("explicit", tracks_df.explicit.cast(IntegerType())) \
         .withColumn("release_date", tracks_df.release_date.cast(DateType())) \
         .withColumn("danceability", tracks_df.danceability.cast(FloatType())) \
         .withColumn("energy", tracks_df.energy.cast(FloatType())) \
         .withColumn("key", tracks_df.key.cast(IntegerType())) \
         .withColumn("loudness", tracks_df.loudness.cast(FloatType())) \
         .withColumn("mode", tracks_df.mode.cast(IntegerType())) \
         .withColumn("speechiness", tracks_df.speechiness.cast(FloatType())) \
         .withColumn("acousticness", tracks_df.acousticness.cast(FloatType())) \
         .withColumn("instrumentalness", tracks_df.instrumentalness.cast(FloatType())) \
         .withColumn("liveness", tracks_df.liveness.cast(FloatType())) \
         .withColumn("valence", tracks_df.valence.cast(FloatType())) \
         .withColumn("tempo", tracks_df.tempo.cast(FloatType())) \
         .withColumn("time_signature", tracks_df.time_signature.cast(IntegerType()))  

tracks_df.printSchema()

In [None]:
#null values
tracks_df_1.select([count(when(col(c).isNull(), c)).alias(c) for c in tracks_df_1.columns]).show()
artist_df_1.select([count(when(col(c).isNull(), c)).alias(c) for c in artist_df_1.columns]).show()

In [None]:
tracks_df_2 = tracks_df_1.filter(col("release_date").isNotNull())
artist_df_2 = artist_df_1.withColumn('popularity', coalesce(artist_df_1['popularity'], lit(0))) \
                         .withColumn('followers', coalesce(artist_df_1['followers'], lit(0)))


In [None]:
#null values
tracks_df_2.select([count(when(col(c).isNull(), c)).alias(c) for c in tracks_df_2.columns]).show()
artist_df_2.select([count(when(col(c).isNull(), c)).alias(c) for c in artist_df_2.columns]).show()

In [None]:
tracks_df_2.count()

In [None]:
artist_df_2.count()

In [None]:
import datetime

tracks_df = tracks_df.withColumn('age',datediff(current_date(), tracks_df.release_date)/365)

# Data Integration

In [None]:
artist_df = artist_df.withColumn(
    "genres",
    split(regexp_replace(col("genres"), r"(^\[)|(\]$)|(')", ""), ", ")
)

In [None]:
tracks_df_wk0= tracks_df.withColumn(
    "id_artists",
    split(regexp_replace(col("id_artists"), r"(^\[)|(\]$)|(')", ""), ", ")
)
tracks_df_wk0

In [None]:
windowSpec = Window.partitionBy("id_track")

In [None]:
tracks_df_wk1 = tracks_df_wk0.select(col("id").alias("id_track"), "duration_ms", col("popularity").alias("popularity_track"),"explicit", explode(tracks_df_wk0.id_artists).alias("id_artist"),"release_date","danceability","energy","key","loudness","mode", "speechiness","acousticness","instrumentalness","liveness","valence","tempo","time_signature","age")



tracks_df_wk2 = tracks_df_wk1.join(artist_df, tracks_df_wk1.id_artist==artist_df.id,"left") \
           .withColumn("sum_artist_followers",sum(col("followers")).over(windowSpec)) \
           .withColumn("sum_artist_popularity",sum(col("popularity")).over(windowSpec)) \
           .withColumn("avg_artist_followers",avg(col("followers")).over(windowSpec)) \
           .withColumn("avg_artist_popularity",avg(col("popularity")).over(windowSpec)) \
           .withColumn("collect_list_genres", collect_list("genres").over(windowSpec)) \
           .withColumn("collect_list_genres", flatten(col("collect_list_genres"))) \
           .withColumn("collect_list_genres", array_distinct("collect_list_genres")) \
           .withColumn("genres", array_remove("collect_list_genres", "")) \
           .drop("collect_list_genres") \
           .select("id_track", "popularity_track",  "duration_ms", "genres", "release_date","danceability","energy","key","loudness","mode", "speechiness","acousticness","instrumentalness","liveness","valence","tempo","time_signature", "sum_artist_followers", "sum_artist_popularity", "avg_artist_followers", "avg_artist_popularity","age").distinct()



In [None]:
tracks_df_wk2.select("genres").distinct().show(10, truncate=False)

In [None]:
tracks_df_wk2.show()

In [None]:
tracks_df_wk2.printSchema()

In [None]:
tracks_df_wk2.select([count(when(col(c).isNull(), c)).alias(c) for c in tracks_df_wk2.columns]).show()

In [None]:
df = tracks_df_wk2.withColumn('sum_artist_followers', coalesce(tracks_df_wk2['sum_artist_followers'], lit(0))) \
                  .withColumn('sum_artist_popularity', coalesce(tracks_df_wk2['sum_artist_popularity'], lit(0)))\
                  .withColumn('avg_artist_followers', coalesce(tracks_df_wk2['avg_artist_followers'], lit(0)))\
                  .withColumn('avg_artist_popularity', coalesce(tracks_df_wk2['avg_artist_popularity'], lit(0)))

In [None]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in tracks_df_wk2.columns]).show()

In [None]:
df.count()

In [None]:

df = df.na.drop(how='any')
df.count()

# Correlation

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

In [None]:
from pyspark.sql import functions as f

### Normalization

In [None]:
columns_to_scale = ["popularity_track",  "duration_ms", "danceability","energy", "loudness", "speechiness","acousticness","instrumentalness","liveness","valence","tempo","time_signature", "sum_artist_followers", "sum_artist_popularity","age"]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(df)
enriched_df = scalerModel.transform(df)

In [None]:

names = {x + "_scaled": x for x in columns_to_scale}
scaledData = enriched_df.select([f.col(c).alias(names[c]) for c in names.keys()])

In [None]:
scaledData.show()

In [None]:
scaledData.printSchema()

CORRELATION

In [None]:
from pyspark.ml.stat import Correlation

In [None]:
import pandas as pd

In [None]:
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=scaledData.columns, outputCol=vector_col)
df_vector = assembler.transform(scaledData).select(vector_col)


matrix = Correlation.corr(df_vector, vector_col)
corrmatrix = matrix.collect()[0]["pearson({})".format(vector_col)].values

In [None]:
pd.DataFrame(corrmatrix.reshape(-1, len(scaledData.columns)), columns=scaledData.columns, index=scaledData.columns)

# Correlation filtered dataframe

In [None]:
from pyspark.sql.functions import col
df1=df.where(col('popularity_track')>40)

In [None]:
dates = ("2015-01-01",  "2021-01-01")
df2=df1.where(col('release_date').between(*dates))

In [None]:
df2.count()

In [None]:
columns_to_scale = ["popularity_track",  "duration_ms", "danceability","energy", "loudness", "speechiness","acousticness","instrumentalness","liveness","valence","tempo","time_signature", "sum_artist_followers", "sum_artist_popularity","age"]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(df2)
enriched_df = scalerModel.transform(df2)

In [None]:
names = {x + "_scaled": x for x in columns_to_scale}
scaledData1 = enriched_df.select([f.col(c).alias(names[c]) for c in names.keys()])

In [None]:
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=scaledData1.columns, outputCol=vector_col)
df_vector = assembler.transform(scaledData1).select(vector_col)


matrix = Correlation.corr(df_vector, vector_col)
corrmatrix = matrix.collect()[0]["pearson({})".format(vector_col)].values

In [None]:
pd.DataFrame(corrmatrix.reshape(-1, len(scaledData1.columns)), columns=scaledData1.columns, index=scaledData1.columns)

In [None]:
df.show()

In [12]:
artist_df.write.json('../data/cleanedDataset')

Py4JJavaError: An error occurred while calling o58.json.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:853)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:332)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:402)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:375)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
	... 33 more


In [11]:
artist_df.write.parquet("../data/cleanedDataset.parquet")

Py4JJavaError: An error occurred while calling o55.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:874)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:332)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:402)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:375)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
	... 33 more


In [None]:
import os

In [None]:
os.environ["HADOOP_HOME"] = "C:\\hadoop-3.3.1"

In [None]:
os.environ

In [None]:
artist_df.