In [None]:
!pip install pyspark >> None

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import *
import pyspark.sql.functions as F

###Задание 1. Вывод данных в консоль

1. Создаем сессию Spark с именем "CountElements".
2. Читаем поток данных с использованием источника "rate", который автоматически генерирует данные. Этот источник используется для тестирования и отладки, поскольку он позволяет легко создавать потоки данных без необходимости подключения к внешним системам.
3. Записываем полученные данные в консоль с использованием режима вывода "append". В этом режиме в консоль будут выводиться только новые строки, полученные в каждом микро-пакете данных.
4. Запускаем поток обработки данных и ожидает его завершения.

In [None]:
spark = SparkSession.builder.appName("CountElements").getOrCreate()
df = spark.readStream.format("rate").load()
query = df.writeStream.outputMode("append").format("console").start()
query.awaitTermination(20)
query.stop()

###Задание 2. Фильтрация чисел (четные)

1. Создаем сессию Spark с именем приложения "FilterEvenNumbers". Это необходимо для инициализации Spark и подготовки к выполнению операций с данными.

2. Читаем потоковые данные из источника, используя формат "rate". Формат "rate" генерирует данные с заданной скоростью, что полезно для тестирования и демонстрации обработки потоковых данных.

3. Фильтруем полученные данные, оставляя только те строки, где значение поля "value" делится на 2 без остатка, то есть числа, являющиеся четными.

4. Записываем отфильтрованные данные обратно в консоль с использованием режима вывода "append". Это означает, что при каждом обновлении данных в потоке, новые данные будут добавлены к уже существующим, не удаляя предыдущие записи.

5. Запускаем потоковую запись и ожидает завершения работы запроса в течение 5 секунд. Это позволяет увидеть результаты обработки данных в консоли в течение этого времени.


In [None]:
spark = SparkSession.builder.appName("FilterEvenNumbers").getOrCreate()
df = spark.readStream.format("rate").load()
df_even = df.filter("value % 2 == 0")
query = df_even.writeStream.outputMode("append").format("console").start()
query.awaitTermination(5)
query.stop()

###Задание 3. Сгруппировать по значениям и посчитать количество


1. Создаем сессию Spark с именем приложения "GroupByValue". Это необходимо для работы с Spark и его функциональностью, включая потоковую обработку данных.

2. Читаем потоковые данные из источника, используя формат "rate". Формат "rate" генерирует данные с постоянной скоростью, что удобно для тестирования и демонстрации потоковой обработки.

3. Группируем полученные данные по столбцу "value" и подсчитывает количество записей для каждого уникального значения в этом столбце. Это позволяет агрегировать данные по определенному критерию.

4. Запускаем потоковую запись результатов агрегации в консоль с использованием режима вывода "update". Режим "update" означает, что в консоль будут выводиться только обновленные результаты агрегации, а не полный набор данных при каждом обновлении. Это делает вывод более читаемым и эффективным, особенно когда данные постоянно обновляются.

5. Ожидаем завершения потоковой обработки в течение 5 секунд. Это означает, что поток будет работать в течение этого времени, а затем завершится.


In [None]:
spark = SparkSession.builder.appName("GroupByValue").getOrCreate()
df = spark.readStream.format("rate").load()
df_grouped = df.groupBy("value").count()
query = df_grouped.writeStream.outputMode("update").format("console").start()
query.awaitTermination(5)
query.stop()

###Задание 4. Вычислить сумму значений

1. Создаем экземпляр SparkSession с именем приложения "SumValues". Этот шаг необходим для инициализации Spark и подготовки среды для выполнения операций с данными.

2. Читаем поток данных из источника, используя формат "rate". Формат "rate" генерирует данные с заданной скоростью, что полезно для тестирования и отладки. В данном случае, данные будут генерироваться автоматически без необходимости подключения к внешним источникам данных.

3. Выполняем агрегацию данных, вычисляя сумму значений в столбце "value" и сохраняя результат в новом столбце "total". Это делается с помощью метода selectExpr, который позволяет выполнять SQL-подобные выражения для трансформации данных.

4. Записываем результаты агрегации в консоль с использованием формата "console". Это позволяет наблюдать за изменениями в реальном времени, так как данные будут выводиться в консоль.

5. Запускаем запрос на потоковую обработку данных с использованием режима вывода "update". В этом режиме, при каждом обновлении данных, будет выводиться только текущее состояние агрегации, что позволяет видеть актуальную сумму значений.

6. Ожидаем завершения обработки потока данных в течение 5 секунд с помощью метода awaitTermination. Это означает, что приложение будет работать в течение этого времени, обрабатывая поступающие данные и выводя результаты в консоль.

In [None]:
spark = SparkSession.builder.appName("SumValues").getOrCreate()
df = spark.readStream.format("rate").load()
df_sum = df.selectExpr("sum(value) AS total")
query = df_sum.writeStream.outputMode("update").format("console").start()
query.awaitTermination(5)
query.stop()

###Задание 5. Найти максимальное значение

1. Создаем сессию Spark с именем приложения "MaxValue" с помощью SparkSession.builder.appName("MaxValue").getOrCreate(). Это необходимо для работы с Spark и его функциональностью, включая структурированный потоковый анализ.

2. Читаем поток данных с использованием источника "rate". Источник "rate" генерирует данные с заданной скоростью, что полезно для тестирования и разработки. В данном случае, данные читаются без дополнительных параметров, что означает использование значений по умолчанию для генерации данных.

3. Выполняем агрегацию данных, вычисляя максимальное значение поля 'value' в каждом микро-батче данных. Это достигается с помощью метода agg(f.max('value')), где f - это ссылка на модуль pyspark.sql.functions, который предоставляет функции для работы с данными.

4. Записываем результаты агрегации в консоль с использованием writeStream.outputMode("update").format("console").start(). Здесь outputMode("update") указывает, что при каждом обновлении данных в потоке, результаты агрегации будут выводиться в консоль. format("console") указывает, что вывод должен быть направлен в консоль, а не в файл или базу данных.

5. Ожидаем завершения потокового запроса с помощью query.awaitTermination(5), что означает, что поток будет работать в течение 5 секунд, после чего будет выполнено завершение работы.

In [None]:
spark = SparkSession.builder.appName("MaxValue").getOrCreate()
df = spark.readStream.format("rate").load()
df_max = df.agg(F.max('value'))
query = df_max.writeStream.outputMode("update").format("console").start()
query.awaitTermination(5)
query.stop()

### Задание 6. Вычислить скользящее окно по значению

1. Создаем сессию Spark с именем "SlidingWindow".
2. Читаем потоковые данные из источника "rate", который генерирует данные с постоянной скоростью.
3. Группируем эти данные по временным окнам, размером в 10 минут, и подсчитывает количество записей в каждом окне.
4. Записываем результаты обработки в консоль в режиме обновления (outputMode("update")), что означает, что в консоль будут выводиться только обновленные агрегированные результаты.
5. Запускаем потоковую запрос и ожидает его завершения в течение 5 секунд.

In [None]:
spark = SparkSession.builder.appName("SlidingWindow").getOrCreate()
df = spark.readStream.format("rate").load()
df_windowed = df.groupBy(window("timestamp", "10 minutes")).count()
query = df_windowed.writeStream.outputMode("update").format("console").start()
query.awaitTermination(5)
query.stop()

### Задание 7. Соединение потоков данных

1. Создаем сессию Spark с именем приложения "JoinStreams".
2. Читаем два потока данных, используя формат "rate", который генерирует данные с определенной скоростью. По умолчанию, каждый поток будет генерировать одну строку в секунду с полями timestamp и value.
3. Соединяем два потока данных по полю value. Это означает, что для каждой пары строк из двух потоков, где значение поля value совпадает, будет создана новая строка в результате соединения.
4. Записываем результат соединения в консоль с использованием режима вывода "append". Это означает, что при каждом обновлении данных в потоках, новые строки будут добавляться к выводу, не удаляя предыдущие строки.
5. Запускаем запрос на потоковую обработку и ожидает его завершения в течение 5 секунд.


In [None]:
spark = SparkSession.builder.appName("JoinStreams").getOrCreate()
df1 = spark.readStream.format("rate").load()
df2 = spark.readStream.format("rate").load()
df_joined = df1.join(df2, "value")
query = df_joined.writeStream.outputMode("append").format("console").start()
query.awaitTermination(5)
query.stop()

### Задание 8. Запись данных в файл

1. Создаем экземпляр SparkSession с именем приложения "WriteToFile". SparkSession является точкой входа для работы с Spark и предоставляет API для работы с DataFrame и DataSet.

2. Читаем потоковые данные с использованием формата "rate". Формат "rate" генерирует данные с заданной скоростью, что полезно для тестирования и отладки потоковых приложений. В данном случае, данные будут генерироваться без дополнительных параметров, что означает использование значений по умолчанию.

3. Записываем полученные потоковые данные в формате Parquet в указанный каталог "output/". Parquet - это эффективный формат хранения данных, который обеспечивает высокую производительность и эффективное сжатие.

4. Запускаем потоковую запись данных и ожидает завершения работы запроса в течение 5 секунд. Метод awaitTermination блокирует выполнение программы до тех пор, пока потоковая запись не будет остановлена или не произойдет ошибка.


In [None]:
spark = SparkSession.builder.appName("WriteToFile").getOrCreate()
df = spark.readStream.format("rate").load()
query = df.writeStream \
    .format("parquet") \
    .option("path", "/content/output/") \
    .option("checkpointLocation", "/content/checkpoint/") \
    .start()
query.awaitTermination(5)
query.stop()