In [27]:
from dotenv import load_dotenv
import os

# variable_loads
load_dotenv()

# Set AWS credentials dari file .env
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
hadoop_aws_jar = os.getenv('hadoop_aws_jar')
aws_sdk_jar = os.getenv('aws_sdk_jar')

In [38]:
# Cell 3: Create Spark Session with AWS S3 Configuration and JARs
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, mean,when

# Paths to the required JAR files
hadoop_aws_jar = "./hadoop-aws-3.3.1.jar"
aws_sdk_jar = "./aws-java-sdk-bundle-1.11.901.jar"

# Create Spark session with the JARs and AWS S3 configuration
spark = SparkSession.builder \
    .appName("Spotify Data Transformation") \
    .config("spark.driver.extraClassPath", f"{hadoop_aws_jar}:{aws_sdk_jar}") \
    .config("spark.executor.extraClassPath", f"{hadoop_aws_jar}:{aws_sdk_jar}") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.awsAccessKeyId", aws_access_key_id) \
    .config("spark.hadoop.fs.s3a.awsSecretAccessKey", aws_secret_access_key) \
    .getOrCreate()


In [29]:
input_path = "s3a://intermediary-bucket-house/spotify_songs/spotify_songs.csv"
df = spark.read.csv(input_path, header=True, inferSchema=True)
df.show(5)  # test if it connect to s3 or not

+--------------------+--------------------+----------------+----------------+--------------------+--------------------+------------------------+-------------+--------------------+--------------+-----------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+
|            track_id|          track_name|    track_artist|track_popularity|      track_album_id|    track_album_name|track_album_release_date|playlist_name|         playlist_id|playlist_genre|playlist_subgenre|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration_ms|
+--------------------+--------------------+----------------+----------------+--------------------+--------------------+------------------------+-------------+--------------------+--------------+-----------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+
|6f807x0im

In [30]:
# total row and dataset
print((df.count(), len(df.columns)))

(32833, 23)


In [31]:
# !pip3 install pandas

# total data in column
# import pandas as pd

df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 32833 entries, 0 to 32832
Data columns (total 23 columns):
 #   Column                    Non-Null Count  Dtype  
---  ------                    --------------  -----  
 0   track_id                  32833 non-null  object 
 1   track_name                32833 non-null  object 
 2   track_artist              32833 non-null  object 
 3   track_popularity          32833 non-null  int32  
 4   track_album_id            32833 non-null  object 
 5   track_album_name          32833 non-null  object 
 6   track_album_release_date  32833 non-null  object 
 7   playlist_name             32833 non-null  object 
 8   playlist_id               32833 non-null  object 
 9   playlist_genre            32833 non-null  object 
 10  playlist_subgenre         32833 non-null  object 
 11  danceability              32833 non-null  object 
 12  energy                    32833 non-null  object 
 13  key                       32833 non-null  object 
 14  loudne

In [32]:
# checking if data is empty or not
df.toPandas().isna().sum().sort_values(ascending=False)

track_id                    0
track_name                  0
track_artist                0
track_popularity            0
track_album_id              0
track_album_name            0
track_album_release_date    0
playlist_name               0
playlist_id                 0
playlist_genre              0
playlist_subgenre           0
danceability                0
energy                      0
key                         0
loudness                    0
mode                        0
speechiness                 0
acousticness                0
instrumentalness            0
liveness                    0
valence                     0
tempo                       0
duration_ms                 0
dtype: int64

In [33]:
# counting unique value
for column in df.columns:
    unique_count = df.select(column).distinct().count()
    print(f"Unique values in {column}: {unique_count}")

Unique values in track_id: 28356
Unique values in track_name: 23450
Unique values in track_artist: 10693
Unique values in track_popularity: 101
Unique values in track_album_id: 22545
Unique values in track_album_name: 19744
Unique values in track_album_release_date: 4531
Unique values in playlist_name: 451
Unique values in playlist_id: 479
Unique values in playlist_genre: 14
Unique values in playlist_subgenre: 27
Unique values in danceability: 827
Unique values in energy: 953
Unique values in key: 23
Unique values in loudness: 10223
Unique values in mode: 13
Unique values in speechiness: 1271
Unique values in acousticness: 3731
Unique values in instrumentalness: 4729
Unique values in liveness: 1629
Unique values in valence: 1364
Unique values in tempo: 17689
Unique values in duration_ms: 19790


In [76]:
# check if there are  duplicates or no

total_rows = df.count()
unique_rows = df.distinct().count()

duplicates = total_rows == unique_rows

print(f"Df doesn't contain duplicates: {duplicates}")

                                                                                

Df doesn't contain duplicates: True


                                                                                

In [13]:
# Because there is no such thing duplicates and no empy dataset is actually good

In [35]:
# check type data in dataframe
columns_types = df.dtypes

for column, dtype in columns_types:
    print(f"{column}: {dtype}")

track_id: string
track_name: string
track_artist: string
track_popularity: int
track_album_id: string
track_album_name: string
track_album_release_date: string
playlist_name: string
playlist_id: string
playlist_genre: string
playlist_subgenre: string
danceability: string
energy: string
key: string
loudness: string
mode: string
speechiness: double
acousticness: double
instrumentalness: double
liveness: double
valence: double
tempo: double
duration_ms: double


In [36]:
# transform track_album_release_date  into year and month
from pyspark.sql.functions import year,month
# year
df = df.withColumn('album_release_year', year(df['track_album_release_date']))
# # month
df = df.withColumn('album_release_month', month(df['track_album_release_date']))
# show df
df.select('album_release_year', 'album_release_month').show(5)

+------------------+-------------------+
|album_release_year|album_release_month|
+------------------+-------------------+
|              2019|                  6|
|              2019|                 12|
|              2019|                  7|
|              2019|                  7|
|              2019|                  3|
+------------------+-------------------+
only showing top 5 rows



In [39]:
# filling invalid value with 0
df = df.withColumn("album_release_year", when(col("album_release_year").cast("int").isNotNull(), col("album_release_year")).otherwise(0))

In [41]:
# invalid fformating fix
from pyspark.sql.functions import regexp_replace
df = df.withColumn("track_name", regexp_replace("track_name", '"', ''))

In [47]:
# invalid formatting fix
df = df.withColumn("track_album_name", regexp_replace("track_album_name", '"', ''))

In [51]:
# invalid formatting fix
df = df.withColumn("track_artist", regexp_replace("track_artist", '"', ''))

In [54]:
# Invalid digit, Value 'i', Pos 0, Type: Double
df = df.withColumn("danceability", when(col("danceability").cast("double").isNotNull(), col("danceability")).otherwise(None))

In [58]:
# invalid formatting fix
df = df.withColumn("playlist_name", regexp_replace("playlist_name", '"', ''))

In [70]:
# invalid formatting fix
df = df.withColumn("playlist_subgenre", regexp_replace("playlist_subgenre", '"', ''))

In [62]:
df.show(5)

+--------------------+--------------------+----------------+----------------+--------------------+--------------------+-------------+--------------------+--------------+-----------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+------------------+-------------------+
|            track_id|          track_name|    track_artist|track_popularity|      track_album_id|    track_album_name|playlist_name|         playlist_id|playlist_genre|playlist_subgenre|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration_ms|album_release_year|album_release_month|
+--------------------+--------------------+----------------+----------------+--------------------+--------------------+-------------+--------------------+--------------+-----------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+--------

In [73]:
df.select('danceability','energy','key','loudness','mode','speechiness','acousticness') \
  .show(5)

+------------+------+---+--------+----+-----------+------------+
|danceability|energy|key|loudness|mode|speechiness|acousticness|
+------------+------+---+--------+----+-----------+------------+
|       0.748| 0.916|  6|  -2.634|   1|     0.0583|       0.102|
|       0.726| 0.815| 11|  -4.969|   1|     0.0373|      0.0724|
|       0.675| 0.931|  1|  -3.432|   0|     0.0742|      0.0794|
|       0.718|  0.93|  7|  -3.778|   1|      0.102|      0.0287|
|        0.65| 0.833|  1|  -4.672|   1|     0.0359|      0.0803|
+------------+------+---+--------+----+-----------+------------+
only showing top 5 rows



In [71]:
# drop track_albume_release_date, track_id, track_album_id, playlist_id
df = df.drop('track_album_release_date','track_id','track_album_id','playlist_id')
df.printSchema()

root
 |-- track_name: string (nullable = true)
 |-- track_artist: string (nullable = true)
 |-- track_popularity: integer (nullable = true)
 |-- track_album_name: string (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- playlist_genre: string (nullable = true)
 |-- playlist_subgenre: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- album_release_year: integer (nullable = true)
 |-- album_release_month: integer (nullable = true)



In [69]:
# check type data in dataframe
columns_types = df.dtypes

for column, dtype in columns_types:
    print(f"{column}: {dtype}")

track_name: string
track_artist: string
track_popularity: int
track_album_name: string
playlist_name: string
playlist_genre: string
playlist_subgenre: string
danceability: string
energy: string
key: string
loudness: string
mode: string
speechiness: double
acousticness: double
instrumentalness: double
liveness: double
valence: double
tempo: double
duration_ms: double
album_release_year: int
album_release_month: int


In [78]:
df.toPandas().info()

                                                                                

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 32833 entries, 0 to 32832
Data columns (total 21 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   track_name           32833 non-null  object 
 1   track_artist         32833 non-null  object 
 2   track_popularity     32833 non-null  int32  
 3   track_album_name     32833 non-null  object 
 4   playlist_name        32833 non-null  object 
 5   playlist_genre       32833 non-null  object 
 6   playlist_subgenre    32833 non-null  object 
 7   danceability         32821 non-null  object 
 8   energy               32833 non-null  object 
 9   key                  32833 non-null  object 
 10  loudness             32833 non-null  object 
 11  mode                 32833 non-null  object 
 12  speechiness          32833 non-null  float64
 13  acousticness         32833 non-null  float64
 14  instrumentalness     32833 non-null  float64
 15  liveness             32833 non-null 

In [72]:
# upload new csv back to s3 using pyspark

df = df.coalesce(1)
output_path = "s3a://intermediary-bucket-house/spotify_songs_transformed"
df.write.csv(output_path, header=True, mode='overwrite')

24/07/13 11:32:18 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/07/13 11:32:18 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [39]:
# upload using pandas
import boto3

pandas_df = df.toPandas()

output_path = "spotify_songs_transformed.csv"
pandas_df.to_csv(output_path, index=False)

s3 = boto3.client('s3', region_name='ap-southeast-3')
bucket_name = 'intermediary-bucket-house'
s3_path = 'spotify_songs_transformed.csv'

s3.upload_file(output_path, bucket_name, s3_path)

                                                                                