# Таблица 

In [88]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, asc
import pyspark.sql.functions as f
from pyspark.sql.functions import lower
from pyspark.sql.functions import abs
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col,isnan, when, count
spark = SparkSession.builder.appName("Introduction to Spark").getOrCreate()
df_pyspark = spark.read.csv("/mnt/c/Users/data.csv", header = True, inferSchema = True)
df_pyspark.show(5)

+--------+------------------+---------+---------+------------+----------------+----------------+------+-----------+----+------------------+--------------------+
|children|     days_employed|dob_years|education|education_id|   family_status|family_status_id|gender|income_type|debt|      total_income|             purpose|
+--------+------------------+---------+---------+------------+----------------+----------------+------+-----------+----+------------------+--------------------+
|       1|-8437.673027760233|       42|   высшее|           0| женат / замужем|               0|     F|  сотрудник|   0| 253875.6394525987|       покупка жилья|
|       1|-4024.803753850451|       36|  среднее|           1| женат / замужем|               0|     F|  сотрудник|   0|112080.01410244203|приобретение авто...|
|       0|-5623.422610230956|       33|  Среднее|           1| женат / замужем|               0|     M|  сотрудник|   0|145885.95229686378|       покупка жилья|
|       3|-4124.747206540018|     

# Поиск столбцов с пропусками

In [49]:
df_pyspark.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_pyspark.columns]).show()


+--------+-------------+---------+---------+------------+-------------+----------------+------+-----------+----+------------+-------+
|children|days_employed|dob_years|education|education_id|family_status|family_status_id|gender|income_type|debt|total_income|purpose|
+--------+-------------+---------+---------+------------+-------------+----------------+------+-----------+----+------------+-------+
|       0|         2174|        0|        0|           0|            0|               0|     0|          0|   0|        2174|      0|
+--------+-------------+---------+---------+------------+-------------+----------------+------+-----------+----+------------+-------+



In [50]:
print(df_pyspark.count())

21525


# Какую долю составляют пропущенные значения в каждом из столбцов с пропусками?

In [51]:
print((2174/21525*100))

10.099883855981417


# Делаем значения целочисленными

In [52]:
df_pyspark = df_pyspark.withColumn('total_income', df_pyspark['total_income'].cast(IntegerType()))

In [53]:
df_pyspark.show(5)

+--------+------------------+---------+---------+------------+----------------+----------------+------+-----------+----+------------+--------------------+
|children|     days_employed|dob_years|education|education_id|   family_status|family_status_id|gender|income_type|debt|total_income|             purpose|
+--------+------------------+---------+---------+------------+----------------+----------------+------+-----------+----+------------+--------------------+
|       1|-8437.673027760233|       42|   высшее|           0| женат / замужем|               0|     F|  сотрудник|   0|      253875|       покупка жилья|
|       1|-4024.803753850451|       36|  среднее|           1| женат / замужем|               0|     F|  сотрудник|   0|      112080|приобретение авто...|
|       0|-5623.422610230956|       33|  Среднее|           1| женат / замужем|               0|     M|  сотрудник|   0|      145885|       покупка жилья|
|       3|-4124.747206540018|       32|  среднее|           1| женат /

# Заполняем пропуски медианным значением:

In [66]:
df_pyspark=df_pyspark.fillna(df_pyspark.select(f.mean(df_pyspark['total_income'])).collect()[0][0])

# Положительные значения days_employed

In [86]:
df_pyspark = df_pyspark.withColumn('days_employed', df_pyspark['days_employed'].cast(IntegerType()))
df_pyspark = df_pyspark.withColumn('days_employed', abs(df_pyspark.days_employed)).show()

+--------+-------------+---------+-------------------+------------+--------------------+----------------+------+-----------+----+------------------+--------------------+
|children|days_employed|dob_years|          education|education_id|       family_status|family_status_id|gender|income_type|debt|      total_income|             purpose|
+--------+-------------+---------+-------------------+------------+--------------------+----------------+------+-----------+----+------------------+--------------------+
|       2|         5718|       39|            Среднее|           1|     женат / замужем|               0|     M|  сотрудник|   0| 83892.81598726273|покупка жилья для...|
|       0|         5229|       43|             ВЫСШЕЕ|           0|Не женат / не зам...|               4|     F|  сотрудник|   0|203279.59982532426|строительство жил...|
|       0|        63046|       66|            среднее|           1|     женат / замужем|               0|     F|  пенсионер|   0|63046.497661473615|  

# Медианные значения в days_employed

In [81]:
df_pyspark=df_pyspark.fillna(df_pyspark.select(f.mean(df_pyspark['days_employed'])).collect()[0][0])

# Приведение к нижнему регистру

In [102]:
df_pyspark = df_pyspark.withColumn("education", lower(df_pyspark["education"]))
df_pyspark.show(20)

+--------+-------------------+---------+-------------------+------------+--------------------+----------------+------+-----------+----+------------------+--------------------+
|children|      days_employed|dob_years|          education|education_id|       family_status|family_status_id|gender|income_type|debt|      total_income|             purpose|
+--------+-------------------+---------+-------------------+------------+--------------------+----------------+------+-----------+----+------------------+--------------------+
|       2|  -5718.70890108714|       39|            среднее|           1|     женат / замужем|               0|     M|  сотрудник|   0| 83892.81598726273|покупка жилья для...|
|       0|  -5229.55206892918|       43|             высшее|           0|Не женат / не зам...|               4|     F|  сотрудник|   0|203279.59982532426|строительство жил...|
|       0| 384733.81603224203|       56|            среднее|           1|      вдовец / вдова|               2|     F|  

# Удаление строк-дубликатов

In [103]:
df_pyspark = df_pyspark.distinct()


In [104]:
print(df_pyspark.count())

21454


# Создание двух новых датафреймов

In [105]:
df_pyspark.createOrReplaceTempView('dubli')
new_education_df=spark.sql("select education, education_id from dubli group by education, education_id")
new_family_df=spark.sql("select family_status, family_status_id from dubli group by family_status, family_status_id")

new_education_df.show()
new_family_df.show()

df_pyspark.drop('education')
df_pyspark.drop('family_status')

+-------------------+------------+
|          education|education_id|
+-------------------+------------+
|     ученая степень|           4|
|             высшее|           0|
|          начальное|           3|
|неоконченное высшее|           2|
|            среднее|           1|
+-------------------+------------+

+--------------------+----------------+
|       family_status|family_status_id|
+--------------------+----------------+
|    гражданский брак|               1|
|      вдовец / вдова|               2|
|Не женат / не зам...|               4|
|     женат / замужем|               0|
|           в разводе|               3|
+--------------------+----------------+



DataFrame[children: int, days_employed: double, dob_years: int, education: string, education_id: int, family_status_id: int, gender: string, income_type: string, debt: int, total_income: double, purpose: string]

# На основании диапазонов, указанных ниже, создали столбец total_income_category с категориями:

In [106]:
from pyspark.sql.functions import udf 
def income_group(income): 
    if income <= 30000: 
        return "E"
    elif income <= 50000:
        return "D"
    elif income <= 200000:
        return "C"
    elif income <= 1000000:
        return "B"
    else: 
        return "A"

totalincome_group_udf = udf(income_group) 
df_pyspark = df_pyspark.withColumn("total_income_category", totalincome_group_udf(df_pyspark.total_income))

df_pyspark.show(20)

+--------+-------------------+---------+-------------------+------------+--------------------+----------------+------+-----------+----+------------------+--------------------+---------------------+
|children|      days_employed|dob_years|          education|education_id|       family_status|family_status_id|gender|income_type|debt|      total_income|             purpose|total_income_category|
+--------+-------------------+---------+-------------------+------------+--------------------+----------------+------+-----------+----+------------------+--------------------+---------------------+
|       0| 384733.81603224203|       56|            среднее|           1|      вдовец / вдова|               2|     F|  пенсионер|   0|162416.13776348685|на покупку своего...|                    C|
|       0|  393452.1445002313|       62|            среднее|           1|     женат / замужем|               0|     F|  пенсионер|   0|173640.35088220533|строительство жил...|                    C|
|       0|

# Создали функцию, которая на основании данных из столбца purpose сформирует новый столбец purpose_category, в который войдут следующие категории

In [107]:
from pyspark.sql.functions import udf 
def purpose_group(purpose): 
    if 'автомобил' in purpose: 
        return "операции с автомобилем"
    elif 'жиль' in purpose or 'недвижимо' in purpose:
        return "операции с недвижимостью"
    elif 'свадь' in purpose:
        return "проведение свадьбы"
    else:
        return "получение образования"

purpose_group_udf = udf(purpose_group) 
df_pyspark = df_pyspark.withColumn("purpose_category", purpose_group_udf(df_pyspark.purpose))

df_pyspark.show()

+--------+-------------------+---------+-------------------+------------+--------------------+----------------+------+-----------+----+------------------+--------------------+---------------------+--------------------+
|children|      days_employed|dob_years|          education|education_id|       family_status|family_status_id|gender|income_type|debt|      total_income|             purpose|total_income_category|    purpose_category|
+--------+-------------------+---------+-------------------+------------+--------------------+----------------+------+-----------+----+------------------+--------------------+---------------------+--------------------+
|       0| 384733.81603224203|       56|            среднее|           1|      вдовец / вдова|               2|     F|  пенсионер|   0|162416.13776348685|на покупку своего...|                    C|операции с автомо...|
|       0|  393452.1445002313|       62|            среднее|           1|     женат / замужем|               0|     F|  пенс