In [0]:
import json 
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
import time

In [0]:
artist_schema = StructType([
  StructField("ArtistID", StringType(), True),
  StructField("Name", StringType(), True),
  StructField("followers", IntegerType(), True)
])

genre_schema = StructType([
  StructField("GenreID", IntegerType(), True),
  StructField("Name", StringType(), True)
])

artist_genre_schema = StructType([
  StructField("ID", IntegerType(), True),
  StructField("ArtistID", StringType(), True),
  StructField("GenreID", IntegerType(), True)
])



df_artist = spark.createDataFrame([], schema = artist_schema)
df_genre = spark.createDataFrame([], schema = genre_schema)
df_artist_genre = spark.createDataFrame([], schema = artist_genre_schema)



file_path = "/mnt/raw/followed_artist_data/followed_artists_list.json"
data = spark.read.text(file_path)

artist_count = 0
genre_data = []

lines = data.rdd.map(lambda r:r.value).collect()

for line in lines:
    all_artists = json.loads(line)


for artist in all_artists['items']:
    artist_name = artist['name']
    artist_id = artist['id']
    artist_followers = int(artist['followers']['total'])


    artist_row = Row(ArtistID = artist_id, Name = artist_name, followers = artist_followers)
                     
    df_artist = df_artist.union(spark.createDataFrame([artist_row], schema=artist_schema))

    artist_genre = artist['genres']
    print(f"{artist_count}:{artist_name}")

    for genre_name in artist_genre:
        genre_data.append(genre_name)

distinct_genre_data = set(genre_data)


genre_id_mapping = {genre: i + 1 for i, genre in enumerate(distinct_genre_data)}

genre_rows = [Row(GenreID = genre_id , Name = genre) for genre, genre_id in genre_id_mapping.items()]

df_genre = df_genre.union(spark.createDataFrame(genre_rows, schema = genre_schema))

0:Khalid Khan
0:Color Music Choir
0:Ali Gatie
0:Prem Dhillon
0:Nusrat Fateh Ali Khan
0:Mohammed Rafi
0:Pankaj Udhas
0:R. D. Burman
0:Kumar Sanu
0:Alan Walker
0:DIVINE
0:Parmish Verma
0:Karan Aujla
0:Sidhu Moose Wala
0:Diljit Dosanjh
0:Imagine Dragons
0:Drake
0:Post Malone
0:Kishore Kumar


In [0]:
df_genre.show()
row_count = df_genre.count()
print(f"Number of rows in df_genre: {row_count}")

+-------+--------------------+
|GenreID|                Name|
+-------+--------------------+
|      1|                sufi|
|      2|         punjabi pop|
|      3|        desi hip hop|
|      4|    canadian hip hop|
|      5|         melodic rap|
|      6|      pakistani folk|
|      7|   classic bollywood|
|      8|         modern rock|
|      9|   pakistani hip hop|
|     10|    modern bollywood|
|     11|             pop rap|
|     12|       indian fusion|
|     13|classic pakistani...|
|     14|             qawwali|
|     15|               filmi|
|     16|             chutney|
|     17|        canadian pop|
|     18|                 rap|
|     19|              ghazal|
|     20|           desi trap|
+-------+--------------------+
only showing top 20 rows

Number of rows in df_genre: 28


In [0]:
max_artist_genre_id = df_artist_genre.agg({"ArtistID": "max"}).collect()[0][0] or 0
artist_genre_data = []

for line in lines:
    all_artists = json.loads(line)


for artist in all_artists['items']:
    artist_id = artist['id']
    artist_genre = artist['genres']


    for genre_name in artist_genre:
      genre_id = df_genre.filter(col("Name") == genre_name).select("GenreID").collect()[0][0]

      max_artist_genre_id += 1
      artist_genre_id = max_artist_genre_id
      artist_genre_data.append((artist_genre_id, artist_id, genre_id))




df_artist_genre = df_artist_genre.union(spark.createDataFrame(artist_genre_data, schema = artist_genre_schema))
df_artist_genre.show()
row_count = df_artist_genre.count()
print(f"Number of rows in df_artist_genre: {row_count}")

+---+--------------------+-------+
| ID|            ArtistID|GenreID|
+---+--------------------+-------+
|  1|4rTv3Ejc7hKMtmoBO...|      4|
|  2|6IP4VnqS1pOiQcPVP...|      2|
|  3|5HcunTidTUrOaf8V0...|     13|
|  4|5HcunTidTUrOaf8V0...|     15|
|  5|5HcunTidTUrOaf8V0...|     12|
|  6|5HcunTidTUrOaf8V0...|      6|
|  7|5HcunTidTUrOaf8V0...|     14|
|  8|5HcunTidTUrOaf8V0...|      1|
|  9|5HcunTidTUrOaf8V0...|     26|
| 10|0gXDpqwYNDODn7fB0...|      7|
| 11|0gXDpqwYNDODn7fB0...|     15|
| 12|4Qpbhxe0sO2zhvUVf...|      7|
| 13|4Qpbhxe0sO2zhvUVf...|     15|
| 14|4Qpbhxe0sO2zhvUVf...|     19|
| 15|2JSYASbWU5Y0fVpts...|     16|
| 16|2JSYASbWU5Y0fVpts...|      7|
| 17|2JSYASbWU5Y0fVpts...|     15|
| 18|4K6blSRoklNdpw4mz...|     15|
| 19|4K6blSRoklNdpw4mz...|     10|
| 20|7vk5e3vY1uw9plTHJ...|     28|
+---+--------------------+-------+
only showing top 20 rows

Number of rows in df_artist_genre: 47


In [0]:
df_genre.write.mode("overwrite").csv("/mnt/processed/processed-genre-data")
df_artist.write.mode("overwrite").csv("/mnt/processed/processed-artist-data")
df_artist_genre.write.mode("overwrite").csv("/mnt/processed/processed-artist-genre-data")



In [0]:
duplicates = df_artist_genre.subtract(df_artist_genre.dropDuplicates())
duplicates.show()

+---+--------+-------+
| ID|ArtistID|GenreID|
+---+--------+-------+
+---+--------+-------+

