# PySpark

In [2]:
import os
from pymongo.mongo_client import MongoClient
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
import pandas as pd
from datetime import datetime

# IO manger

Spark Session

In [3]:
from contextlib import contextmanager

@contextmanager
def SparkIO(conf: SparkConf = SparkConf()):
    app_name = conf.get("spark.app.name")
    master = conf.get("spark.master")
    print(f'Create SparkSession app {app_name} with {master} mode')
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    try:
        yield spark
    finally:
        print(f'Stop SparkSession app {app_name}')
        spark.stop()


In [4]:
from pymongo.errors import ConnectionFailure
from contextlib import contextmanager
import os

@contextmanager
def MongodbIO():
    user = os.getenv("MONGODB_USER")
    password = os.getenv("MONGODB_PASSWORD")
    uri = f"mongodb+srv://{user}:{password}@python.zynpktu.mongodb.net/?retryWrites=true&w=majority"
    try:
        client = MongoClient(uri)
        print(f"MongoDB Connected")
        yield client
    except ConnectionFailure:
        print(f"Failed to connect with MongoDB")
        raise ConnectionFailure
    finally:
        print("Close connection to MongoDB")
        client.close()

## Bronze 

In [18]:
def createSchema(df: pd.DataFrame):
    """This function create Pyspark Schema"""
    field = []

    for col in df.columns:
        dtype = str(df[col].dtype)

        if dtype == 'object':
            field_type = StringType()
        elif 'int' in dtype:
            field_type = IntegerType()
        elif 'bool' in dtype: 
            field_type = BooleanType()
        elif 'float' in dtype: 
            field_type = FloatType()
        elif dtype == 'double':
            field_type = DoubleType()
        else:
            field_type = StringType()
        
        field.append(StructField(col, field_type, True))

    return StructType(field)


def bronze_layer_task(spark: SparkSession, db: str, table_name: str) -> None:
    """Extract data from MongoDB to HDFS at bronze layer"""
    user = os.getenv("MONGODB_USER")
    password = os.getenv("MONGODB_PASSWORD")
    hdfs_uri = f"hdfs://namenode:8020/bronze_layer/{table_name}.parquet"
    uri = f"mongodb+srv://{user}:{password}@python.zynpktu.mongodb.net/?retryWrites=true&w=majority"
    
    # mongo_data = pd.DataFrame(list(collection.find({}, {"_id": 0}))) # eliminate the _id field
    spark_data = spark.read.format("mongodb").option("uri", uri).option('database', db).option('collection', table_name).load()
    
    # try:
    #     spark_data = spark.createDataFrame(mongo_data, schema=mongo_data.columns.tolist())
    # except Exception as e:
    #     print(f"Error to Create Spark DataFrame {table_name} {e}")
    #     print(f"Start Create Schema for {table_name}")

    #     schema = createSchema(mongo_data)
    #     spark_data = spark.createDataFrame(mongo_data, schema=schema)
    

    print(f"Writing {table_name}")
    try:
        spark_data.write.parquet(hdfs_uri, mode="overwrite")
        print(f"Bronze: Successfully push {table_name}.parquet")
    except Exception as e:
        print(spark_data.printSchema())
        print(e)

def IngestHadoop(spark: SparkSession):
    """Extract data From MongoDb and Load to HDFS"""

    # Connect to MongoDB

    database_name = "remake_spotify_crawling_data"
    # database_name = os.getenv("MONGODB_DATABASE")
    
    
    with MongodbIO() as client:
        mongo_db = client[database_name] 
        collections = mongo_db.list_collection_names() #get all collectons
    
        #Running task concurrently
        for collection in collections:
            print(f"{collection} start being Ingested...")
            bronze_layer_task(spark, database_name, collection) #collection is also the name of table

There is 4 tables:
- artists_data.parquet
- songs_data.parquet
- genres_data.parquet
- albums_data.parquet

location: hdfs://namenode:8020/bronze_layer/{table_name}.parquet

## Silver

### Schema

![Schema](./spotify.png)

Target: 
- using pyspark Cleaning, droping duplicated, drop unusable column (Read [EDA](https://colab.research.google.com/drive/15uM8Uvj1I89zjtJrVn-Z7mvkfyCWo50T?usp=sharing)), format type, there are many duplicated observation.
- join dim artist and dim albums -> join_artist_albums table (clean table before merge) (task 1)
- clean genre, then write back to silver(task 2) -> clean_genre table
- clean songs (task 3) -> clean_songs table (return None)
- The location of silver: hdfs_uri = f"hdfs://namenode:8020/silver_layer/{table_name}.parquet" with table_name is name of result table


Requirements:
- Input of silver main task (spark session), Output: None
- silver main task may have many child tasks, concurrently or sequencially
- Child task input (spark session), any extended params or return base on you, ensure write back result in hdfs with related uri
- Writing (print out) logs every action, handle error and exception (raise it if neccesary)

Dont forget to add your main task to main function !

In [11]:
# Run some code here
def silver_layer_task(spark: SparkSession):
    '''Do some Cleaning tasks for silver layer'''
    # task 1
    # task 2
    # task 3 ...

# Main

In [19]:
def pipeline_B():
    """ELT pipeline with pyspark"""

    user = os.getenv("MONGODB_USER")
    password = os.getenv("MONGODB_PASSWORD")
    uri = f"mongodb+srv://{user}:{password}@python.zynpktu.mongodb.net/?retryWrites=true&w=majority"
    conf = (SparkConf().setAppName("ETL-app-{}".format(datetime.today()))
        .set("spark.executor.memory", "8g")
        .set("spark.mongodb.read.connection.uri",uri)
        .set("spark.mongodb.write.connection.uri", uri)
        .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.1")
        .setMaster("local[*]")
        )

    with SparkIO(conf) as spark:
        IngestHadoop(spark) # <----- bronze task
        # add silver tasks here <-------
        

In [20]:
%%time
pipeline_B()

Create SparkSession app ETL-app-2023-11-30 08:50:34.862617 with local[*] mode
MongoDB Connected
tracks_data start being Ingested...
Writing tracks_data


23/11/30 08:50:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

Bronze: Successfully push tracks_data.parquet
tracks_features_data start being Ingested...
Writing tracks_features_data


                                                                                

Bronze: Successfully push tracks_features_data.parquet
artists_data start being Ingested...
Writing artists_data
root
 |-- _id: string (nullable = true)
 |-- external_urls: struct (nullable = true)
 |    |-- spotify: string (nullable = true)
 |-- followers: struct (nullable = true)
 |    |-- href: void (nullable = true)
 |    |-- total: integer (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- href: string (nullable = true)
 |-- id: string (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- height: integer (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- width: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- uri: string (nullable = true)

None
Parquet data source does not support struct<href:void,total:int> data type.
albums_data start bei

[Stage 2:>                                                          (0 + 1) / 1]

Bronze: Successfully push albums_data.parquet
Close connection to MongoDB
Stop SparkSession app ETL-app-2023-11-30 08:50:34.862617
CPU times: user 321 ms, sys: 116 ms, total: 437 ms
Wall time: 2min 38s


                                                                                