In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Introduction to Spark").getOrCreate()
df_pyspark = spark.read.csv("/mnt/c/dsp234/data.csv", header = True, inferSchema = True)
df_pyspark.show(20)

24/04/01 17:44:15 WARN Utils: Your hostname, DESKTOP-UJM3BA1 resolves to a loopback address: 127.0.1.1; using 172.22.176.199 instead (on interface eth0)
24/04/01 17:44:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/01 17:44:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+--------+-------------------+---------+-------------------+------------+----------------+----------------+------+-----------+----+------------------+--------------------+
|children|      days_employed|dob_years|          education|education_id|   family_status|family_status_id|gender|income_type|debt|      total_income|             purpose|
+--------+-------------------+---------+-------------------+------------+----------------+----------------+------+-----------+----+------------------+--------------------+
|       1| -8437.673027760233|       42|             высшее|           0| женат / замужем|               0|     F|  сотрудник|   0| 253875.6394525987|       покупка жилья|
|       1| -4024.803753850451|       36|            среднее|           1| женат / замужем|               0|     F|  сотрудник|   0|112080.01410244203|приобретение авто...|
|       0| -5623.422610230956|       33|            Среднее|           1| женат / замужем|               0|     M|  сотрудник|   0|145885.95

## Первая таблица показывает пропуски в каждом из столбцов, вторая таблица показывает количество всех ненулевых данных в каждом из столбцов, в третьей произведена замена данных в столбце 'total_income' с значений равных NULL на медианное значение со всего столбца
## в `days_employed` и `total_income` доля незаполненных данных составляет <b>11,2%</b>
## Пропуски данных в столбцах таблицы могут возникать по разным причинам. Некоторые из наиболее распространенных причин включают в себя: 
1. Недостоверность или неполные данные: Иногда данные просто не были собраны или введены правильно, что приводит к пропускам информации в таблице.
2. Ошибки в сборе данных: В процессе сбора данных могут возникать ошибки, например, если ответ не был предоставлен или был неправильно записан.
3. Технические проблемы: Некоторые пропуски данных могут быть вызваны техническими проблемами, такими как сбои в программном обеспечении, потеря данных или сбои в сети.
4. Изменения в структуре данных: Изменения в структуре данных или в самих данных могут привести к пропускам информации в таблице.
## Заполнение пропусков медианным значением в таблице для количественных переменных нужно для сохранения формы распределенния данных, сохранения статистических свойств, простоты и эффективности анализа

In [2]:
import pyspark.sql.functions as f
from pyspark.sql.functions import col,isnan, when, count

df_pyspark.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_pyspark.columns]).show()
df_pyspark.select([count( c).alias(c) for c in df_pyspark.columns]).show()

df_pyspark = df_pyspark.na.fill(df_pyspark.select(f.mean(df_pyspark['total_income'])).collect()[0][0])
df_pyspark.show(20)

                                                                                

+--------+-------------+---------+---------+------------+-------------+----------------+------+-----------+----+------------+-------+
|children|days_employed|dob_years|education|education_id|family_status|family_status_id|gender|income_type|debt|total_income|purpose|
+--------+-------------+---------+---------+------------+-------------+----------------+------+-----------+----+------------+-------+
|       0|         2174|        0|        0|           0|            0|               0|     0|          0|   0|        2174|      0|
+--------+-------------+---------+---------+------------+-------------+----------------+------+-----------+----+------------+-------+

+--------+-------------+---------+---------+------------+-------------+----------------+------+-----------+-----+------------+-------+
|children|days_employed|dob_years|education|education_id|family_status|family_status_id|gender|income_type| debt|total_income|purpose|
+--------+-------------+---------+---------+------------+--

## замена аномальных значений, все отрицательные стали положительными
## далее все значения равные NULL были приравнены среднему по столбцу

In [3]:
from pyspark.sql.functions import abs
df_pyspark = df_pyspark.withColumn('days_employed', abs(df_pyspark.days_employed))

df_pyspark = df_pyspark.na.fill(df_pyspark.select(f.mean(df_pyspark['days_employed'])).collect()[0][0])
df_pyspark.show(20)

+--------+------------------+---------+-------------------+------------+----------------+----------------+------+-----------+----+------------------+--------------------+
|children|     days_employed|dob_years|          education|education_id|   family_status|family_status_id|gender|income_type|debt|      total_income|             purpose|
+--------+------------------+---------+-------------------+------------+----------------+----------------+------+-----------+----+------------------+--------------------+
|       1| 8437.673027760233|       42|             высшее|           0| женат / замужем|               0|     F|  сотрудник|   0| 253875.6394525987|       покупка жилья|
|       1| 4024.803753850451|       36|            среднее|           1| женат / замужем|               0|     F|  сотрудник|   0|112080.01410244203|приобретение авто...|
|       0| 5623.422610230956|       33|            Среднее|           1| женат / замужем|               0|     M|  сотрудник|   0|145885.95229686

## измение типа значений в стобце `total_income` на целочисленный тип

In [4]:
from pyspark.sql.types import IntegerType
df_pyspark = df_pyspark.withColumn('total_income', df_pyspark['total_income'].cast(IntegerType()))
df_pyspark.show(20)

+--------+------------------+---------+-------------------+------------+----------------+----------------+------+-----------+----+------------+--------------------+
|children|     days_employed|dob_years|          education|education_id|   family_status|family_status_id|gender|income_type|debt|total_income|             purpose|
+--------+------------------+---------+-------------------+------------+----------------+----------------+------+-----------+----+------------+--------------------+
|       1| 8437.673027760233|       42|             высшее|           0| женат / замужем|               0|     F|  сотрудник|   0|      253875|       покупка жилья|
|       1| 4024.803753850451|       36|            среднее|           1| женат / замужем|               0|     F|  сотрудник|   0|      112080|приобретение авто...|
|       0| 5623.422610230956|       33|            Среднее|           1| женат / замужем|               0|     M|  сотрудник|   0|      145885|       покупка жилья|
|       3|

## замена значений в столбце `education` к нижнему регистру и удаление строк-дубликатов

In [5]:
from pyspark.sql.functions import lower
print(df_pyspark.count)
df_pyspark = df_pyspark.distinct()
print(df_pyspark.count)
df_pyspark = df_pyspark.withColumn("education", lower(df_pyspark["education"]))
df_pyspark.show(20)

<bound method DataFrame.count of DataFrame[children: int, days_employed: double, dob_years: int, education: string, education_id: int, family_status: string, family_status_id: int, gender: string, income_type: string, debt: int, total_income: int, purpose: string]>
<bound method DataFrame.count of DataFrame[children: int, days_employed: double, dob_years: int, education: string, education_id: int, family_status: string, family_status_id: int, gender: string, income_type: string, debt: int, total_income: int, purpose: string]>


[Stage 18:>                                                         (0 + 1) / 1]

+--------+------------------+---------+---------+------------+--------------------+----------------+------+-----------+----+------------+--------------------+
|children|     days_employed|dob_years|education|education_id|       family_status|family_status_id|gender|income_type|debt|total_income|             purpose|
+--------+------------------+---------+---------+------------+--------------------+----------------+------+-----------+----+------------+--------------------+
|       0|4341.7867754100425|       53|  среднее|           1|    гражданский брак|               1|     F|  компаньон|   0|      261369|операции с недвиж...|
|       0|12930.541677797675|       61|  среднее|           1|Не женат / не зам...|               4|     F|  компаньон|   0|      173896|        недвижимость|
|       1| 3267.738265311034|       45|   высшее|           0|    гражданский брак|               1|     F|  сотрудник|   0|      118552|на проведение сва...|
|       2|2844.5470858598133|       40|  средн

                                                                                

## Создание двух новых датафреймов с хранением id значений столбца `education` и `family_status` и удаление этих столбцом из начальной таблицы

In [6]:
from pyspark.sql.functions import desc, asc
df_pyspark.createOrReplaceTempView('dubli')
new_edu_df = spark.sql("select education, education_id from dubli group by education, education_id")
new_fam_df = spark.sql("select family_status, family_status_id from dubli group by family_status, family_status_id")

new_edu_df.show()
new_fam_df.show()

df_pyspark.drop('education')
df_pyspark.drop('family_status')

+-------------------+------------+
|          education|education_id|
+-------------------+------------+
|     ученая степень|           4|
|             высшее|           0|
|          начальное|           3|
|неоконченное высшее|           2|
|            среднее|           1|
+-------------------+------------+

+--------------------+----------------+
|       family_status|family_status_id|
+--------------------+----------------+
|    гражданский брак|               1|
|      вдовец / вдова|               2|
|Не женат / не зам...|               4|
|     женат / замужем|               0|
|           в разводе|               3|
+--------------------+----------------+



DataFrame[children: int, days_employed: double, dob_years: int, education: string, education_id: int, family_status_id: int, gender: string, income_type: string, debt: int, total_income: int, purpose: string]

## Создание столбца `total_income_category` на основе диапазона с категориями
0–30000 — 'E';
30001–50000 — 'D';
50001–200000 — 'C';
200001–1000000 — 'B';
1000001 и выше — 'A'.

In [7]:
from pyspark.sql.functions import udf 
def income_group(income): 
    if income <= 30000: 
        return "E"
    elif income <= 50000:
        return "D"
    elif income <= 200000:
        return "C"
    elif income <= 1000000:
        return "B"
    else: 
        return "A"

totalincome_group_udf = udf(income_group) 
df_pyspark = df_pyspark.withColumn("total_income_category", totalincome_group_udf(df_pyspark.total_income))

df_pyspark.show(20)

[Stage 29:>                                                         (0 + 1) / 1]

+--------+------------------+---------+---------+------------+--------------------+----------------+------+-----------+----+------------+--------------------+---------------------+
|children|     days_employed|dob_years|education|education_id|       family_status|family_status_id|gender|income_type|debt|total_income|             purpose|total_income_category|
+--------+------------------+---------+---------+------------+--------------------+----------------+------+-----------+----+------------+--------------------+---------------------+
|       0|4341.7867754100425|       53|  среднее|           1|    гражданский брак|               1|     F|  компаньон|   0|      261369|операции с недвиж...|                    B|
|       0|12930.541677797675|       61|  среднее|           1|Не женат / не зам...|               4|     F|  компаньон|   0|      173896|        недвижимость|                    C|
|       1| 3267.738265311034|       45|   высшее|           0|    гражданский брак|            

                                                                                

In [10]:
from pyspark.sql.functions import udf 
def purpose_group(purpose): 
    if 'автомобил' in purpose: 
        return "операции с автомобилем"
    elif 'жиль' in purpose or 'недвижимо' in purpose:
        return "операции с недвижимостью"
    elif 'свадь' in purpose:
        return "проведение свадьбы"
    else:
        return "получение образования"

purpose_group_udf = udf(purpose_group) 
df_pyspark = df_pyspark.withColumn("purpose_category", purpose_group_udf(df_pyspark.purpose))

df_pyspark.show()

+--------+------------------+---------+---------+------------+--------------------+----------------+------+-----------+----+------------+--------------------+---------------------+--------------------+
|children|     days_employed|dob_years|education|education_id|       family_status|family_status_id|gender|income_type|debt|total_income|             purpose|total_income_category|    purpose_category|
+--------+------------------+---------+---------+------------+--------------------+----------------+------+-----------+----+------------+--------------------+---------------------+--------------------+
|       0|4341.7867754100425|       53|  среднее|           1|    гражданский брак|               1|     F|  компаньон|   0|      261369|операции с недвиж...|                    B|операции с недвиж...|
|       0|12930.541677797675|       61|  среднее|           1|Не женат / не зам...|               4|     F|  компаньон|   0|      173896|        недвижимость|                    C|операции с н