#Импорты и инициализация спарк сесии

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions  as fnct
from pyspark.sql.types import *
from pyspark.conf import SparkConf
import os
import logging

In [None]:

spark = (SparkSession.builder.master('local').appName('Spark').config(conf=SparkConf()).getOrCreate())

updates=spark.read.parquet('/content/updates.parquet', header =True, inferSchema=True)

sources=spark.read.parquet('/content/sources.parquet', header =True, inferSchema=True)

client_attribute=spark.read.parquet('/content/client_attribute.parquet', header =True, inferSchema=True)
# добавляем конструктор логов, который будет записывать все логи с уровнем INFO и выше в файл "py_log.log". Каждый раз новые логи присоединяются к текущим
import logging
logging.basicConfig(level=logging.INFO, force=True, filename='/content/py_log.log',filemode="a",
                    format="%(asctime)s %(levelname)s %(message)s")

try:
  monthly_report=spark.read.parquet('/content/monthly_report', header =True, inferSchema=True).show()
except:
  schema = StructType([
    StructField('client_id',
                IntegerType(), True),
    StructField('id_attribute',
                IntegerType(), True),
    StructField('name',
                StringType(), True),
    StructField('dtype',
                StringType(), True),
    StructField('value',
                StringType(), True),
    StructField('report_dt',
                StringType(), True)
  ])
  monthly_report=spark.createDataFrame([], schema)

try:
  daily_report=spark.read.parquet('/content/daily_report', header =True, inferSchema=True).show()
except:
  schema = StructType([
    StructField('client_id',
                IntegerType(), True),
    StructField('id_attribute',
                IntegerType(), True),
    StructField('name',
                StringType(), True),
    StructField('dtype',
                StringType(), True),
    StructField('value',
                StringType(), True),
    StructField('row_actual_from',
                StringType(), True),
    StructField('row_actual_to',
                StringType(), True)
  ])
  daily_report=spark.createDataFrame([], schema)

In [None]:
# подсчет количества источников
list_of_files=os.listdir('/content/sources')
if len(list_of_files)>3: # если количество не совпадает, сообщаем от необходимости запросить документацию и пересмотреть пайплайн забора данных
  logging.error('New sources was added. Ask for documentation ')
else:
  logging.info('All ' + str(len(list_of_files)) + ' sources was read successfully')

In [None]:
list_of_files.remove('.ipynb_checkpoints')

#Инициализация функций

In [None]:
# функции очистки/проверки данных
def check_types_deb(data):
  mydic = dict({"client_id": "int",
       "onl_bank_active_1m_nflag": "int",
       "auto_pay_active_qty": "int",
       "cl_income_1m_amt": "decimal(18, 2)",
       "dep_acc_1st_open_dt": "timestamp",
       "wdr_cash_6m_amt": "decimal(18, 2)",
       "cash_op_6m_amt": "decimal(18, 2)",
       "cash_3m_qty": "decimal(18, 2)",
       "lst_balance_amt": "decimal(18, 2)",
       "card_active_1m_nflag": "int",
       "row_update_dtime": "timestamp",
       "row_loading_id": "bigint",
       "row_hash_val": "string",
       "report_dt": "string"})
  for key in mydic:
    if dict(data.dtypes)[key] != mydic.get(key):
     try:
        data = data.withColumn(key, data[key].cast(mydic.get(key)))
     except Error:
       logging.error("failed to cast source columns to correct types",exc_info=True)
  return (data)
def check_types_credit(data):
  mydic = dict({"client_id": "int",
                "client_income_amt": "int",
                "oi_total_amt": "int",
                "act_pl_os_rub_amt": "decimal(18, 2)",
                "payroll_client_nflag": "int",
                "inf_payroll_rub_amt": "decimal(18, 2)",
                "legal_entity_amt": "decimal(18, 2)",
                "otf_loan_rub_amt": "decimal(18, 2)",
                "otf_fee_rub_amt": "decimal(18, 2)",
                "inf_transfer_rub_amt": "decimal(18, 2)",
                "cc_ever_nflag": "int", "row_update_dtime":
                "timestamp", "row_loading_id": "bigint",
                "row_hash_val": "string", "report_dt": "string"})
  for key in mydic:
    if dict(data.dtypes)[key] != mydic.get(key):
     try:
        data = data.withColumn(key, data[key].cast(mydic.get(key)))
     except Error:
       logging.error("failed to cast source columns to correct types",exc_info=True)
  return (data)
def check_types_client(data):
  mydic = dict({"client_id": "int",
                "srv_mb_nflag": "int",
                "cc_stoplist": "tinyint",
                "lne_tot_debt_int_ovrd_rub_amt": "decimal(18, 2)",
                "lne_tot_debt_ovrd_rub_amt": "decimal(18, 2)",
                "row_update_dtime": "timestamp",
                "row_loading_id": "bigint",
                "row_hash_val": "string",
                "row_actual_from": "string",
                "row_actual_to": "string"})
  for key in mydic:
    if dict(data.dtypes)[key] != mydic.get(key):
     try:
        data = data.withColumn(key, data[key].cast(mydic.get(key)))
     except Error:
       logging.error("failed to cast source columns to correct types",exc_info=True)
  return (data)

#проверка типов
def check_types(source, name):
  if name=='deb_cards_info':
    return(check_types_deb(source))
  elif name=='credit_cards_info':
    return(check_types_credit(source))
  else:
    return(check_types_client(source))

#общая проверка
def check_data(data, name):
  # Убрать строки с null в client_id или с null во всех атрибутах кроме client_id
  condition=''
  columns=data.columns
  columns.pop(0)
  for attribute in columns:
    condition+='and '+attribute+' is null '
  condition=condition[4:]
  clean_data_prep=data.where('client_id is not null and not ('+condition+ ')')
  # проверить соответствие типов и преобразовать к кооректным
  clean_data=check_types(clean_data_prep, name)
  return(clean_data)

#Процесс забора и очистки данных

In [None]:
# запустить процесс для каждого источника
for name in list_of_files:
  source=spark.read.parquet('/content/sources/'+name, header =True, inferSchema=True)
  # проверяем по какой колонке партиционирован источник
  if sources.select(sources.partitioned_by).where(sources.name==name).collect()[0][0]=='report_dt': # есть, работаем с scd1

    #Есть строки с report_dt источника > max(report_dt) таблицы monthly_report?
    max_dt=source.selectExpr("max(report_dt)").collect()[0][0]
    our_max_dt=monthly_report.where(monthly_report.name.isin(source.columns)).select(fnct.col('report_dt')).agg(fnct.max("report_dt").alias('our_max_dt'))
    if our_max_dt.collect()[0][0] is None or max_dt>our_max_dt.collect()[0][0]: #есть

      #какие новые партиции появились?
      if our_max_dt.collect()[0][0] is None: #если первый раз забираем данные из источника
        new_partitions_dt=source.select('report_dt').distinct().collect()
      else: #если не первый
        new_partitions_dt=source.select('report_dt').where(source.report_dt>our_max_dt.collect()[0][0]).distinct().collect()
      #запускаем процесс для каждой новой партиции
      for partition_dt in new_partitions_dt:
        #берем новые данные
        new_data_raw=source.where(source.report_dt==partition_dt[0])
        #очищаем данные
        new_data=check_data(new_data_raw, name)

        #преобразуем данные в строчный тип
        attribute_cols=new_data.columns
        new_data_casted=new_data.select([fnct.col(c).cast("string") for c in attribute_cols])

        #переворачиваем новые данные в вертикальный формат
        for tech_column in ['row_update_dtime', 'row_loading_id', 'row_hash_val']:
          attribute_cols.remove(tech_column)
        stack_expr = "stack({num}, {cols})".format(
          num=len(attribute_cols),
          cols=", ".join([f"'{a}', {a}" for a in attribute_cols])
        )
        new_data_vert=new_data_casted.select('client_id', 'report_dt', fnct.expr(stack_expr))

        #дополняем данные столбцами id_attribute, dtype
        new_data_final=new_data_vert.join(client_attribute, new_data_vert.col0==client_attribute.name, 'left').where('id_attribute is not null').selectExpr('client_id', 'id_attribute', 'col0 as name', 'dtype', 'col1 as value', 'report_dt')

        #добавляем новые данные в конечную таблицу
        monthly_report=monthly_report.unionAll(new_data_final)
        logging.info('Partition '+str(partition_dt[0])+' succesfully inserted from source ' + name)

        #обновляем журнал обновлений
        attribute_cols=new_data.columns
        for tech_column in ['report_dt', 'client_id', 'row_loading_id', 'row_hash_val']:
          attribute_cols.remove(tech_column)
        stack_expr = "stack({num}, {cols})".format(
          num=len(attribute_cols),
          cols=", ".join([f"'{a}', {a}" for a in attribute_cols])
        )
        new_data_vert=new_data_casted.select('row_update_dtime', fnct.expr(stack_expr))
        id_source_df=sources.select('id_source').where(sources.name==name)

        new_updates=new_data_vert.join(client_attribute, new_data_vert.col0==client_attribute.name, 'left').join(id_source_df, how='cross').where('id_attribute is not null').selectExpr('id_attribute', 'id_source', 'row_update_dtime').groupBy('id_attribute', 'id_source').agg(fnct.max('row_update_dtime').alias('update_dt_sources'))
        updates=updates.unionAll(new_updates).groupBy('id_attribute', 'id_source').agg(fnct.max('update_dt_sources').alias('update_dt_sources'))

    else: #нет
      logging.info('No new data in source' + name)

  else: # нет, работаем с scd2
    #Есть строки с row_actual_to источника = update_dt_sources таблицы обновлений?
    max_dt=source.where("row_actual_to!='9999-01-01 00:00:00'").selectExpr("max(row_actual_to)").collect()[0][0]
    our_max_dt=daily_report.where(daily_report.name.isin(source.columns)).where("row_actual_to!='9999-01-01 00:00:00'").select(fnct.col('row_actual_to')).agg(fnct.max("row_actual_to").alias('our_max_dt'))
    if our_max_dt.collect()[0][0] is None or max_dt>our_max_dt.collect()[0][0]: #есть

      #собираем все новые партиции?
      if our_max_dt.collect()[0][0] is None: #если первый раз забираем данные из источника
        new_partitions_dt=source.select('row_actual_to').distinct().collect()
      else: #если не первый
        new_partitions_dt=source.where(source.row_actual_to>our_max_dt.collect()[0][0]).select('row_actual_to').distinct().collect()

      #запускаем процесс для каждой новой партиции
      for partition_dt in new_partitions_dt:
        #берем новые данные
        new_data_raw=source.where(source.row_actual_to==partition_dt[0])
        #очищаем данные
        new_data=check_data(new_data_raw, name)

        #преобразуем данные в строчный тип
        attribute_cols=new_data.columns
        new_data_casted=new_data.select([fnct.col(c).cast("string") for c in attribute_cols])

        #переворачиваем новые данные в вертикальный формат
        for tech_column in ['row_update_dtime', 'row_loading_id', 'row_hash_val']:
          attribute_cols.remove(tech_column)
        stack_expr = "stack({num}, {cols})".format(
          num=len(attribute_cols),
          cols=", ".join([f"'{a}', {a}" for a in attribute_cols])
        )
        new_data_vert=new_data_casted.select('client_id', 'row_actual_from', 'row_actual_to', fnct.expr(stack_expr))
        #дополняем данные столбцами id_attribute, dtype
        new_data_final=new_data_vert.join(client_attribute, new_data_vert.col0==client_attribute.name, 'left').where('id_attribute is not null').selectExpr('client_id', 'id_attribute', 'col0 as name', 'dtype', 'col1 as value', 'row_actual_from', 'row_actual_to')

        #добавляем новые данные в конечную таблицу
        daily_report=daily_report.where("row_actual_to!='9999-01-01 00:00:00'").unionAll(new_data_final).groupBy('client_id', 'id_attribute', 'name', 'dtype', 'value').agg(fnct.min('row_actual_from').alias('row_actual_from'), fnct.max('row_actual_to').alias('row_actual_to'))
        logging.info('Partition '+str(partition_dt[0])+' succesfully inserted from source ' + name)

        #обновляем журнал обновлений
        attribute_cols=new_data.columns
        for tech_column in ['row_actual_to', 'row_actual_from', 'row_loading_id', 'row_hash_val']:
          attribute_cols.remove(tech_column)
        stack_expr = "stack({num}, {cols})".format(
          num=len(attribute_cols),
          cols=", ".join([f"'{a}', {a}" for a in attribute_cols])
        )
        new_data_vert=new_data_casted.select('row_update_dtime', 'client_id', fnct.expr(stack_expr))
        try:
          new_updates=new_updates.unionAll(new_data_vert)
        except:
          new_updates=new_data_vert
      id_source_df=sources.select('id_source').where(sources.name==name)
      new_updates=new_updates.groupBy('client_id', 'col0', 'col1').agg(fnct.min('row_update_dtime').alias('row_update_dtime')).groupBy('col0').agg(fnct.max('row_update_dtime').alias('row_update_dtime')).select('row_update_dtime', 'col0').join(client_attribute, new_updates.col0==client_attribute.name, 'inner').join(id_source_df, how='cross').where('id_attribute is not null').selectExpr('id_attribute', 'id_source', 'row_update_dtime')
      updates=updates.unionAll(new_updates).groupBy('id_attribute', 'id_source').agg(fnct.max('update_dt_sources').alias('update_dt_sources'))

    else: #нет
      logging.info('No new data in source' + name)

In [None]:
monthly_report.write.mode("overwrite").parquet("/content/results/monthly_report", "overwrite", 'report_dt')
daily_report.write.mode("overwrite").parquet("/content/results/daily_report", "overwrite", 'row_actual_to')

#Показываем результат

In [None]:
monthly_report.show(100)

+---------+------------+--------------------+--------------+-------------------+-------------------+
|client_id|id_attribute|                name|         dtype|              value|          report_dt|
+---------+------------+--------------------+--------------+-------------------+-------------------+
|        1|          15|onl_bank_active_1...|           int|                  1|2024-11-24 00:00:00|
|        1|          16| auto_pay_active_qty|           int|                  1|2024-11-24 00:00:00|
|        1|          17|    cl_income_1m_amt|decimal(18, 2)|               1.00|2024-11-24 00:00:00|
|        1|          18| dep_acc_1st_open_dt|     timestamp|2024-11-24 00:00:00|2024-11-24 00:00:00|
|        1|          19|     wdr_cash_6m_amt|decimal(18, 2)|               1.00|2024-11-24 00:00:00|
|        1|          20|      cash_op_6m_amt|decimal(18, 2)|               1.00|2024-11-24 00:00:00|
|        1|          21|         cash_3m_qty|decimal(18, 2)|               1.00|2024-11-24 

In [None]:
daily_report.show(100)

+---------+------------+--------------------+--------------+-----+-------------------+-------------------+
|client_id|id_attribute|                name|         dtype|value|    row_actual_from|      row_actual_to|
+---------+------------+--------------------+--------------+-----+-------------------+-------------------+
|        1|           1|        srv_mb_nflag|           int|    1|2024-11-24 00:00:00|9999-01-01 00:00:00|
|        1|           2|         cc_stoplist|       tinyint|    1|2024-11-24 00:00:00|9999-01-01 00:00:00|
|        1|           3|lne_tot_debt_int_...|decimal(18, 2)| 1.00|2024-11-24 00:00:00|9999-01-01 00:00:00|
|        1|           4|lne_tot_debt_ovrd...|decimal(18, 2)| 1.00|2024-11-24 00:00:00|9999-01-01 00:00:00|
|        2|           1|        srv_mb_nflag|           int|    1|2024-11-24 00:00:00|9999-01-01 00:00:00|
|        2|           2|         cc_stoplist|       tinyint|    1|2024-11-24 00:00:00|9999-01-01 00:00:00|
|        2|           3|lne_tot_debt_

In [None]:
updates.show(50)

+------------+---------+-------------------+
|id_attribute|id_source|  update_dt_sources|
+------------+---------+-------------------+
|           1|        3|2024-11-24 00:00:00|
|           2|        3|2024-11-24 00:00:00|
|           3|        3|2024-11-25 00:00:00|
|           4|        3|2024-11-24 00:00:00|
|           5|        2|2024-11-24 00:00:00|
|           6|        2|2024-11-24 00:00:00|
|           7|        2|2024-11-24 00:00:00|
|           8|        2|2024-11-24 00:00:00|
|           9|        2|2024-11-24 00:00:00|
|          10|        2|2024-11-24 00:00:00|
|          11|        2|2024-11-24 00:00:00|
|          12|        2|2024-11-24 00:00:00|
|          13|        2|2024-11-24 00:00:00|
|          14|        2|2024-11-24 00:00:00|
|          15|        1|2024-11-24 00:00:00|
|          16|        1|2024-11-24 00:00:00|
|          17|        1|2024-11-24 00:00:00|
|          18|        1|2024-11-24 00:00:00|
|          19|        1|2024-11-24 00:00:00|
|         