### Агрегаты первый круг 

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, to_timestamp, substring, rank, expr, substring, current_timestamp, concat, lit, avg, least, row_number, max,  last
from pyspark.sql.functions import to_date, regexp_replace, length, reverse, datediff, greatest, months_between, coalesce, count, desc, abs, asc, add_months
from pyspark.sql.window import Window
from pyspark.sql.types import DecimalType, IntegerType, StructType, StructField, StringType, TimestampType, LongType, ByteType  
from pyspark.sql import functions as F
import os 

In [4]:
spark = SparkSession \
    .builder \
    .appName("spark") \
    .config("spark.jars", "hadoop-aws-3.3.2.jar,aws-java-sdk-bundle-1.11.1026.jar") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.hadoop.hive.metastore.uris", os.environ.get("METASTORE_URI")) \
    .config("spark.sql.warehouse.dir", os.environ.get("METASTORE_WAREHOUSE_DIR")) \
    .config("spark.hadoop.fs.s3a.access.key", os.environ.get("METASTORE_AWS_ACCESS_KEY_ID")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.environ.get("METASTORE_AWS_SECRET_ACCESS_KEY")) \
    .config("spark.hadoop.fs.s3a.endpoint", os.environ.get("FEAST_S3_ENDPOINT_URL")) \
    .config("spark.sql.debug.maxToStringFields", "100") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .enableHiveSupport() \
    .getOrCreate()

23/08/23 08:48:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
spark.sql("USE stage")
sft_df = spark.read.table("cre_sf_singleformattype").alias("sft")
mt_df = spark.read.table("cre_sf_maintype").alias("mt")
l_df = spark.read.table("cre_sf_loanstype").alias("l")
ov_df = spark.read.table("cre_sf_loansoverviewtype").alias("ov")

23/08/23 08:49:10 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [6]:
# точно ок 
t1_df = sft_df.join(mt_df, mt_df.hjid == sft_df.main) \
    .join(l_df, l_df.loanstypes_loan_hjid == sft_df.loans) \
    .join(ov_df, ov_df.hjid == sft_df.loansoverview, "left") \
    .where(col("relationship").cast("int") <= 5) \
    .where(~col("TYPE_").isin("17", "18", "19")) \
    .where(expr("length(coalesce(replace(regexp_replace(pmtstring84m, 'X+$', ''), '9', '5'), '')) <= 2000")) \
    .withColumn("rep_type", expr("'IKAR'")) \
    .withColumn("second_name", expr("''")) \
    .withColumn("first_name", expr("''")) \
    .withColumn("third_name", expr("''")) \
    .withColumn("birth_dt", expr("''")) \
    .withColumn("cre_natural_loan_request_id", col("applicationuid")) \
    .withColumn("name_id", col("mt.hjid")) \
    .withColumn("report_id", col("mt.hjid")) \
    .withColumn("loan_id", col("l.hjid")) \
    .withColumn("report_dt", to_timestamp(expr("substring(mt.reportdatetime, 1, 8)"), 'ddMMyyyy')) \
    .withColumn("class_type", 
                expr("CASE WHEN cast(l.relationship as int) <= 4 THEN 'D' WHEN cast(l.relationship as int) = 5 THEN 'G' ELSE NULL END")) \
    .withColumn("is_bank", col("l.isown")) \
    .withColumn("date_start", to_timestamp(col("l.opendate"), 'ddMMyyyy')) \
    .withColumn("date_end_plan", to_timestamp(col("l.finalpmtdate"), 'ddMMyyyy')) \
    .withColumn("date_end_fact", to_timestamp(col("l.factclosedate"), 'ddMMyyyy')) \
    .withColumn("credit_amount", col("l.creditlimit")) \
    .withColumn("current_debt_rur", col("l.outstanding")) \
    .withColumn("delinquent_debt_rur", col("l.delqbalance")) \
    .withColumn("product", expr(
                "CASE WHEN l.type_ = '1' THEN 'АК' WHEN l.type_ = '6' THEN 'ИК' WHEN l.type_ = '7' THEN 'КК' " \
                "WHEN l.type_ = '8' THEN 'КК' WHEN l.type_ = '21' THEN 'МЗ' WHEN l.type_ = '9' THEN 'ПК' ELSE 'Прочее' END")) \
    .withColumn("interest_rate_month", col("l.interestrate") / 100 / 12) \
    .withColumn("is_debtor", expr("CASE WHEN cast(l.relationship as int) <= 4 THEN 1 ELSE 0 END")) \
    .withColumn("is_guarantor", expr("CASE WHEN cast(l.relationship as int) = 5 THEN 1 ELSE 0 END")) \
    .withColumn("infconfirmdate", to_timestamp(col("l.Infconfirmdate"), 'ddMMyyyy')) \
    .withColumn("currentdelq", col("l.currentdelq").cast("int")) \
    .withColumn("inquiry1week", col("ov.inquiry1week")) \
    .withColumn("inquiry1month", col("ov.inquiry1month")) \
    .withColumn("inquiry3month", col("ov.inquiry3month")) \
    .withColumn("inquiry6month", col("ov.inquiry6month")) \
    .withColumn("pmtstringstart", to_timestamp(col("l.pmtstringstart"), "ddMMyyyy")) \
    .withColumn("next_pmt", col("l.nextpmt")) \
    .withColumn("business_category", expr("coalesce(l.businesscategory, 1)")) \
    .withColumn("status", col("l.status")) \
    .withColumn("relationship", col("l.relationship")) \
    .withColumn("type_", col("l.type_")) \
    .withColumn("uuid", col("l.uuid")) \
    .withColumn("credit_cost_rate", col("l.creditcostrate")) \
    .withColumn("programcode", expr("'01.01.06'")) \
    .withColumn("principal_past_due", col("l.principal_past_due")) \
    .withColumn("collateralcode", col("l.collateralcode")) \
    .withColumn("principal_outstanding", col("l.principal_outstanding")) \
    .withColumn("rn", expr("rank() over (partition by mt.applicantid order by to_timestamp(mt.reportdatetime, 'ddMMyyyyHHmmss') desc, mt.hjid desc)")) \
    .select("applicationuid", "rep_type", "second_name", "first_name", "third_name", "birth_dt", "applicantid",
            "cre_natural_loan_request_id", "name_id", "report_id", "loan_id", "report_dt", "class_type", "is_bank",
            "date_start", "date_end_plan", "date_end_fact", "credit_amount", "current_debt_rur", "delinquent_debt_rur",
            "product", "currency", "interest_rate_month", "is_debtor", "is_guarantor", "pmtstring84m", "pmtstringstart",
            "infconfirmdate", "currentdelq", "inquiry1week", "inquiry1month", "inquiry3month", "inquiry6month",
            "next_pmt", "business_category", "status", "relationship", "type_", "uuid", "credit_cost_rate", "programcode",
            "principal_past_due", "collateralcode", "principal_outstanding", "rn")
nvg_data_gr_loans_np = t1_df.filter(F.col("rn") == 1).distinct()

In [7]:
# Создаем DataFrame с одной строкой, содержащей дату '2023-03-01' в столбце 'application_date' ОК !!!
data = [('2023-03-01',)]
nvg_cre_date = spark.createDataFrame(data, ['application_date'])
# Конвертируем столбец 'application_date' в формат timestamp
nvg_cre_date = nvg_cre_date.withColumn('application_date', to_timestamp(nvg_cre_date['application_date'], 'yyyy-MM-dd'))

In [8]:
#!ОК
nvg_data_gr_month_pmt = nvg_data_gr_loans_np.select( 
    col("applicationuid"),
    col("rep_type"),
    col("second_name"),
    col("first_name"),
    col("third_name"),
    col("birth_dt"),
    col("applicantid"),
    col("name_id"),
    col("report_id"),
    col("report_dt"),
    col("loan_id"),
    col("class_type"),
    col("is_bank"),
    col("date_start"),
    col("date_end_plan"),
    col("date_end_fact"),
    col("credit_amount"),
    col("current_debt_rur"),
    col("delinquent_debt_rur"),
    col("product"),
    col("currency"),
    col("interest_rate_month"),
    col("is_debtor"),
    col("is_guarantor"),
    col("pmtstring84m"),
    col("pmtstringstart"),
    col("infconfirmdate"),
    col("currentdelq"),
    col("inquiry1week"),
    col("inquiry1month"),
    col("inquiry3month"),
    col("inquiry6month"),
    col("next_pmt"),
    col("business_category"),
    col("status"),
    col("relationship"),
    col("type_"),
    col("uuid"),
    col("collateralcode"),
    col("next_pmt").alias("month_pmt"),  
    substring(current_timestamp().cast("string"), 1, 19).alias("t_changed_dttm")  # Добавляем столбец t_changed_dttm с текущим временем
)

In [9]:
# Выполняем кросс-джойн между 'nvg_data_gr_month_pmt' и 'nvg_cre_date' !ОК
nvg_data_gr_cre_mart = nvg_data_gr_month_pmt.crossJoin(nvg_cre_date)

# Преобразуем типы данных в DataFrame 'nvg_data_gr_cre_mart'
nvg_data_gr_cre_mart = nvg_data_gr_cre_mart.withColumn("name_id", col("name_id").cast(DecimalType(19, 0))) \
    .withColumn("report_id", concat(col("rep_type"), lit("_"), col("report_id").cast("string"))) \
    .withColumn("loan_id", col("loan_id").cast(DecimalType(19, 0))) \
    .withColumn("is_bank", col("is_bank").cast(IntegerType())) \
    .withColumn("credit_amount", col("credit_amount").cast(DecimalType(19, 2))) \
    .withColumn("current_debt_rur", col("current_debt_rur").cast(DecimalType(19, 2))) \
    .withColumn("delinquent_debt_rur", col("delinquent_debt_rur").cast(DecimalType(19, 2))) \
    .withColumn("interest_rate_month", col("interest_rate_month").cast(DecimalType(38, 6))) \
    .withColumn("inquiry1week", col("inquiry1week").cast(DecimalType(10, 0))) \
    .withColumn("inquiry1month", col("inquiry1month").cast(DecimalType(10, 0))) \
    .withColumn("inquiry3month", col("inquiry3month").cast(DecimalType(10, 0))) \
    .withColumn("inquiry6month", col("inquiry6month").cast(DecimalType(10, 0))) \
    .withColumn("next_pmt", col("next_pmt").cast(DecimalType(19, 2))) \
    .withColumn("business_category", col("business_category").cast(DecimalType(15, 0))) \
    .withColumn("month_pmt", col("month_pmt").cast(DecimalType(38, 6))) \
    .withColumn("t_changed_dttm", current_timestamp())\
    .withColumn("t_deleted_flg", lit(0).cast(ByteType()))\
    .withColumn("t_process_task_id", lit(None).cast(IntegerType())) \
    .withColumn("t_active_flg", lit(1).cast(ByteType())) \
    .select(
        "applicationuid",
        "application_date",
        "rep_type",
        "second_name",
        "first_name",
        "third_name",
        "birth_dt",
        "applicantid",
        "name_id",
        "report_id",
        "report_dt",
        "loan_id",
        "class_type",
        "is_bank",
        "date_start",
        "date_end_plan",
        "date_end_fact",
        "credit_amount",
        "current_debt_rur",
        "delinquent_debt_rur",
        "product",
        "currency",
        "interest_rate_month",
        "is_debtor",
        "is_guarantor",
        "pmtstring84m",
        "pmtstringstart",
        "infconfirmdate",
        "currentdelq",
        "inquiry1week",
        "inquiry1month",
        "inquiry3month",
        "inquiry6month",
        "next_pmt",
        "business_category",
        "status",
        "relationship",
        "type_",
        "month_pmt",
        "uuid",
        "collateralcode",
        "t_changed_dttm",
        "t_deleted_flg",
        "t_process_task_id",
        "t_active_flg"
    )

In [10]:
#ОК
nvg_data_gr_cre_dlq_t2 = nvg_data_gr_cre_mart.withColumn("l_val", lit(0)) \
    .withColumn("worst_status", lit(0)) \
    .withColumn("count_day", lit(0)) \
    .select(
        col("application_date"),
        col("report_id"),
        col("report_dt").alias("report_date"),
        col("applicantid"),
        col("loan_id"),
        col("pmtstringstart"),
        # Обрезаем правый "хвост", если он весь состоит из 'X' и заменяем '9' на '5'
        regexp_replace(regexp_replace(col("pmtstring84m"), 'X+$', ''), '9', '5').alias("pmtstring84m"),
        # Создаем обратную строку для pmtstring84m
        reverse(regexp_replace(regexp_replace(col("pmtstring84m"), 'X+$', ''), '9', '5')).alias("pmtstring84m_rev"),
        # Вычисляем длину строки без 'X' и '9'
        length(regexp_replace(col("pmtstring84m"), 'X+$', '')).alias("len"),
        # Задаем значения по умолчанию для остальных столбцов
        col("l_val"),
        col("worst_status"),
        col("count_day")
).distinct()

In [11]:
 #!ОК ( какая-то подставная табличка )
schema = StructType([
    StructField("application_date", TimestampType()),
    StructField("report_id", StringType()),
    StructField("report_date", TimestampType()),
    StructField("applicantid", StringType()),
    StructField("loan_id", DecimalType(19, 0)),
    StructField("pmtstringstart", TimestampType()),
    StructField("pmtstring84m", StringType()),
    StructField("pmtstring84m_rev", StringType()),
    StructField("len", IntegerType()),
    StructField("l_val", DecimalType(11, 1)),
    StructField("worst_status", DecimalType(11, 1)),
    StructField("count_day", LongType())
])

# Создаем пустой DataFrame с указанной схемой
nvg_data_gr_cre_dlq_t3 = spark.createDataFrame([], schema)

In [12]:
#!ОК! ( какая-то подставная табличка )
new_nvg_data_gr_cre_dlq_t1 = nvg_data_gr_cre_dlq_t3.select(
    col("application_date"),
    col("report_id"),
    col("report_date"),
    col("applicantid"),
    col("loan_id"),
    col("pmtstringstart").alias("dlq_end_dt"),
    # Используем функцию greatest для выбора максимального значения между l_val и worst_status
    when(col("l_val") == 1.5, col("l_val")).otherwise(greatest(col("l_val"), col("worst_status"))).alias("worst_status"),
    col("count_day")
).where(col("l_val").between(1.5, 5))

In [13]:
nvg_data_gr_cre_dlq_t4 = nvg_data_gr_cre_dlq_t2.select(   #!ОК!
    col("application_date"),
    col("report_id"),
    col("report_date"),
    col("applicantid"),
    col("loan_id"),
    col("pmtstringstart"),
    col("pmtstring84m"),
    col("pmtstring84m_rev").alias("st"),
    col("pmtstring84m_rev"),
    col("len"),
    col("l_val"),
    col("worst_status"),
    col("count_day")
).where(col("len") > 1)

In [14]:
#Выполняем выборку данных из DataFrame 'nvg_data_gr_cre_dlq_t2' с условием len = 1 #!ОК!
nvg_data_gr_cre_dlq_t5 = nvg_data_gr_cre_dlq_t2.select(
    col("application_date"),
    col("report_id"),
    col("report_date"),
    col("applicantid"),
    col("loan_id"),
    col("pmtstringstart"),
    col("pmtstring84m"),
    col("pmtstring84m_rev"),
    when(substring(col("pmtstring84m_rev"), 1, 1) == 'X', 1)
    .when(substring(col("pmtstring84m_rev"), 1, 1) == 'A', 1.5)
    .when(substring(col("pmtstring84m_rev"), 1, 1).isin('0', '1', '2', '3', '4', '5', '7', '8', '9'),
          col("pmtstring84m_rev").cast("int"))
    .otherwise(1).cast(DecimalType(11, 1)).alias("worst_status"),
    when(substring(col("pmtstring84m_rev"), 1, 1) == '5', 1).otherwise(0).alias("count_day")
).where(col("len") == 1)

In [15]:
#!ОК!
nvg_data_gr_cre_dlq_t1 = nvg_data_gr_cre_dlq_t5.select(
    col("application_date"),
    col("report_id"),
    col("report_date"),
    col("applicantid"),
    col("loan_id"),
    to_timestamp(col("pmtstringstart")).alias("dlq_end_dt"),
    col("worst_status").cast(DecimalType(19, 1)),
    col("count_day").cast(LongType())
).where(col("worst_status").between(1.5, 5))

In [16]:
# ОК
df1 = nvg_data_gr_cre_dlq_t1.select(
    col("application_date"),
    col("report_id"),
    col("report_date").alias("report_dt"),
    col("applicantid"),
    col("loan_id"),
    col("dlq_end_dt"),
    when(col("worst_status") != 5, 
         when(col("worst_status") == 1.5, 29)
         .when(col("worst_status") == 2, 59)
         .when(col("worst_status") == 3, 89)
         .when(col("worst_status") == 4, 119))
    .otherwise(datediff(col("dlq_end_dt"), 
                        (col("dlq_end_dt") - expr("INTERVAL 121 DAY") - expr("INTERVAL 1 MONTH") * (col("count_day") - 1)))
               ).alias("delinq_length"),
    when(col("report_date").between(
            col("dlq_end_dt") - expr("INTERVAL 1 DAY") * (when(col("worst_status") == 1.5, 29)
                                                         .when(col("worst_status") == 2, 59)
                                                         .when(col("worst_status") == 3, 89)
                                                         .when(col("worst_status") == 4, 119)
                                                         .otherwise(datediff(col("dlq_end_dt"), 
                                                                            (col("dlq_end_dt") - expr("INTERVAL 121 DAY") - expr("INTERVAL 1 MONTH") * (col("count_day") - 1)))
                                                        )),
            (col("dlq_end_dt") + expr("INTERVAL 1 MONTH")) - expr("INTERVAL 1 DAY")
         ), 1).otherwise(0).alias("is_active")
)

# Выполняем вторую часть SELECT с LEFT JOIN
df2 = nvg_data_gr_cre_mart.alias("ist").join(
    nvg_data_gr_cre_dlq_t1.alias("scr"),
    (col("ist.applicantid") == col("scr.applicantid"))
    & (col("ist.loan_id") == col("scr.loan_id"))
    & (datediff(col("ist.Infconfirmdate"), col("scr.dlq_end_dt")).cast("int").between(1, 30)),
    "left"
).where(col("currentdelq") > 0).select(
    col("ist.application_date"),
    col("ist.report_id"),
    col("ist.report_dt"),
    col("ist.applicantid"),
    col("ist.loan_id"),
    coalesce(col("scr.dlq_end_dt"), col("ist.Infconfirmdate")).alias("dlq_end_dt"),
    col("currentdelq").alias("delinq_length"),
    when(col("report_dt").between(
            coalesce(col("dlq_end_dt"), col("Infconfirmdate")) - expr("INTERVAL 1 DAY") * col("currentdelq"),
            (coalesce(col("dlq_end_dt"), col("Infconfirmdate")) + expr("INTERVAL 1 MONTH")) - expr("INTERVAL 1 DAY")
        ),
        1
    ).otherwise(0).alias("is_active")
)


# Объединяем результаты и выполняем группировку
nvg_data_gr_cre_dlq = df1.unionAll(df2).groupBy(
    "application_date", "report_id", "report_dt", "applicantid", "loan_id", "dlq_end_dt", "is_active"
).agg(
    max("delinq_length").alias("delinq_length")
).select(
    "application_date", "report_id", "report_dt", "applicantid", "loan_id", "dlq_end_dt", "delinq_length", "is_active"
)

In [17]:
#ОК
nvg_data_gr_agr_all = nvg_data_gr_cre_mart.select(
    col("applicationuid"),
    col("application_date"),
    col("applicantid"),
    col("report_dt"),
    col("loan_id"),
    col("class_type"),
    col("is_bank"),
    col("date_start"),
    col("date_end_plan"),
    when(
        (col("date_end_fact").isNull()) &
        (months_between(col("application_date"), col("report_dt")) > 1) &
        (col("application_date") > col("date_end_plan")),
        col("date_end_plan")
    ).otherwise(col("date_end_fact")).alias("date_end_fact"),
    col("credit_amount"),
    col("credit_amount").alias("card_limit"),
    when(col("STATUS") == "14", lit(0))
    .when((col("product") == "КК") & (months_between(col("application_date"), col("report_dt")) > 1), lit(0))
    .when(months_between(col("application_date"), col("report_dt")) > 1,
    greatest(col("current_debt_rur") - coalesce(col("MONTH_PMT"), lit(0)) * months_between(col("application_date"), col("report_dt")), lit(0)))
    .otherwise(col("current_debt_rur")).alias("current_debt_rur"),
    coalesce(col("MONTH_PMT"), lit(0)).alias("next_pmt"),
    when(col("STATUS") == "14", lit(0))
    .when((col("product") == "КК") & (months_between(col("application_date"), col("report_dt")) > 1), lit(0))
    .otherwise(col("delinquent_debt_rur"))
    .alias("delinquent_debt_rur"),
    col("product"),
    lit(0).alias("is_diff"),
    lit(0).alias("is_individual_pp"),
    col("currency"),
    col("interest_rate_month"),
    col("is_debtor"),
    col("is_guarantor"),
    when(
        col("collateralcode").isNull(), 1
    ).otherwise(0).alias("coll_not_flg"),
    when(
        col("collateralcode").isin('11', '12'), 1
    ).otherwise(0).alias("coll_real_flg"),
    when(
        col("collateralcode").isin('1', '01'), 1
    ).otherwise(0).alias("coll_auto_flg")
)


In [18]:
# ОК!
window_spec1 = Window.partitionBy(
    "applicationuid", "application_date", "applicantid", "class_type", "is_bank"
).orderBy(
    "date_start", col("date_end_fact").asc_nulls_last(), col("loan_id").desc()
).rowsBetween(
    Window.unboundedPreceding, Window.currentRow - 1
)

window_spec2 = Window.partitionBy(
    "applicationuid", "application_date", "applicantid", "class_type"
).orderBy(
    "date_start", col("date_end_fact").asc_nulls_last(), col("loan_id").desc()
).rowsBetween(
    Window.unboundedPreceding, Window.currentRow - 1
)



nvg_data_gr_bki_t1 = nvg_data_gr_agr_all.select(
    col("applicationuid"),
    col("application_date"),
    col("applicantid"),
    col("report_dt"),
    col("loan_id"),
    col("class_type"),
    col("is_bank"),
    col("date_start"),
    col("date_end_plan"),
    col("date_end_fact"),
    col("credit_amount"),
    col("card_limit"),
    col("current_debt_rur"),
    col("next_pmt"),
    col("delinquent_debt_rur"),
    col("product"),
    col("is_diff"),
    col("is_individual_pp"),
    col("currency"),
    col("interest_rate_month"),
    col("is_debtor"),
    col("is_guarantor"),
    col("credit_amount").alias("credit_amount_rur"),
    col("card_limit").alias("card_limit_rur"),
    col("credit_amount").alias("credit_amount_rur_0"),
    col("card_limit").alias("card_limit_rur_0"),
    col("credit_amount").alias("credit_amount_rur_b"),
    col("card_limit").alias("card_limit_rur_b"),
    (datediff(col("date_end_plan"), col("application_date")) / (365.25 / 12)).cast(DecimalType(26,10)).alias("remain_term"),
    when(col("product") == "ИК", 1).otherwise(0).alias("is_ik"),
    when(col("product") == "ПК", 1).otherwise(0).alias("is_pk"),
    when(col("product") == "АК", 1).otherwise(0).alias("is_ak"),
    when(col("product") == "КК", 1).otherwise(0).alias("is_kk"),
    when(col("product") == "МЗ", 1).otherwise(0).alias("is_mk"),
    when(col("product") == "Прочее", 1).otherwise(0).alias("is_ok"),
    col("coll_not_flg"),
    col("coll_real_flg"),
    col("coll_auto_flg"),
    when((col("date_end_plan") != col("date_start")) & (datediff(col("date_end_fact"), col("date_start")) / datediff(col("date_end_plan"), col("date_start")) <= 0.9), 1).otherwise(0).alias("is_advanced"),
    when((col("date_end_plan") != col("date_start")) & (datediff(col("date_end_fact"), col("date_start")) / datediff(col("date_end_plan"), col("date_start")) <= 0.75), 1).otherwise(0).alias("is_advanced_25"),
    max(coalesce(col("date_end_fact"), col("application_date"))).over(window_spec1).alias(
        "prev_date_end_fact__bank"
    ),
    max(coalesce(col("date_end_fact"), col("application_date"))).over(window_spec2).alias(
        "prev_date_end_fact"
    )
)

In [19]:
#ОК!
nvg_data_gr_bki_t2 = nvg_data_gr_bki_t1.select(
    "applicationuid",
    "application_date",
    "applicantid",
    "report_dt",
    "loan_id",
    "class_type",
    "is_bank",
    "date_start",
    "date_end_plan",
    "date_end_fact",
    "credit_amount",
    "card_limit",
    "current_debt_rur",
    "next_pmt",
    "delinquent_debt_rur",
    "product",
    "is_diff",
    "is_individual_pp",
    "currency",
    "interest_rate_month",
    "is_debtor",
    "is_guarantor",
    "credit_amount_rur",
    "card_limit_rur",
    "credit_amount_rur_0",
    "card_limit_rur_0",
    "credit_amount_rur_b",
    "card_limit_rur_b",
    "remain_term",
    "is_ik",
    "is_pk", 
    "is_ak",
    "is_kk",
    "is_mk",
    "is_ok",
    "is_advanced",
    "is_advanced_25",
    "prev_date_end_fact__bank",
    "prev_date_end_fact",
    "coll_not_flg",
    "coll_real_flg",
    "coll_auto_flg",
    when(
        (col("date_end_fact").isNotNull()) |
        (col("is_kk") == 0) & (coalesce(col("current_debt_rur"), lit(0)) + coalesce(col("delinquent_debt_rur"), lit(0)) == 0),  
        0
    ).otherwise(1).alias("is_open"),

    greatest(
        col("date_start"),
        when(col("prev_date_end_fact__bank").isNull(), col("date_start"))
        .otherwise(col("prev_date_end_fact__bank") + expr("INTERVAL 1 day"))
    ).alias("for_length__date_start__bank"),
    greatest(
        col("date_start"),
        when(col("prev_date_end_fact").isNull(), col("date_start"))
        .otherwise(col("prev_date_end_fact") + expr("INTERVAL 1 day"))
    ).alias("for_length__date_start"),
    when(col("date_end_fact").isNotNull(), col("date_end_fact")).otherwise(col("application_date"))
    .alias("for_length__date_end_fact")
)

In [20]:
window_spec_start_desc = Window.partitionBy(
    "applicationuid", "applicantid", "application_date", "is_debtor", "is_open", "is_kk", "is_bank"
).orderBy(desc("date_start"), desc(abs("loan_id")))

window_spec_start_asc = Window.partitionBy(
    "applicationuid", "applicantid", "application_date", "is_debtor", "is_open", "is_kk", "is_bank"
).orderBy(asc("date_start"), asc(abs("loan_id")))

window_spec_end_desc = Window.partitionBy(
    "applicationuid", "applicantid", "application_date", "is_debtor", "is_open", "is_kk", "is_bank"
).orderBy(desc("date_end_fact"), desc(abs("loan_id")))

window_spec_start_pk_desc = Window.partitionBy(
    "applicationuid", "applicantid", "application_date", "is_debtor", "is_pk", "is_bank"
).orderBy(desc("date_start"), desc(abs("loan_id")))

window_spec_start_pk_asc = Window.partitionBy(
    "applicationuid", "applicantid", "application_date", "is_debtor", "is_pk", "is_bank"
).orderBy(asc("date_start"), asc(abs("loan_id")))


nvg_data_gr_bki_t3 = nvg_data_gr_bki_t2.select(
    "applicationuid", "application_date", "applicantid", "report_dt", "loan_id", "class_type",
    "is_bank", "date_start", "date_end_plan", "date_end_fact", "credit_amount", "card_limit",
    "current_debt_rur", "next_pmt", "delinquent_debt_rur", "product", "is_diff", "is_individual_pp",
    "currency", "interest_rate_month", "is_debtor", "is_guarantor", "credit_amount_rur",
    "card_limit_rur", "credit_amount_rur_0", "card_limit_rur_0", "credit_amount_rur_b",
    "card_limit_rur_b", "remain_term", "is_ik", "is_pk", "is_ak", "is_kk", "is_mk", "is_ok",
    "is_advanced", "is_advanced_25", "prev_date_end_fact__bank", "prev_date_end_fact",
    "is_open", "for_length__date_start__bank", "for_length__date_start", "for_length__date_end_fact",
    "coll_not_flg", "coll_real_flg", "coll_auto_flg",
    (
        when(col("is_open") == 0, lit(0))
        .when(col("is_kk") == 1, 0.05 * col("card_limit_rur"))
        .when(col("remain_term") <= 0, lit(0))
        .when(col("is_diff") == 1, ((coalesce(col("current_debt_rur"), lit(0)) * (1 / col("remain_term") + col("interest_rate_month"))) + coalesce(col("delinquent_debt_rur"), lit(0))))
        .when(pow(1 + col("interest_rate_month"), col("remain_term")) == 0, lit(0))
        .when(1 - 1 / pow(1 + col("interest_rate_month"), col("remain_term")) == 0, lit(0))
        .otherwise(((coalesce(col("current_debt_rur"), lit(0)) * col("interest_rate_month") / (1 - 1 / pow(1 + col("interest_rate_month"), col("remain_term")))) + coalesce(col("delinquent_debt_rur"), lit(0))))
    * when(col("class_type") == 'G', lit(0.2)).otherwise(lit(1))).alias("liability_rur"),
    (
        when(col("is_open") == 0, lit(0))
        .when(col("is_bank") == 0, col("next_pmt"))
        .when(col("is_kk") == 1, 0.05 * col("card_limit_rur"))
        .when(col("remain_term") <= 0, lit(0))
        .when(col("is_diff") == 1, ((coalesce(col("current_debt_rur"), lit(0)) * (1 / col("remain_term") + col("interest_rate_month"))) + coalesce(col("delinquent_debt_rur"), lit(0))))
        .when(pow(1 + col("interest_rate_month"), col("remain_term")) == 0, lit(0))
        .when(1 - 1 / pow(1 + col("interest_rate_month"), col("remain_term")) == 0, lit(0))
        .otherwise(((coalesce(col("current_debt_rur"), lit(0)) * col("interest_rate_month") / (1 - 1 / pow(1 + col("interest_rate_month"), col("remain_term")))) + coalesce(col("delinquent_debt_rur"), lit(0))))
    * when(col("class_type") == 'G', lit(0.2)).otherwise(lit(1))).alias("liability_rur_rkk"),
    row_number().over(window_spec_start_desc).alias("start_desc_num"),
    row_number().over(window_spec_start_asc).alias("start_asc_num"),
    row_number().over(window_spec_end_desc).alias("end_desc_num"),
    row_number().over(window_spec_start_pk_desc).alias("start_pk_desc_num"),
    row_number().over(window_spec_start_pk_asc).alias("start_pk_asc_num")
) 

In [21]:
window_spec_liab_pk_desc = Window.partitionBy(
    "applicationuid", "applicantid", "application_date", "is_debtor", "is_open", "is_pk", "is_bank"
).orderBy(desc("liability_rur_rkk"), desc(abs("loan_id")))

window_spec_liab_ik_desc = Window.partitionBy(
    "applicationuid", "applicantid", "application_date", "is_debtor", "is_open", "is_ik", "is_bank"
).orderBy(desc("liability_rur_rkk"), desc(abs("loan_id")))

window_spec_liab_kk_desc = Window.partitionBy(
    "applicationuid", "applicantid", "application_date", "is_debtor", "is_open", "is_kk", "is_bank"
).orderBy(desc("liability_rur_rkk"), desc(abs("loan_id")))

window_spec_liab_ak_desc = Window.partitionBy(
    "applicationuid", "applicantid", "application_date", "is_debtor", "is_open", "is_ak", "is_bank"
).orderBy(desc("liability_rur_rkk"), desc(abs("loan_id")))

# ОК !
nvg_data_gr_bki_t4 =  nvg_data_gr_bki_t3.select(
    "applicationuid", "application_date", "applicantid", "report_dt", "loan_id", "class_type",
    "is_bank", "date_start", "date_end_plan", "date_end_fact", "credit_amount", "card_limit",
    "current_debt_rur", "next_pmt", "delinquent_debt_rur", "product", "is_diff", "is_individual_pp",
    "currency", "interest_rate_month", "is_debtor", "is_guarantor", "credit_amount_rur",
    "card_limit_rur", "credit_amount_rur_0", "card_limit_rur_0", "credit_amount_rur_b",
    "card_limit_rur_b", "remain_term", "is_ik", "is_pk", "is_ak", "is_kk", "is_mk", "is_ok",
    "is_advanced", "is_advanced_25", "prev_date_end_fact__bank", "prev_date_end_fact",
    "is_open", "for_length__date_start__bank", "for_length__date_start", "for_length__date_end_fact",
    "liability_rur", "liability_rur_rkk","start_desc_num","start_asc_num","end_desc_num","start_pk_desc_num", "start_pk_asc_num",
    row_number().over(window_spec_liab_pk_desc).alias("liab_pk_desc_num"),
    row_number().over(window_spec_liab_ik_desc).alias("liab_ik_desc_num"),
    row_number().over(window_spec_liab_kk_desc).alias("liab_kk_desc_num"),
    row_number().over(window_spec_liab_ak_desc).alias("liab_ak_desc_num")
)


In [22]:
#ОК
joined_df = nvg_data_gr_cre_dlq.alias("a").join(
    nvg_data_gr_cre_mart.alias("b"),
    (col("b.applicantid") == col("a.applicantid")) &
    (col("b.report_id") == col("a.report_id")) &
    (col("b.loan_id") == col("a.loan_id")) &
    (col("b.application_date") == col("a.application_date")),
    "inner"
).select(
    col("b.applicationuid"),
    col("a.loan_id"),
    col("b.is_bank"),
    col("b.application_date"),
    col("a.applicantid"),
    col("a.dlq_end_dt").alias("delinq_cutoff_date"),
    col("a.delinq_length"),
    when(months_between(col("b.application_date"), col("a.report_dt")) > 1, 0).otherwise(col("a.is_active")).alias("is_active")
)

result = joined_df.groupBy(
    col("applicationuid"),
    col("application_date"),
    col("loan_id"),
    col("is_bank"),
    col("applicantid"),

).agg( 
 count(when(col("delinq_cutoff_date") < col("application_date"), 1)).alias("count_any"),
 count(when((add_months(col("application_date"), -5*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")), 1)).alias("count_any_5y"),
 max(when(col("is_active") == 1, col("delinq_length")).otherwise(0)).alias("active_delinq_length"),
 count(when((add_months(col("application_date"), -5*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (1 <= col("delinq_length")) & (col("delinq_length") < 30), 1)).alias("count_1_29_5y"),
 count(when((add_months(col("application_date"), -5*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (30 <= col("delinq_length")) & (col("delinq_length") < 60), 1)).alias("count_30_59_5y"),
 count(when((add_months(col("application_date"), -5*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (60 <= col("delinq_length")) & (col("delinq_length") < 90), 1)).alias("count_60_89_5y"),
 count(when((add_months(col("application_date"), -5*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (90 <= col("delinq_length")) & (col("delinq_length") < 120), 1)).alias("count_90_119_5y"),
 count(when((add_months(col("application_date"), -5*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (120 <= col("delinq_length")), 1)).alias("count_120_5y"),
 count(when((add_months(col("application_date"), -3*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (1 <= col("delinq_length")) & (col("delinq_length") < 30), 1)).alias("count_1_29_3y"),
 count(when((add_months(col("application_date"), -3*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (30 <= col("delinq_length")) & (col("delinq_length") < 60), 1)).alias("count_30_59_3y"),
 count(when((add_months(col("application_date"), -3*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (60 <= col("delinq_length")) & (col("delinq_length") < 90), 1)).alias("count_60_89_3y"),
 count(when((add_months(col("application_date"), -3*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (90 <= col("delinq_length")) & (col("delinq_length") < 120), 1)).alias("count_90_119_3y"),
 count(when((add_months(col("application_date"), -3*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (120 <= col("delinq_length")), 1)).alias("count_120_3y"),
 count(when((add_months(col("application_date"), -1*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (1 <= col("delinq_length")) & (col("delinq_length") < 30), 1)).alias("count_1_29_1y"),
 count(when((add_months(col("application_date"), -1*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (30 <= col("delinq_length")) & (col("delinq_length") < 60), 1)).alias("count_30_59_1y"),
 count(when((add_months(col("application_date"), -1*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (60 <= col("delinq_length")) & (col("delinq_length") < 90), 1)).alias("count_60_89_1y"),
 count(when((add_months(col("application_date"), -1*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (90 <= col("delinq_length")) & (col("delinq_length") < 120), 1)).alias("count_90_119_1y"),
 count(when((add_months(col("application_date"), -1*12) <= col("delinq_cutoff_date")) & (col("delinq_cutoff_date") < col("application_date")) & (120 <= col("delinq_length")), 1)).alias("count_120_1y")
)


In [23]:
#ОК
nvg_data_gr_bki_t6 = nvg_data_gr_cre_dlq.alias("a").join(
    nvg_data_gr_cre_mart.alias("b"),
    (col("b.applicantid") == col("a.applicantid")) &
    (col("b.report_id") == col("a.report_id")) &
    (col("b.loan_id") == col("a.loan_id")) &
    (col("b.application_date") == col("a.application_date")),
    "inner"
).select(
    col("b.applicationuid"),
    col("a.loan_id"),
    col("b.is_bank"),
    col("b.application_date"),
    col("a.applicantid"),
    col("b.date_start"),
    when((col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)")) < col("b.date_start"),
         col("b.date_start")).otherwise(col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)")).alias("delinquency_start_dt"),
    datediff(least(col("a.dlq_end_dt"), expr("ADD_MONTHS(b.date_start, 6)")) + expr("interval 1 day"),
             when((col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)")) < col("b.date_start"),
                  col("b.date_start")).otherwise(col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)"))).alias("delinq_length6m"),
    datediff(least(col("a.dlq_end_dt"), expr("ADD_MONTHS(b.date_start, 12)")) + expr("interval 1 day"),
             when((col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)")) < col("b.date_start"),
                  col("b.date_start")).otherwise(col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)"))).alias("delinq_length12m"),
    col("a.delinq_length"),
    when((months_between(when((col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)")) < col("b.date_start"),
                               col("b.date_start")).otherwise(col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)")),
                        col("b.date_start")) < 2) & (col("a.delinq_length") > 90), 1).otherwise(0).alias("first_payment_default"),
    when((months_between(when((col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)")) < col("b.date_start"),
                               col("b.date_start")).otherwise(col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)")),
                        col("b.date_start")) > 2) &
         (months_between(when((col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)")) < col("b.date_start"),
                               col("b.date_start")).otherwise(col("a.dlq_end_dt") - expr("interval 1 day * (-1 + a.delinq_length)")),
                        col("b.date_start")) < 3) &
         (col("a.delinq_length") > 90), 1).otherwise(0).alias("second_payment_default")
)

In [24]:
# ОК!
nvg_data_gr_bki_t7 = nvg_data_gr_bki_t6.groupBy(
    "applicationuid",
    "application_date",
    "loan_id",
    "is_bank",
    "applicantid"
).agg(
    count(when(
        (col("is_bank") == 1) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 6)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length6m") > 0), 1)
    ).alias("bank_count_first_6m_1"),

    count(when(
        (col("is_bank") == 1) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 6)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length6m") > 30), 1)
    ).alias("bank_count_first_6m_30"),

    count(when(
        (col("is_bank") == 1) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 6)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length6m") > 60), 1)
    ).alias("bank_count_first_6m_60"),

    count(when(
        (col("is_bank") == 1) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 6)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length6m") > 90), 1)
    ).alias("bank_count_first_6m_90"),

    count(when(
        (col("is_bank") == 1) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 12)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length12m") > 0), 1)
    ).alias("bank_count_first_12m_1"),

    count(when(
        (col("is_bank") == 1) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 12)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length12m") > 30), 1)
    ).alias("bank_count_first_12m_30"),

    count(when(
        (col("is_bank") == 1) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 12)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length12m") > 60), 1)
    ).alias("bank_count_first_12m_60"),

    count(when(
        (col("is_bank") == 1) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 12)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length12m") > 90), 1)
    ).alias("bank_count_first_12m_90"),

    count(when(
        (col("is_bank") == 0) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 6)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length6m") > 0), 1)
    ).alias("bki_count_first_6m_1"),

    count(when(
        (col("is_bank") == 0) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 6)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length6m") > 30), 1)
    ).alias("bki_count_first_6m_30"),

    count(when(
        (col("is_bank") == 0) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 6)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length6m") > 60), 1)
    ).alias("bki_count_first_6m_60"),

    count(when(
        (col("is_bank") == 0) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 6)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length6m") > 90), 1)
    ).alias("bki_count_first_6m_90"),

    count(when(
        (col("is_bank") == 0) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 12)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length12m") > 0), 1)
    ).alias("bki_count_first_12m_1"),

    count(when(
        (col("is_bank") == 0) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 12)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length12m") > 30), 1)
    ).alias("bki_count_first_12m_30"),

    count(when(
        (col("is_bank") == 0) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 12)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length12m") > 60), 1)
    ).alias("bki_count_first_12m_60"),

    count(when(
        (col("is_bank") == 0) &
        (col("delinquency_start_dt") <= add_months(col("date_start"), 12)) &
        (col("delinquency_start_dt") < col("application_date")) &
        (col("delinq_length12m") > 90), 1)
    ).alias("bki_count_first_12m_90"),

    max(col("first_payment_default")).alias("first_payment_default"),
    max(col("second_payment_default")).alias("second_payment_default")
).select("applicationuid",
    "application_date",
    "loan_id",
    "is_bank",
    "applicantid",
    "bank_count_first_6m_1",
    "bank_count_first_6m_30",
    "bank_count_first_6m_60",
    "bank_count_first_6m_90",
    "bank_count_first_12m_1",
    "bank_count_first_12m_30",
    "bank_count_first_12m_60",
    "bank_count_first_12m_90",
    "bki_count_first_6m_1",
    "bki_count_first_6m_30",
    "bki_count_first_6m_60",
    "bki_count_first_6m_90",
    "bki_count_first_12m_1",
    "bki_count_first_12m_30",
    "bki_count_first_12m_60",
    "bki_count_first_12m_90",
    "first_payment_default",
    "second_payment_default"
        )

In [25]:
#ОК
nvg_data_gr_bki_payment = nvg_data_gr_cre_dlq.alias("a").join(
    nvg_data_gr_cre_mart.alias("b"),
    (col("b.applicantid") == col("a.applicantid")) &
    (col("b.report_id") == col("a.report_id")) &
    (col("b.loan_id") == col("a.loan_id")) &
    (col("b.application_date") == col("a.application_date")),
    "inner"
).join(
    nvg_data_gr_bki_t4.alias("t4"),
    (col("b.applicantid") ==col("t4.applicantid")) &
    (col("b.loan_id") == col("t4.loan_id")) &
    (col("b.application_date") == col("t4.application_date")),
    "inner"
).where(
    col("t4.is_bank") == 0
).groupBy(
    "b.applicationuid",
    "b.application_date",
    "a.loan_id",
    "b.is_bank",
    "a.applicantid",
    "t4.is_ik",
    "t4.is_pk",
    "t4.is_kk",
    "t4.liability_rur_rkk",
    "t4.liab_pk_desc_num",
    "t4.liab_ik_desc_num",
    "t4.liab_kk_desc_num"
).agg(
count(when(
    (add_months(col("b.application_date"), -12) <= col("a.dlq_end_dt")) &
    (col("a.dlq_end_dt") < col("b.application_date")) &
    (1 <= col("a.delinq_length")) & (col("a.delinq_length") < 30), 1)
).alias("bank_count_1_29_1y"),
count(when(
    (add_months(col("b.application_date"), -12) <= col("a.dlq_end_dt")) &
    (col("a.dlq_end_dt") < col("b.application_date")) &
    (30 <= col("a.delinq_length")) & (col("a.delinq_length") < 60), 1)
).alias("bank_count_30_59_1y"),

count(when(
    (add_months(col("b.application_date"), -12) <= col("a.dlq_end_dt")) &
    (col("a.dlq_end_dt") < col("b.application_date")) &
    (60 <= col("a.delinq_length")) & (col("a.delinq_length") < 90), 1)
).alias("bank_count_60_89_1y"),

count(when(
    (add_months(col("b.application_date"), -12) <= col("a.dlq_end_dt")) &
    (col("a.dlq_end_dt") < col("b.application_date")) &
    (90 <= col("a.delinq_length")) & (col("a.delinq_length") < 120), 1)
).alias("bank_count_90_119_1y"),

count(when(
    (add_months(col("b.application_date"), -12) <= col("a.dlq_end_dt")) &
    (col("a.dlq_end_dt") < col("b.application_date")) &
    (120 <= col("a.delinq_length")), 1)
).alias("bank_count_120_1y")

).select(
    col("b.applicationuid"),
    col("b.application_date"),
    col("a.loan_id"),
    col("b.is_bank"),
    col("a.applicantid"),
    col("t4.is_ik"),
    col("t4.is_pk"),
    col("t4.is_kk"),
    col("t4.liability_rur_rkk"),
    col("t4.liab_pk_desc_num").alias("liab_pk_desc_any_num"),
    col("t4.liab_ik_desc_num").alias("liab_ik_desc_any_num"),
    col("t4.liab_kk_desc_num").alias("liab_kk_desc_any_num"),
    col("bank_count_1_29_1y"),
    col("bank_count_30_59_1y"),
    col("bank_count_60_89_1y"),
    col("bank_count_90_119_1y"),
    col("bank_count_120_1y")
    
)

In [26]:
#ОК
result_df = nvg_data_gr_bki_t4.alias("a") \
    .join(result.alias("delinq"),
          (col("delinq.application_date") == col("a.application_date")) & 
          (col("delinq.loan_id") == col("a.loan_id")) & 
          (col("delinq.is_bank") == col("a.is_bank")) & 
          (col("delinq.applicantid") == col("a.applicantid")) & 
          (col("delinq.applicationuid") == col("a.applicationuid")),
          "left"
         ) \
    .join(nvg_data_gr_bki_t7.alias("del_first"),
          (col("del_first.application_date") == col("a.application_date")) & 
          (col("del_first.loan_id") == col("a.loan_id")) & 
          (col("del_first.is_bank") == col("a.is_bank")) & 
          (col("del_first.applicantid") == col("a.applicantid")) & 
          (col("del_first.applicationuid") == col("a.applicationuid")),
          "left"
         ) \
    .join(nvg_data_gr_cre_mart.select(
            "application_date",
            "applicantid",
            coalesce(col("inquiry1week"), lit(0)).alias("inquiry1week"),
            coalesce(col("inquiry1month"), lit(0)).alias("inquiry1month"),
            coalesce(col("inquiry3month"), lit(0)).alias("inquiry3month"),
            coalesce(col("inquiry6month"), lit(0)).alias("inquiry6month")
        ).distinct().alias("cre"),
          (col("cre.application_date") == col("a.application_date")) & 
          (col("cre.applicantid") == col("a.applicantid")),
          "left"
         ) \
    .join(nvg_data_gr_bki_payment.alias("pay"),
          (col("pay.application_date") == col("a.application_date")) & 
          (col("pay.loan_id") == col("a.loan_id")) & 
          (col("pay.is_bank") == col("a.is_bank")) & 
          (col("pay.applicantid") == col("a.applicantid")) & 
          (col("pay.applicationuid") == col("a.applicationuid")),
          "left"
         ) \
    .select(
        "a.*",
        "delinq.*",
        "del_first.*",
        col("cre.application_date").alias("cre_application_date"),
        col("cre.applicantid").alias("cre_applicantid"),
        col("cre.inquiry1week").alias("inquiry1week"),
        col("cre.inquiry1month").alias("inquiry1month"),
        col("cre.inquiry3month").alias("inquiry3month"),
        col("cre.inquiry6month").alias("inquiry6month"),
        "pay.*"
    )

In [27]:
##### Агрегаты 
nvg_data_gr_bki_aggr_spark_df = result_df.groupBy(
    col("a.applicationuid"),
    col("a.application_date"),
    col("a.applicantid")
).agg(
    #------------------------------------------------------------------------
    # количество открытых
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 1), 1)).cast("int").alias("cnt_opened"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 1) & (col("a.is_ik") == 1), 1)).cast("int").alias("mrtg_open"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 1) & (col("a.is_pk") == 1), 1)).cast("int").alias("cl_open"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 1) & (col("a.is_ak") == 1), 1)).cast("int").alias("auto_open"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 1) & (col("a.is_kk") == 1), 1)).cast("int").alias("card_open"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 1) & (col("a.is_mk") == 1), 1)).cast("int").alias("micro_open"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 1) & (col("a.is_ok") == 1), 1)).cast("int").alias("other_open"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 1) & (F.add_months(col("a.application_date"), -6) <= col("date_start")), 1)).cast("int").alias("cnt_opened_6m"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 1) & (F.add_months(col("a.application_date"), -12) <= col("date_start")), 1)).cast("int").alias("cnt_opened_1y"),
        #------------------------------------------------------------------------
    # количество закрытых    
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0), 1)).cast("int").alias("cnt_closed"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_ik") == 1), 1)).cast("int").alias("mrtg_closed"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1), 1)).cast("int").alias("cl_closed"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_ak") == 1), 1)).cast("int").alias("auto_closed"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 1), 1)).cast("int").alias("card_closed"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_mk") == 1), 1)).cast("int").alias("micro_closed"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_ok") == 1), 1)).cast("int").alias("other_closed"),    
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") >= 500000), 1)).cast("int").alias("cl_closed_gr500k"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") >= 100000), 1)).cast("int").alias("cl_closed_gr100k"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") >= 50000), 1)).cast("int").alias("cl_closed_gr50k"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") < 50000), 1)).cast("int").alias("cl_closed_ls50k"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (F.add_months(col("a.application_date"),-6) <= col("a.date_end_fact")), 1)).cast("int").alias("cnt_agr_closed_last6m"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (F.add_months(col("a.application_date"),-12) <= col("a.date_end_fact")), 1)).cast("int").alias("cnt_agr_closed_last1y"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (F.add_months(col("a.application_date"),-12*3) <= col("a.date_end_fact")), 1)).cast("int").alias("cnt_agr_closed_last3y"),
    #--Классический вариант--
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (coalesce(col("delinq.count_any"), lit(0)) == 0), 1)).cast("int").alias("cnt_closed_no_del"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (coalesce(col("delinq.count_any"), lit(0)) == 0), 1)).cast("int").alias("cl_closed_no_del"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_ik") == 1) & (coalesce(col("delinq.count_any"), lit(0)) == 0), 1)).cast("int").alias("mrtg_closed_no_del"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_ok") == 1) & (coalesce(col("delinq.count_any"), lit(0)) == 0), 1)).cast("int").alias("other_closed_no_del"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (coalesce(col("delinq.count_any"), lit(0)) == 0) & (col("a.credit_amount_rur_0") >= 500000), 1)).cast("int").alias("cnt_closed_gr500k"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (coalesce(col("delinq.count_any"), lit(0)) == 0) & (col("a.credit_amount_rur_0") >= 100000), 1)).cast("int").alias("cnt_closed_gr100k"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (coalesce(col("delinq.count_any"), lit(0)) == 0) & (col("a.credit_amount_rur_0") >= 50000), 1)).cast("int").alias("cnt_closed_gr50k"),
    #------------------------------------------------------------------------
    #количество закрытых досрочно
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_advanced") == 1) & (col("a.is_ik") == 1), 1)).cast("int").alias("mrtg_adv_closed"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_advanced") == 1) & (col("a.is_pk") == 1), 1)).cast("int").alias("cl_adv_closed"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_advanced") == 1) & (col("a.is_ok") == 1), 1)).cast("int").alias("other_adv_closed"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_advanced") == 1) & (col("a.is_kk") == 0), 1)).cast("int").alias("cnt_adv_repayment"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_advanced") == 1) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") >= 500000), 1)).cast("int").alias("cnt_adv_repayment_gr_500k"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_advanced") == 1) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") >= 100000), 1)).cast("int").alias("cnt_adv_repayment_gr_100k"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_advanced") == 1) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") >= 50000), 1)).cast("int").alias("cnt_adv_repayment_gr_50k"),
    count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_advanced") == 1) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") < 50000), 1)).cast("int").alias("cnt_adv_repayment_ls_50k"),
    #------------------------------------------------------------------------
    #--Доп.агрегаты для витрины досрочного погашение
     (count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (col("a.is_advanced") == 1), 1)) /
     count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0), 1))).cast("decimal(38,16)").alias("ratio_adv_repayment"),
    (count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") >= 500000) & (col("a.is_advanced") == 1), 1)) /
     count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") >= 500000), 1))).cast("decimal(38,16)").alias("ratio_adv_repayment_adv_gr_500k"),
    (count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") >= 100000) & (col("a.is_advanced") == 1), 1)) /
     count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") >= 100000), 1))).cast("decimal(38,16)").alias("ratio_adv_repayment_adv_gr_100k"),
    (count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") >= 50000) & (col("a.is_advanced") == 1), 1)) /
     count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") >= 50000), 1))).cast("decimal(38,16)").alias("ratio_adv_repayment_adv_gr_50k"),
    (count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") < 50000) & (col("a.is_advanced") == 1), 1)) /
     count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_kk") == 0) & (col("a.credit_amount_rur_0") < 50000), 1))).cast("decimal(38,16)").alias("ratio_adv_repayment_adv_ls_50k"),
    (count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.is_advanced") == 1), 1)) /
     count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1), 1))).cast("decimal(38,16)").alias("ratio_cl_closed"),
    (count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") >= 500000) & (col("a.is_advanced") == 1), 1)) /
     count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") >= 500000), 1))).cast("decimal(38,16)").alias("ratio_cl_closed_gr500k"),
    (count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") >= 100000) & (col("a.is_advanced") == 1), 1)) /
     count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") >= 100000), 1))).cast("decimal(38,16)").alias("ratio_cl_closed_gr100k"),
    (count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") >= 50000) & (col("a.is_advanced") == 1), 1)) /
     count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") >= 50000), 1))).cast("decimal(38,16)").alias("ratio_cl_closed_gr50k"),
    (count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") < 50000) & (col("a.is_advanced") == 1), 1)) /
     count(when((col("a.is_debtor") == 1) & (col("a.is_open") == 0) & (col("a.is_pk") == 1) & (col("a.credit_amount_rur_0") < 50000), 1))).cast("decimal(38,16)").alias("ratio_cl_closed_ls50k"),
    coalesce(avg(when((col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_advanced") == 1), col("a.credit_amount_rur_0"))), lit(0)).cast("decimal(38,16)").alias("avg_liab_sum_total_adv_agr"),
    avg(when((col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 0), (datediff(col("a.date_end_plan"), col("a.date_end_fact")) / 365.25 / 12))).cast("decimal(38,16)").alias("avg_diff_plan_fact_closed"),  # Разница в  10-м знаке 
    F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("is_advanced") == 1),col("a.credit_amount_rur_0")).otherwise(0)).cast(DecimalType(28, 6)).alias("total_liab_sum_bank_adv_agr"),
    F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_advanced") == 1), col("a.credit_amount_rur_0") ).otherwise(0)).cast(DecimalType(28, 6)).alias("total_liab_sum_bki_adv_agr"),
    
    count( when((col("a.is_debtor") == 1) & (col("a.is_advanced_25") == 1) & (col("a.is_kk") == 0),1)).cast(IntegerType()).alias("cnt_adv25_closed"),
    
    F.sum( when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_advanced_25") == 1),col("a.credit_amount_rur_0")).otherwise(0)).cast(DecimalType(28, 6)).alias("total_liab_sum_bank_adv25_agr"),
    
    F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_advanced_25") == 1),col("credit_amount_rur_0")).otherwise(0)).cast(DecimalType(28, 6)).alias("total_liab_sum_bki_adv25_agr"),
    
    #-- количество на просрочке
    #--Классический вариант-----
    
    count(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("delinq.active_delinq_length") > 0) &(col("a.delinquent_debt_rur") >= 500), 1)).cast(IntegerType()).alias("curdel_0plus_bank"),
    
    count( when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("delinq.active_delinq_length") > 0) &(col("a.delinquent_debt_rur") >= 500), 1 )).cast(IntegerType()).alias("curdel_0plus_bki"),
    
    count(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("delinq.active_delinq_length") >= 30) &(col("a.delinquent_debt_rur") >= 500), 1 )).cast(IntegerType()).alias("curdel_30plus_bank"),
    
    count(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("delinq.active_delinq_length") >= 30) &(col("a.delinquent_debt_rur") >= 500),1)).cast(IntegerType()).alias("curdel_30plus_bki"),
    
    count(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("delinq.active_delinq_length") >= 60) &(col("a.delinquent_debt_rur") >= 500),1 )).cast(IntegerType()).alias("curdel_60plus_bank"),
    
    count( when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("delinq.active_delinq_length") >= 60) &(col("a.delinquent_debt_rur") >= 500),1)).cast(IntegerType()).alias("curdel_60plus_bki"),
    
    count(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("delinq.active_delinq_length") > 0) &(0 < col("a.delinquent_debt_rur")) & (col("a.delinquent_debt_rur") < 500),1)).cast(IntegerType()).alias("curdel_tech_bank"),
    
    count(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("delinq.active_delinq_length") > 0) &(0 < col("a.delinquent_debt_rur")) & (col("a.delinquent_debt_rur") < 500),1) ).cast(IntegerType()).alias("curdel_tech_bki"),
    #------------------------------------------------------------------------
    #-- просрочки
    #--Классический вариант-----
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_1_29_5y"))), lit(0)).cast("int").alias("bank_1_29_5y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_30_59_5y"))), lit(0)).cast("int").alias("bank_30_59_5y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_60_89_5y"))), lit(0)).cast("int").alias("bank_60_89_5y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_90_119_5y"))), lit(0)).cast("int").alias("bank_90_119_5y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_120_5y"))), lit(0)).cast("int").alias("bank_120plus_5y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_1_29_3y"))), lit(0)).cast("int").alias("bank_1_29_3y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_30_59_3y"))), lit(0)).cast("int").alias("bank_30_59_3y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_60_89_3y"))), lit(0)).cast("int").alias("bank_60_89_3y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_90_119_3y"))), lit(0)).cast("int").alias("bank_90_119_3y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_120_3y"))), lit(0)).cast("int").alias("bank_120plus_3y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_1_29_1y"))), lit(0)).cast("int").alias("bank_1_29_1y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_30_59_1y"))), lit(0)).cast("int").alias("bank_30_59_1y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_60_89_1y"))), lit(0)).cast("int").alias("bank_60_89_1y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_90_119_1y"))), lit(0)).cast("int").alias("bank_90_119_1y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("delinq.count_120_1y"))), lit(0)).cast("int").alias("bank_120plus_1y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_1_29_5y"))), lit(0)).cast("int").alias("bki_1_29_5y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_30_59_5y"))), lit(0)).cast("int").alias("bki_30_59_5y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_60_89_5y"))), lit(0)).cast("int").alias("bki_60_89_5y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_90_119_5y"))), lit(0)).cast("int").alias("bki_90_119_5y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_120_5y"))), lit(0)).cast("int").alias("bki_120plus_5y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_1_29_3y"))), lit(0)).cast("int").alias("bki_1_29_3y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_30_59_3y"))), lit(0)).cast("int").alias("bki_30_59_3y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_60_89_3y"))), lit(0)).cast("int").alias("bki_60_89_3y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_90_119_3y"))), lit(0)).cast("int").alias("bki_90_119_3y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_120_3y"))), lit(0)).cast("int").alias("bki_120plus_3y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_1_29_1y"))), lit(0)).cast("int").alias("bki_1_29_1y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_30_59_1y"))), lit(0)).cast("int").alias("bki_30_59_1y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_60_89_1y"))), lit(0)).cast("int").alias("bki_60_89_1y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_90_119_1y"))), lit(0)).cast("int").alias("bki_90_119_1y_debtor"),
    coalesce(F.sum(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1), col("delinq.count_120_1y"))), lit(0)).cast("int").alias("bki_120plus_1y_debtor"),
    #------------------------------------------------------------------------
    #-- текущая просрочка   
    F.max(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("delinq.active_delinq_length") > 0),
            col("delinq.active_delinq_length")).otherwise(0)).cast(IntegerType()).alias("maxcurdel_bank"),
    
    F.max(when((col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("delinq.active_delinq_length") > 0),
            col("delinq.active_delinq_length")).otherwise(0)).cast(IntegerType()).alias("maxcurdel_bki"),
    #------------------------------------------------------------------------
    #-- срок, NULL будет означать отсутствие соответствующих договоров
    #avg
    #avg
    #avg
    avg(
        when(
            (col("a.is_debtor") == 1) & 
            (col("a.is_kk") == 0),
            datediff((col("a.date_end_fact") + expr("interval 1 day")), col("a.date_start")) / (365.25 / 12)
        )
    ).cast("decimal(38,16)").alias("avg_term_fact_closed"),
    
    avg(
        when(
            (col("a.is_debtor") == 1) & 
            (col("a.is_kk") == 0) &
            (col("a.is_open") == 0),
            datediff((col("a.date_end_plan") + expr("interval 1 day")), col("a.date_start")) / (365.25 / 12)
        )
    ).cast("decimal(38,16)").alias("avg_term_plan_closed"),
    
    avg(
        when(
            (col("a.is_debtor") == 1) & 
            (col("a.is_kk") == 0) &
            (col("a.is_open") == 1),
            datediff((col("a.date_end_plan") + expr("interval 1 day")), col("a.date_start")) / (365.25 / 12)
        )
    ).cast("decimal(38,16)").alias("avg_term_plan_open"),
    #------------------------------------------------------------------------
    #-- длина истории с самого ранеего договора

    F.max(
        when(
            (col("a.is_bank") == 1) &
            (col("a.is_debtor") == 1),
            datediff((col("a.application_date") + expr("interval 1 day")), col("a.date_start")) / (365.25 / 12)
        ).otherwise(0)
    ).cast("decimal(38,16)").alias("length_bank"),
    
    F.max(
        when(
            (col("a.is_bank") == 0) &
            (col("a.is_debtor") == 1),
            datediff((col("a.application_date") + expr("interval 1 day")), col("a.date_start")) / (365.25 / 12)
        ).otherwise(0)
    ).cast("decimal(38,16)").alias("length_bki"),
    #max
    #max
    #-- длина истории по-честному с перерывами
    #sum
    #sum
    F.sum(
        when(
            (col("a.is_bank") == 1) &
            (col("a.is_debtor") == 1) &
            (col("a.for_length__date_end_fact") >= col("a.for_length__date_start__bank")),
            datediff((col("a.for_length__date_end_fact") + expr("interval 1 day")), col("a.for_length__date_start__bank")) / (365.25 / 12)
        )
    ).cast("decimal(38,16)").alias("length_bank_total"),
    
    F.sum(
        when(
            (col("a.is_debtor") == 1) &
            (col("a.for_length__date_end_fact") >= col("a.for_length__date_start")),
            datediff((col("a.for_length__date_end_fact") + expr("interval 1 day")), col("a.for_length__date_start")) / (365.25 / 12)
        )
    ).cast("decimal(38,16)").alias("length_total"),
        #-- флаги ипотечных кредитов
    max(
        when(
            (col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_ik") == 1) & (col("a.is_open") == 1) &
            (coalesce(col("delinq.count_any_5y"), F.lit(0)) == 0) &
            (col("a.date_start") < F.add_months(col("a.application_date"), -6)) &
            (coalesce(col("delinq.active_delinq_length"), F.lit(0)) == 0) &
            (coalesce(col("a.delinquent_debt_rur"), F.lit(0)) == 0),
            1
        ).otherwise(0)
    ).cast("tinyint").alias("good_mortgage_bank_open"),
    
    max(
        when(
            (col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("a.is_ik") == 1) & (col("a.is_open") == 1) &
            (coalesce(col("delinq.count_any_5y"), F.lit(0)) == 0) &
            (col("a.date_start") < F.add_months(col("a.application_date"), -6)) &
            (coalesce(col("delinq.active_delinq_length"), F.lit(0)) == 0) &
            (coalesce(col("a.delinquent_debt_rur"), F.lit(0)) == 0),
            1
        ).otherwise(0)
    ).cast("tinyint").alias("good_mortgage_bki_open"),
    
    max(
        when(
            (col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_ik") == 1) & (col("a.is_open") == 0) &
            (coalesce(col("delinq.count_any"), F.lit(0)) == 0),
            1
        ).otherwise(0)
    ).cast("tinyint").alias("good_mortgage_bank_closed"),
    
    max(
        when(
            (col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("a.is_ik") == 1) & (col("a.is_open") == 0) &
            (coalesce(col("delinq.count_any"), F.lit(0)) == 0),
            1
        ).otherwise(0)
    ).cast("tinyint").alias("good_mortgage_bki_closed"),
    #------------------------------------------------------------------------
    #-- сумма кредита 
        coalesce(
        avg(
            when(
                (col("a.is_debtor") == 1) & (col("a.is_kk") == 0),
                col("a.credit_amount_rur_0")
            )
        ),
        lit(0)
    ).cast(DecimalType(38, 16)).alias("avg_liab_sum_total"),
    
    coalesce(
        avg(
            when(
                (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 1),
                col("a.credit_amount_rur_0")
            )
        ),
        lit(0)
    ).cast(DecimalType(38, 16)).alias("avg_liab_sum_total_open_agr"),
        coalesce(
        avg(
            when(
                (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 0),
                col("a.credit_amount_rur_0")
            )
        ),
        lit(0)
    ).cast(DecimalType(38, 16)).alias("avg_liab_sum_total_closed_agr"),
    
    max(
        when(
            (col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 1),
            col("a.credit_amount_rur_0")
        ).otherwise(0)
    ).cast(DecimalType(28, 6)).alias("max_liab_sum_bank_open"),
    max(
        when(
            (col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 1),
            col("a.credit_amount_rur_0")
        ).otherwise(0)
    ).cast(DecimalType(28, 6)).alias("max_liab_sum_bki_open"),
    
    max(
        when(
            (col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 0),
            col("a.credit_amount_rur_0")
        ).otherwise(0)
    ).cast(DecimalType(28, 6)).alias("max_liab_sum_bank_closed"),
    
    max(
        when(
            (col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 0),
            col("a.credit_amount_rur_0")
        ).otherwise(0)
    ).cast(DecimalType(28, 6)).alias("max_liab_sum_bki_closed"),
    
    F.sum(
        when(
            (col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 1),
            col("a.credit_amount_rur_0")
        ).otherwise(0)
    ).cast(DecimalType(28, 6)).alias("total_liab_sum_bank_open_agr"),
    
    F.sum(
        when(
            (col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 1),
            col("a.credit_amount_rur_0")
        ).otherwise(0)
    ).cast(DecimalType(28, 6)).alias("total_liab_sum_bki_open_agr"),
    
    F.sum(
        when(
            (col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 0),
            col("a.credit_amount_rur_0")
        ).otherwise(0)
    ).cast(DecimalType(28, 6)).alias("total_liab_sum_bank_closed_agr"),
    
    F.sum(
        when(
            (col("a.is_bank") == 0) & (col("a.is_debtor") == 1) & (col("a.is_kk") == 0) & (col("a.is_open") == 0),
            col("a.credit_amount_rur_0")
        ).otherwise(0)
    ).cast(DecimalType(28, 6)).alias("total_liab_sum_bki_closed_agr"),
    
    #------------------------------------------------------------------------
    #-- текущая задолженнсть; отдельные агрегаты сделаны намерено
    
    F.sum(
        when(
            col("a.is_debtor") == 1,
            coalesce(col("a.current_debt_rur"), lit(0)) + coalesce(col("a.delinquent_debt_rur"), lit(0))
        ).otherwise(0)
    ).cast("decimal(38,16)").alias("outstanding"),
    
    F.sum(
        when(
            (col("a.is_debtor") == 1) & 
            (col("a.is_open") == 1),
            coalesce(col("a.current_debt_rur"), lit(0)) + coalesce(col("a.delinquent_debt_rur"), lit(0))
        ).otherwise(0)
    ).cast("decimal(38,16)").alias("outstanding_open"),
    
    F.avg(
        when(
            (col("a.is_debtor") == 1) & 
            (col("a.delinquent_debt_rur") > 0),
            col("a.delinquent_debt_rur")
        )
    ).cast("decimal(38,16)").alias("avg_delinquent_debt_total"),
    
    F.sum(
        when(
            col("a.is_debtor") == 1,
            col("a.delinquent_debt_rur")
        ).otherwise(0)
    ).cast("decimal(28,6)").alias("curr_arrear_rur"),
    
    #------------------------------------------------------------------------
    #-- Утилизация: сумма задолженности / сумма лимитов (по курсу на последнюю отчётную дату; в текущей ситуации - конец месяца)
    #-- NULL будет означать отсутствие карт
    #- по открытым картам
    
       when(F.sum(when((col("a.is_debtor") == 1) & (col("a.is_kk") == 1) & (col("a.is_open") == 1),col("a.card_limit_rur_b"))) > 0,
        (F.sum( when((col("a.is_debtor") == 1) & (col("a.is_kk") == 1) & (col("a.is_open") == 1),coalesce(col("a.current_debt_rur"), lit(0)) + coalesce(col("a.delinquent_debt_rur"), lit(0)))
            ) / F.sum(
                when(
                    (col("a.is_debtor") == 1) & (col("a.is_kk") == 1) & (col("a.is_open") == 1),
                    col("a.card_limit_rur_b")
                )
            )
        ).cast(DecimalType(38, 16))
    ).otherwise(None).alias("utilization_avg"),
    
    #------------------------------------------------------------------------
    #-- считаем количество разных видов обязательств: 
    #-- ипотека, потреб, авто, карта, поручительство, в БКИ - микро, прочее
    
        (
        coalesce(max(col("a.is_guarantor")), lit(0)) +
        coalesce(max(when(col("a.is_debtor") == 1, col("a.is_ik"))), lit(0)) +
        coalesce(max(when(col("a.is_debtor") == 1, col("a.is_pk"))), lit(0)) +
        coalesce(max(when(col("a.is_debtor") == 1, col("a.is_ak"))), lit(0)) +
        coalesce(max(when(col("a.is_debtor") == 1, col("a.is_kk"))), lit(0)) +
        coalesce(max(when(col("a.is_debtor") == 1, col("a.is_mk"))), lit(0)) +
        coalesce(max(when(col("a.is_debtor") == 1, col("a.is_ok"))), lit(0))
    ).cast(IntegerType()).alias("cnt_liability_types"),
    
    #-- платёжная нагрузка считается по всем договорам (внутренним, внешним), поручительствам
    coalesce(F.sum(col("a.liability_rur")), lit(0)).cast("decimal(38,16)").alias("total_curr_payment"),
    #------------------------------------------------------------------------
    #-- если все обязательства - это поручительства
    F.when(count(when(col('a.is_guarantor') == 1, 1)) == count('*'), 1).otherwise(0).cast('tinyint').alias('only_guarantees'),
    #-- общее количество запросов в БКИ берётся из данных CRE
    #-- если данные в CRE есть, но они NULL, то заменям на 0
    #-- если данные в CRE отсутствуют, то NULL
    F.max(col('inquiry1week')).cast('int').alias('cnt_inquiry_last1w'),
    F.max(col('inquiry1month')).cast('int').alias('cnt_inquiry_last1m'),
    F.max(col('inquiry3month')).cast('int').alias('cnt_inquiry_last3m'),
    F.max(col('inquiry6month')).cast('int').alias('cnt_inquiry_last6m'),
    #------------------------------------------------------------------------
    #-- Агрегаты для ПКБ-----------------------------------------------------
    #------------------------------------------------------------------------
    
    F.max(col('a.report_dt')).cast('string').alias('CRE_DATE'),
    coalesce(F.sum(col('a.liability_rur_rkk')), lit(0)).cast('decimal(38,16)').alias('total_curr_payment_rkk'),
    coalesce(F.sum(when(col('a.is_bank') == 1, col('a.liability_rur'))), lit(0)).cast('decimal(38,16)').alias('BANK_LIAB_PAYM'),
    coalesce(F.sum(when(col('a.is_bank') == 0, col('a.liability_rur_rkk'))), lit(0)).cast('decimal(38,16)').alias('BKI_LIAB_PAYM'),
     F.max(
        when(
            (col("a.is_bank") == 0) &
            (col("a.is_debtor") == 1) &
            (col("a.is_kk") == 0) &
            (col("a.is_open") == 0) &
            (col("a.end_desc_num") == 1),
            (datediff(col("a.application_date"), col("a.date_end_fact"))) / lit(30.437500)
            
        )
    ).cast("decimal(38,10)").alias("bki_loan_lst_close_m_term"),
      F.max(
        when(
            (col("a.is_bank") == 0) &
            (col("a.is_debtor") == 1) &
            (col("a.is_kk") == 0) &
            (col("a.is_open") == 1) &
            (col("a.start_desc_num") == 1),
            col("a.remain_term")
        )
    ).cast("decimal(38,16)").alias("bki_loan_till_lst_close_m_term"),
    
    F.max(
        when(
            (col("a.is_bank") == 1) &
            (col("a.is_debtor") == 1) &
            (col("a.is_kk") == 0) &
            (col("a.is_open") == 0) &
            (col("a.end_desc_num") == 1),
            datediff(col("a.application_date"), col("a.date_end_fact")) / lit(30.437500)
        )
    ).cast("decimal(38,10)").alias("bank_loan_lst_close_m_term"),
    
    F.max(
        when(
            (col("a.is_bank") == 1) &
            (col("a.is_debtor") == 1) &
            (col("a.is_kk") == 0) &
            (col("a.is_open") == 1) &
            (col("a.start_desc_num") == 1),
            col("a.remain_term")
        )
    ).cast("decimal(38,16)").alias("bank_loan_till_lst_close_m_term"),
    max(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_pk") == 1) & (col("a.start_pk_desc_num") == 1), (col("a.interest_rate_month") * 100 * 12))).cast(DecimalType(28, 6)).alias("bank_loan_lst_rate"), 
    max(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_pk") == 1) & (col("a.start_pk_asc_num") == 1), (col("a.interest_rate_month") * 100 * 12))).cast(DecimalType(28, 6)).alias("bank_loan_fst_rate"),  
    when(count(when(col("a.is_bank") == 1, lit(1))).alias("a.is_bank_count") > 0, lit(1)).otherwise(lit(0)).cast("tinyint").alias("bank_ki_flg"),
    when(count(when(col("a.is_bank") == 0, lit(1))).alias("a.is_bank_count") > 0, lit(1)).otherwise(lit(0)).cast("tinyint").alias("bki_app_flg"),
    #------------------------------------------------------------------------
    F.max(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_pk") == 1), col("a.liability_rur_rkk"))).cast("decimal(38,16)").alias("max_paym_bank"),
    F.min(when((col("a.is_bank") == 1) &(col("a.is_debtor") == 1) & (col("a.is_pk") == 1), col("a.liability_rur_rkk"))).cast("decimal(38,16)").alias("min_paym_bank"),
    F.max(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) & (col("a.is_pk") == 1) &(col("a.is_open") == 1),col("a.liability_rur_rkk"))).cast("decimal(38,16)").alias("max_paym_bank_act"),
    F.min(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1) &(col("a.is_pk") == 1) & (col("a.is_open") == 1),col("a.liability_rur_rkk"))).cast("decimal(38,16)").alias("min_paym_bank_act"),
    F.sum( when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("a.delinquent_debt_rur")).otherwise(0)).cast("decimal(28,6)").alias("current_overdue_bank_amt"),
    #--Количество выходов на просрочку 0+/30+/60+/90+ по данным БКИ/Банка за первые 6/12 месяцев с даты выдачи кредита.
    #--BANK 6m/12m
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("del_first.bank_count_first_6m_1"))), lit(0)).cast("int").alias("bank_count_first_6m_1"),
    coalesce(F.sum(when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1), col("del_first.bank_count_first_6m_30"))),lit(0)).cast("int").alias("bank_count_first_6m_30"),
    coalesce( F.sum( when((col("a.is_bank") == 1) &(col("a.is_debtor") == 1),col("del_first.bank_count_first_6m_60"))),lit(0)).cast("int").alias("bank_count_first_6m_60"),
    coalesce(F.sum(when((col("a.is_bank") == 1) &(col("a.is_debtor") == 1), col("del_first.bank_count_first_6m_90"))),lit(0)).cast("int").alias("bank_count_first_6m_90"),
    coalesce(F.sum( when((col("a.is_bank") == 1) & (col("a.is_debtor") == 1),col("del_first.bank_count_first_12m_1"))),lit(0)).cast("int").alias("bank_count_first_12m_1"),
    coalesce(F.sum(when((col("a.is_bank") == 1) &(col("a.is_debtor") == 1), col("del_first.bank_count_first_12m_30") ) ),lit(0)).cast("int").alias("bank_count_first_12m_30"),
    coalesce(F.sum(when((col("a.is_bank") == 1) &(col("a.is_debtor") == 1),col("del_first.bank_count_first_12m_60"))),lit(0)).cast("int").alias("bank_count_first_12m_60"),
    coalesce(F.sum( when((col("a.is_bank") == 1) &(col("a.is_debtor") == 1),col("del_first.bank_count_first_12m_90"))),lit(0)).cast("int").alias("bank_count_first_12m_90"),
    #--BKI 6m/12m
    coalesce(F.sum(when((col("a.is_bank") == 0) &(col("a.is_debtor") == 1),col("del_first.bki_count_first_6m_1"))),lit(0)).cast("int").alias("bki_count_first_6m_1"),
    coalesce(F.sum( when((col("a.is_bank") == 0) &(col("a.is_debtor") == 1),col("del_first.bki_count_first_6m_30")) ),lit(0)).cast("int").alias("bki_count_first_6m_30"),
    coalesce(F.sum( when((col("a.is_bank") == 0) &(col("a.is_debtor") == 1),col("del_first.bki_count_first_6m_60"))),lit(0)).cast("int").alias("bki_count_first_6m_60"),
    coalesce(F.sum( when( (col("a.is_bank") == 0) &(col("a.is_debtor") == 1),col("del_first.bki_count_first_6m_90"))),lit(0)).cast("int").alias("bki_count_first_6m_90"),
    coalesce(F.sum( when( (col("a.is_bank") == 0) &(col("a.is_debtor") == 1),col("del_first.bki_count_first_12m_1"))),lit(0)).cast("int").alias("bki_count_first_12m_1"),
    coalesce(F.sum(when((col("a.is_bank") == 0) &(col("a.is_debtor") == 1),col("del_first.bki_count_first_12m_30"))),lit(0)).cast("int").alias("bki_count_first_12m_30"),
    coalesce( F.sum(when((col("a.is_bank") == 0) &(col("a.is_debtor") == 1),col("del_first.bki_count_first_12m_60"))),lit(0)).cast("int").alias("bki_count_first_12m_60"),
    coalesce( F.sum(when((col("a.is_bank") == 0) &(col("a.is_debtor") == 1),col("del_first.bki_count_first_12m_90"))),lit(0)).cast("int").alias("bki_count_first_12m_90"),
    #----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    #----------Флаг выхода на просрочку с первого/второго платежа. 
    #----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    coalesce(F.max(col('first_payment_default')), lit(0)).cast('tinyint').alias('first_payment_default'),
    coalesce(F.max(when(col('first_payment_default') == 0, col('second_payment_default'))), lit(0)).cast('tinyint').alias('second_payment_default'),
    substring(current_timestamp().cast('string'), 1, 19).alias('t_changed_dttm'), 
    lit(0).cast('tinyint').alias('t_deleted_flg'),
    lit(1).cast('tinyint').alias('t_active_flg')    
)