## Тестовое задание на должность Data Engineer

## Входные данные
```
+-------+----------+----------+---------+
|partner|  rep_date|     value|  article|
+-------+----------+----------+---------+
|  00001|2021-05-17|   8951.36|    begin|
|  00001|2021-05-17|-117015.26|comission|
|  00001|2021-05-17|     -0.15|      end|
|  00002|2021-03-05|  -8517.04|    begin|
|  00002|2021-03-05|     -0.92|comission|
|  00002|2021-03-05|  -6706.29|      end|
|  00002|2021-05-09|   -245.28|    begin|
|  00002|2021-05-09|  -1053.11|comission|
|  00002|2021-05-09|      -4.3|      end|
|  00004|2021-11-04|    -81.37|    begin|
|  00004|2021-11-04|    245.12|comission|
|  00004|2021-11-04|   -326.49|      end|
|  00006|2021-12-05|   -4847.4|    begin|
|  00006|2021-12-05|  30825.59|comission|
|  00006|2021-12-05|     -6.49|      end|
|  00006|2021-12-28|      7.98|    begin|
|  00006|2021-12-28|    238.45|comission|
|  00006|2021-12-28|   6619.69|      end|
|  00008|2021-04-19| -56554.39|    begin|
|  00008|2021-04-19|      5.97|comission|
+-------+----------+----------+---------+
```
Датасет описывает активность клиента и содержит колонки:
- `partner` - номер клиента
- `rep_date` - дата
- `article` - название статьи (`begin` - остатки по счетам клиента в начале дня, `end` - остатки по счетам клиента в конце дня, `comission` - комиссия)
- `value` - значение

## Теория
Клиент `partner` является **активным** на определенный день `rep_date`, если у него есть ненулевое изменение остатков по счетам в течение дня без учета списания комиссий `comission` банком (пассивное действие, не требующее участия клиента). В остальных случаях он считается **неактивным**.

_Например, если изменение остатков по счетам у клиента за день составило 100 рублей, и комиссия составила 100 рублей, то он тратил деньги только на комисию, поэтому он неактивен. Если клиент тратил деньги не только на комиссию, то он - активен._

Если клиент был неактивным `N=30` или более дней, то он переходит в **отток**. Если клиент вернулся в банк после периода неактивности в `N=30` или более дней, то он становится **новым** клиентом.

Задача банка - предотвращать отток, так как удерживать старых клиентов - намного дешевле, чем привлекать новых. Банк использует модели для поиска клиентов с высоким риском оттока и коммуницирует с ними с целью их удержания в банке. Для этого клиенту, например, могут быть предложены более выгодные условия по какому-либо продукту или более высокий cashback.  

## Задача
Собрать датасет для аналитиков и Data Scientist-ов с разметкой периодов активности клиента.

Выходные данные должны иметь структуру:
- `partner` - номер клиента
- `rep_date` - дата
- `life` - номер жизни клиента

_Например, клиент Х активен 10 дней: 2022-01-07, 2022-01-09, 2022-01-13, 2022-02-21, 2022-03-24, 2022-04-02, 2022-05-13, 2022-05-23, 2022-05-31. Первые три даты относятся к первой жизни клиента (между ними менее 30 дней). Далее клиент оттекает и возвращается только 2022-02-21 (это вторая жизнь клиента). Третья жизнь клиента начинается с 2022-05-13._

```
+-------+----------+----+
|partner|  rep_date|life|
+-------+----------+----+
|  03255|2022-01-07|   0|
|  03255|2022-01-09|   0|
|  03255|2022-01-13|   0|
|  03255|2022-02-21|   1|
|  03255|2022-03-24|   2|
|  03255|2022-04-02|   2|
|  03255|2022-05-13|   3|
|  03255|2022-05-23|   3|
|  03255|2022-05-31|   3|
+-------+----------+----+

```

## Требования
* Задание необходимо выполнить на Spark (PySpark, Spark SQL, Scala)

## Решение

1) Устанавливаем необходимые библиотеки:

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 47 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 57.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=33725c4fa144afe30937a2feceb2816eec4c9b0c0a820399d40a1b8a6e753f90
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import when

2) Инициализируем спарк-сессию и прочитаем датасет:

In [None]:
spark = SparkSession.builder.getOrCreate()

dataset = spark.read.parquet("/dataset")
dataset = dataset.select("*").orderBy("partner", "rep_date", "article")
dataset.show()

+-------+----------+---------+---------+
|partner|  rep_date|    value|  article|
+-------+----------+---------+---------+
|  00000|2021-06-30|    -0.11|    begin|
|  00000|2021-06-30|     0.62|comission|
|  00000|2021-06-30|159632.82|      end|
|  00001|2021-05-17|   895.14|    begin|
|  00001|2021-05-17|    -1.17|comission|
|  00001|2021-05-17|  -146.45|      end|
|  00001|2021-07-31|     1.49|    begin|
|  00001|2021-07-31|     1.25|comission|
|  00001|2021-07-31| -6766.27|      end|
|  00002|2021-03-05|    -8.52|    begin|
|  00002|2021-03-05|    -0.92|comission|
|  00002|2021-03-05| -6706.29|      end|
|  00002|2021-03-23|     27.4|    begin|
|  00002|2021-03-23|   259.22|comission|
|  00002|2021-03-23|     4.06|      end|
|  00002|2021-05-09|-24527.77|    begin|
|  00002|2021-05-09| -1053.11|comission|
|  00002|2021-05-09| -4303.97|      end|
|  00004|2021-09-27|     1.28|    begin|
|  00004|2021-09-27|    -47.7|comission|
+-------+----------+---------+---------+
only showing top

3) Транспонируем изначачальный датасет с помощью метода pivot(), что позволит нам определять активность партнёра по формуле: begin.value + comission.value - end.value. Если значение вычисления равно нулю, значит в этот день партнёр был неактивен и эта дата нас не интересует.
Для метода pivot() требуется обернуть значения второй транспонируемой колонки в функцию агрегации, использование которой не даст нам нужный результат. Для решения этой проблемы сгруппируем и отсортируем наш датасет по id партнёров и датам отчёта:


In [None]:
transposed = dataset\
        .groupBy("partner", "rep_date")\
        .pivot("article").sum("value")\
        .where(F.col("begin")+F.col("comission")-F.col("end") != 0)\
        .orderBy("partner", "rep_date")

transposed.createOrReplaceTempView("transposed")

spark.sql("select * from transposed").show()

+-------+----------+---------+---------+---------+
|partner|  rep_date|    begin|comission|      end|
+-------+----------+---------+---------+---------+
|  00000|2021-06-30|    -0.11|     0.62|159632.82|
|  00001|2021-05-17|   895.14|    -1.17|  -146.45|
|  00001|2021-07-31|     1.49|     1.25| -6766.27|
|  00002|2021-03-05|    -8.52|    -0.92| -6706.29|
|  00002|2021-03-23|     27.4|   259.22|     4.06|
|  00002|2021-05-09|-24527.77| -1053.11| -4303.97|
|  00004|2021-09-27|     1.28|    -47.7|    11.64|
|  00004|2021-11-04|    -0.81|    31.84|   -32.65|
|  00005|2021-05-05|   112.61|     0.94|  1503.41|
|  00006|2021-11-01|  -812.79|  -699.48|  -113.31|
|  00006|2021-12-05|    -0.48|    30.83| -6491.63|
|  00006|2021-12-28|    79.82| 23844.79|     6.62|
|  00008|2021-01-28|148218.19|     1.55| -9552.59|
|  00008|2021-04-19|  -565.54|   5973.2| 50609.74|
|  00008|2021-11-01| -1538.83|    11.09|-79920.15|
|  00008|2021-11-28|  -130.77| -1202.58|    94.25|
|  00009|2021-01-02|    35.88| 

4) Создание временного представления промежуточной таблицы transfer:

В запросе временной таблицы temp мы выбираем предыдущие даты активности партнёра и нынешнюю дату активности из transposed.

Затем из temp мы берём идентификатор партнёра, дату репорта и проставляем значение жизни на основе разницы в днях (этот столбец можно назвать индикатором начала новой жизни)


In [None]:
transfer = spark.sql('''
with temp as (
  select LAG(rep_date, 1, null) 
    Over (PARTITION BY partner ORDER BY rep_date) as prev_date, partner, rep_date
    from transposed
)

select partner, rep_date, 
  case 
    when DATEDIFF(day, prev_date, rep_date) >= 30 then 1
    else 0
  end as life
  from temp
  order by partner, rep_date
''')

transfer.createOrReplaceTempView('transfer')
transfer.show(10)

+-------+----------+----+
|partner|  rep_date|life|
+-------+----------+----+
|  00000|2021-06-30|   0|
|  00001|2021-05-17|   0|
|  00001|2021-07-31|   1|
|  00002|2021-03-05|   0|
|  00002|2021-03-23|   0|
|  00002|2021-05-09|   1|
|  00004|2021-09-27|   0|
|  00004|2021-11-04|   1|
|  00005|2021-05-05|   0|
|  00006|2021-11-01|   0|
+-------+----------+----+
only showing top 10 rows



5) Получаем результирующую таблицу:

Выбираем столбцы, которые нам нужны, а суммирование значения life мы оборачиваем в оконную функцию, которая разбивает на партиции по идентификаторам партнёра в порядке даты репорта и значения life.


In [None]:
rslt = spark.sql('''
select partner, rep_date, (sum(life) 
  over (partition by partner order by rep_date, life)) as life 
  from transfer
''')

In [None]:
rslt.show(50)

+-------+----------+----+
|partner|  rep_date|life|
+-------+----------+----+
|  00000|2021-06-30|   0|
|  00001|2021-05-17|   0|
|  00001|2021-07-31|   1|
|  00002|2021-03-05|   0|
|  00002|2021-03-23|   0|
|  00002|2021-05-09|   1|
|  00004|2021-09-27|   0|
|  00004|2021-11-04|   1|
|  00005|2021-05-05|   0|
|  00006|2021-11-01|   0|
|  00006|2021-12-05|   1|
|  00006|2021-12-28|   1|
|  00008|2021-01-28|   0|
|  00008|2021-04-19|   1|
|  00008|2021-11-01|   2|
|  00008|2021-11-28|   2|
|  00009|2021-01-02|   0|
|  00009|2021-05-25|   1|
|  00010|2021-04-26|   0|
|  00011|2021-09-07|   0|
|  00012|2021-01-27|   0|
|  00012|2021-02-22|   0|
|  00012|2021-04-05|   1|
|  00012|2021-05-02|   1|
|  00012|2021-11-30|   2|
|  00012|2021-12-05|   2|
|  00013|2021-04-25|   0|
|  00014|2021-02-19|   0|
|  00014|2021-12-26|   1|
|  00015|2021-06-09|   0|
|  00015|2021-09-21|   1|
|  00015|2021-11-06|   2|
|  00015|2021-11-08|   2|
|  00016|2021-02-26|   0|
|  00017|2021-02-25|   0|
|  00018|202