In [0]:
import json
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
import time


In [0]:

main_start_time = time.time()

# Initialize Spark DataFrames
# Define the schema for the artist DataFrame
artist_schema = StructType([
    StructField("ArtistID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Followers", IntegerType(), True)
])

genre_schema = StructType([
    StructField("GenreID", IntegerType(), True),
    StructField('Name', StringType(), True)
])

artist_genre_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("ArtistID", StringType(), True),
    StructField("GenreID", IntegerType(), True)
])

df_artist = spark.createDataFrame([], schema=artist_schema)
df_genre = spark.createDataFrame([], schema = genre_schema)
df_artist_genre = spark.createDataFrame([], schema = artist_genre_schema)



# Read data from DBFS
file_path = '/mnt/spotifyetlprojectdl/raw/followed_artists_list.json'
data = spark.read.text(file_path)


# Calculate the max_genre_id based on the "Genre" DataFrame
max_genre_id = df_genre.agg({"GenreID": "max"}).collect()[0][0] or 0

artist_count = 0
genre_data = []


# Extract text content and collect the data
lines = data.rdd.map(lambda r: r.value).collect()
for line in lines:
    # Load JSON data from the line
    all_artists = json.loads(line)

    # Iterate over each artist in the loaded data
    for artist in all_artists['items']:
        artist_count += 1
        
        artist_name = artist['name']
        artist_id = artist['id']
        artist_followers = int(artist['followers']['total'])
        # Create a Spark DataFrame Row
        artist_row = Row(ArtistID=artist_id, Name=artist_name, Followers=artist_followers)

        # Append the Row to the artist DataFrame
        df_artist = df_artist.union(spark.createDataFrame([artist_row], schema=artist_schema))
        # Extract artist genres
        artist_genre = artist['genres']
        #unique_genres.update(artist_genre)
        print(f"{artist_count}:{artist_name}")
        
        # Process genres using Spark operations
        for genre_name in artist_genre:
            genre_data.append(genre_name)      
    
    
    print('-----------------------------------------------------------------------')

# Convert genre_data to a set to get distinct values
distinct_genre_data = set(genre_data)
# Create a mapping of genre names to GenreID
genre_id_mapping = {genre: i + 1 for i, genre in enumerate(distinct_genre_data)}
# Create a list of Row objects for the DataFrame
genre_rows = [Row(GenreID=genre_id, Name=genre) for genre, genre_id in genre_id_mapping.items()]
# Add the rows to the existing df_genre DataFrame
df_genre = df_genre.union(spark.createDataFrame(genre_rows, schema=genre_schema))

# Record the end time
main_end_time = time.time()
# Calculate the total runtime
total_run_time = main_end_time - main_start_time

# Convert the elapsed time to minutes and seconds
minutes, seconds = divmod(total_run_time, 60)
# Print the runtime
print(f"Total runtime: {int(minutes)} minutes and {round(seconds, 2)} seconds")



1:YG
2:Rygin King
3:Pablo YG
4:Nadia Rose
5:DJ Khaled
6:Mustard
7:TeeZandos
8:Big Sean
9:Patrice Roberts
10:Quada
11:Kid Cudi
12:Nicki Minaj
13:STARSET
14:Malie Donn
15:Chip
16:Destra
17:Farmer Nappy
18:Wiz Khalifa
19:Gucci Mane
20:Chief Keef
-----------------------------------------------------------------------
21:2 Chainz
22:Nessa Preppy
23:Megan Thee Stallion
24:Lady Leshurr
25:Squash
26:Lady Gaga
27:Nailah Blackman
28:P!nk
29:Future
30:21 Savage
31:The Weeknd
32:YNW Melly
33:KES the Band
34:Kevin Gates
35:Problem Child
36:Nadia Batson
37:Rich The Kid
38:Rick Ross
39:Anne-Marie
40:Panic! At The Disco
-----------------------------------------------------------------------
41:Meek Mill
42:Britney Spears
43:Dexta Daps
44:Jada Kingdom
45:Masicka
46:Prince Swanny
47:Alkaline
48:Vybz Kartel
49:Rico Nasty
50:Stormzy
51:Rytikal
52:Kendrick Lamar
53:Tory Lanez
54:Najeeriii
55:Skepta
56:450
57:Tommy Lee Sparta
58:Motto
59:Teejay
60:Ace Hood
---------------------------------------------------

In [0]:
df_genre.show()
row_count = df_genre.count()
print(f"Number of rows in df_genre: {row_count}")

+-------+------------------+
|GenreID|              Name|
+-------+------------------+
|      1|  birmingham grime|
|      2|         afroswing|
|      3|       future rock|
|      4|        piano rock|
|      5|              trap|
|      6|           art pop|
|      7|        trap queen|
|      8|   dirty south rap|
|      9|     barbadian pop|
|     10|       atl hip hop|
|     11|       florida rap|
|     12|    canadian metal|
|     13|            riddim|
|     14|       modern rock|
|     15|              rock|
|     16|instrumental grime|
|     17|      grenada soca|
|     18|        uk hip hop|
|     19|       melodic rap|
|     20|           pop rap|
+-------+------------------+
only showing top 20 rows

Number of rows in df_genre: 100


In [0]:
main_start_time = time.time()
artist_count = 0
# Calculate the max_artist_genre_id based on the "Artist_Genre" DataFram
max_artist_genre_id = df_artist_genre.agg({"ID": "max"}).collect()[0][0] or 0

artist_genre_data = []
for line in lines:
    # Load JSON data from the line
    all_artists = json.loads(line)

    # Iterate over each artist in the loaded data
    for artist in all_artists['items']:
        artist_count += 1
        # Extract artist genres
        artist_id = artist['id']
        artist_name = artist['name']
        artist_genre = artist['genres']
        
        print(f"{artist_count}:{artist_name}")
        print(f"Genres:{artist_genre}")
        print("Updating Genres...")
        print('-----------------------------------------------------------------------')
        # Process genres using Spark operations
        for genre_name in artist_genre:
            genre_id = df_genre.filter(col("Name") == genre_name).select("GenreID").collect()[0][0]
            #print(f"Genre ID: {genre_id} |  Genre: {genre_name}")
            
            max_artist_genre_id += 1
            artist_genre_id = max_artist_genre_id 
            artist_genre_data.append((artist_genre_id, artist_id, genre_id))


#artist_genre_df = spark.createDataFrame(artist_genre_data, schema=artist_genre_schema)
#df_artist_genre = df_artist_genre.union(artist_genre_df)
df_artist_genre = df_artist_genre.union(spark.createDataFrame(artist_genre_data, schema=artist_genre_schema))
df_artist_genre.show()
row_count = df_artist_genre.count()
print(f"Number of rows in df_artist_genre: {row_count}")



# Record the end time
main_end_time = time.time()
# Calculate the total runtime
total_run_time = main_end_time - main_start_time

# Convert the elapsed time to minutes and seconds
minutes, seconds = divmod(total_run_time, 60)
# Print the runtime
print(f"Total runtime: {int(minutes)} minutes and {round(seconds, 2)} seconds")

1:YG
Genres:['cali rap', 'hip hop', 'pop rap', 'rap', 'southern hip hop', 'trap']
Updating Genres...
-----------------------------------------------------------------------
2:Rygin King
Genres:['dancehall', 'jamaican dancehall', 'traphall']
Updating Genres...
-----------------------------------------------------------------------
3:Pablo YG
Genres:['dancehall']
Updating Genres...
-----------------------------------------------------------------------
4:Nadia Rose
Genres:[]
Updating Genres...
-----------------------------------------------------------------------
5:DJ Khaled
Genres:['hip hop', 'miami hip hop', 'pop rap', 'rap']
Updating Genres...
-----------------------------------------------------------------------
6:Mustard
Genres:['cali rap', 'pop rap', 'rap', 'southern hip hop', 'trap']
Updating Genres...
-----------------------------------------------------------------------
7:TeeZandos
Genres:[]
Updating Genres...
------------------------------------------------------------------

In [0]:
main_start_time = time.time()
# Save processed-genre-data to CSV
df_genre.write.mode("overwrite").csv("/mnt/spotifyetlprojectdl/processed/processed-genre-data")

# Save processed-artist-data to CSV
df_artist.write.mode("overwrite").csv("/mnt/spotifyetlprojectdl/processed/processed-artist-data")

# Save processed-artist-genre-data to CSV
df_artist_genre.write.mode("overwrite").csv("/mnt/spotifyetlprojectdl/processed/processed-artist-genre-data")

# Record the end time
main_end_time = time.time()
# Calculate the total runtime
total_run_time = main_end_time - main_start_time

# Convert the elapsed time to minutes and seconds
minutes, seconds = divmod(total_run_time, 60)
# Print the runtime
print(f"Total runtime: {int(minutes)} minutes and {round(seconds, 2)} seconds")

Total runtime: 0 minutes and 27.53 seconds


In [0]:
# Assuming df_artist_genre is your DataFrame
# Find and display all duplicate rows
duplicates = df_artist_genre.subtract(df_artist_genre.dropDuplicates())
duplicates.show()


+---+--------+-------+
| ID|ArtistID|GenreID|
+---+--------+-------+
+---+--------+-------+



In [0]:
duplicates = df_genre.subtract(df_genre.dropDuplicates())
duplicates.show()

+-------+----+
|GenreID|Name|
+-------+----+
+-------+----+

