In [None]:
from src.utils.logging import setup_logging
import yaml
from pyspark.sql import SparkSession

config = yaml.load(open('congig.yaml', encoding='utf-8'), Loader=yaml.FullLoader)
logger = setup_logging(name='RUN', level='DEBUG')

spark = (
  SparkSession
  .builder
  .appname('appname')
  .master('yarn')
  .config('name','value')
  .enableHiveSupport()
  .getOrCreate())

In [None]:
import logging 
import pandas as pd

class Step:
  """
  Базовый класс.
  """
  def __init__(self, config, logger: logging.Logger, spark):
     self.config = config
     self.logger = logger
     self.spark = spark
  
  def execute(self, data_store: pd.Dataframe):
    raise NotImplementedError

In [None]:
from src.steps.base_class import Step
from pyspark.sql.functions import lit
import pyspark
import gc


class TargetConstructor(Step):
    """
    Класс с методами по сбору целевой выборки для модели.
    """
    def __init__(self, config, logger, spark):
        super().__init__(config = config, logger = logger, spark=spark)
    
    def __load_uniq_clients(self):
        """
        Загружаем уникальных клиентов.
        """
        self.logger.info('1.1: loading unique clients')

        uniq_clients = \
            self.spark.sql(open(self.config['sql_paths']['uniq_clients'])
                .read()
                .format(self.config['dates']['date'], self.config['dates']['date']))
        
        uniq_clients = \
            uniq_clients.withColumnRenamed('start_month', 'report_date') \
                        .withColumn('target', lit(1))
        
        return uniq_clients
    
    def __load_mass_clients(self):
        """
        Загрузка массовых клиентов.
        """
        self.logger.info('1.2: loading mass clients')

        mass_clients = \
            self.spark.sql(open(self.config['sql_paths']['mass_clients'])
                .read()
                .format(self.config['dates']['date'], self.config['dates']['date']))
        
        return mass_clients

    def __union_and_write_pre_final_target(self, 
                                        uniq_clients: pyspark.sql.Dataframe, 
                                        mass_clients: pyspark.sql.Dataframe):
        """
        Объединяем 0 и 1, загружаем в хранилище.
        """
        self.logger.info('1.3: making target union and loading to datastore')

        target_final = uniq_clients.union(mass_clients).withColumnRenamed('report_date', 'month_end_dt')

        target_final.repartition(10).write.mode('overwrite').format('orc') \
                    .saveAsTable(self.config['temp_tables_paths']['namepath'])
        
        target_for_pre_selection = target_final.select('hid_party', 'month_end_dt')

        return target_for_pre_selection

    def __clean_workspace_after_step_1(self, 
                                       uniq_clients: pyspark.sql.Dataframe, 
                                       mass_clients: pyspark.sql.Dataframe):
        """
        Подчищаем за собой.
        """
        self.logger.info('1.4: clean workspace after step 1')

        uniq_clients.unpersist()
        mass_clients.unpersist()
        gc.collect()

        pass

    def execute(self):
        """
        Запуск методов.
        """
        uniq_clients = self.__load_uniq_clients()
        mass_clients = self.__load_mass_clients()

        target_for_pre_selection = self.__union_and_write_pre_final_target(uniq_clients, mass_clients)

        self.__clean_workspace_after_step_1(uniq_clients, mass_clients)

        return target_for_pre_selection

tc = TargetConstructor(config, logger, spark)
target_for_pre_selection = tc.execute()