# Клиенты и счета

## Инициализация

In [85]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql import functions as F
from pyspark.sql.window import Window


In [86]:
spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("project4")
    .config("spark.jars", "./jars/postgresql-42.5.0.jar")
    .getOrCreate()
)

## Загрузка данных

In [87]:
schema = StructType() \
      .add("ClientId",IntegerType(),True) \
      .add("ClientName",StringType(),True) \
      .add("Type",StringType(),True) \
      .add("Form",StringType(),True) \
      .add("RegisterDate",DateType(),True) \
      
df_clients = (
    spark.read
        .options(delimiter=';')
        .option("header",True)
        .schema(schema)
        .csv("./data/Clients.csv")
)

df_clients.printSchema(), df_clients.show(10)

root
 |-- ClientId: integer (nullable = true)
 |-- ClientName: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Form: string (nullable = true)
 |-- RegisterDate: date (nullable = true)

+--------+----------+----+----+------------+
|ClientId|ClientName|Type|Form|RegisterDate|
+--------+----------+----+----+------------+
|       1|  Клиент 1|   Ф|   -|  2020-11-01|
|       2|  Клиент 2|   Ю| ООО|  2020-11-02|
|       3|  Клиент 3|   Ф|   -|  2020-11-03|
|       4|  Клиент 4|   Ю|  ИП|  2020-11-04|
|       5|  Клиент 5|   Ф|   -|  2020-11-01|
|       6|  Клиент 6|   Ю|  АО|  2020-11-02|
|       7|  Клиент 7|   Ф|   -|  2020-11-03|
|       8|  Клиент 8|   Ю| ПАО|  2020-11-01|
|       9|  Клиент 9|   Ф|   -|  2020-11-02|
|      10| Клиент 10|   Ю| ООО|  2020-11-03|
+--------+----------+----+----+------------+
only showing top 10 rows



(None, None)

In [88]:
schema = StructType() \
      .add("AccountID",IntegerType(),True) \
      .add("AccountNum",StringType(),True) \
      .add("ClientId",IntegerType(),True) \
      .add("DateOpen",DateType(),True) \
      
df_accounts = (
    spark.read
        .options(delimiter=';')
        .option("header",True)
        .schema(schema)
        .csv("./data/Account.csv")
)

df_accounts.printSchema(), df_accounts.show(10)

root
 |-- AccountID: integer (nullable = true)
 |-- AccountNum: string (nullable = true)
 |-- ClientId: integer (nullable = true)
 |-- DateOpen: date (nullable = true)

+---------+--------------------+--------+----------+
|AccountID|          AccountNum|ClientId|  DateOpen|
+---------+--------------------+--------+----------+
|        1|40702810927050000337|       1|2020-11-01|
|        2|40802810300000009067|       2|2020-11-02|
|        3|40802810300000009708|       3|2020-11-03|
|        4|40802810800000030701|       4|2020-11-04|
|        5|40802810300000011071|       5|2020-11-01|
|        6|40802810100000063339|       6|2020-11-02|
|        7|40702810823620000031|       7|2020-11-03|
|        8|40802810409260005894|       8|2020-11-04|
|        9|40802810905030000004|       9|2020-11-01|
|       10|40802810400020001097|      10|2020-11-02|
+---------+--------------------+--------+----------+
only showing top 10 rows



(None, None)

In [89]:
schema = StructType() \
    .add("Currency", StringType(), True) \
    .add("Rate", StringType(), True) \
    .add("RateDate", DateType(), True) \

df_rates = (
    spark.read
    .options(delimiter=';')
    .option("header", True)
    .schema(schema)
    .csv("./data/Rate.csv")
)

df_rates = df_rates.withColumn("Rate", F.regexp_replace("Rate", ",", ".").cast(DoubleType()))

df_rates = (
    df_rates.select(F.col("Currency"), F.struct(F.col("RateDate"), F.col("Rate")).alias("vs"))
    .groupBy(F.col("Currency"))
    .agg(F.max(F.col("vs")).alias("vs"))
    .select("Currency", "vs.RateDate", "vs.Rate")
)

df_rates.printSchema(), df_rates.show()


root
 |-- Currency: string (nullable = true)
 |-- RateDate: date (nullable = true)
 |-- Rate: double (nullable = true)

+--------+----------+-----+
|Currency|  RateDate| Rate|
+--------+----------+-----+
|      EU|2020-01-02|91.27|
|     RUB|2020-01-02|  1.0|
|     USD|2020-01-02|80.23|
+--------+----------+-----+



(None, None)

In [90]:
schema = StructType() \
    .add("AccountDB", IntegerType(), True) \
    .add("AccountCR", IntegerType(), True) \
    .add("DateOp", DateType(), True) \
    .add("Amount", StringType(), True) \
    .add("Currency", StringType(), True) \
    .add("Comment", StringType(), True) \


df_operations = (
    spark.read
    .options(delimiter=';')
    .option("header", True)
    .schema(schema)
    .csv("./data/Operation.csv")
)

df_operations = df_operations.withColumn(
    "Amount", F.regexp_replace("Amount", ",", ".").cast(DoubleType()))

df_operations = df_operations.join(
    df_rates, df_operations.Currency == df_rates.Currency)

# всё в рублях
df_operations = (
    df_operations
    .withColumn("Amount", F.round(F.col("Amount") * F.col("Rate"), 2))
    .select("AccountDB", "AccountCR", "DateOp", "Amount", "Comment")
)

df_operations.printSchema(), df_operations.show(10)


root
 |-- AccountDB: integer (nullable = true)
 |-- AccountCR: integer (nullable = true)
 |-- DateOp: date (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Comment: string (nullable = true)

+---------+---------+----------+------+--------------------+
|AccountDB|AccountCR|    DateOp|Amount|             Comment|
+---------+---------+----------+------+--------------------+
|        1|     4999|2020-11-01|   0.1|а/м, а\м, автомоб...|
|        2|     5000|2020-11-02| 16.05| сою, соя, зерно,...|
|        3|     5001|2020-11-03| 27.38|сигар, табачн, та...|
|        4|     5002|2020-11-04|   0.4|гсм, бензин, керо...|
|        5|     5003|2020-11-01| 40.12|компью, монитор, ...|
|        6|     5004|2020-11-02| 54.76|ювелир,  юв., лом...|
|        7|     5005|2020-11-03|   0.7|хоз, бытхим, подг...|
|        8|     5006|2020-11-04| 64.18|турусл, турист, з...|
|        9|     5007|2020-11-01| 82.14|торгвыруч, оплату...|
|       10|     5008|2020-11-02|   1.0|а/м, а\м, автомоб...|
+---

(None, None)

In [91]:
# Загрузка списков из postgres

# url = "jdbc:postgresql://localhost:5432/postgres"
# creds = {"user": "postgres", "password": "myPassword"}
# lists = spark.read.jdbc(url, "de_sprint.lists", properties = creds)
# lists.printSchema(), lists.show()

In [92]:
# для теста

list1 = "%а/м%, %а\м%, %автомобиль %, %автомобили %, %транспорт%, %трансп%средс%, %легков%, %тягач%, %вин%, %vin%,%viн:%, %fоrd%, %форд%,%кiа%, %кия%, %киа%%мiтsuвisнi%, %мицубиси%, %нissан%, %ниссан%, %sсанiа%, %вмw%, %бмв%, %аudi%, %ауди%, %jеер%, %джип%, %vоlvо%, %вольво%, %тоyота%, %тойота%, %тоиота%, %нyuнdаi%, %хендай%, %rенаulт%, %рено%, %реugеот%, %пежо%, %lаdа%, %лада%, %dатsuн%, %додж%, %меrсеdеs%, %мерседес%, %vоlкswаgен%, %фольксваген%, %sкоdа%, %шкода%, %самосвал%, %rover%, %ровер%"
list2= "%сою%, %соя%, %зерно%, %кукуруз%, %масло%, %молок%, %молоч%, %мясн%, %мясо%, %овощ%, %подсолн%, %пшениц%, %рис%, %с/х%прод%, %с/х%товар%, %с\х%прод%, %с\х%товар%, %сахар%, %сельск%прод%, %сельск%товар%, %сельхоз%прод%, %сельхоз%товар%, %семен%, %семечк%, %сено%, %соев%, %фрукт%, %яиц%, %ячмен%, %картоф%, %томат%, %говя%, %свин%, %курин%, %куриц%, %рыб%, %алко%, %чаи%, %кофе%, %чипс%, %напит%, %бакале%, %конфет%, %колбас%, %морож%, %с/м%, %с\м%, %консерв%, %пищев%, %питан%, %сыр%, %макарон%, %лосос%, %треск%, %саир%, % филе%, % хек%, %хлеб%, %какао%, %кондитер%, %пиво%, %ликер%"

## Подготовка данных

In [93]:
# Будем делать только на первую дату

data_mart_date = "2020-11-01"

In [94]:
debet =  (
    df_operations
    .where(df_operations.DateOp == data_mart_date)
    .groupBy("AccountDB")
    .agg(
        F.round(F.sum(F.col("Amount")), 2).alias("SumAmountDB")
    )
)

debet.show(5), debet.count()

+---------+-----------+
|AccountDB|SumAmountDB|
+---------+-----------+
|      833|  877595.71|
|     1645| 1283174.66|
|     1829|  902867.22|
|     3749|  951583.38|
|     3997| 1345448.57|
+---------+-----------+
only showing top 5 rows



(None, 5000)

In [95]:
credit =  (
    df_operations
    .where(df_operations.DateOp == data_mart_date)
    .groupBy("AccountCR")
    .agg(
        F.round(F.sum(F.col("Amount")), 2).alias("SumAmountCR")
    )
)

credit.show(5), credit.count()

+---------+-----------+
|AccountCR|SumAmountCR|
+---------+-----------+
|     5803|  1260934.0|
|     9427| 1356886.63|
|    10623| 1547420.02|
|    12027|  1595717.6|
|    12799| 1446167.08|
+---------+-----------+
only showing top 5 rows



(None, 5000)

## Витрины

### 1. Витрина corporate_payments

In [96]:
# AccountId	        ИД счета
# ClientId	        Ид клиента счета
# PaymentAmt	    Сумма операций по счету, где счет клиента указан в дебете проводки
# EnrollementAmt	Сумма операций по счету, где счет клиента указан в кредите проводки
# TaxAmt		    Сумму операций, где счет клиента указан в дебете, и счет кредита 40702
# ClearAmt	        Сумма операций, где счет клиента указан в кредите, и счет дебета 40802
# CarsAmt		    Сумма операций, где счет клиента указан в дебете проводки и назначение платежа не содержит слов по маскам Списка 1
# FoodAmt		    Сумма операций, где счет клиента указан в кредите проводки и назначение платежа содержит слова по Маскам Списка 2
# FLAmt		        Сумма операций с физ. лицами. Счет клиента указан в дебете проводки, а клиент в кредите проводки – ФЛ.
# CutoffDt	        Дата операции;


# Client
#  |-- ClientId: integer (nullable = true)
#  |-- ClientName: string (nullable = true)
#  |-- Type: string (nullable = true)
#  |-- Form: string (nullable = true)
#  |-- RegisterDate: date (nullable = true)

# Account
#  |-- AccountID: integer (nullable = true)
#  |-- AccountNum: string (nullable = true)
#  |-- ClientId: integer (nullable = true)
#  |-- DateOpen: date (nullable = true)

# Rate
#  |-- Currency: string (nullable = true)
#  |-- RateDate: date (nullable = true)
#  |-- Rate: double (nullable = true)

# Operations
#  |-- AccountDB: integer (nullable = true)
#  |-- AccountCR: integer (nullable = true)
#  |-- DateOp: date (nullable = true)
#  |-- Amount: double (nullable = true)
#  |-- Currency: string (nullable = true)
#  |-- Comment: string (nullable = true)

from functools import reduce

grouped_payment = (
    df_operations
    .where(df_operations.DateOp == data_mart_date)
    .join(df_accounts, df_accounts.AccountID == df_operations.AccountDB)
    .groupBy("AccountDB")
    .agg(
        F.sum("Amount").alias("PaymentAmt")
    )
)

grouped_tax = (
    df_operations
    .where(df_operations.DateOp == data_mart_date)
    .join(df_accounts, (df_accounts.AccountID == df_operations.AccountCR) & (df_accounts.AccountNum.startswith("40702")))
    .groupBy("AccountDB")
    .agg(
        F.sum("Amount").alias("TaxAmt")
    )
)

grouped_cars = (
    df_operations
    .where((df_operations.DateOp == data_mart_date) & (~reduce(lambda a, b: a | b, (df_operations.Comment.like(pat.replace("\\", "\\\\")) for pat in list1.split(",")))))
    .groupBy("AccountDB")
    .agg(
        F.sum("Amount").alias("CarsAmt")
    )
)


grouped_enrollement = (
    df_operations
    .where(df_operations.DateOp == data_mart_date)
    .groupBy("AccountCR")
    .agg(
        F.sum("Amount").alias("EnrollementAmt")
    )
)

grouped_clear = (
    df_operations
    .where(df_operations.DateOp == data_mart_date)
    .join(df_accounts, (df_accounts.AccountID == df_operations.AccountDB) & (df_accounts.AccountNum.startswith("40802")))
    .groupBy("AccountCR")
    .agg(
        F.sum("Amount").alias("ClearAmt")
    )
)

grouped_food = (
    df_operations
    .where((df_operations.DateOp == data_mart_date) & (~reduce(lambda a, b: a | b, (df_operations.Comment.like(pat.replace("\\", "\\\\")) for pat in list2.split(",")))))
    .groupBy("AccountCR")
    .agg(
        F.sum("Amount").alias("FoodAmt")
    )
)


grouped_fl = (
    df_operations
    .where(df_operations.DateOp == data_mart_date)
    .join(df_accounts.alias("acc_db"), F.col("acc_db.AccountID") == df_operations.AccountDB)
    .join(df_accounts.alias("acc_cr"), F.col("acc_cr.AccountID") == df_operations.AccountCR)
    .join(df_clients.where(F.col("Type") == "Ф").alias("cl_cr"), F.col("acc_db.ClientId") == F.col("cl_cr.ClientId"))
    .groupBy("AccountDB")
    .agg(
        F.sum("Amount").alias("FLAmt")
    )
)


corporate_payments = (
    df_accounts
    .join(grouped_payment, df_accounts.AccountID == grouped_payment.AccountDB, "left")
    .join(grouped_tax, df_accounts.AccountID == grouped_tax.AccountDB, "left")
    .join(grouped_cars, df_accounts.AccountID == grouped_cars.AccountDB, "left")
    .join(grouped_enrollement, df_accounts.AccountID == grouped_enrollement.AccountCR, "left")
    .join(grouped_clear, df_accounts.AccountID == grouped_clear.AccountCR, "left")
    .join(grouped_food, df_accounts.AccountID == grouped_food.AccountCR, "left")
    .join(grouped_fl, df_accounts.AccountID == grouped_fl.AccountDB, "left")
    .select("AccountID", "ClientId", 
            F.when(F.col("PaymentAmt").isNotNull(), F.round(F.col("PaymentAmt"), 2)).otherwise(F.lit(0.0)).alias("PaymentAmt"), 
            F.when(F.col("EnrollementAmt").isNotNull(), F.round(F.col("EnrollementAmt"), 2)).otherwise(F.lit(0.0)).alias("EnrollementAmt"),
            F.when(F.col("TaxAmt").isNotNull(), F.round(F.col("TaxAmt"), 2)).otherwise(F.lit(0.0)).alias("TaxAmt"),
            F.when(F.col("ClearAmt").isNotNull(), F.round(F.col("ClearAmt"), 2)).otherwise(F.lit(0.0)).alias("ClearAmt"),
            F.when(F.col("CarsAmt").isNotNull(), F.round(F.col("CarsAmt"), 2)).otherwise(F.lit(0.0)).alias("CarsAmt"),
            F.when(F.col("FoodAmt").isNotNull(), F.round(F.col("FoodAmt"), 2)).otherwise(F.lit(0.0)).alias("FoodAmt"),
            F.when(F.col("FLAmt").isNotNull(), F.round(F.col("FLAmt"), 2)).otherwise(F.lit(0.0)).alias("FLAmt"))
)

corporate_payments.orderBy("AccountID").show()

corporate_payments.write\
    .format("parquet")\
    .option("path", "data/" + data_mart_date + "/corporate_payments")\
    .mode("overwrite")\
    .save()


                                                                                

+---------+--------+----------+--------------+---------+----------+----------+----------+----------+
|AccountID|ClientId|PaymentAmt|EnrollementAmt|   TaxAmt|  ClearAmt|   CarsAmt|   FoodAmt|     FLAmt|
+---------+--------+----------+--------------+---------+----------+----------+----------+----------+
|        1|       1|1239646.48|           0.0|      0.0|       0.0|1239646.38|       0.0|1239646.48|
|        2|       2|       0.0|           0.0|      0.0|       0.0|       0.0|       0.0|       0.0|
|        3|       3|       0.0|           0.0|      0.0|       0.0|       0.0|       0.0|       0.0|
|        4|       4|       0.0|    1698092.02|      0.0|       0.0|       0.0|1698092.02|       0.0|
|        5|       5| 856586.88|           0.0|      0.0|       0.0| 375166.76|       0.0| 856586.88|
|        6|       6|       0.0|           0.0|      0.0|       0.0|       0.0|       0.0|       0.0|
|        7|       7|       0.0|           0.0|      0.0|       0.0|       0.0|       0.0|  

                                                                                

### 2. Витрина corporate_account

In [97]:
# AccountID	    ИД счета
# AccountNum	Номер счета
# DateOpen	    Дата открытия счета
# ClientId	    ИД клиента
# ClientName	Наименование клиента
# TotalAmt	    Общая сумма оборотов по счету. Считается как сумма PaymentAmt и EnrollementAmt
# CutoffDt	    Дата операции

total = debet.join(credit, debet.AccountDB == credit.AccountCR, "outer")
total = total.na.fill(value=0, subset=["SumAmountDB", "SumAmountCR"])
total = total.withColumn("AccountID", F.when(F.col("AccountDB").isNotNull(), F.col("AccountDB")).otherwise(F.col("AccountCR")))\
    .withColumn("TotalAmt", total.SumAmountDB + total.SumAmountCR)\
    .drop("AccountDB", "SumAmountDB", "AccountCR", "SumAmountCR")
total = total.join(df_accounts.alias("a"), total.AccountID == df_accounts.AccountID)\
    .select(total.AccountID, "ClientId", "TotalAmt")

# total.show()

corporate_account = (
    df_accounts
    .join(df_clients.alias("c"), df_accounts.ClientId == df_clients.ClientId)
    .join(total.alias("t"), total.AccountID == df_accounts.AccountID, "inner")
    .select(df_accounts.AccountID, "AccountNum", "DateOpen", "c.ClientId", "ClientName", "TotalAmt")    
)

corporate_account.show()

corporate_account.write\
    .format("parquet")\
    .option("path", "data/" + data_mart_date + "/corporate_account")\
    .mode("overwrite")\
    .save()

+---------+--------------------+----------+--------+----------+----------+
|AccountID|          AccountNum|  DateOpen|ClientId|ClientName|  TotalAmt|
+---------+--------------------+----------+--------+----------+----------+
|        1|40702810927050000337|2020-11-01|       1|  Клиент 1|1239646.48|
|        4|40802810800000030701|2020-11-04|       4|  Клиент 4|1698092.02|
|        5|40802810300000011071|2020-11-01|       5|  Клиент 5| 856586.88|
|        8|40802810409260005894|2020-11-04|       8|  Клиент 8|1504628.29|
|        9|40802810905030000004|2020-11-01|       9|  Клиент 9| 1354229.6|
|       12|40702810836260000957|2020-11-04|      12| Клиент 12|1110519.85|
|       13|40702810000000150277|2020-11-01|      13| Клиент 13| 1239964.2|
|       16|40702810400000099734|2020-11-04|      16| Клиент 16| 1698504.8|
|       17|40702810600000057437|2020-11-01|      17| Клиент 17| 856891.34|
|       20|40702810473000000378|2020-11-04|      20| Клиент 20|1504946.02|
|       21|40702810300000

### 3. Витрина corporate_info

In [98]:
# ClientId	    ИД клиента (PK)
# ClientName	Наименование клиента
# Type	        Тип клиента (ФЛ, ЮЛ)
# Form	        Организационно-правовая форма (ООО, ИП и т.п.)
# RegisterDate	Дата регистрации клиента
# TotalAmt	    Сумма операций по всем счетам клиент. Считается как сумма corporate_account.total_amt по всем счетам.
# CutoffDt	    Дата операции

grouped = total.groupby("ClientId").agg(F.sum("TotalAmt").alias("TotalAmt"))

corporate_info = (
    df_clients.join(grouped, df_clients.ClientId == grouped.ClientId)
    .select(df_clients.ClientId, "ClientName", "Type", "Form", "RegisterDate", F.round("TotalAmt", 2).alias("TotalAmt"))
)

corporate_info.show()

corporate_info.write\
    .format("parquet")\
    .option("path", "data/" + data_mart_date + "/corporate_info")\
    .mode("overwrite")\
    .save()

+--------+-----------+----+----+------------+----------+
|ClientId| ClientName|Type|Form|RegisterDate|  TotalAmt|
+--------+-----------+----+----+------------+----------+
|     148| Клиент 148|   Ю|  ИП|  2020-11-01| 1703045.6|
|     496| Клиент 496|   Ю| ПАО|  2020-11-02| 1715016.8|
|     833| Клиент 833|   Ф|   -|  2020-11-03|2604170.91|
|    1088|Клиент 1088|   Ю| ПАО|  2020-11-03|1533223.45|
|    1580|Клиент 1580|   Ю|  ИП|  2020-11-01|1546250.14|
|    1645|Клиент 1645|   Ф|   -|  2020-11-03|2435103.26|
|    1829|Клиент 1829|   Ф|   -|  2020-11-02|2663704.82|
|    3749|Клиент 3749|   Ф|   -|  2020-11-04|2778468.98|
|    3997|Клиент 3997|   Ф|   -|  2020-11-03|2557054.45|
|    4101|Клиент 4101|   Ф|   -|  2020-11-02|3107966.58|
|    4900|Клиент 4900|   Ю|  ИП|  2020-11-03| 1866514.4|
|    5156|Клиент 5156|   Ю|  ИП|  2020-11-04|1640931.89|
|    5300|Клиент 5300|   Ю|  ИП|  2020-11-01|1644744.58|
|    5803|Клиент 5803|   Ф|   -|  2020-11-03| 1260934.0|
|    6336|Клиент 6336|   Ю| ПАО