In [1]:
class Singleton():
    "The singleton class"
    value =[]
    
    def __new__(cls):
        print("in init")
        return cls
        
    @staticmethod
    def static_method():
        pass

    @classmethod
    def class_method(cls):
        print(cls.value)

In [2]:
A = Singleton()
B = Singleton()

print("ID of A : {}".format(id(A)))
print("ID of A : {}".format(id(B)))

in init
in init
ID of A : 3231402878544
ID of A : 3231402878544


In [3]:
A.value.append("A")

B.value.append("C")

Singleton.value.append("D")

D = Singleton()

print(D.value)

print(f"id(Singleton)\t={id(Singleton)}")
print(f"id(Singleton)\t={id(A)}")
print(f"id(Singleton)\t={id(B)}")
print(f"id(Singleton)\t={id(D)}")


in init
['A', 'C', 'D']
id(Singleton)	=3231402878544
id(Singleton)	=3231402878544
id(Singleton)	=3231402878544
id(Singleton)	=3231402878544


In [4]:
#Is there any way to find out the memory footprint?

import os, psutil
process = psutil.Process(os.getpid())
print(process.memory_info().rss)


79941632


In [5]:
from functools import wraps

def singleton(class_):
    instances ={}
    @wraps(class_)
    def wrapper(*args, **kwargs):
        if class_ not in instances:
            instances[class_] = class_(*args, **kwargs)
        return instances[class_]
    return wrapper

@singleton
class MyClass():
    pass

A = MyClass()
B = MyClass()

print("ID of A : {}".format(id(A)))
print("ID of A : {}".format(id(B)))

ID of A : 3231422611664
ID of A : 3231422611664


In [6]:

process = psutil.Process(os.getpid())
print(process.memory_info().rss)

79966208


### Real life example

- Database connection

In [11]:
#Game!

class RankBoard:
    "The rankboard as a Singleton"
    _rank ={}
    
    def __new__(cls):
        return cls
    
    @classmethod
    def print(cls):
        "A class level method"
        print("----------RankBoard----------")
        for key,value in sorted(cls._rank.items(),key=lambda x:x[1],reverse=True):
            print(f"\t{key}\t|\t{value}\t|")
        print()
        
    @classmethod
    def add_player(cls,name,score):
        "A class level method"
        cls._rank[name] = score
    
from abc import ABC,abstractmethod
class IPlayer(ABC):
    "A Player interface"
    @staticmethod
    @abstractmethod
    def add_player(name,score):
        "you must implement this!"
        
        
class Player(IPlayer):
    
    def __init__(self):
        self.rankboard= RankBoard()
        
    def add_player(self,name,score):
        self.rankboard.add_player(name,score)

class WeeklyLottery:
    def __init__(self):
        self.rankboard = RankBoard()
    
    def select_ten(self):
        self.current_rank = sorted(self.rankboard._rank,key=lambda x:x[1])
        cnt=0
        while cnt<10 :
            cnt+=1
            return self.current_rank


        

In [None]:
a= WeeklyLottery()
a.select_ten()

In [10]:
Player1 = Player()
Player1.add_player("Migo",99)

Player2 = Player()
Player2.add_player("Shaun",32)

Player2.rankboard.print()



----------RankBoard----------
	Migo	|	99	|
	Shaun	|	32	|



In [None]:
class HiveHandler:
    """
    hive handler 클래스 입니다.
    """

    def __init__(self, database="default", hadoop_cluster=None):
        self._hadoop_cluster = HadoopCluster(hadoop_cluster)

        hive_secret = get_secret()["hive"][
            self._hadoop_cluster.value.replace("-", "_")
        ]
        self._service_discovery_mode = hive_secret.get(
            "service_discovery_mode"
        )
        self._host = hive_secret.get("host", "")
        self._port = hive_secret.get("port", 10000)
        self._database = database

        self._conn = None
        self._cursor = None

    def __del__(self):
        if self._conn:
            Logger.info("closing hive connection...")
            self._conn.close()
            self._conn = None

    def _get_connection(self):
        """
        hive database와 연결합니다.
        :return: connection
        """
        Logger.info(
            "Connecting Hive ... (cluster: %s, database: %s)",
            self._hadoop_cluster.value,
            self._database,
        )
        if self._service_discovery_mode is None:
            return hive.Connection(
                host=self._host,
                port=self._port,
                database=self._database,
                auth="KERBEROS",
                kerberos_service_name="hive",
            )

        if (
            ServiceDiscoveryMode(self._service_discovery_mode)
            == ServiceDiscoveryMode.ZOO_KEEPER
        ):
            return self._get_hive_connection_discovered_by_zoo_keeper()

        raise ServiceDiscoveryModeNotFoundError

In [None]:
"""
spark handler 클래스를 구현합니다.
"""
import os

from pyspark.sql import SparkSession

from src.config.enums import SourcingClusterType

from .. import logger
from ..kerberos_handler import KerberosHandler

ACTIVE_SPARK_SESSION_CLUSTER = "ACTIVE_SPARK_SESSION_CLUSTER"


class SparkHandler:  # pylint: disable=too-few-public-methods
    """
    SparkHandler class
    """

    def __init__(self, cluster="hadoop-dev", master="local[*]"):
        self._cluster = SourcingClusterType(cluster).value
        self._master = master  # yarn 적용이 필요하다면 master를 yarn으로 변경
        self._hadoop_client_home = "/hadoop-client-env-v2/target"
        self._spark = None

    def _set_env(self) -> None:
        """
        spark를 사용하기 위한 env를 set합니다.
        :return: None
        """
        os.environ[ACTIVE_SPARK_SESSION_CLUSTER] = self._cluster
        logger.info(
            f"spark env set... ({ACTIVE_SPARK_SESSION_CLUSTER}: "
            f"{os.environ[ACTIVE_SPARK_SESSION_CLUSTER]})"
        )

        logger.info(
            "set environ variables... "
            "(JAVA_HOME, SPARK_HOME, HADOOP_HOME, HADOOP_CONF_DIR, HIVE_HOME)"
        )
        os.environ[
            "JAVA_HOME"
        ] = f"{self._hadoop_client_home}/jdk/jdk8u242-b08"
        os.environ[
            "SPARK_HOME"
        ] = f"{self._hadoop_client_home}/{self._cluster}/spark"
        os.environ[
            "HADOOP_HOME"
        ] = f"{self._hadoop_client_home}/{self._cluster}/hadoop"
        os.environ[
            "HADOOP_CONF_DIR"
        ] = f"{self._hadoop_client_home}/{self._cluster}/hadoop/conf"
        os.environ[
            "HIVE_HOME"
        ] = f"{self._hadoop_client_home}/{self._cluster}/hive"

        KerberosHandler().issue_ticket()

    def _get_spark_session(self) -> SparkSession:
        """
        spark sessions을 set합니다.
        :return: None
        """
        if os.environ.get(ACTIVE_SPARK_SESSION_CLUSTER) is None:
            self._set_env()
        return (
            SparkSession.builder.master(self._master)
            .appName("andromeda-operator")
            .enableHiveSupport()
            .getOrCreate()
        )

    @property
    def spark(self) -> SparkSession:
        """
        spark session을 반환합니다.
        :return: spark sessions
        """
        if self._spark is None:
            self._spark = self._get_spark_session()
        return self._spark

    def stop(self) -> None:
        """
        spark sessions을 종료합니다.
        :return: None
        """
        self.spark.stop()
        os.environ.pop(ACTIVE_SPARK_SESSION_CLUSTER)
        logger.info(
            f"spark stopped... ({ACTIVE_SPARK_SESSION_CLUSTER}: "
            f"{os.environ.get(ACTIVE_SPARK_SESSION_CLUSTER)})"
        )