In [2]:
from pymongo import MongoClient, database, collection
from pymongo.errors import ConnectionFailure, OperationFailure
from contextlib import contextmanager

""" Context manager for mongoDB connection. """
@contextmanager
def mongoDB_client(username: str, password: str, 
                    host: str = 'mongo', port: str = 27017):
    #set path
    path = f"mongodb://{username}:{password}@{host}:{port}"
    client = None

    #init
    try:
        print("Starting connect mongoDB...")
        client = MongoClient(path)
        
        print("Client connected successfully!")
        yield client

    #handle error
    except ConnectionFailure:
        print("Connection to mongoDB failed!")

    except OperationFailure:
        print("Operation failed!")

    #close client
    finally:
        client.close()
        print("The connection to MongoDB has stopped!")

""" Class mongoDB for operations. """
class mongoDB_operations:
    """ Init """
    def __init__(self, client: MongoClient):
        #check params
        if not isinstance(client, MongoClient):
            raise TypeError('client must be MongoClient!')
        
        #set value for class attrs
        self.client = client

    """ Check whether the database exists. """
    def check_database_exists(self, database_name: str) -> bool:
        #list database name
        return database_name in self.client.list_database_names()

    """ Check whether collection exists. """
    def check_collection_exists(self, database_obj: database.Database, collection: str) -> bool:
        #check params
        if not isinstance(database_obj, database.Database):
            raise TypeError("database_obj must be a database.Database!")
        
        #list collection name
        return collection in self.client[database_obj.name].list_collection_names()

    """ Create new database. """
    def create_database_if_not_exists(self, database_name: str) -> database.Database:
        #check whether database exists
        if self.check_database_exists(database_name):
            print(f"Don't create the database '{database_name}' because it already exists.")
        else:
            print(f"Successfully created database '{database_name}'.")

        #return database
        return self.client[database_name]
    
    """ Create new collection. """
    def create_collection_if_not_exists(self, database_obj: database.Database, collection: str) -> collection.Collection:
        #check params
        if not isinstance(database_obj, database.Database):
            raise TypeError("database_obj must be a database.Database!")
        
        #check whether collection exists
        if self.check_collection_exists(database_obj, collection):
            print(f"Don't create the collection '{collection}' because it already exists.")
        else:
            print(f"Successfully created collection '{collection}'.")

        #return collection
        return self.client[database_obj.name][collection]
    
    """ Insert data """
    def insert_data(self, collection_obj: collection.Collection, data: list[dict]):
        #check params
        if not isinstance(data, list) or not all(isinstance(item, dict) for item in data):
            raise TypeError("data must be a list of dictionaries!")
        
        if not isinstance(collection_obj, collection.Collection):
            raise TypeError("collection_obj must be a collection.Collection!")
        
        #insert data
        collection_obj.insert_many(data)

        print(f"Successfully inserted data into collection '{collection_obj.name}'.")

In [24]:
import pandas as pd

""" Convert data to dictionaries. """
def get_dict_data(csv_path) -> pd.DataFrame:
    df = pd.read_csv(csv_path)

    df = df.to_dict(orient = 'records')

    return df

def load_mongodb_artist(artist_path: str = '/opt/data/Artist.csv'):
    #use mongoDB client
    with mongoDB_client(username = 'huynhthuan', password = 'password') as client:
        client = mongoDB_operations(client)
        #create artist database
        client_artist_database = client.create_database_if_not_exists(database_name= 'artist_database')

        #create artist collection
        client_artist_collection = client.create_collection_if_not_exists(database_obj = client_artist_database, 
                                                                          collection = 'artist_collection')

        #get data
        data = get_dict_data(artist_path)    

        #insert artist data
        client_artist_insert = client.insert_data(collection_obj = client_artist_collection, data = data)

load_mongodb_artist()

Starting connect mongoDB...
Client connected successfully!
Don't create the database 'artist_database' because it already exists.
Don't create the collection 'artist_collection' because it already exists.
Successfully inserted data into collection 'artist_collection'.
The connection to MongoDB has stopped!


In [21]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DateType, FloatType, BooleanType

""" Function for getting schemas. """
def get_schema(table_name: str) -> StructType:
    """ Artist schema. """
    artist_schema = [StructField('Artist_ID',    StringType(), True),
                     StructField('Artist_Name',  StringType(), True),
                     StructField('Genres',       ArrayType(StringType(), True), True),
                     StructField('Followers',    IntegerType(), True),
                     StructField('Popularity',   IntegerType(), True),
                     StructField('Artist_Image', StringType(), True),
                     StructField('Artist_Type',  StringType(), True),
                     StructField('External_Url', StringType(), True),
                     StructField('Href',         StringType(), True),
                     StructField('Artist_Uri',   StringType(), True),
                     StructField('idx',          IntegerType(), True)]
    #applying struct type
    artist_schema = StructType(artist_schema)
    
    """ Album schema. """
    album_schema = [StructField('Artist',               StringType(), True),
                    StructField('Artist_ID',            StringType(), True),
                    StructField('Album_ID',             StringType(), True),
                    StructField('Name',                 StringType(), True),
                    StructField('Type',                 StringType(), True),
                    StructField('Genres',               ArrayType(StringType(), True), True),
                    StructField('Label',                StringType(), True),
                    StructField('Popularity',           StringType(), True),
                    StructField('Available_Markets',    StringType(), True),
                    StructField('Release_Date',         DateType(), True),
                    StructField('ReleaseDatePrecision', StringType(), True),
                    StructField('TotalTracks',          IntegerType(), True),
                    StructField('Copyrights',           StringType(), True),
                    StructField('Restrictions',         StringType(), True),
                    StructField('External_URL',         StringType(), True),
                    StructField('Href',                 StringType(), True),
                    StructField('Image',                StringType(), True),
                    StructField('Uri',                  StringType(), True),
                    StructField('idx',                  IntegerType(), True)]
    #Applying struct type
    album_schema = StructType(album_schema)

    """ Track schema. """
    track_schema = [StructField("Artists",           StringType(), True),
                    StructField("Album_ID",         StringType(), True),
                    StructField("Album_Name",       StringType(), True),
                    StructField("Track_ID",         StringType(), True),
                    StructField("Name",             StringType(), True),
                    StructField("Track_Number",     IntegerType(), True),
                    StructField("Type",             StringType(), True),
                    StructField("AvailableMarkets", StringType(), True),
                    StructField("Disc_Number",      StringType(), True),
                    StructField("Duration_ms",      IntegerType(), True),
                    StructField("Explicit",         StringType(), True),
                    StructField("External_urls",    StringType(), True),
                    StructField("Href",             StringType(), True),
                    StructField("Restrictions",     StringType(), True),
                    StructField("Preview_url",      StringType(), True),
                    StructField("Uri",              StringType(), True),
                    StructField("Is_Local",         StringType(), True),
                    StructField('idx',              IntegerType(), True)]
    #Applying struct type
    track_schema = StructType(track_schema)
    
    """ TrackFeature schema. """
    trackfeature_schema = [StructField("Track_ID",         StringType(), True),
                           StructField("Danceability",     FloatType(), True),
                           StructField("Energy",           FloatType(), True),
                           StructField("Key",              IntegerType(), True),
                           StructField("Loudness",         FloatType(), True),
                           StructField("Mode",             IntegerType(), True),
                           StructField("Speechiness",      FloatType(), True),
                           StructField("Acousticness",     FloatType(), True),
                           StructField("Instrumentalness", FloatType(), True),
                           StructField("Liveness",         FloatType(), True),
                           StructField("Valence",          FloatType(), True),
                           StructField("Tempo",            FloatType(), True),
                           StructField("Time_signature",   IntegerType(), True),
                           StructField("Track_href",       StringType(), True),
                           StructField("Type_Feature",     StringType(), True),
                           StructField("Analysis_Url",     StringType(), True),
                           StructField('idx',              IntegerType(), True)]
    #Applying struct type
    trackfeature_schema = StructType(trackfeature_schema)

    #mapping
    mapping = {
        'artist': artist_schema,
        'album': album_schema,
        'track': track_schema,
        'trackfeature': trackfeature_schema
    }
    
    #return schema
    return mapping[table_name]

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pymongo import MongoClient
from pyspark import SparkConf
from contextlib import contextmanager
import pyspark.sql

""" Context manager for creating Spark Session. """
@contextmanager
def get_sparkSession(appName: str, master: str = 'local'):
    #declare sparkconf
    conf = SparkConf()

    #set config
    conf = conf.setAppName(appName) \
               .setMaster(master) \
               .set("spark.executor.memory", "4g") \
               .set("spark.executor.cores", "2") \
               .set("spark.sql.shuffle.partitions", "4") \
               .set("spark.sql.legacy.timeParserPolicy", "LEGACY") \
               .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.0")
    
    #               .set("spark.executor.instances", "2") \
    #create Spark Session
    spark = SparkSession.builder.config(conf = conf).getOrCreate()

    print(f"Successfully created Spark Session with app name: {appName} and master: {master}!")

    #yield spark
    try:
        yield spark

    finally:
        #must stop Spark Session
        spark.stop()
        print("Successfully stopped Spark Session!")


""" Read data from mongoDB. """
def read_mongoDB(spark: SparkSession, database_name: str, collection_name: str, chunk_params: list = None,
                 schema: StructType = None, username: str = 'huynhthuan', password: str = 'password', 
                 host: str = 'mongo', port: str = 27017) -> pyspark.sql.DataFrame:
    
    #check params
    if not isinstance(spark, SparkSession):
        raise TypeError("spark must be a SparkSession!")
    
    if chunk_params is not None and not isinstance(chunk_params, list):
        raise TypeError("chunk_params must be a dict!")
    
    if schema is not None and not isinstance(schema, StructType):
        raise TypeError("schema must be a StructType!")
    
    #uri mongoDB 
    uri = f"mongodb://{username}:{password}@{host}:{port}/{database_name}.{collection_name}?authSource=admin"

    print(f"Starting to read data from database '{database_name}' and collection '{collection_name}'...")
  
    #read data
    try:
        data = spark.read.format('mongodb') \
                         .option("spark.mongodb.read.connection.uri", uri) \
                         .option('header', 'true')
        
        data = data.schema(schema).load() if schema is not None else data.load()

        return data 
    
    except Exception:
        print("An error occurred while reading data from mongoDB.")


""" Read data from HDFS. """
def read_HDFS(spark: SparkSession, HDFS_dir: str, file_type: str) -> pyspark.sql.DataFrame:
    #check params
    if not isinstance(spark, SparkSession):
        raise TypeError("spark must be a SparkSession!")
    
    #set HDFS path
    HDFS_path = f"hdfs://namenode:9000/datalake/{HDFS_dir}"

    print(f"Starting to read data from {HDFS_path}...")

    #read data
    try:
        data = spark.read.format(file_type).option('header', 'true').load(HDFS_path)
        #return data
        return data
    
    except Exception:
        print("An error occurred while reading data from HDFS.")


""" Write data into HDFS. """
def write_HDFS(spark: SparkSession, data: pyspark.sql.DataFrame, direct: str, file_type: str):
    #check params
    if not isinstance(spark, SparkSession):
        raise TypeError("spark must be a SparkSession!")
    
    if not isinstance(data, pyspark.sql.DataFrame):
        raise TypeError("data must be a DataFrame!")

    #set HDFS path  
    HDFS_path = f"hdfs://namenode:9000/datalake/{direct}"
    table_name = direct.split('/')[-1]

    print(f"Starting to upload '{table_name}' into {HDFS_path}...")
    
    #write data
    try:
        data.write.format(file_type) \
                  .option('header', 'true') \
                  .mode('append') \
                  .save(HDFS_path)
        
        print(f"Successfully uploaded '{table_name}' into HDFS.")

    except Exception:
        print("An error occurred while upload data into HDFS!")

In [27]:

""" Load all csv files into mongoDB."""
if __name__ == "__main__":
    with get_sparkSession(appName = "init_load") as spark:
        #uri
        uri_artist_name = "mongodb://huynhthuan:password@mongo:27017/music_database.artist_name_collection?authSource=admin"
        uri_artist = "mongodb://huynhthuan:password@mongo:27017/music_database.artist_collection?authSource=admin"
        uri_album = "mongodb://huynhthuan:password@mongo:27017/music_database.album_collection?authSource=admin"
        uri_track = "mongodb://huynhthuan:password@mongo:27017/music_database.track_collection?authSource=admin"
        uri_trackfeature = "mongodb://huynhthuan:password@mongo:27017/music_database.trackfeature_collection?authSource=admin"

        # read
        df_ArtistName = spark.read.option('header', 'true').csv("/opt/data/ArtistName.csv")
        df_Artist = spark.read.option('header', 'true').csv("/opt/data/Artist.csv")
        df_Album = spark.read.option('header', 'true').csv("/opt/data/Album.csv")
        df_Track = spark.read.option('header', 'true').csv("/opt/data/Track.csv")
        df_TrackFeature = spark.read.option('header', 'true').csv("/opt/data/TrackFeature.csv")


        #write
        df_ArtistName.write.format('mongoDB') \
                           .option("spark.mongodb.write.connection.uri", uri_artist_name) \
                           .mode("overwrite") \
                           .save()
        
        df_Artist.write.format('mongoDB') \
                       .option("spark.mongodb.write.connection.uri", uri_artist) \
                       .mode("overwrite") \
                       .save()
        
        df_Album.write.format('mongoDB') \
                       .option("spark.mongodb.write.connection.uri", uri_album) \
                       .mode("overwrite") \
                       .save()
        
        df_Track.write.format('mongoDB') \
                      .option("spark.mongodb.write.connection.uri", uri_track) \
                      .mode("overwrite") \
                      .save()
        
        df_TrackFeature.write.format('mongoDB') \
                             .option("spark.mongodb.write.connection.uri", uri_trackfeature) \
                             .mode("overwrite") \
                             .save()

Successfully created Spark Session with app name: init_load and master: local!


24/11/14 05:09:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/14 05:09:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/14 05:09:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/14 05:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/14 05:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/14 05:10:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/14 0

Successfully stopped Spark Session!


In [None]:
from pyspark.sql.functions import split, col, get_json_object, to_date, regexp_replace, length

""" Applying schemas and loading data from MongoDB into HDFS."""
def bronze_task():
    #get spark Session
    with get_sparkSession(appName = 'Bronze_task') as spark:
        """------------------------ BRONZE ARTIST ------------------------"""
        artist_data = read_mongoDB(spark, database_name = 'music_database', collection_name = 'artist_collection')
        print("Starting bronze preprocessing for artist data...")
        #preprocessing before loading data
        try:
            artist_data = artist_data.withColumn('Genres', split(col('Genres'), ",")) \
                                    .withColumn('Followers', col('Followers').cast('int')) \
                                    .withColumn('Popularity', col('Popularity').cast('int')) \
                                    .withColumn('External_Url', get_json_object(col('External_Url'),'$.spotify')) \
            #reorder columns after reading 
            artist_data = artist_data.select('Artist_ID', 'Artist_Name', 'Genres', 
                                            'Followers', 'Popularity', 'Artist_Image', 
                                            'Artist_Type', 'External_Url', 'Href', 'Artist_Uri')
            #applying schema        
            artist_data = spark.createDataFrame(artist_data.rdd, schema = get_schema('artist'))

            print("Finished bronze preprocessing for artist data.")

            #upload data into HDFS
            write_HDFS(spark, data = artist_data, direct = 'bronze_data/bronze_artist', file_type = 'parquet')
        except Exception as e:
            print(f"An error occurred while preprocessing bronze data: {e}")


        """------------------------ BRONE ALBUM ------------------------"""
        album_data = read_mongoDB(spark, database_name = 'music_database', collection_name = 'album_collection')
        print("Starting bronze preprocessing for album data...")
        try:
            album_data = album_data.withColumn('Popularity', col('Popularity').cast('int')) \
                                .withColumn('Release_Date', to_date('Release_Date', "MM/dd/yyyy")) \
                                .withColumn('TotalTracks', col('TotalTracks').cast('int'))
            #reorder columns after reading
            album_data = album_data.select('Artist', 'Artist_ID', 'Album_ID', 'Name', 'Type', 'Genres', 
                                        'Label', 'Popularity', 'Available_Markets', 'Release_Date', 
                                        'ReleaseDatePrecision', 'TotalTracks', 'Copyrights', 'Restrictions', 
                                        'External_URL', 'Href', 'Image', 'Uri')
            album_data = spark.createDataFrame(album_data.rdd, schema = get_schema('album'))
            print("Finished bronze preprocessing for album data.")
            #upload data into HDFS
            write_HDFS(spark, data = album_data, direct = 'bronze_data/bronze_album', file_type = 'parquet')
        except Exception as e:
            print(f"An error occurred while preprocessing bronze data: {e}")


        """------------------------ BRONZE TRACK -------------------------"""
        track_data = read_mongoDB(spark, database_name = 'music_database', collection_name = 'track_collection', 
                                  schema = get_schema('track'))
        #upload data into HDFS
        write_HDFS(spark, data = track_data, direct = 'bronze_data/bronze_track', file_type = 'parquet')


        """------------------------ BRONZE TRACK FEATURE ------------------------"""
        track_feature_data = read_mongoDB(spark, database_name = 'music_database', collection_name = 'trackfeature_collection', 
                                          schema = get_schema('trackfeature'))
        #upload data into HDFS
        write_HDFS(spark, data = track_feature_data, direct = 'bronze_data/bronze_track_feature', file_type = 'parquet')


if __name__ == "__main__":
    print("------------------------------- Bronze task starts! -------------------------------")
    bronze_task()
    print("------------------------------ Bronze task finished! -------------------------------")

-------------------------------Bronze task starts!-------------------------------
Successfully created Spark Session with app name: Bronze_task and master: local!
Starting to read data from database 'music_database' and collection 'track_collection'...
Starting to upload 'bronze_track' into hdfs://namenode:9000/datalake/bronze_data/bronze_track...


                                                                                

Successfully uploaded 'bronze_track' into HDFS.
250001 500000
Starting to read data from database 'music_database' and collection 'track_collection'...


                                                                                

Starting to upload 'bronze_track' into hdfs://namenode:9000/datalake/bronze_data/bronze_track...


                                                                                

Successfully uploaded 'bronze_track' into HDFS.
500001 750000
Starting to read data from database 'music_database' and collection 'track_collection'...


                                                                                

Starting to upload 'bronze_track' into hdfs://namenode:9000/datalake/bronze_data/bronze_track...


                                                                                

Successfully uploaded 'bronze_track' into HDFS.
750001 1000000
Starting to read data from database 'music_database' and collection 'track_collection'...


                                                                                

Starting to upload 'bronze_track' into hdfs://namenode:9000/datalake/bronze_data/bronze_track...


                                                                                

Successfully uploaded 'bronze_track' into HDFS.
1000001 1250000
Starting to read data from database 'music_database' and collection 'track_collection'...


                                                                                

Starting to upload 'bronze_track' into hdfs://namenode:9000/datalake/bronze_data/bronze_track...


                                                                                

Successfully uploaded 'bronze_track' into HDFS.
1250001 1500000
Starting to read data from database 'music_database' and collection 'track_collection'...
Successfully stopped Spark Session!


Py4JJavaError: An error occurred while calling o2234.isEmpty.
: com.mongodb.spark.sql.connector.exceptions.MongoSparkException: Partitioning failed. Partitioner calling collStats command failed
	at com.mongodb.spark.sql.connector.read.MongoInputPartitionHelper.generateMongoBatchPartitions(MongoInputPartitionHelper.java:79)
	at com.mongodb.spark.sql.connector.read.MongoBatch.planInputPartitions(MongoBatch.java:52)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions$lzycompute(BatchScanExec.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions(BatchScanExec.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:173)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:172)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.supportsColumnar(BatchScanExec.scala:36)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:149)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:476)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:162)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:162)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:175)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
	at org.apache.spark.sql.Dataset.isEmpty(Dataset.scala:622)
	at jdk.internal.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.mongodb.spark.sql.connector.exceptions.MongoSparkException: Partitioner calling collStats command failed
	at com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.storageStats(PartitionerHelper.java:119)
	at com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner.generatePartitions(SamplePartitioner.java:94)
	at com.mongodb.spark.sql.connector.read.MongoInputPartitionHelper.generateMongoBatchPartitions(MongoInputPartitionHelper.java:52)
	... 67 more
Caused by: com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=UNKNOWN, servers=[{address=mongo:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: mongo}, caused by {java.net.UnknownHostException: mongo}}]
	at com.mongodb.internal.connection.BaseCluster.createAndLogTimeoutException(BaseCluster.java:392)
	at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:148)
	at com.mongodb.internal.connection.SingleServerCluster.selectServer(SingleServerCluster.java:46)
	at com.mongodb.internal.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:108)
	at com.mongodb.client.internal.ClientSessionBinding.getConnectionSource(ClientSessionBinding.java:128)
	at com.mongodb.client.internal.ClientSessionBinding.getReadConnectionSource(ClientSessionBinding.java:92)
	at com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:141)
	at com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:122)
	at com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$4(SyncOperationHelper.java:186)
	at com.mongodb.internal.operation.SyncOperationHelper.lambda$decorateReadWithRetries$12(SyncOperationHelper.java:289)
	at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
	at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:191)
	at com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:173)
	at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:189)
	at com.mongodb.internal.operation.AggregateOperation.execute(AggregateOperation.java:153)
	at com.mongodb.internal.operation.AggregateOperation.execute(AggregateOperation.java:44)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:153)
	at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:130)
	at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:90)
	at com.mongodb.client.internal.MongoIterableImpl.first(MongoIterableImpl.java:101)
	at com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.lambda$storageStats$0(PartitionerHelper.java:109)
	at com.mongodb.spark.sql.connector.config.AbstractMongoConfig.withCollection(AbstractMongoConfig.java:210)
	at com.mongodb.spark.sql.connector.config.ReadConfig.withCollection(ReadConfig.java:47)
	at com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.storageStats(PartitionerHelper.java:105)
	... 69 more


In [25]:
import pyspark
from pyspark.sql.functions import explode_outer, ltrim

""" Create SilverLayer class to process data in the Silver layer. """
class SilverLayer:
    #init 
    def __init__(self, data: pyspark.sql.DataFrame, 
                 drop_columns: list = None, 
                 drop_null_columns: list = None,
                 fill_nulls_columns: dict = None,
                 duplicate_columns: list = None,
                 nested_columns: list = None,
                 rename_columns: dict = None,
                 ):
        
        #check valid params
        if data is not None and not isinstance(data, pyspark.sql.DataFrame):
            raise TypeError("data must be a DataFrame!")
        
        if drop_columns is not None and not isinstance(drop_columns, list):
            raise TypeError("drop_columns must be a list!")
        
        if drop_null_columns is not None and not isinstance(drop_null_columns, list):
            raise TypeError("drop_null_columns must be a list!")
        
        if fill_nulls_columns is not None and not isinstance(fill_nulls_columns, dict):
            raise TypeError("handle_nulls must be a dict!")
        
        if duplicate_columns is not None and not isinstance(duplicate_columns, list):
            raise TypeError("duplicate_columns must be a list!")
        
        if nested_columns is not None and not isinstance(nested_columns, list):
            raise TypeError("handle_nested must be a list!")
        
        if rename_columns is not None and not isinstance(rename_columns, dict):
            raise TypeError("rename_columns must be a dict!")
        """Initialize class attributes for data processing."""
        self._data = data
        self._drop_columns = drop_columns
        self._drop_null_columns = drop_null_columns
        self._fill_nulls_columns = fill_nulls_columns
        self._duplicate_columns = duplicate_columns
        self._nested_columns = nested_columns
        self._rename_columns = rename_columns


    """ Method to drop unnecessary columns. """
    def drop(self):
        self._data = self._data.drop(*self._drop_columns)

    
    """ Method to drop rows based on null values in each column. """
    def drop_null(self):
        self._data = self._data.dropna(subset = self._drop_null_columns, how = "all")

    
    """ Method to fill null values. """
    def fill_null(self):
        for column_list, value in self._fill_nulls_columns.items():
            self._data = self._data.fillna(value = value, subset = column_list)


    """ Method to rename columns. """
    def rename(self):
        for old_name, new_name in self._rename_columns.items():
            self._data = self._data.withColumnRenamed(old_name, new_name)

    """ Method to handle duplicates. """
    def handle_duplicate(self):
        self._data = self._data.dropDuplicates(self._duplicate_columns)

    """ Method to handle nested. """
    def handle_nested(self):
        for column in self._nested_columns:
            self._data = self._data.withColumn(column, explode_outer(column)) \
                                   .withColumn(column, ltrim(column))
    
    """ Main processing. """
    def process(self) -> pyspark.sql.DataFrame:
        #drop unnecessary columns
        if self._drop_columns:
            self.drop() 

        #drop rows contain null values for each col
        if self._drop_null_columns:
            self.drop_null()

        #fill null values
        if self._fill_nulls_columns:
            self.fill_null()
        
        #handle duplicate rows
        if self._duplicate_columns:
            self.handle_duplicate()

        #handle nested columns 
        if self._nested_columns:
            self.handle_nested()

        #rename columns
        if self._rename_columns:
            self.rename()

        return self._data

In [26]:
from pyspark.sql.functions import col, year
with get_sparkSession("silver_task") as spark:
    """ Silver artist data. """
    #read bronze artist data
    bronze_artist = read_HDFS(spark, HDFS_dir = "bronze_data/bronze_artist", file_type = 'parquet')
    #applying SilverLayer class 
    silver_artist = SilverLayer(data = bronze_artist, 
                                drop_columns       = ['Artist_Type', 'Href', 'Artist_Uri'],
                                drop_null_columns  = ['Artist_ID'], 
                                fill_nulls_columns = {'Followers': 0,
                                                      'Popularity': 0},
                                duplicate_columns = ['Artist_ID'],
                                nested_columns     = ['Genres'],
                                rename_columns     = {'Artist_ID': 'id',
                                                      'Artist_Name': 'name',
                                                      'Genres': 'genres',
                                                      'Followers': 'followers',
                                                      'Popularity': 'popularity',
                                                      'Artist_Image': 'link_image',
                                                      'External_Url': 'url'})
    #processing data
    print("Processing for 'silver_artist' ...")
    silver_artist = silver_artist.process()
    print("Finished processing for 'silver_artist'.")
    #load data into HDFS
    write_HDFS(spark, data = silver_artist, direct = "silver_data/silver_artist", file_type = 'parquet')
    

    """ Silver album data. """
    #read bronze album data
    bronze_album = read_HDFS(spark, HDFS_dir = 'bronze_data/bronze_album', file_type = 'parquet')
    #applying Silver Layer class
    silver_album = SilverLayer(data = bronze_album,
                               drop_columns = ['Genres', 'Available_Markets', 'Restrictions', 'Href','Uri'],
                               drop_null_columns = ['Album_ID'],
                               fill_nulls_columns = {'Popularity': 0,
                                                     'TotalTracks': 0},
                               duplicate_columns = ['Album_ID'],
                               rename_columns = {'Artist': 'artist',
                                                 'Artist_ID': 'artist_id',
                                                 'Album_ID': 'id',
                                                 'Name': 'name',
                                                 'Type': 'type',
                                                 'Label': 'label',
                                                 'Popularity': 'popularity',
                                                 'Release_Date': 'release_date',
                                                 'ReleaseDatePrecision': 'release_date_precision',
                                                 'TotalTracks': 'total_tracks',
                                                 'Copyrights': 'copyrights',
                                                 'External_URL': 'url',
                                                 'Image': 'link_image'})
    #processing data
    print("Processing for 'silver_album' ...")
    silver_album = silver_album.process()
    print("Finished processing for 'silver_album'.")
    #load data into HDFS
    write_HDFS(spark, data = silver_album, direct = 'silver_data/silver_album', file_type = 'parquet')


    """ Silver track data. """
    #read bronze track data
    bronze_track = read_HDFS(spark, HDFS_dir = 'bronze_data/bronze_track', file_type = 'parquet')
    #applying Silver Layer class
    silver_track = SilverLayer(data = bronze_track,
                                drop_columns = ['Artists', 'Type', 'AvailableMarkets', 'Href', 'Uri', 'Is_Local'],
                                drop_null_columns = ['Track_ID'],
                                duplicate_columns = ['Track_ID'],
                                rename_columns = {'Album_ID': 'album_id',
                                                 'Album_Name': 'album_name',
                                                 'Track_ID': 'id',
                                                 'Name': 'name',
                                                 'Track_Number': 'track_number',
                                                 'Disc_Number': 'disc_number',
                                                 'Duration_ms': 'duration_ms',
                                                 'Explicit': 'explicit',
                                                 'External_urls': 'url',
                                                 'Restrictions': 'restriction',
                                                 'Preview_url': 'preview'})
    #processing data
    print("Processing for 'silver_track' ...")
    silver_track = silver_track.process()
    print("Finished processing for 'silver_track'.")
    #load data into HDFS
    write_HDFS(spark, data = silver_track, direct = 'silver_data/silver_track', file_type = 'parquet')


    """ Silver track feature data. """
    #read silver track feature data
    bronze_track_feature = read_HDFS(spark, HDFS_dir = 'bronze_data/bronze_track_feature', file_type = 'parquet')
    #applying Silver Layer class
    silver_track_feature = SilverLayer(data = bronze_track_feature,
                                       drop_columns = ['Track_href', 'Type_Feature', 'Analysis_Url'],
                                       drop_null_columns = ['Track_ID'],
                                       duplicate_columns = ['Track_ID'],
                                       rename_columns = {'Track_ID': 'id',
                                                         'Danceability': 'danceability',
                                                         'Energy': 'energy',
                                                         'Key': 'key',
                                                         'Loudness': 'loudness',
                                                         'Mode': 'mode',
                                                         'Speechiness': 'speechiness',
                                                         'Acousticness': 'acousticness',
                                                         'Instrumentalness': 'instrumentalness',
                                                         'Liveness': 'liveness',
                                                         'Valence': 'valence',
                                                         'Tempo': 'tempo',
                                                         'Time_signature': 'time_signature'})
    #processing data
    print("Processing for 'silver_track_feature' ...")
    silver_track_feature = silver_track_feature.process()
    print("Finished processing for 'silver_track_feature'.")
    #load data into HDFS
    write_HDFS(spark, data = silver_track_feature, direct = 'silver_data/silver_track_feature', file_type = 'parquet')


Successfully created Spark Session with app name: silver_task and master: local[4]!
Starting to read data from hdfs://namenode:9000/datalake/bronze_data/bronze_artist...
Processing for 'silver_artist' ...
Finished processing for 'silver_artist'.
Starting to upload 'silver_artist' into hdfs://namenode:9000/datalake/silver_data/silver_artist...


                                                                                

Successfully uploaded 'silver_artist' into HDFS.
Starting to read data from hdfs://namenode:9000/datalake/bronze_data/bronze_album...
Processing for 'silver_album' ...
Finished processing for 'silver_album'.
Starting to upload 'silver_album' into hdfs://namenode:9000/datalake/silver_data/silver_album...


                                                                                

Successfully uploaded 'silver_album' into HDFS.
Starting to read data from hdfs://namenode:9000/datalake/bronze_data/bronze_track...
Processing for 'silver_track' ...
Finished processing for 'silver_track'.
Starting to upload 'silver_track' into hdfs://namenode:9000/datalake/silver_data/silver_track...


                                                                                

Successfully uploaded 'silver_track' into HDFS.
Starting to read data from hdfs://namenode:9000/datalake/bronze_data/bronze_track_feature...
Processing for 'silver_track_feature' ...
Finished processing for 'silver_track_feature'.
Starting to upload 'silver_track_feature' into hdfs://namenode:9000/datalake/silver_data/silver_track_feature...


                                                                                

Successfully uploaded 'silver_track_feature' into HDFS.
Successfully stopped Spark Session!


In [33]:
from pyspark.sql.functions import col
with get_sparkSession('test') as spark:
    d = read_HDFS(spark, HDFS_dir = 'silver_data/silver_album', file_type = 'parquet')
    d.filter(d['artist'] == "Charlie Puth").show(truncate = False)
    data = read_HDFS(spark, HDFS_dir = 'silver_data/silver_track', file_type = 'parquet')
    data.filter(data['album_id'] == '36I3qGod6ZymqApmj1ckU4').show(truncate = False)
    # d = read_HDFS(spark, HDFS_dir = 'bronze_data/bronze_artist', file_type = 'parquet')
    # d.filter(d['Artist_Name'] == "Michael Jackson").show(truncate = False)
    # c = read_HDFS(spark, HDFS_dir = 'silver_data/silver_track_feature', file_type = 'parquet')
    # c.filter(c['id'] == '2GE2p2dFpxp7OIz2QOFBzW').show()


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pyspark/context.py", line 378, in signal_handler
    raise KeyboardInterrupt()
KeyboardInterrupt


KeyboardInterrupt: 

In [29]:
""" Gold layer. """
#Handle table individually
from pyspark.sql.functions import monotonically_increasing_id, concat, col, lit, count
with get_sparkSession('spark_for_gold_task') as spark:
    #Read data from HDFS
    silver_artist = read_HDFS(spark, HDFS_dir = 'silver_data/silver_artist', file_type = 'parquet')
    silver_album = read_HDFS(spark, HDFS_dir = 'silver_data/silver_album', file_type = 'parquet')
    silver_track = read_HDFS(spark, HDFS_dir = 'silver_data/silver_track', file_type = 'parquet')
    silver_track_feature = read_HDFS(spark, HDFS_dir = 'silver_data/silver_track_feature', file_type = 'parquet')


    """ Create dim_genres table. """
    #list all distinct genres in artist table
    dim_genres = silver_artist.select('genres').distinct()
    dim_genres = dim_genres.filter(col('genres').isNotNull())
    #add primary key
    dim_genres = dim_genres.withColumn("id", monotonically_increasing_id()) \
                           .withColumn("id", concat(lit("gns"), col('id')))
    #reorder columns
    dim_genres = dim_genres.select("id", "genres")
    #load data into HDFS
    write_HDFS(spark, data = dim_genres, direct = 'gold_data/dim_genres', file_type = 'parquet')
    

    """ Create dim_artist table. """
    #just drop genres column and distinct row
    dim_artist = silver_artist.drop('genres').distinct()
    write_HDFS(spark, data = dim_artist, direct = 'gold_data/dim_artist', file_type = 'parquet')


    """ Create dim_artist_genres table. """
    #select necessary columns in artist table
    dim_artist_genres = silver_artist.select('id', 'genres') \
                                     .withColumnRenamed('id', 'artist_id')
    #joining tables to map artist IDs and genre IDs
    dim_artist_genres = dim_artist_genres.join(dim_genres, on = 'genres', how = 'inner') \
                                         .withColumnRenamed('id', 'genres_id')
    #drop genres column
    dim_artist_genres = dim_artist_genres.drop('genres')
    #load data into HDFS
    write_HDFS(spark, data = dim_artist_genres, direct = 'gold_data/dim_artist_genres', file_type = 'parquet')


    """ Create dim_album table. """
    #just drop unnecessary columns 
    dim_album = silver_album.drop('artist', 'artist_id', 'total_tracks', 'release_date_precision')
    #load data into HDFS
    write_HDFS(spark, data = dim_album, direct = 'gold_data/dim_album', file_type = 'parquet')


    """ Create dim_track_feature table. """
    #we don't need to do anything since the dim_track_feature table is complete
    dim_track_feature = silver_track_feature
    #load data into HDFS
    write_HDFS(spark, data = dim_track_feature, direct = 'gold_data/dim_track_feature', file_type = 'parquet')

    
    """ Create fact_track table. """
    #drop album name and rename track id column
    fact_track = silver_track.drop('album_name') \
                             .withColumnRenamed('id', 'track_id')
    #get artist ID from silver album table to create a foreign key for the fact_track table
    silver_album = silver_album.select('id', 'artist_id') \
                               .withColumnRenamed('id', 'album_id')
    fact_track = fact_track.join(silver_album, on = 'album_id', how = 'inner')
    #reorder columns
    fact_track = fact_track.select('track_id', 'artist_id', 'album_id', 'name', 'track_number', 
                                   'disc_number', 'duration_ms', 'explicit', 'url', 'restriction', 'preview')
    #load data into HDFS
    write_HDFS(spark, data = fact_track, direct = 'gold_data/fact_track', file_type = 'parquet')
    

Successfully created Spark Session with app name: spark_for_gold_task and master: local[4]!
Starting to read data from hdfs://namenode:9000/datalake/silver_data/silver_artist...
Starting to read data from hdfs://namenode:9000/datalake/silver_data/silver_album...
Starting to read data from hdfs://namenode:9000/datalake/silver_data/silver_track...
Starting to read data from hdfs://namenode:9000/datalake/silver_data/silver_track_feature...
Starting to upload 'dim_genres' into hdfs://namenode:9000/datalake/gold_data/dim_genres...


                                                                                

Successfully uploaded 'dim_genres' into HDFS.
Starting to upload 'dim_artist' into hdfs://namenode:9000/datalake/gold_data/dim_artist...
Successfully uploaded 'dim_artist' into HDFS.
Starting to upload 'dim_artist_genres' into hdfs://namenode:9000/datalake/gold_data/dim_artist_genres...


                                                                                

Successfully uploaded 'dim_artist_genres' into HDFS.
Starting to upload 'dim_album' into hdfs://namenode:9000/datalake/gold_data/dim_album...


                                                                                

Successfully uploaded 'dim_album' into HDFS.
Starting to upload 'dim_track_feature' into hdfs://namenode:9000/datalake/gold_data/dim_track_feature...


                                                                                

Successfully uploaded 'dim_track_feature' into HDFS.
Starting to upload 'fact_track' into hdfs://namenode:9000/datalake/gold_data/fact_track...


                                                                                

Successfully uploaded 'fact_track' into HDFS.
Successfully stopped Spark Session!


In [None]:
with get_sparkSession('test') as spark:
    data = read_HDFS(spark, HDFS_dir = "gold_data/dim_artist", file_type = 'parquet')
    print(data.count())
    data.show(truncate = False)

Successfully created Spark Session with app name: test and master: local!
Starting to read data from hdfs://namenode:9000/datalake/gold_data/dim_artist...
13101
+--------------------+--------------------+---------+----------+--------------------+--------------------+
|                  id|                name|followers|popularity|          link_image|                 url|
+--------------------+--------------------+---------+----------+--------------------+--------------------+
|00FQb4jTyendYWaN8...|        Lana Del Rey| 40266213|        92|https://i.scdn.co...|https://open.spot...|
|00LY9cN1x8xfhdAa1...|    Gould Piano Trio|      941|        14|https://i.scdn.co...|https://open.spot...|
|00hfOKuZEhvKKNXmV...|       Janine Jansen|    53875|        44|https://i.scdn.co...|https://open.spot...|
|00nOi0g1Bq8i4tY2H...|      Ken Waterworth|        0|         0|                null|https://open.spot...|
|00pDcSoftlUplmExU...|             Subbota|     2702|        27|https://i.scdn.co...|https