# Подготовка датасетов

In [1]:
# Импорт библиотек
import random
import pandas as pd
from datetime import timedelta, datetime

pd.set_option('display.max_columns', None) 

## Подготовка статистической информации

In [2]:
data = pd.read_csv('BankChurners.csv', sep=',')
data = data[data.columns[:-12]]
data.rename(columns={'CLIENTNUM': 'user_id'}, inplace=True)
data.head(5)

Unnamed: 0,user_id,Attrition_Flag,Customer_Age,Gender,Dependent_count,Education_Level,Marital_Status,Income_Category,Card_Category,Months_on_book,Total_Relationship_Count
0,768805383,Existing Customer,45,M,3,High School,Married,$60K - $80K,Blue,39,5
1,818770008,Existing Customer,49,F,5,Graduate,Single,Less than $40K,Blue,44,6
2,713982108,Existing Customer,51,M,3,Graduate,Married,$80K - $120K,Blue,36,4
3,769911858,Existing Customer,40,F,4,High School,Unknown,Less than $40K,Blue,34,3
4,709106358,Existing Customer,40,M,3,Uneducated,Married,$60K - $80K,Blue,21,5


In [3]:
data.to_csv(path_or_buf='./BankChurners_static_data.csv', index=False)

## Генерация транзакций

In [4]:
# Список операций
operations_list = ['buy', 'deposit', 'open_product', 'close_product']


# Функция итерации по времени
def datetime_range(start, delta):
    current = start
    for _ in range(1051200):
        yield current
        current += delta

Создание id транзакции

In [5]:
transaction_id = [x for x in range(1, 1051201)]

Выбор пользователя

In [7]:
# Получение списка id пользователей
customer_id = pd.read_csv('BankChurners_static_data.csv')['user_id'].tolist()

# Выбор рандомного пользователя
user_id = [random.choice(customer_id) for _ in range(1051200)]

Выбор рандомной операции с вероятностями

In [8]:
events = [random.choices(operations_list, weights=[46, 44, 5, 5])[0] for _ in range(1051200)]

Генерации времени транзакции

In [9]:
dts = [dt.strftime('%Y-%m-%d %H:%M:%S') for dt in datetime_range(datetime(2021, 1, 1), timedelta(seconds=30))]

Деньги

In [10]:
money = []
for event in events:
    if event in ('buy', 'deposit'):
        money.append(random.randint(100, 5000))
    else:
        money.append(0)

Создание и сохранение датафрейма

In [11]:
df = pd.DataFrame(list(zip(transaction_id, user_id, events, dts, money)), \
                  columns=['transaction_id', 'user_id', 'event', 'event_date', 'money'])

df.to_csv(path_or_buf='./BankChurners_transaction.csv', index=False)

In [12]:
df.head(5)

Unnamed: 0,transaction_id,user_id,event,event_date,money
0,1,714745608,buy,2021-01-01 00:00:00,4636
1,2,713271708,deposit,2021-01-01 00:00:30,3259
2,3,712009083,buy,2021-01-01 00:01:00,842
3,4,822125283,buy,2021-01-01 00:01:30,1322
4,5,710825658,deposit,2021-01-01 00:02:00,3690


# Построение витрины

In [None]:
/spark2.4/bin/pyspark --master local[1] --driver-memory 512m --num-executors 1 --executor-memory 512m

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType

Загрузка данных

In [None]:
static_data = spark.read.csv("file:///home/BD_228_yriabinin/data/BankChurners_static_data.csv", sep=',', header=True)


transact = spark.read.csv("file:///home/BD_228_yriabinin/data/BankChurners_transaction.csv", sep=',', header=True)

Количество открытых продуктов

In [None]:
temp_df = transact.filter(transact.event == 'open_product').groupBy('user_id').agg(F.count('event').alias('Open_Prod'))
static_data = static_data.join(temp_df, 'user_id', 'left')

Количество закрытых продуктов

In [None]:
temp_df = transact.filter(transact.event == 'close_product').groupBy('user_id').agg(F.count('event').alias('Close_Prod'))
static_data = static_data.join(temp_df, 'user_id', 'left')

Изменение количества продуктов к концу года и отношение количества продуктов в конце года к началу года

In [None]:
static_data = static_data.withColumn('Total_Rel_Chng', (F.col('Total_Relationship_Count') + F.col('Open_Prod') - \
                                                        F.col('Close_Prod')).cast('integer')). \
    withColumn('Total_Rel_Chng_Ratio', F.round(F.col('Total_Rel_Chng') / F.col('Total_Relationship_Count'), 2))

Количество открытых продуктов по кварталам

In [None]:
temp_df = transact.filter(transact.event == 'open_product'). \
    filter((transact.event_date >= '2021-01-01 00:00:00') & (transact.event_date < '2021-04-01 00:00:00')). \
    groupBy('user_id').agg(F.count('event').alias('Open_Prod_Q1'))
static_data = static_data.join(temp_df, 'user_id', 'left')

temp_df = transact.filter(transact.event == 'open_product'). \
    filter((transact.event_date >= '2021-04-01 00:00:00') & (transact.event_date < '2021-07-01 00:00:00')). \
    groupBy('user_id').agg(F.count('event').alias('Open_Prod_Q2'))
static_data = static_data.join(temp_df, 'user_id', 'left')

temp_df = transact.filter(transact.event == 'open_product'). \
    filter((transact.event_date >= '2021-07-01 00:00:00') & (transact.event_date < '2021-10-01 00:00:00')). \
    groupBy('user_id').agg(F.count('event').alias('Open_Prod_Q3'))
static_data = static_data.join(temp_df, 'user_id', 'left')

temp_df = transact.filter(transact.event == 'open_product'). \
    filter((transact.event_date >= '2021-10-01 00:00:00') & (transact.event_date <= '2021-12-31 23:59:30')). \
    groupBy('user_id').agg(F.count('event').alias('Open_Prod_Q4'))
static_data = static_data.join(temp_df, 'user_id', 'left')

Количество закрытых продуктов по кварталам

In [None]:
temp_df = transact.filter(transact.event == 'close_product'). \
    filter((transact.event_date >= '2021-01-01 00:00:00') & (transact.event_date < '2021-04-01 00:00:00')). \
    groupBy('user_id').agg(F.count('event').alias('Close_Prod_Q1'))
static_data = static_data.join(temp_df, 'user_id', 'left')

temp_df = transact.filter(transact.event == 'close_product'). \
    filter((transact.event_date >= '2021-04-01 00:00:00') & (transact.event_date < '2021-07-01 00:00:00')). \
    groupBy('user_id').agg(F.count('event').alias('Close_Prod_Q2'))
static_data = static_data.join(temp_df, 'user_id', 'left')

temp_df = transact.filter(transact.event == 'close_product'). \
    filter((transact.event_date >= '2021-07-01 00:00:00') & (transact.event_date < '2021-10-01 00:00:00')). \
    groupBy('user_id').agg(F.count('event').alias('Close_Prod_Q3'))
static_data = static_data.join(temp_df, 'user_id', 'left')

temp_df = transact.filter(transact.event == 'close_product'). \
    filter((transact.event_date >= '2021-10-01 00:00:00') & (transact.event_date <= '2021-12-31 23:59:30')). \
    groupBy('user_id').agg(F.count('event').alias('Close_Prod_Q4'))
static_data = static_data.join(temp_df, 'user_id', 'left')

Количество обращений в банк

In [None]:
temp_df = transact.filter((transact.event == 'open_product') | (transact.event == 'close_product')). \
    groupBy('user_id').agg(F.count('event').alias('Contacts_Count_12_mon'))
static_data = static_data.join(temp_df, 'user_id', 'left')

Количество транзакций в год

In [None]:
temp_df = transact.filter((transact.event == 'buy') | (transact.event == 'deposit')). \
    groupBy('user_id').agg(F.count('event').alias('Total_Trans_Ct_12_mon'))
static_data = static_data.join(temp_df, 'user_id', 'left')

Количество транзакций по кварталам

In [None]:
temp_df = transact.filter((transact.event == 'buy') | (transact.event == 'deposit')). \
    filter((transact.event_date >= '2021-01-01 00:00:00') & (transact.event_date < '2021-04-01 00:00:00')). \
    groupBy('user_id').agg(F.count('event').alias('Trans_Ct_Q1'))
static_data = static_data.join(temp_df, 'user_id', 'left')

temp_df = transact.filter((transact.event == 'buy') | (transact.event == 'deposit')). \
    filter((transact.event_date >= '2021-04-01 00:00:00') & (transact.event_date < '2021-07-01 00:00:00')). \
    groupBy('user_id').agg(F.count('event').alias('Trans_Ct_Q2'))
static_data = static_data.join(temp_df, 'user_id', 'left')

temp_df = transact.filter((transact.event == 'buy') | (transact.event == 'deposit')). \
    filter((transact.event_date >= '2021-07-01 00:00:00') & (transact.event_date < '2021-10-01 00:00:00')). \
    groupBy('user_id').agg(F.count('event').alias('Trans_Ct_Q3'))
static_data = static_data.join(temp_df, 'user_id', 'left')

temp_df = transact.filter((transact.event == 'buy') | (transact.event == 'deposit')). \
    filter((transact.event_date >= '2021-10-01 00:00:00') & (transact.event_date <= '2021-12-31 23:59:30')). \
    groupBy('user_id').agg(F.count('event').alias('Trans_Ct_Q4'))
static_data = static_data.join(temp_df, 'user_id', 'left')

Сумма по расходным операциям

In [None]:
temp_df = transact.filter(transact.event == 'buy'). \
    groupBy('user_id').agg(F.sum('money').cast('integer').alias('Total_Buy_Amt'))
static_data = static_data.join(temp_df, 'user_id', 'left')

Сумма по приходным операциям

In [None]:
temp_df = transact.filter(transact.event == 'deposit'). \
    groupBy('user_id').agg(F.sum('money').cast('integer').alias('Total_Dep_Amt'))
static_data = static_data.join(temp_df, 'user_id', 'left')

Заполнение нулями пустых значений

In [None]:
static_data = static_data.na.fill(0)

Сохранение датафрейма в файл

In [None]:
static_data.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save('file:///home/BD_228_yriabinin/data/output.csv')