In [None]:
import json
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

In [None]:
artist_schema = StructType([
    StructField("ArtistID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Followers", IntegerType(), True)
])

genre_schema = StructType([
    StructField("GenreID", IntegerType(), True),
    StructField('Name', StringType(), True)
])

artist_genre_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("ArtistID", StringType(), True),
    StructField("GenreID", IntegerType(), True)
])

df_artist = spark.createDataFrame([], schema=artist_schema)
df_genre = spark.createDataFrame([], schema = genre_schema)
df_artist_genre = spark.createDataFrame([], schema = artist_genre_schema)



In [None]:
# Read data from DBFS
file_path = "/mnt/rawdata/followed_artists_data/followed_artists.json"
data = spark.read.text(file_path)

In [None]:
max_genre_id = df_genre.agg({"GenreID": "max"}).collect()[0][0] or 0

artist_count = 0
genre_data = []


# Extract text content and collect the data
lines = data.rdd.map(lambda r: r.value).collect()
for line in lines:
    # Load JSON data from the line
    all_artists = json.loads(line)

    # Iterate over each artist in the loaded data
    for artist in all_artists['items']:
        artist_count += 1
        
        artist_name = artist['name']
        artist_id = artist['id']
        artist_followers = int(artist['followers']['total'])
        # Create a Spark DataFrame Row
        artist_row = Row(ArtistID=artist_id, Name=artist_name, Followers=artist_followers)

        # Append the Row to the artist DataFrame
        df_artist = df_artist.union(spark.createDataFrame([artist_row], schema=artist_schema))
        # Extract artist genres
        artist_genre = artist['genres']
        #unique_genres.update(artist_genre)
        print(f"{artist_count}:{artist_name}")
        
        # Process genres using Spark operations
        for genre_name in artist_genre:
            genre_data.append(genre_name)      
    
    
    print('-----------------------------------------------------------------------')

1:Maroon 5
2:Taylor Swift
3:Selena Gomez
4:Shakira
5:Ellie Goulding
6:Aimer
7:LiSA
8:SUGA
9:Nicki Minaj
10:Wiz Khalifa
11:David Guetta
12:Maria Becerra
13:Olivia Rodrigo
14:Future
15:AURORA
16:The Weeknd
17:Christina Aguilera
18:Justin Bieber
19:Anne-Marie
20:Post Malone
21:Halsey
22:Oasis
23:Stray Kids
24:The Kid LAROI
25:Ice Spice
26:BTS
27:TheFatRat
28:Drake
29:Michael Jackson
30:Fossils
31:Disney
32:NiziU
33:BLACKPINK
34:milet
35:Ericovich
36:Fall Out Boy
37:Luis Fonsi
38:Daddy Yankee
39:Arijit Singh
40:Adam Levine
41:Adele
42:Mariah Carey
43:Cardi B
44:Camila Cabello
45:Ava Max
46:Gracie Abrams
47:Glass Animals
48:Imagine Dragons
49:DJ Snake
50:OneRepublic
-----------------------------------------------------------------------


In [None]:

distinct_genre_data = set(genre_data)
genre_id_mapping = {genre: i + 1 for i, genre in enumerate(distinct_genre_data)}
genre_rows = [Row(GenreID=genre_id, Name=genre) for genre, genre_id in genre_id_mapping.items()]
df_genre = df_genre.union(spark.createDataFrame(genre_rows, schema=genre_schema))


In [None]:
df_genre.show()
row_count = df_genre.count()
print(f"Number of rows in df_genre: {row_count}")

+-------+------------------+
|GenreID|              Name|
+-------+------------------+
|      1|         gauze pop|
|      2|         j-poprock|
|      3|australian hip hop|
|      4|        beatlesque|
|      5|    queens hip hop|
|      6|     latin hip hop|
|      7|        piano rock|
|      8|               edm|
|      9|  modern bollywood|
|     10|      british soul|
|     11|              trap|
|     12|  deep talent show|
|     13|             k-pop|
|     14|             j-pop|
|     15|        pov: indie|
|     16|     colombian pop|
|     17|       trap latino|
|     18|         pop dance|
|     19|           traprun|
|     20|          pop punk|
+-------+------------------+
only showing top 20 rows

Number of rows in df_genre: 73


In [None]:
artist_count = 0
max_artist_genre_id = df_artist_genre.agg({"ID": "max"}).collect()[0][0] or 0

artist_genre_data = []
for line in lines:
    all_artists = json.loads(line)
    
    for artist in all_artists['items']:
        artist_count += 1
        # Extract artist genres
        artist_id = artist['id']
        artist_name = artist['name']
        artist_genre = artist['genres']
        
        print(f"{artist_count}:{artist_name}")
        print(f"Genres:{artist_genre}")
        print("Updating Genres...")
        print('-----------------------------------------------------------------------')
        # Process genres
        for genre_name in artist_genre:
            genre_id = df_genre.filter(col("Name") == genre_name).select("GenreID").collect()[0][0]
            max_artist_genre_id += 1
            artist_genre_id = max_artist_genre_id 
            artist_genre_data.append((artist_genre_id, artist_id, genre_id))

df_artist_genre = df_artist_genre.union(spark.createDataFrame(artist_genre_data, schema=artist_genre_schema))
df_artist_genre.show()
row_count = df_artist_genre.count()
print(f"Number of rows in df_artist_genre: {row_count}")

1:Maroon 5
Genres:['pop']
Updating Genres...
-----------------------------------------------------------------------
2:Taylor Swift
Genres:['pop']
Updating Genres...
-----------------------------------------------------------------------
3:Selena Gomez
Genres:['pop', 'post-teen pop']
Updating Genres...
-----------------------------------------------------------------------
4:Shakira
Genres:['colombian pop', 'dance pop', 'latin pop', 'pop']
Updating Genres...
-----------------------------------------------------------------------
5:Ellie Goulding
Genres:['indietronica', 'metropopolis', 'pop', 'uk pop']
Updating Genres...
-----------------------------------------------------------------------
6:Aimer
Genres:['anime', 'anime rock', 'j-pixie', 'j-pop', 'j-poprock']
Updating Genres...
-----------------------------------------------------------------------
7:LiSA
Genres:['anime', 'anime rock', 'j-pixie', 'j-pop']
Updating Genres...
------------------------------------------------------------

In [None]:
# Save processed-genre-data to CSV
df_genre.write.mode("overwrite").csv("/mnt/processed-data/processed-genre-data")

In [None]:
# Save processed-artists-data to CSV
df_artist.write.mode("overwrite").csv("/mnt/processed-data/processed-artists-data")

In [None]:
# Save processed-artists_genre-data to CSV
df_artist_genre.write.mode("overwrite").csv("/mnt/processed-data/processed-artists_genre-data")

In [None]:
#check for duplicates 
duplicates = df_artist_genre.subtract(df_artist_genre.dropDuplicates())
duplicates.show()


+---+--------+-------+
| ID|ArtistID|GenreID|
+---+--------+-------+
+---+--------+-------+



In [None]:

duplicates = df_genre.subtract(df_genre.dropDuplicates())
duplicates.show()

+-------+----+
|GenreID|Name|
+-------+----+
+-------+----+



In [None]:
duplicates = df_artist.subtract(df_artist.dropDuplicates())
duplicates.show()

+--------+----+---------+
|ArtistID|Name|Followers|
+--------+----+---------+
+--------+----+---------+

