**Начало работы с PySpark**

In [1]:
# Установка
!pip install pyspark==3.0.1 py4j==0.10.9

Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.2/204.2 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m21.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612225 sha256=28999ebd0b3bf1b7ffd8527714b002cd594a2325e570df4c94a528d325a4d8fb
  Stored in directory: /root/.cache/pip/wheels/19/b0/c8/6cb894117070e130fc44352c2a13f15b6c27e440d04a84fb48
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
    Uninstalling 

In [4]:
# Импортируем точку входа
from pyspark.sql import SparkSession
# Импортируем функции spark, пригодится позже
from pyspark.sql import functions as f

# Создаём сессию в Spark
# Используем local, так как работаем в автоновном режиме
# Задаём имя нашей сессии
# Непосредственно команда создания сессии
spark = SparkSession.builder.master("local[*]").appName('PySpark_Tutorial').getOrCreate()

**Чтение данных**

In [18]:
# В переменную df сохраняем таблицу
# Важно задать header=True, чтобы названия столбцов не сместились
df = spark.read.csv('/content/wine.csv', header=True)
# Выведем схему данных используемого датафрейма
# Схема отображает структуру датафрейма: название столбца и его тип данных
# Схему данных также можно создать самостоятельно
df.printSchema()

root
 |-- fixed acidity: string (nullable = true)
 |-- volatile acidity: string (nullable = true)
 |-- citric acid: string (nullable = true)
 |-- residual sugar: string (nullable = true)
 |-- chlorides: string (nullable = true)
 |-- free sulfur dioxide: string (nullable = true)
 |-- total sulfur dioxide: string (nullable = true)
 |-- density: string (nullable = true)
 |-- pH: string (nullable = true)
 |-- sulphates: string (nullable = true)
 |-- alcohol: string (nullable = true)
 |-- quality: string (nullable = true)



In [19]:
# Вывод датафрейма
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

**Манипуляции со столбцами**

In [20]:
# Создадим новый столбец Sum, как произведение значений столбцов pH и alcohol (как пример)
# Для удобства округляем до второго знака полученный столбец
df = df.withColumn('Mul_pH_alc', f.round(df.pH*df.alcohol, 2))
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+----------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|Mul_pH_alc|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+----------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|     32.99|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|     31.36|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|     31.95|
|         11.2|            0.28|  

In [21]:
# Переименуем созданный ранее столбец
df = df.withColumnRenamed('Mul_pH_alc', 'pH-Alc')
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|pH-Alc|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5| 32.99|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5| 31.36|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5| 31.95|
|         11.2|            0.28|       0.56|           1.9

In [22]:
# Удалим созданный столбец
df = df.drop('pH-Alc')
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

**Работа с пропущенными значениями**

In [29]:
# Подсчёт количества строк
df.count()

1599

In [34]:
# Как проверить датафрейм на наличие пустых значений?? df.Empty?

In [23]:
# Удаление строк с пропущенными значениями
df.na.drop()
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [38]:
# Замена отсутствующих значений новыми
df.na.replace(0, 1)
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

**Получение данных**

In [39]:
# Получение нескольких столбцов
df.select('fixed acidity', 'volatile acidity').show(5)

+-------------+----------------+
|fixed acidity|volatile acidity|
+-------------+----------------+
|          7.4|             0.7|
|          7.8|            0.88|
|          7.8|            0.76|
|         11.2|            0.28|
|          7.4|             0.7|
+-------------+----------------+
only showing top 5 rows



In [45]:
# фильтрация данных на основе заданного условия
# выведем только те строки, значения quality которых больше 5
df1 = df.filter(df.quality >5)
df1.show(5)
df1.count()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|                60.0|  0.998|3.16|     0.58|    9.8|      6|
|          7.3|            0.65|        0.0|           1.2|    0.065|               15.0|                21.0| 0.9946|3.39|     0.47|   10.0|      7|
|          7.8|            0.58|       0.02|           2.0|    0.073|                9.0|                18.0| 0.9968|3.36|     0.57|    9.5|      7|
|          8.5|            0.28|       0.56|           1.8|    0.092|               35.0|           

855

In [46]:
# Используем метод between, который вернёт строки, входящие в промежуток по указанному столбцу
df.filter(df.sulphates.between(0.5, 0.7)).show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

**Сортировка**

In [64]:
# Сортировка выбранных столбцов по возрастанию
df.select(['quality', 'pH']).orderBy('quality', 'pH').show(5)

+-------+----+
|quality|  pH|
+-------+----+
|      3|3.16|
|      3|3.25|
|      3|3.31|
|      3|3.32|
|      3|3.38|
+-------+----+
only showing top 5 rows



In [65]:
# Сортировка выбранных столбцов по убыванию
df.select(['quality', 'pH']).orderBy('quality', 'pH', ascending=False).show(5)

+-------+----+
|quality|  pH|
+-------+----+
|      8|3.72|
|      8|3.56|
|      8| 3.5|
|      8|3.46|
|      8|3.35|
+-------+----+
only showing top 5 rows



**Агрегрирование**

По сути именно такая запись даёт результат аналогичный GROUP BY в SQL, происходит группировка по некоторому признаку и для всех значений, сгруппированных по этому признаку возможно применение групповых операций.

In [61]:
df.groupBy('quality')\
      .agg(f.min('pH').alias('минимальная кислотность'),
           f.max('pH').alias('максимальная кислотность'),
           f.round(f.sum('pH'), 2).alias('суммарная кислотность'),
           f.round(f.avg('pH'), 2).alias('средняя кислотность')).show()

+-------+-----------------------+------------------------+---------------------+-------------------+
|quality|минимальная кислотность|максимальная кислотность|суммарная кислотность|средняя кислотность|
+-------+-----------------------+------------------------+---------------------+-------------------+
|      7|                   2.92|                    3.78|               654.86|               3.29|
|      3|                   3.16|                    3.63|                33.98|                3.4|
|      8|                   2.88|                    3.72|                58.81|               3.27|
|      5|                   2.88|                    3.74|              2250.67|                3.3|
|      6|                   2.86|                    4.01|              2116.93|               3.32|
|      4|                   2.74|                     3.9|               179.22|               3.38|
+-------+-----------------------+------------------------+---------------------+-----------