# Урок 3. DataFrames

### Как создать DataFrame
Есть несколько способов для создания DataFrame.
* из python list
* прочитать из файла
* прочитать таблицу из Hive
* прочитать таблицу по jdbc
* range


In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=fb7a32f9ef407c644ac2b5bbcdf0dfd43a2a7c46d71acdadfca0825de3e0b5c3
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


### из python list

In [4]:
# Здесь мы инициализируем spark сессию
from pyspark.sql import SparkSession
spark = (SparkSession.builder.master('local[*]').getOrCreate())
data = [('Ivan', 10),
        ('Petr', 20),
        ('Elena', 30)]

columns = ['name', 'age']

df = spark.createDataFrame(data=data, schema=columns)
df.show()

+-----+---+
| name|age|
+-----+---+
| Ivan| 10|
| Petr| 20|
|Elena| 30|
+-----+---+



### прочитать из файла

In [6]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.master('local[*]').getOrCreate())

df = spark.read.option('header', 'true').csv('data_3.csv')
df.show()

+-----+---+
| name|age|
+-----+---+
| Ivan| 10|
| Petr| 20|
|Elena| 30|
+-----+---+



### прочитать таблицу из Hive

В крупных компаниях часто используют СУБД
Apache Hive, коннект с которой доступен в spark
сразу.
Инициализируем спарк сессию, но с настройкой
enableHiveSupport(). Эта настройка позволяет нам
считывать и записывать таблицы, используя только
название таблицы, т.е вам не нужно прописывать
путь хранения данных.
В 5 строке мы используем метод table(), параметр
которого это название таблицы. Этот метод вернет
нам DataFrame с данными из таблицы Hive

In [None]:
# from pyspark.sql import SparkSession
# spark = (SparkSession.builder.enableHiveSupport().getOrCreate())

# df = spark.table('table_name')
# df.show()

### прочитать таблицу по jdbc

JDBC (Java Database Connector) - специальная
библиотека, написанная на java, позволяющая
подключаться к базе данных (Postgres, Oracle,
GreenPlum и тд) и читать или записывать туда
данные.
Здесь мы инициализируем спарк сессию, в
переменной creds у нас словарь, в котором указаны
логин и пароль для подключения к базе данных
Затем мы читаем данные с помощью метода jdbc, в
нем указываем url базы данных, нужную нам
таблицу (которую мы хотим прочитать), properties -
это наш словарь creds, в котором указаны наши
логин и пароль.

In [None]:
# from pyspark.sql import SparkSession
# spark = (SparkSession.builder.master('local[*]').getOrCreate())

# creds = {'user': 'name', 'password': '12345'}
# df = spark.read.jdbc('url', 'table', properties=creds)

### range

Самый простой способ создать датафрейм, в
котором будут числа, а столбец будет называться id
инициализируем спарк сессию, затем используем
метод range и указываем количество чисел в
датафрейме

In [9]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.master('local[*]').getOrCreate())

df = spark.range(100)
df.show(5)

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+
only showing top 5 rows



## Функции
#### DataFrames в Apache Spark предоставляют множество функций для работы с данными. Некоторые из них:
* select: выбор столбцов из датафрейма
* filter: фильтрация строк датафрейма на основе условия
* withColumn: создание столбца в датафрейме
* withColumnRenamed: переименование столбца в датафрейме
* groupBy: группировка строк датафрейма по значениям в столбцах
* agg: агрегация данных в датафрейме с использованием агрегатных функций, таких как сумма, среднее
и т.д.
* join: объединение двух датафреймов на основе заданных условий
* orderBy: сортировка строк датафрейма по значениям в столбцах

Кроме того, в DataFrames есть множество других функций, таких как distinct, dropDuplicates, count,
mean, sum, max, min и т.д., которые позволяют проводить анализ данных и получать информацию о данных.

In [None]:
# select
from pyspark.sql.functions import col
df.select(col('id')) # вернет одну колонку id

In [None]:
# filter
df.filter(col('id') < 50) # вернет колонку id со значениями менее 50

In [None]:
# withColumn
df.withColumn('new', col('id') % 2)
# первый параметр - имя нового столбца
# второй параметр - значения нового столбца. Здесь значения определяются таким образом,
# то есть в столбце будут значение 0 и 1

In [None]:
# withColumnRenamed
df.withColumnRenamed('id', 'id_rename')

In [None]:
# groupby
df.withColumn('new', col('id') % 2).groupby(col('new'))

In [None]:
# agg
(df.withColumn('new', col('id') % 2).groupby(col('new')).agg(max(col('id'))))
# здесь мы делаем агрегацию - находим максимальное число в id при группировке по new

In [None]:
# Join
df2= spark.range(10).withColumnRenamed('id', 'id2')
df.join(df2,col('id') == col('id2'), 'inner')
# соединяем оба датафрейма по столбцам id и id2
# тип джойна inner, есть и другие

In [None]:
# orderBy
df.orderBy(col('id').desc) # по убыванию
df.orderBy(col('id').asc) # по возрастанию

In [None]:
# distinct
df.distinct()
# убираем дубли из датафрейма

## Группировки и агрегации

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, collect_list
spark = (SparkSession.builder.master('local[*]').getOrCreate())

df = spark.read.option('header', 'true').csv('data_33.csv')

df.groupBy(col('name')).agg(mean(col('age')).alias('count_age')).show()

+-----+------------------+
| name|         count_age|
+-----+------------------+
| Lena|              50.0|
| Petr|              13.0|
| Ivan|53.333333333333336|
|Katya|              22.5|
+-----+------------------+



In [14]:
df.groupBy(col('name')).agg(collect_list(col('age')).alias('count_age')).show()

+-----+------------+
| name|   count_age|
+-----+------------+
| Lena|[25, 80, 45]|
| Petr|     [25, 1]|
| Ivan|[50, 50, 60]|
|Katya|    [10, 35]|
+-----+------------+



## Оконные функции
Оконная функция — это функция, которая выполняет агрегирующую функцию на определенном наборе (окне,
партиции) строк и результат записывает в новый столбец в таблице

к оконным функциям относятся такие функции, как count, mean, sum, max, min и т.д., а также ранжирующие функции row_number, rank, dense_rank, функции смещения lag (смещает все значения назад), lead (смещает все значения вперед)

In [15]:
# Общие правила применения оконных функций
# импортируем нужные функции
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

In [None]:
df.withColumn('rank_col', rank().over(Window.partitionBy('partition_col').orderBy('orderby_col')))

In [20]:
# When и otherwise
from pyspark.sql.functions import when, col

spark = (SparkSession.builder.getOrCreate())

df = spark.range(100)

df.withColumn('test_col', when(col('id') > 50, 'value_more_50').otherwise('value_less_50')).show(2)

+---+-------------+
| id|     test_col|
+---+-------------+
|  0|value_less_50|
|  1|value_less_50|
+---+-------------+
only showing top 2 rows



#### Pivot
Pivot - позволяет развернуть сгруппированные значения строк в столбцы


In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, collect_list
spark = (SparkSession.builder.master('local[*]').getOrCreate())

# df = spark.read.option('header', 'true').csv('data_pivot.csv')


data = [('Ivan', 'Moscow', 150000),
        ('Lena', 'Moscow', 100000),
        ('Ivan', 'Moscow', 20000),
        ('Petr', 'Sochi', 60000),
         ('Roma', 'Sochi', 50000),
        ('Roma', 'Sochi', 10000)]


df = spark.createDataFrame(data=data).toDF('name', 'city', 'salary')
df.show()

+----+------+------+
|name|  city|salary|
+----+------+------+
|Ivan|Moscow|150000|
|Lena|Moscow|100000|
|Ivan|Moscow| 20000|
|Petr| Sochi| 60000|
|Roma| Sochi| 50000|
|Roma| Sochi| 10000|
+----+------+------+



In [37]:
# группировка по столбцам и нахождения средней зп по городам
df.groupby(col('city'), col('name')).agg(mean(col('salary'))).show()

+------+----+-----------+
|  city|name|avg(salary)|
+------+----+-----------+
|Moscow|Ivan|    85000.0|
|Moscow|Lena|   100000.0|
| Sochi|Petr|    60000.0|
| Sochi|Roma|    30000.0|
+------+----+-----------+



In [38]:
# развернем данные
df.groupby('city').pivot('name').agg(mean('salary')).show()

+------+-------+--------+-------+-------+
|  city|   Ivan|    Lena|   Petr|   Roma|
+------+-------+--------+-------+-------+
|Moscow|85000.0|100000.0|   NULL|   NULL|
| Sochi|   NULL|    NULL|60000.0|30000.0|
+------+-------+--------+-------+-------+



#### Cast
Функция Cast - меняет тип данных

In [46]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder.master('local[*]').getOrCreate())

df = spark.range(100)
df.printSchema()
# длинное число

root
 |-- id: long (nullable = false)



In [47]:
new_df = df.select(col('id').cast('string'))
new_df.printSchema()

# или

# from pyspark.sql.types import StringType
# new_df2 = df.select(col('id').cast(StringType))
# длинное число заменено на строкое значение

root
 |-- id: string (nullable = false)



In [48]:
# другой способ
new_df2 = df.withColumn('id', col('id').cast('string'))
new_df2.printSchema()

root
 |-- id: string (nullable = false)



### UDF
UDF - user defined function - это функции, определяемые пользователем.
Эти функции используются, когда вы реализовывайте более сложную логику.

In [53]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
spark = (SparkSession.builder.master('local[*]').getOrCreate())

# df = spark.read.option('header', 'true').csv('data_pivot.csv')


data = [('Iphone', 'Iphone14', 150000),
        ('Huawei', 'Mate14', 100000),
        ('Samsung', 'A14', 20000),
        ('Iphone', 'Iphone13', 60000),
         ('Huawei', 'Mate12', 50000),
        ('Samsung', 'A11', 5000)]


df_phones = spark.createDataFrame(data=data).toDF('company', 'model', 'price')
df_phones.show()

+-------+--------+------+
|company|   model| price|
+-------+--------+------+
| Iphone|Iphone14|150000|
| Huawei|  Mate14|100000|
|Samsung|     A14| 20000|
| Iphone|Iphone13| 60000|
| Huawei|  Mate12| 50000|
|Samsung|     A11|  5000|
+-------+--------+------+



In [54]:
def recognize_model(model: str) -> str:
    if '14' in model:
        return 'new model'
    else:
        return 'old model'

recognize_model_udf = udf(lambda x: recognize_model(x))

new_df = df_phones.withColumn('version_model', recognize_model_udf('model')).show()

+-------+--------+------+-------------+
|company|   model| price|version_model|
+-------+--------+------+-------------+
| Iphone|Iphone14|150000|    new model|
| Huawei|  Mate14|100000|    new model|
|Samsung|     A14| 20000|    new model|
| Iphone|Iphone13| 60000|    old model|
| Huawei|  Mate12| 50000|    old model|
|Samsung|     A11|  5000|    old model|
+-------+--------+------+-------------+



### Сохранение данных
Для сохранения данных из DataFrames в Apache Spark можно использовать метод write объекта DataFrame,
который позволяет записывать данные в различные источники, такие как файлы, базы данных, S3 и другие.
Например, чтобы сохранить данные в CSV файл, можно использовать следующий код:
df.write.csv("path/to/output/folder", header=True)
В данном примере мы записываем данные в CSV файл, указывая путь к выходной папке и опцию
header=True, которая указывает на запись заголовка в файл.

In [56]:
df_phones.write.csv("phones.csv", header=True)