In [20]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

spark = SparkSession.builder \
    .appName("fill_f101_round_f") \
    .enableHiveSupport() \
    .getOrCreate()

In [2]:
on_date = '2018-02-01'

In [3]:
md_account_d = spark.read.parquet('/user/hive/warehouse/ds.db/md_account_d')

In [4]:
second_level_accounts = md_account_d.select(F.substring('account_number', 1, 5))

In [29]:
def fill_f101_round_f(on_date: str, spark):
    # 0. Расчет дат
    report_on_date = datetime.strptime(on_date, "%Y-%m-%d")
    report_month = report_on_date - relativedelta(months=1)
    prev_month_start = report_month.replace(day=1)
    prev_month_end = (prev_month_start + relativedelta(months=1)) - relativedelta(days=1)
    balance_in_date = (prev_month_start - relativedelta(days=1)).strftime("%d.%m.%Y")
    prev_month_start_str = prev_month_start.strftime("%Y-%m-%d")
    prev_month_end_str = prev_month_end.strftime("%Y-%m-%d")
    
    # 1. Справочник счетов (актуальные все)
    accounts = spark.table("DS.MD_ACCOUNT_D") \
        .withColumn("ledger_account", F.expr("substring(account_number, 1, 5)")) \
        .select("account_rk", "ledger_account", "currency_code", "char_type")

    print(balance_in_date)
    # 2. Остатки на начало периода из DS.FT_BALANCE_F
    balances_in = spark.table("DS.FT_BALANCE_F") \
        .where(F.col("on_date") == balance_in_date) \
        .select("account_rk", 
                F.col("balance_out").alias("balance_in"))
    
    # 3. Остатки на конец периода из DM.DM_ACCOUNT_BALANCE_F
    balances_out = spark.table("DM.DM_ACCOUNT_BALANCE_F") \
        .where(F.col("on_date") == prev_month_end) \
        .select("account_rk", 
                F.col("balance_out").alias("balance_out_val"),
                F.col("balance_out_rub").alias("balance_out_rub"))

    # 4. Обороты по дням с суммами в валюте и в рублях
    turnovers = spark.table("DM.DM_ACCOUNT_TURNOVER_F")\
    .select("account_rk", "debet_amount", "credit_amount", 
            "debet_amount_rub", "credit_amount_rub")
    turnovers.show()
    # 5. Присоединяем всё
    df = accounts \
        .join(balances_in, "account_rk", "left") \
        .join(balances_out, "account_rk", "left") \
        .join(turnovers, "account_rk", "left") \
        .join(spark.table("DS.MD_LEDGER_ACCOUNT_S").select("ledger_account", "chapter"), 
              "ledger_account", "left")

    # 6. Агрегация по балансовому счету второго порядка
    result = df.groupBy("ledger_account", "chapter", "char_type").agg(
        F.first(F.lit(prev_month_start_str)).alias("from_date"),
        F.first(F.lit(prev_month_end_str)).alias("to_date"),

        # Остатки на начало
        F.sum(F.when(F.col("currency_code").isin("810", "643"), F.col("balance_in"))).alias("balance_in_rub"),
        F.sum(F.when(~F.col("currency_code").isin("810", "643"), F.col("balance_in"))).alias("balance_in_val"),
        F.sum(F.col("balance_in")).alias("balance_in_total"),

        # Обороты дебет
        F.sum(F.when(F.col("currency_code").isin("810", "643"), F.col("debet_amount_rub"))).alias("turn_deb_rub"),
        F.sum(F.when(~F.col("currency_code").isin("810", "643"), F.col("debet_amount_rub"))).alias("turn_deb_val"),
        F.sum(F.col("debet_amount_rub")).alias("turn_deb_total"),

        # Обороты кредит
        F.sum(F.when(F.col("currency_code").isin("810", "643"), F.col("credit_amount_rub"))).alias("turn_cre_rub"),
        F.sum(F.when(~F.col("currency_code").isin("810", "643"), F.col("credit_amount"))).alias("turn_cre_val"),
        F.sum(F.col("credit_amount_rub")).alias("turn_cre_total"),

        # Остатки на конец
        F.sum(F.when(F.col("currency_code").isin("810", "643"), F.col("balance_out_rub"))).alias("balance_out_rub"),
        F.sum(F.when(~F.col("currency_code").isin("810", "643"), F.col("balance_out_val"))).alias("balance_out_val"),
        F.sum(
            F.when(F.col("currency_code").isin("810", "643"), F.col("balance_out_rub"))
             .otherwise(F.col("balance_out_val"))
        ).alias("balance_out_total")
    )

    return result

In [30]:
f101_round_f = fill_f101_round_f(on_date, spark)

31.12.2017
+----------+-----------------+-----------------+-----------------+-----------------+
|account_rk|     debet_amount|    credit_amount| debet_amount_rub|credit_amount_rub|
+----------+-----------------+-----------------+-----------------+-----------------+
|     13619|   11579.74000000|   26389.41000000|   11579.74000000|   26389.41000000|
|     13630|78498976.71000000|80020220.76000000|78498976.71000000|80020220.76000000|
|     13631|  868633.91000000|  909738.98000000|  868633.91000000|  909738.98000000|
|     13632| 3820310.58000000| 3262575.86000000| 3820310.58000000| 3262575.86000000|
|     13658|   31966.89000000|   69235.39000000|   31966.89000000|   69235.39000000|
|     13685|  258141.85000000|  155499.70000000|  258141.85000000|  155499.70000000|
|     13688|     272.50000000|   86664.74000000|     272.50000000|   86664.74000000|
|     13873|   33513.35000000|   14956.22000000|   33513.35000000|   14956.22000000|
|     13883|   58205.40000000|   76771.39000000|   582

In [31]:
f101_round_f.show(vertical=True, truncate=False)

-RECORD 0---------------------------------
 ledger_account    | 30102                
 chapter           | А                    
 char_type         | А                    
 from_date         | 2018-01-01           
 to_date           | 2018-01-31           
 balance_in_rub    | 3.401354052039999E9  
 balance_in_val    | NULL                 
 balance_in_total  | 3.401354052039999E9  
 turn_deb_rub      | 782147331.70000000   
 turn_deb_val      | NULL                 
 turn_deb_total    | 782147331.70000000   
 turn_cre_rub      | 787848410.30000000   
 turn_cre_val      | NULL                 
 turn_cre_total    | 787848410.30000000   
 balance_out_rub   | 3304435715.84000000  
 balance_out_val   | NULL                 
 balance_out_total | 3304435715.84000000  
-RECORD 1---------------------------------
 ledger_account    | 30109                
 chapter           | А                    
 char_type         | П                    
 from_date         | 2018-01-01           
 to_date   