# Финальное задание
Перепешите код на pyspark. 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import pyspark.sql.functions as F
from pyspark.ml.linalg import DenseVector
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler

In [2]:


spark = SparkSession.builder\
        .master('local[*]')\
        .appName("Example") \
        .config("spark.some.config.option","some-value") \
        .getOrCreate()

sc = spark.sparkContext

In [3]:
from pyspark.sql.types import *


schema = StructType([
    StructField('ID', IntegerType()),
    StructField('CODE_GENDER', StringType()),
    StructField('FLAG_OWN_CAR', StringType()),
    StructField('FLAG_OWN_REALTY', StringType()),
    StructField('CNT_CHILDREN ', FloatType()),
    StructField('AMT_INCOME_TOTAL', FloatType()),
    StructField('NAME_INCOME_TYPE', StringType()),
    StructField('NAME_EDUCATION_TYPE', StringType()),
    StructField('NAME_FAMILY_STATUS', StringType()),  
    StructField('NAME_HOUSING_TYPE', StringType()),
    StructField('DAYS_BIRTH', FloatType()),
    StructField('DAYS_EMPLOYED', FloatType()),
    StructField('FLAG_WORK_PHONE', FloatType()),
    StructField('FLAG_PHONE', FloatType()),
    StructField('FLAG_EMAIL', FloatType()),
    StructField('OCCUPATION_TYPE', StringType()),
    StructField('CNT_FAM_MEMBERS', FloatType())
])
schema2 = StructType([
    StructField('ID', IntegerType()),
    StructField('MONTHS_BALANCE', FloatType()),
    StructField('STATUS', StringType())
])
 

In [4]:
data = spark.read.option(key="header", value="true").schema(schema).csv("application_record.csv")

In [5]:
data.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN : float (nullable = true)
 |-- AMT_INCOME_TOTAL: float (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- DAYS_BIRTH: float (nullable = true)
 |-- DAYS_EMPLOYED: float (nullable = true)
 |-- FLAG_WORK_PHONE: float (nullable = true)
 |-- FLAG_PHONE: float (nullable = true)
 |-- FLAG_EMAIL: float (nullable = true)
 |-- OCCUPATION_TYPE: string (nullable = true)
 |-- CNT_FAM_MEMBERS: float (nullable = true)



In [6]:
record = spark.read.option(key="header", value="true").schema(schema2).csv("credit_record.csv")
record.show()
record.printSchema()

+-------+--------------+------+
|     ID|MONTHS_BALANCE|STATUS|
+-------+--------------+------+
|5001711|           0.0|     X|
|5001711|          -1.0|     0|
|5001711|          -2.0|     0|
|5001711|          -3.0|     0|
|5001712|           0.0|     C|
|5001712|          -1.0|     C|
|5001712|          -2.0|     C|
|5001712|          -3.0|     C|
|5001712|          -4.0|     C|
|5001712|          -5.0|     C|
|5001712|          -6.0|     C|
|5001712|          -7.0|     C|
|5001712|          -8.0|     C|
|5001712|          -9.0|     0|
|5001712|         -10.0|     0|
|5001712|         -11.0|     0|
|5001712|         -12.0|     0|
|5001712|         -13.0|     0|
|5001712|         -14.0|     0|
|5001712|         -15.0|     0|
+-------+--------------+------+
only showing top 20 rows

root
 |-- ID: integer (nullable = true)
 |-- MONTHS_BALANCE: float (nullable = true)
 |-- STATUS: string (nullable = true)



In [7]:
new_df = record.groupBy('ID').agg((F.min('MONTHS_BALANCE')*-1).alias('begin_month'))
new_df.show()

+-------+-----------+
|     ID|begin_month|
+-------+-----------+
|5001812|       22.0|
|5001849|        8.0|
|5001921|       19.0|
|5003338|       33.0|
|5003386|        7.0|
|5003485|       16.0|
|5003623|       44.0|
|5004426|       21.0|
|5004485|       14.0|
|5004511|       49.0|
|5004620|       16.0|
|5004650|       37.0|
|5004774|       28.0|
|5005000|        6.0|
|5005607|       35.0|
|5005681|       30.0|
|5009033|       16.0|
|5009304|       34.0|
|5009355|       25.0|
|5009429|        2.0|
+-------+-----------+
only showing top 20 rows



In [8]:
record.join(new_df, on = 'ID', how = 'left').show()


+-------+--------------+------+-----------+
|     ID|MONTHS_BALANCE|STATUS|begin_month|
+-------+--------------+------+-----------+
|5001711|           0.0|     X|        3.0|
|5001711|          -1.0|     0|        3.0|
|5001711|          -2.0|     0|        3.0|
|5001711|          -3.0|     0|        3.0|
|5001712|           0.0|     C|       18.0|
|5001712|          -1.0|     C|       18.0|
|5001712|          -2.0|     C|       18.0|
|5001712|          -3.0|     C|       18.0|
|5001712|          -4.0|     C|       18.0|
|5001712|          -5.0|     C|       18.0|
|5001712|          -6.0|     C|       18.0|
|5001712|          -7.0|     C|       18.0|
|5001712|          -8.0|     C|       18.0|
|5001712|          -9.0|     0|       18.0|
|5001712|         -10.0|     0|       18.0|
|5001712|         -11.0|     0|       18.0|
|5001712|         -12.0|     0|       18.0|
|5001712|         -13.0|     0|       18.0|
|5001712|         -14.0|     0|       18.0|
|5001712|         -15.0|     0| 

In [9]:
record.withColumn('dep_value', F.lit(0)).show()

+-------+--------------+------+---------+
|     ID|MONTHS_BALANCE|STATUS|dep_value|
+-------+--------------+------+---------+
|5001711|           0.0|     X|        0|
|5001711|          -1.0|     0|        0|
|5001711|          -2.0|     0|        0|
|5001711|          -3.0|     0|        0|
|5001712|           0.0|     C|        0|
|5001712|          -1.0|     C|        0|
|5001712|          -2.0|     C|        0|
|5001712|          -3.0|     C|        0|
|5001712|          -4.0|     C|        0|
|5001712|          -5.0|     C|        0|
|5001712|          -6.0|     C|        0|
|5001712|          -7.0|     C|        0|
|5001712|          -8.0|     C|        0|
|5001712|          -9.0|     0|        0|
|5001712|         -10.0|     0|        0|
|5001712|         -11.0|     0|        0|
|5001712|         -12.0|     0|        0|
|5001712|         -13.0|     0|        0|
|5001712|         -14.0|     0|        0|
|5001712|         -15.0|     0|        0|
+-------+--------------+------+---

In [10]:
record = record.withColumn(
    'dep_value', 
    F.when((F.col('STATUS') == '2') \
           | (F.col('STATUS') =='3') | (F.col('STATUS') == '4') \
           | (F.col('STATUS') =='5'), 1)
    .otherwise(0)
)

record.show()

+-------+--------------+------+---------+
|     ID|MONTHS_BALANCE|STATUS|dep_value|
+-------+--------------+------+---------+
|5001711|           0.0|     X|        0|
|5001711|          -1.0|     0|        0|
|5001711|          -2.0|     0|        0|
|5001711|          -3.0|     0|        0|
|5001712|           0.0|     C|        0|
|5001712|          -1.0|     C|        0|
|5001712|          -2.0|     C|        0|
|5001712|          -3.0|     C|        0|
|5001712|          -4.0|     C|        0|
|5001712|          -5.0|     C|        0|
|5001712|          -6.0|     C|        0|
|5001712|          -7.0|     C|        0|
|5001712|          -8.0|     C|        0|
|5001712|          -9.0|     0|        0|
|5001712|         -10.0|     0|        0|
|5001712|         -11.0|     0|        0|
|5001712|         -12.0|     0|        0|
|5001712|         -13.0|     0|        0|
|5001712|         -14.0|     0|        0|
|5001712|         -15.0|     0|        0|
+-------+--------------+------+---

In [11]:
cpunt = record.groupBy('ID').agg((F.sum('dep_value')).alias('target'))
cpunt.show()

+-------+------+
|     ID|target|
+-------+------+
|5001812|     0|
|5001849|     0|
|5001921|     0|
|5003338|     0|
|5003386|     0|
|5003485|     0|
|5003623|     0|
|5004426|     0|
|5004485|     0|
|5004511|     0|
|5004620|     0|
|5004650|     0|
|5004774|     0|
|5005000|     0|
|5005607|     0|
|5005681|     0|
|5009033|     0|
|5009304|     0|
|5009355|     0|
|5009429|     0|
+-------+------+
only showing top 20 rows



In [12]:
cpunt=cpunt.withColumn('target', when(cpunt['target'] > 0, 1).otherwise(0))

In [13]:
data_new=data.join(cpunt , on = 'ID', how = 'inner')

In [None]:
df=data_new.select(col('AMT_INCOME_TOTAL'), col('CODE_GENDER'), col('FLAG_OWN_CAR'), col('FLAG_OWN_REALTY'), col('CNT_CHILDREN '), col('target'))
df.show()

+----------------+-----------+------------+---------------+-------------+------+
|AMT_INCOME_TOTAL|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN |target|
+----------------+-----------+------------+---------------+-------------+------+
|        427500.0|          M|           Y|              Y|          0.0|     0|
|        427500.0|          M|           Y|              Y|          0.0|     0|
|        112500.0|          M|           Y|              Y|          0.0|     0|
|        270000.0|          F|           N|              Y|          0.0|     0|
|        270000.0|          F|           N|              Y|          0.0|     0|
|        270000.0|          F|           N|              Y|          0.0|     0|
|        270000.0|          F|           N|              Y|          0.0|     0|
|        283500.0|          F|           N|              Y|          0.0|     0|
|        283500.0|          F|           N|              Y|          0.0|     0|
|        283500.0|          

In [None]:
df.filter('target!=0').show()

In [None]:
df.select(
    [F.count(F.when(F.isnan(c), c)).alias(c) for c in df.columns]
).show()

In [None]:
# Разбиваем на трейн и тест
train, test = df.randomSplit([0.7, 0.3])

In [None]:
print(train.count())
test.count()

In [None]:
categorial_variables = ['CODE_GENDER', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY']

for variable in categorial_variables:
    indexer = StringIndexer(inputCol=variable, outputCol=variable+'_index').fit(train)
    train = indexer.transform(train)
    test = indexer.transform(test)
    
    encoder = OneHotEncoder(inputCol=variable+"_index", outputCol=variable+"_vec").fit(train)
    train = encoder.transform(train)
    test = encoder.transform(test)

In [None]:
train.show()

In [None]:
asselmbler = VectorAssembler(
    inputCols=['AMT_INCOME_TOTAL', 'CNT_CHILDREN ', 'CODE_GENDER_vec', 'FLAG_OWN_CAR_vec', 'FLAG_OWN_REALTY_vec'],
    outputCol='features'
)

In [None]:
train = asselmbler.transform(train)
test = asselmbler.transform(test)

In [None]:
lrgen = LinearRegression(labelCol='target', featuresCol='features')

In [None]:
linear_model = lrgen.fit(train)

trainingSummary = linear_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [None]:
result = linear_model.transform(test).select(['target', 'prediction'])
result.show()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(labelCol='target', metricName="r2", predictionCol='prediction')
r2 = lr_evaluator.evaluate(result)
test_evaluation = linear_model.evaluate(test)
print('RMSE:{:.3}'.format(test_evaluation.rootMeanSquaredError))
print('r2:{:.3}'.format(test_evaluation.r2))

RMSE тренировочной и тестовой близки

In [None]:
linear_model.coefficients

Случайная лесная регрессия

In [None]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=10, maxDepth=5, seed=101, labelCol='target')
rf_model = rf.fit(train)

In [None]:
result = rf_model.transform(test).select(['target', 'prediction'])
result.show()

In [None]:
rf_evaluator = RegressionEvaluator(labelCol='target', metricName="rmse", predictionCol='prediction')
rmse = rf_evaluator.evaluate(result)
print("Среднеквадратическая ошибка тестовых данных (rmse): {}".format(rmse))