In [1]:
# Step 1: Create bronze layer DB
spark.sql("""
CREATE DATABASE IF NOT EXISTS artist_bronze_db 
LOCATION 'gs://pq-bucket-spotify/pq.db';
""")


ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used


DataFrame[]

In [2]:
# Step 2: Use the Database (Set the current database context)
spark.sql("USE artist_bronze_db;")


DataFrame[]

In [3]:
# Step 3: Create a Temporary View from the Parquet file
data_path = 'gs://spotify-etl-12/processed_spotify_data_2025-03-22T13-54-14-250618.json'
df = spark.read.json(data_path)
df.createOrReplaceTempView("album_info")


                                                                                

In [4]:
spark.sql("""
CREATE TABLE IF NOT EXISTS album_info
USING PARQUET
AS SELECT * FROM album_info;
""")



DataFrame[]

In [12]:
from pyspark.sql.functions import col, explode_outer
from pyspark.sql.types import StructType, ArrayType

def flatten_df(df):
    complex_fields = dict([(field.name, field.dataType) 
                           for field in df.schema.fields 
                           if isinstance(field.dataType, (StructType, ArrayType))])
    
    while complex_fields:
        col_name, dtype = complex_fields.popitem()
        
        if isinstance(dtype, StructType):
            # Expand struct fields
            expanded = [col(f"{col_name}.{k}").alias(f"{col_name}_{k}") 
                        for k in dtype.names]
            df = df.select("*", *expanded).drop(col_name)
        
        elif isinstance(dtype, ArrayType):
            if isinstance(dtype.elementType, StructType):
                # Explode array of structs
                df = df.withColumn(col_name, explode_outer(col(col_name)))
                
                # After explosion, re-check if the elements are complex
                for field in df.select(col_name + ".*").schema.fields:
                    if isinstance(field.dataType, (StructType, ArrayType)):
                        complex_fields[f"{col_name}.{field.name}"] = field.dataType
            else:
                # Explode array of simple types
                df = df.withColumn(col_name, explode_outer(col(col_name)))

        # Refresh complex fields list after every loop
        complex_fields.update(dict([(field.name, field.dataType) 
                                    for field in df.schema.fields 
                                    if isinstance(field.dataType, (StructType, ArrayType))]))
    return df

In [13]:
flat_df = flatten_df(df)
flat_df.printSchema()
flat_df.show(truncate=False)

root
 |-- album_id: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- external_url: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- total_tracks: long (nullable = true)
 |-- tracks_duration_ms: long (nullable = true)
 |-- tracks_id: string (nullable = true)
 |-- tracks_name: string (nullable = true)
 |-- images_height: long (nullable = true)
 |-- images_url: string (nullable = true)
 |-- images_width: long (nullable = true)
 |-- artists_id: string (nullable = true)
 |-- artists_name: string (nullable = true)

+----------------------+----------+-----------------------------------------------------+------------+------------+------------------+----------------------+--------------------------------+-------------+----------------------------------------------------------------+------------+----------------------+------------+
|album_id              |album_name|external_url                                         |release_date|total_tracks|tra

In [6]:
spark.conf.set('temporaryGcsBucket', 'gs://bigquery-bucket-spotify')

In [14]:
flat_df.write. \
   mode('overwrite'). \
   format('bigquery'). \
   option('table', f'orbital-bee-454310-a5:albums_ds.ALBUM_INFO'). \
   save()

                                                                                