# Spark RDD
В **Apache Spark** существует интерфейс - RDD API. В нём производится работа с RDD напрямую.

<b>RDD </b>(resilent distrubuted dataset) - это фундаментальная структура данных Spark, которая представляет собой неизменяемый набор данных, который вычисляются и располагается на разных узлах кластера.

* [Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)
* [Документация](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html)

In [1]:
!pip3 install pyspark pyarrow



In [20]:
# Вариант 1. Создаем spark-сессию, явно указывая ее параметры
from pyspark.sql import SparkSession

from pyspark import SparkConf, SparkContext

"""
conf = (
    SparkConf()
        .set('spark.ui.port', '4050')
        .setMaster('local[*]')
)
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
"""

"\nconf = (\n    SparkConf()\n        .set('spark.ui.port', '4050')\n        .setMaster('local[*]')\n)\nsc = SparkContext(conf=conf)\nspark = SparkSession(sc)\n"

In [21]:
# Вариант 2. Создаем дефолтную spark-сессию
from pyspark.sql import SparkSession

# Создаем SparkSession
spark = SparkSession.builder.appName("RDD_Example").getOrCreate()

Для того, чтобы создать RDD необходимо к некоторой коллекции объектов применимеить операциюю parallelize. В результате работы spark разобъёт данные на куски (партиции) и отправит её части на разные worker ноды.

In [22]:
# Создаем RDD из списка
data = [("Иван", 30), ("Мария", 25), ("Алексей", 35), ("Елена", 40)]
rdd = spark.sparkContext.parallelize(data)

# Выведем данные RDD
print(rdd.collect())  # [('Иван', 30), ('Мария', 25), ('Алексей', 35), ('Елена', 40)]


[('Иван', 30), ('Мария', 25), ('Алексей', 35), ('Елена', 40)]


Посмотрим на количество партиций.

In [23]:
rdd.getNumPartitions()

2

Spark выбирает количество партиций для каждого RDD в зависимости от конфигруции кластера - по количеству доступных вычислительных ресурсов CPU. Но можно менять количество партиций, как в большую так и в меньшую сторону. Для того чтобы поменять количество партиций в существует два метода:

*   repartition - позволяет как увеличивать, так и уменьшать количество партиций, но производит полную перетасовку (shuffle) данных между узлами
*   coelesce - метод, с помощью которого можно только уменьшать размер партций. Не требует полной перетасовки данных между исполнителями, поэтому более эффективен.



In [24]:
repartitioned = rdd.repartition(5)
print(f"Num partitions after repartition: {repartitioned.getNumPartitions()}")

coelesced = repartitioned.coalesce(2)
print(f"Num partitions after coelesce: {coelesced.getNumPartitions()}")

Num partitions after repartition: 5
Num partitions after coelesce: 2


Обратной операцией к операции parallelize является метод collect, который наоборот создаёт коллекцию из данных, хранящихся на различных worker нодах.

In [25]:
rdd.collect()

[('Иван', 30), ('Мария', 25), ('Алексей', 35), ('Елена', 40)]

In [26]:
# К элементам RDD можно применять различные операции
# Допустим, мы хотим увеличить возраст каждого человека на 1 год
rdd_transformed = rdd.map(lambda x: (x[0], x[1] + 1))
print(rdd_transformed.collect())

[('Иван', 31), ('Мария', 26), ('Алексей', 36), ('Елена', 41)]


In [27]:
# Оставим только людей старше 30 лет
rdd_filtered = rdd.filter(lambda x: x[1] > 30)
print(rdd_filtered.collect())

[('Алексей', 35), ('Елена', 40)]


In [32]:
# Отсортируем людей во возрасту
rdd.sortByKey(keyfunc=lambda x: x[1]).collect()

[('Мария', 25), ('Иван', 30), ('Алексей', 35), ('Елена', 40)]

In [33]:
# Отсортируем людей по именам
rdd.sortByKey().collect()

[('Алексей', 35), ('Елена', 40), ('Иван', 30), ('Мария', 25)]

In [35]:
# Допустим, у нас есть повторяющиеся имена, и мы хотим найти их средний возраст

data_with_duplicates = [("Иван", 30), ("Мария", 25), ("Иван", 40), ("Мария", 35)]
rdd_with_dup = spark.sparkContext.parallelize(data_with_duplicates)

In [36]:
rdd_with_dup.collect()

[('Иван', 30), ('Мария', 25), ('Иван', 40), ('Мария', 35)]

In [40]:
rdd_grouped = rdd_with_dup.groupByKey()

In [41]:
rdd_grouped.collect()

[('Иван', <pyspark.resultiterable.ResultIterable at 0x7c699f390950>),
 ('Мария', <pyspark.resultiterable.ResultIterable at 0x7c699f390210>)]

In [42]:
[(key, list(values)) for key, values in rdd_grouped.collect()]

[('Иван', [30, 40]), ('Мария', [25, 35])]

In [44]:
# Хотим найти сумму возраста повторяющихся имен
rdd_with_dup.groupByKey().mapValues(sum).collect()

[('Иван', 70), ('Мария', 60)]

In [47]:
# Хотим найти количество повторяющихся имен
rdd_with_dup.groupByKey().mapValues(lambda x: len(list(x))).collect()

[('Иван', 2), ('Мария', 2)]

In [49]:
# Хотим найти средний возраст повторяющихся имен
rdd_with_dup.groupByKey().mapValues(lambda x: sum(list(x)) / len(list(x))).collect()

[('Иван', 35.0), ('Мария', 30.0)]

RDD API поддердивает прямую загрузку из текстового файла. При этом каждая строка будет интерпретироваться, как отдельный элемент RDD

In [50]:
! echo "Hello, sample RDD" > text.txt
! echo "This RDD contains three lines" >> text.txt
! echo "This is the last line" >> text.txt
! echo "" >> text.txt
! echo "Just kidding, it contains five lines" >> text.txt

In [51]:
text_data = sc.textFile('text.txt')
text_data, text_data.collect()

(text.txt MapPartitionsRDD[140] at textFile at NativeMethodAccessorImpl.java:0,
 ['Hello, sample RDD',
  'This RDD contains three lines',
  'This is the last line',
  '',
  'Just kidding, it contains five lines'])

Теперь к данному RDD можно применять стандартные Spark операции

In [52]:
distinct_words = (
    text_data
        .filter(lambda x: len(x)) # отбираем только не пустые строки
        .flatMap(lambda x: x.split(' ')) # разбиваем все строки на слова и переводим список
        .distinct() # берём только уникальные слова
)
# Будьте внимательны, если такой файл существует,
# то spark будет выдавать ошибку
distinct_words.saveAsTextFile('words.txt')

# можно вывести отладочную информацию по данному RDD
print(distinct_words.toDebugString().decode())

Py4JJavaError: An error occurred while calling o899.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/content/words.txt already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:299)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1623)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1623)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1609)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1609)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:563)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


Снизу вверх показаны все низкоуровневые операции (lineage), которые были применены к данному RDD c самого начала его создания

Для того, чтобы переиспользовать посчитанные значения в рамках текущей сессии стоит использовать метод `.cache`, который сохраняет результат вычислений вершины графа вычислений в оперативной памяти. Это нужно для того, чтобы оперции, работающие поверх данной получали результат операций из оперативной памяти, а не считывались с диска.

Метод `.persist` позволяет сохранять промежуточные вычисления в рамках текущей сессии с более тонкой настройкой места хранения (жёсткий диск, оперативная память, ...)

In [13]:
distinct_words_cached = distinct_words.cache()
print(distinct_words_cached.toDebugString().decode())

(2) PythonRDD[22] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |  MapPartitionsRDD[18] at mapPartitions at PythonRDD.scala:160 [Memory Serialized 1x Replicated]
 |  ShuffledRDD[17] at partitionBy at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]
 +-(2) PairwiseRDD[16] at distinct at <ipython-input-12-89f2b19c0dba>:5 [Memory Serialized 1x Replicated]
    |  PythonRDD[15] at distinct at <ipython-input-12-89f2b19c0dba>:5 [Memory Serialized 1x Replicated]
    |  text.txt MapPartitionsRDD[14] at textFile at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]
    |  text.txt HadoopRDD[13] at textFile at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]


Как можно заметить после операции cache появились дополнительные вершины в графе вычислений, в которых указаны место расположения данных и количество их реплик: Memory Serialized 1x Replicated

In [14]:
distinct_words_cached.collect()
print(distinct_words_cached.toDebugString().decode())

(2) PythonRDD[22] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |       CachedPartitions: 2; MemorySize: 296.0 B; DiskSize: 0.0 B
 |  MapPartitionsRDD[18] at mapPartitions at PythonRDD.scala:160 [Memory Serialized 1x Replicated]
 |  ShuffledRDD[17] at partitionBy at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]
 +-(2) PairwiseRDD[16] at distinct at <ipython-input-12-89f2b19c0dba>:5 [Memory Serialized 1x Replicated]
    |  PythonRDD[15] at distinct at <ipython-input-12-89f2b19c0dba>:5 [Memory Serialized 1x Replicated]
    |  text.txt MapPartitionsRDD[14] at textFile at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]
    |  text.txt HadoopRDD[13] at textFile at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]


Для сохранения данных между сессиями можно использовать `.checkpoint`. Особенность этого метода — изменение графа вычислений.
Цепочка вычислений для сохраняемого RDD будет удалена.

Сокращение цепочки вычислений полезно в случае больших графов, например, в итеративных алгоритмах.

In [15]:
distinct_first_words = (
    text_data
        .filter(lambda x: len(x))
        .flatMap(lambda x: x.split(' ')[0])
        .distinct()
)

sc.setCheckpointDir('./checkpoints')

distinct_first_words.checkpoint()
print(distinct_first_words.toDebugString().decode())

(2) PythonRDD[27] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[26] at mapPartitions at PythonRDD.scala:160 []
 |  ShuffledRDD[25] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[24] at distinct at <ipython-input-15-b8ebd5c157fb>:5 []
    |  PythonRDD[23] at distinct at <ipython-input-15-b8ebd5c157fb>:5 []
    |  text.txt MapPartitionsRDD[14] at textFile at NativeMethodAccessorImpl.java:0 []
    |  text.txt HadoopRDD[13] at textFile at NativeMethodAccessorImpl.java:0 []


In [16]:
distinct_first_words.collect()
print(distinct_first_words.toDebugString().decode())

(2) PythonRDD[27] at RDD at PythonRDD.scala:53 []
 |  ReliableCheckpointRDD[28] at collect at <ipython-input-16-c1aaa7e99388>:1 []


Как можно увидеть предыдущий граф вычилений был полность удалён, и теперь вычисления начинаются с загрузки контрольной точки: ReliableCheckpointRDD

# Практика

Есть файл google_queries.csv, в нем указаны запросы пользователей в Google, количесво данных запросов в день, день запроса

In [53]:
!head google_queries.csv

request,number_of_views,date
карта,3,2025-02-11
погода,5,2025-02-13
котики,5,2025-02-11
игры,2,2025-02-14
погода,1,2025-02-17
ютуб,1,2025-02-13
погода,1,2025-02-18
вк,1,2025-02-15
авито,5,2025-02-11


In [57]:
from pyspark.sql import SparkSession

# Создаем SparkSession
spark = SparkSession.builder.appName("GoogleQueriesAnalysis").getOrCreate()
sc = spark.sparkContext  # Получаем SparkContext

# Загружаем файл в RDD
rdd = sc.textFile("google_queries.csv")

In [58]:
rdd.take(5)

['request,number_of_views,date',
 'карта,3,2025-02-11',
 'погода,5,2025-02-13',
 'котики,5,2025-02-11',
 'игры,2,2025-02-14']

In [59]:
# Пропускаем заголовок
header = rdd.first()
rdd = rdd.filter(lambda line: line != header)

In [60]:
rdd.take(5)

['карта,3,2025-02-11',
 'погода,5,2025-02-13',
 'котики,5,2025-02-11',
 'игры,2,2025-02-14',
 'погода,1,2025-02-17']

In [61]:
# Парсим данные (request, number_of_views, date)
rdd_parsed = rdd.map(lambda line: line.split(",")).map(lambda x: (x[0], int(x[1]), x[2]))

In [62]:
rdd_parsed.take(5)

[('карта', 3, '2025-02-11'),
 ('погода', 5, '2025-02-13'),
 ('котики', 5, '2025-02-11'),
 ('игры', 2, '2025-02-14'),
 ('погода', 1, '2025-02-17')]

In [64]:
# 1. Общее количество запросов
total_queries = rdd_parsed.count()
print(f"1. Общее количество запросов: {total_queries}")

1. Общее количество запросов: 255


In [65]:
# 2. Общее количество просмотров
total_views = rdd_parsed.map(lambda x: x[1]).sum()
print(f"2. Общее количество просмотров: {total_views}")

2. Общее количество просмотров: 743


In [66]:
# 3. Среднее количество просмотров на запрос
avg_views_per_query = total_views / total_queries if total_queries > 0 else 0
print(f"3. Среднее количество просмотров на запрос: {avg_views_per_query:.2f}")

3. Среднее количество просмотров на запрос: 2.91


In [67]:
# 4. Запрос с максимальным числом просмотров
max_request = rdd_parsed.max(key=lambda x: x[1])
print(f"4. Запрос с максимальным числом просмотров: {max_request}")

4. Запрос с максимальным числом просмотров: ('погода', 5, '2025-02-13')


In [68]:
# 5. Запрос с минимальным числом просмотров
min_request = rdd_parsed.min(key=lambda x: x[1])
print(f"5. Запрос с минимальным числом просмотров: {min_request}")

5. Запрос с минимальным числом просмотров: ('погода', 1, '2025-02-17')


In [69]:
# 6. Количество уникальных запросов
unique_requests = rdd_parsed.map(lambda x: x[0]).distinct().count()
print(f"6. Количество уникальных запросов: {unique_requests}")

6. Количество уникальных запросов: 14


In [70]:
# 7. Среднее количество просмотров на дату
views_per_date = rdd_parsed.map(lambda x: (x[2], x[1])).groupByKey().mapValues(lambda values: sum(values) / len(values))
avg_views_per_date = views_per_date.map(lambda x: x[1]).mean()
print(f"7. Среднее количество просмотров на дату: {avg_views_per_date:.2f}")

7. Среднее количество просмотров на дату: 2.92


In [71]:
# 8. Общее число дат, когда были поисковые запросы
unique_dates = rdd_parsed.map(lambda x: x[2]).distinct().count()
print(f"8. Общее число дат, когда были поисковые запросы: {unique_dates}")

8. Общее число дат, когда были поисковые запросы: 10


In [72]:
# 9. Запрос с наибольшим числом дней появления
query_day_count = rdd_parsed.map(lambda x: (x[0], x[2])).distinct().map(lambda x: (x[0], 1)) \
                            .reduceByKey(lambda a, b: a + b)
max_days_request = query_day_count.max(key=lambda x: x[1])
print(f"9. Запрос с наибольшим числом дней появления: {max_days_request}")

9. Запрос с наибольшим числом дней появления: ('фильмы', 10)


In [73]:
# 10. Самая популярная дата по просмотрам
popular_date = rdd_parsed.map(lambda x: (x[2], x[1])).reduceByKey(lambda a, b: a + b) \
                         .max(key=lambda x: x[1])
print(f"10. Самая популярная дата по просмотрам: {popular_date}")

10. Самая популярная дата по просмотрам: ('2025-02-11', 97)


In [74]:
# 11. Дата с наибольшим количеством уникальных запросов
unique_queries_per_date = rdd_parsed.map(lambda x: (x[2], x[0])).distinct() \
                                    .map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)
max_unique_query_date = unique_queries_per_date.max(key=lambda x: x[1])
print(f"11. Дата с наибольшим количеством уникальных запросов: {max_unique_query_date}")

11. Дата с наибольшим количеством уникальных запросов: ('2025-02-11', 14)


In [75]:
# 12. Общее количество дней, когда было более 3 уникальных запросов
days_with_more_than_3_queries = unique_queries_per_date.filter(lambda x: x[1] > 3).count()
print(f"12. Общее количество дней с >3 уникальными запросами: {days_with_more_than_3_queries}")

12. Общее количество дней с >3 уникальными запросами: 10


In [76]:
# 13. Топ-3 самых популярных запроса по просмотрам
top_3_queries = rdd_parsed.map(lambda x: (x[0], x[1])).reduceByKey(lambda a, b: a + b) \
                          .top(3, key=lambda x: x[1])
print(f"13. Топ-3 самых популярных запроса по просмотрам: {top_3_queries}")

13. Топ-3 самых популярных запроса по просмотрам: [('игры', 68), ('авито', 65), ('вк', 61)]
