<a href="https://colab.research.google.com/github/YulyaZh/-/blob/main/Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

"""Apache Spark: задача кредитного скоринга
# Финальное задание
Перепешите код ниже на pyspark.  
Для оценки модели используйте [BinaryClassificationEvaluator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.BinaryClassificationEvaluator.html)


In [2]:
from matplotlib import pyplot as plt
from matplotlib import rcParams
import seaborn as sns
import numpy as np

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric

# Создаём спарк-сессию

In [None]:
spark = SparkSession.builder \
    .appName('Spark_app') \
    .getOrCreate()

#Считываем данные
# Ниже для тех, у кого хоть раз были просрчоки больше 60 дней, ставим в таргет 1.
# Загружаем данные

In [None]:
data = spark.read.csv("application_record.csv",  encoding = 'utf-8')
record = spark.read.csv("credit_record.csv", encoding = 'utf-8')

# Добавляем срок кредита к параметрам выдачи кредита

In [None]:
begin_month = record.groupBy("ID").agg(F.min(F.col("MONTHS_BALANCE")).alias("begin_month")).withColumn("begin_month", F.col("begin_month") * -1)
new_data = data.join(begin_month, on="ID", how="left")

# # Больше 60, то это просрочка, ставим - Yes, если просрочка есть за срок кредита,то так же ставим Yes

In [None]:
record = spark.read.csv("/content/application_record.csv", header=True)

# Создаем новый столбец 'dep_value' и устанавливаем его значение в зависимости от условий

In [None]:
record = record.withColumn('dep_value', F.lit(None))
record = record.withColumn('dep_value', F.when(record['STATUS'] == '2', 'Yes').otherwise(record['dep_value']))
record = record.withColumn('dep_value', F.when(record['STATUS'] == '3', 'Yes').otherwise(record['dep_value']))
record = record.withColumn('dep_value', F.when(record['STATUS'] == '4', 'Yes').otherwise(record['dep_value']))
record = record.withColumn('dep_value', F.when(record['STATUS'] == '5', 'Yes').otherwise(record['dep_value']))

# Группируем по 'ID' и считаем количество записей

In [None]:
cpunt = record.groupby('ID').agg(F.count('dep_value').alias('dep_count'))

# Устанавливаем значения 'dep_value' в зависимости от количества записей

In [None]:
cpunt = cpunt.withColumn('dep_value', F.when(cpunt['dep_count'] > 0, 'Yes').otherwise('No'))

# Джойним все данные вместе

In [None]:
new_data = new_data.join(cpunt.select('ID', 'dep_value'), on="ID", how="inner")

# Заменяем значения 'Yes' и 'No' на 1 и 0 в столбце 'dep_value'

In [None]:
new_data = new_data.withColumn('target', F.when(new_data['dep_value'] == 'Yes', 1).otherwise(F.when(new_data['dep_value'] == 'No', 0)))

# Удаляем столбец 'dep_value'

In [None]:
new_data = new_data.drop('dep_value')

# Выводим первые строки нового датасета с помощью PySpark

In [None]:
new_data.show()

# Оставим только часть признаков

In [None]:
selected_features = ['AMT_INCOME_TOTAL', 'CODE_GENDER', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'CNT_CHILDREN']
selected_target = ['target']

# Выбираем только заданные признаки и целевую переменную

In [None]:
dataset = new_data.select(selected_features + selected_target)

# Преобразуем целевую переменную в числовой формат

In [None]:
dataset = dataset.withColumn(selected_target[0], dataset[selected_target[0]].cast('double'))

# Преобразуем данные векторный формат для обучения модели

In [None]:
vector_assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
input_data = vector_assembler.transform(dataset)

# Разделим выборку на обучающую и тестовую

In [None]:
(train_data, test_data) = input_data.randomSplit([0.7, 0.3], seed=42)


# Применяем StringIndexer для преобразования категориальных признаков в численные

In [None]:
string_indexer = StringIndexer(inputCol="CODE_GENDER", outputCol="CODE_GENDER_INDEX")
model = string_indexer.fit(train_data)
indexed_train_data = model.transform(train_data)

# Применяем OneHotEncoder для преобразования численных индексов в бинарные признаки

In [None]:
onehot_encoder = OneHotEncoder(inputCols=["CODE_GENDER_INDEX"], outputCols=["CODE_GENDER_ONEHOT"])
onehot_model = onehot_encoder.fit(indexed_train_data)
encoded_train_data = onehot_model.transform(indexed_train_data)

# Повторяем преобразования для тестовых данных

In [None]:
indexed_test_data = model.transform(test_data)
encoded_test_data = onehot_model.transform(indexed_test_data)

# Собираем численные признаки в один вектор

In [None]:
vector_assembler = VectorAssembler(inputCols=["AMT_INCOME_TOTAL", "CNT_CHILDREN"], outputCol="numerical_features")
assembled_train_data = vector_assembler.transform(train_data)
assembled_test_data = vector_assembler.transform(test_data)

# Применяем MinMaxScaler для масштабирования численных признаков

In [None]:
scaler = MinMaxScaler(inputCol="numerical_features", outputCol="scaled_features")
scaler_model = scaler.fit(assembled_train_data)
scaled_train_data = scaler_model.transform(assembled_train_data)
scaled_test_data = scaler_model.transform(assembled_test_data)

# Объединяем преобразованные данные

In [None]:
X_train = scaled_train_data.join(encoded_train_data, on="ID", how="inner")
X_test = scaled_test_data.join(encoded_test_data, on="ID", how="inner")

"""#  Модель"""

# Создаем экземпляр модели логистической регрессии

In [None]:
lr = LogisticRegression(featuresCol='features', labelCol='target')

# Обучаем модель на обучающем датасете

In [None]:
lr_model = lr.fit(X_train)

# Вычисляем оценку модели на обучающем и тестовом датасетах

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol="target")

In [None]:
train_score = evaluator.evaluate(lr_model.transform(X_train))
test_score = evaluator.evaluate(lr_model.transform(X_test))

In [None]:
print(f'Оценка модели на обучающем датасете: {train_score}, на тестовом датасете: {test_score}')