# Spark

## Недостатки MapReduce
1) **Минимум 2 итерации записи на HDD** за job'у => не работает в реальном времени. Хотелось бы хранить сплиты в RAM.

2) **Парадигма коротко живущих контейнеров.** Вспоминаем лекцию по YARN (приложения на сервис, приложения на задачу...). 
    * При запуске mapreduce-таски (например, маппера) YARN запустит контейнер
    * контейнер умрёт его когда таска завершится. 
    * При аварии контейнер перезапустится на другой машине. 
Это удобно но старт-стоп контейнеров даёт Overhead если MapReduce-задач много.

3) Нужно писать **очень много кода** (вспоминаем задачу на Join с позапрошлого семинара).

4) По сути 1 источник данных - диск (HDFS, локальная ФС клиента... но всё равно диск). Хотелось бы уметь читать / писать в другие источники (базы данных, облачные хранилища).

**Итог:** с MapReduce **можно** работать с BigData, но нельзя работать быстро.

* **200Х годы**: нам нужна отказоустойчивая система. RAM на серверах мало. Будем сохранять промежуточные данные **на диск**.
* **201Х годы**: 
    - Память становится дешевле и больше. 
    - Запросы от бизнеса на максимально быструю обработку (real-time).
 
Диск использовать нецелесообразно - возвращаемся к RAM.

## Составляющие Spark-экосистемы

Spark Написан на Scala, имеет Scala, Java, Python API.

1. Spark Core - разбор кода, распределённое выполнение, поддержка отказоустойчивости.
2. Аналог "стандартной библиотеки":
   * Spark SQL - высокоуровненвая обработка с помощью pandas-подобного синтаксиса или SQL.
   * Spark Streaming, Spark Structured Streaming - обработка (обновление результатов) данных в real-time
   * MLLib - инструментарий для ML. Помимо Spark использует сторонние библиотеки (например Breeze, написанный на Fortran).
3. Планировщики:
   * Standalone - легковесный Spark на 1 машине. Использует встроенный планировщик
   * Может использовать другие планировщики (например YARN, Mesos, Kubernetes).

Подробнее **[здесь](https://www.oreilly.com/library/view/learning-spark/9781449359034/ch01.html)**.

#### Источники данных
![Image](images/datasources.png)

В теории можем читать-писать в большое кол-во источников и приёмников данных.

На практике:
* Есть проблемы при взаимодействии с Hive (подробнее будет на лекции),
* И при подключении к Cassandra.
* Хорошо взаимодействует с Kafka.

## Архитектура Spark-приложения
![Image](images/cluster-overview.png)
(https://spark.apache.org/docs/latest/cluster-overview.html)

1. Driver program - управляющая программа.
2. SparkContext - это основной объект, с помощью которого мы взаимодействуем со Spark.
3. Cluster manager - планировщик (любой, см. выше).
4. Executor - по сути JVM на нодах.

В 1-м приближении работает также как и Hadoop. Единственное, контейнеры **долго живущие**. Контейнеры поднимаются 1 раз и умирают когда заканчивается SparkContext. Это позволяет хранить данные **в памяти JVM**. Быстрее RAM только кеши CPU, но это сложно реализуется (ассемблер).

## Возможности работы со Spark
##### Интерактивный shell

1. `spark2-shell` - запускает Scala-оболочку.
2. `pyspark2` - python оболочку.

В этих оболочках уже имеется готовый SparkContext (переменная `sc`).

##### Запуск файла на исполнение
`spark2-submit [params] <file>` - можем запускать как jar-файлы, так и коды на Python.

## Запуск Spark в Jupyter-ноутбуке:

```bash
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_PYTHON=/usr/bin/python3 PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip="*" --port=<PORT> --NotebookApp.token="<TOKEN>" --no-browser' pyspark2 --master=yarn --num-executors=<N>
```
 - **PORT** - порт, на котором откроется ноутбук.
 - **TOKEN** - токен, который нужно будет ввести для входа в Jupyter (любая строка). Не оставляйте токен пустым т.к. в этом случае к вашему ноутбуку смогут подключаться другие пользователи. `--NotebookApp.token="<TOKEN>"` можно не писать, тогда он сгенерится сам, а посмотреть его можно будет с помощью команды `jupyter notebook list`.
 - **N** - кол-во executors (YARN containers), выделенных на приложение. 
 
Подробнее в [Userguide](https://docs.google.com/document/d/1dmb8o3M2ZCsjPq3rJQqd-jNLQhiBXWbWZcTn9aYUAp8/edit).
 
#### Режимы запуска Spark
1. **local**. И драйвер, и worker стартуют на 1 машине. Можно указывать число ядер, выделенных на задачу. Например, `local[3]`. Указывать меньше 2 не рекомендуется т.к. всегда запускает 2 процесса: driver, worker.
2. **yarn**. Распределённый режим. Здесь можно дополнительно указать `--deploy-mode`. 
   * `cluster`. Драйвер на мастере либо на ноде. Рекомендуется для прода.
   * `client`. Драйвер на клиенте. Проще отлаживаться. Проще работать в интерактивном режиме (сейчас мы работаем в режиме `client`). Но грузит клиент. 
 
В аргументах PySpark можно указывать и многе другое, подробнее [здесь](http://spark.apache.org/docs/latest/configuration.html#application-properties).

In [3]:
sc

Можем изменить конфигурацию SparkContext, правда его придётся перезапустить.

In [4]:
import getpass
conf = sc.getConf().setAppName("the {}\'s spark app".format(getpass.getuser())).set("spark.python.profile","true")
sc.stop()
sc = SparkContext(conf=conf)

In [5]:
rdd = sc.textFile("/data/griboedov").map(lambda x: x.strip())

In [6]:
sc.textFile("/data/griboedov").map(lambda x: x.strip()).count()

2681

In [7]:
rdd.count()

2681

In [8]:
sc.show_profiles()

Profile of RDD<id=4>
         29590 function calls (29584 primitive calls) in 0.028 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     5365    0.020    0.000    0.020    0.000 {method 'read' of '_io.BufferedReader' objects}
     2684    0.002    0.000    0.024    0.000 serializers.py:677(loads)
     2684    0.001    0.000    0.027    0.000 rdd.py:1055(<genexpr>)
     2681    0.001    0.000    0.001    0.000 {method 'decode' of 'bytes' objects}
     2684    0.001    0.000    0.002    0.000 serializers.py:714(read_int)
     2684    0.001    0.000    0.025    0.000 serializers.py:686(load_stream)
     2681    0.001    0.000    0.001    0.000 util.py:97(wrapper)
     2681    0.000    0.000    0.001    0.000 <ipython-input-6-a3d81ddb872f>:1(<lambda>)
     2684    0.000    0.000    0.000    0.000 {built-in method unpack}
        6    0.000    0.000    0.027    0.005 {built-in method sum}
        3    0.000    0

## Resilient Distributed Dataset и ленивые вычисления

RDD - набор данных, распределённый по партициям (аналог сплитов в Hadoop). Основной примитив работы в Spark. 

##### Свойства
* Неизменяемый. Можем получить либо новый RDD, либо plain object
* Итерируемый. Можем делать обход RDD
* Восстанавливаемый. Каждая партиция помнит как она была получена (часть графа вычислений) и при утере может быть восстановлена.

Создать RDD можно:
* прочитав данные из источника
* получить новый RDD из существующего.

In [9]:
! hdfs dfs -cat /data/griboedov/gore_ot_uma-1.txt | head

=====	�������������� 1
����������������	��������������!.. ����! ������ ���������� �������� ������������!
����������������	���������� ������������������ ���������� - ����������,
����������������	"�������� ����������". - ���������� �������� ���� ��������,
����������������	���� ������, ���������������� ���� ������������������ ���� ����������.
����������������	������������ ������ ������������ ������ ��������������������,
����������������	���� ��������!.. �������������� ����...
����������������	��������������,
����������������	����! ���������� ����������������, ��������.
����������������	���������� ������������ �������� ���� ��������;
cat: Unable to write to output stream.


In [10]:
rdd = sc.textFile("/data/griboedov")

In [11]:
rdd

/data/griboedov MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0

Идём в [SparkHistory UI](http://localhost:18089/) (для этого нужно пробросить порт 18089). Далее переходим в incompleted applications (приложение не завершилось т.к. SparkContext жив) и видим, что в списке Job пусто.

Несмотря на это, сам RDD есть:

In [12]:
rdd.map(lambda x: x.strip())

PythonRDD[8] at RDD at PythonRDD.scala:53

Посчитаем кол-во объектов в RDD

In [14]:
rdd.count()

2681

Снова проверяем UI и... job'а появилась!

В Spark'е сть 2 типа операций над RDD:
* [трансформации](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations). Преобразуют RDD в новое RDD.
* [действия](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions). Преобразуют RDD в обычный объект.

Трансформации выполяются **лениво**. При вызове трансформации достраивается граф вычислений и больше ничего не происходит. 

Реальное выполнение графа происходит при вызове Action.

## WordCount на Spark

Мы уже прочитали данные, теперь попробуем посчитать на них WordCount.

In [19]:
# строим граф вычислений
rdd = sc.textFile("/data/griboedov")
rdd = rdd.map(lambda x: x.strip().lower()) # приводим к нижнему регистру
rdd = rdd.flatMap(lambda x: x.split(" ")) # выделяем слова
rdd = rdd.map(lambda x: (x, 1))  # собираем пары (word, 1)
rdd = rdd.reduceByKey(lambda a, b: a + b) # суммируем "1" с одинаковыми ключами

In [20]:
rdd = rdd.sortBy(lambda a: -a[1]) # сортируем по кол-ву встречаемости

In [24]:
rdd.take(10) # Action!

[('', 432),
 ('в', 344),
 ('-', 296),
 ('и', 295),
 ('не', 287),
 ('я', 139),
 ('с', 129),
 ('на', 126),
 ('что', 104),
 ('*', 94)]

### Типы трансформаций в Spark

![Image](images/stages.png)
(https://www.slideshare.net/LisaHua/spark-overview-37479609)

* Часть трансформаций (map, flatmap, ...) обрабатывает партиции независимо. Такие трансформации называются *narrow*. 
* reduce, sortBy аггрегируют данные и используют передачу по сети. Они называются *wide*. 
   * Wide-трансформации могут менять кол-во партиций.
   * По wide-трансформациям происходит деление job'ы на Stages.

Stage тоже делится на task'и. 1 task выполняется для одной партиции.

**Итак: Task << Stage << Job << Application.**

В Spark есть возможность вывести план job'ы.

In [25]:
print(rdd.toDebugString().decode('utf-8'))

(3) PythonRDD[58] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[53] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[52] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(3) PairwiseRDD[51] at sortBy at <ipython-input-20-d4e0638907ea>:1 []
    |  PythonRDD[50] at sortBy at <ipython-input-20-d4e0638907ea>:1 []
    |  MapPartitionsRDD[47] at mapPartitions at PythonRDD.scala:133 []
    |  ShuffledRDD[46] at partitionBy at NativeMethodAccessorImpl.java:0 []
    +-(3) PairwiseRDD[45] at reduceByKey at <ipython-input-19-83ffad06e866>:6 []
       |  PythonRDD[44] at reduceByKey at <ipython-input-19-83ffad06e866>:6 []
       |  /data/griboedov MapPartitionsRDD[43] at textFile at NativeMethodAccessorImpl.java:0 []
       |  /data/griboedov HadoopRDD[42] at textFile at NativeMethodAccessorImpl.java:0 []


Видим всего 3 трансформации. Где все остальные?

Spark написан на Scala, которая под капотом использует JVM. Чтоб делать вычисления в Python, нужно вытаскивать данные из JVM. А потом возвращаться обратно. Получаем OverHead на сериализацию-десериализацию. Чтоб overhead'ов было меньше, схлопываем узкие трансформации в одну.

**Весь пример целиком:** `/home/velkerr/seminars/pd2018/14-15-spark/griboedov.py`

Запустим с помощью `spark2-submit griboedov.py`

### Задача 1.

> При подсчёте отсеять пунктуацию и слова короче 3 символов. 
При фильтрации можно использовать регулярку: `re.sub(u"\\W+", " ", x.strip(), flags=re.U)`.

### Задача 2.

> Считать только имена собственные. Именами собственными в данном случае будем считать такие слова, у которых 1-я буква заглавная, остальные - прописные.

**Решение**: ` /home/velkerr/seminars/pd2018/14-15-spark/griboedov_adv.py`

## Аккумуляторы

Аналоги счётчиков в Hadoop. 
* Используется для легковесной аггрегации (без `reduceByKey` и дополнительных shuffle'ов)
* Если аккумулятор используется в трансформациях, то нельзя гарантировать консистентность (мы можем с помощью action'a вызвать DAG несколько раз). Можно использовать в `foreach()`.

**Объявление:** `cnt = sc.accumulator(start_val)`

**Использование:** 
   * Inline: `foreach(lambda x: cnt.add(x))`
   * Или же, с помощью своей функции:
    ```python
    def count_with_conditions(x):
        global cnt
        if ...:
            cnt += 1

    rdd.foreach(lambda x: count_with_conditions(x))
    ```

**Получение результата:** `cnt.value`

Подробнее в [документации](http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators).

### Задача 3.

> Переделайте задача 2 так, чтоб кол-во имён собственных вычислялось с помощью аккумулятора.

**Решение**: ` /home/velkerr/seminars/pd2018/14-15-spark/griboedov_accum.py`

## Broadcast-переменные

Аналог DistributedCache в Hadoop. Обычно используется когда мы хотим в спарке сделать Map-side join (т.е. имеется 2 датасета: 1 маленький, который и добавляем в broadcast, другой большой).

In [None]:
br_cast = sc.broadcast(["hadoop", "hive", "spark", 'zookeeper', 'kafka']) 

In [None]:
br_cast

In [None]:
br_cast.value[3]

## Кеширование

При перезапуске Action, пересчитывается весь граф вычислений. Это логично т.к. в трансформациях ничего не вычисляется. Полезно это тем, что если за время работы задачи данные обновились (дополнились), нам достаточно просто перевызвать Action.

Но если данные не меняются (например, при отладке), такой пересчёт даёт Overhead. Можно **закешировать** часть pipeline. Тогда при след. вызове Action, RDD считается с кеша и пересчёт начнётся с того места, где было кеширование. В History UI все Stage перед этим будут помечены "Skipped".

In [56]:
rdd = sc.textFile("/data/griboedov")
rdd = rdd.map(lambda x: x.strip().lower())
rdd = rdd.flatMap(lambda x: x.split(" "))
rdd = rdd.map(lambda x: (x, 1)).cache() # тут всё хорошо работает, кешируем
rdd = rdd.reduceByKey(lambda a, b: a + b) # а тут хотим отладить, поэтому будут перезапуски
rdd = rdd.sortBy(lambda a: -a[1])

In [57]:
rdd.take(5)

[('', 432), ('в', 344), ('-', 296), ('и', 295), ('не', 287)]

In [62]:
rdd.unpersist()

PythonRDD[556] at RDD at PythonRDD.scala:53

Кешировать можно с помощью двух операций:
* `cache()`
* `persist(storage_level)`

В `persist()` можно указать [StorageLevel](https://spark.apache.org/docs/2.1.2/api/python/_modules/pyspark/storagelevel.html), т.е. на какой носитель кешируем. Можем закешировать в диск, в память, на диск и / или память на несколько нод... или дать возможность Spark'у решить самому (на основе объёма кеша).

`cache()` - это простой вариант `persist()`, когда кешируем только в RAM.

In [5]:
rdd = sc.textFile("/data/griboedov")
rdd = rdd.map(lambda x: x.strip().lower())
rdd = rdd.flatMap(lambda x: x.split(" "))
rdd = rdd.map(lambda x: (x, 1)).cache() # тут всё хорошо работает, кешируем
rdd = rdd.reduceByKey(lambda a, b: a + b) # а тут хотим отладить, поэтому будут перезапуски

In [23]:
rdd = rdd.sortBy(lambda a: -a[1])

In [25]:
rdd.cache()

PythonRDD[97] at RDD at PythonRDD.scala:53

In [26]:
rdd.take(5)

[('', 432), ('в', 344), ('-', 296), ('и', 295), ('не', 287)]

# Практические задания

В hdfs в папке `/data/access_logs/big_log` лежит лог в формате

* IP-адрес пользователя (`195.206.123.39`),
* Далее идут два неиспользуемых в нашем случае поля (`-` и `-`),
* Время запроса (`[24/Sep/2015:12:32:53 +0400]`),
* Строка запроса (`"GET /id18222 HTTP/1.1"`),
* HTTP-код ответа (`200`),
* Размер ответа (`10703`),
* Реферер (источник перехода; `"http://bing.com/"`),
* Идентификационная строка браузера (User-Agent; `"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"`).

Созданы несколько семплов данных разного размера:
```
3.4 G    10.2 G   /data/access_logs/big_log
17.6 M   52.7 M   /data/access_logs/big_log_10000
175.4 M  526.2 M  /data/access_logs/big_log_100000
```

#### Пример парсинга логов

In [63]:
DATASET = "/data/access_logs/big_log_10000"

In [64]:
import re
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from datetime import datetime as dt

log_format = re.compile( 
    r"(?P<host>[\d\.]+)\s" 
    r"(?P<identity>\S*)\s" 
    r"(?P<user>\S*)\s"
    r"\[(?P<time>.*?)\]\s"
    r'"(?P<request>.*?)"\s'
    r"(?P<status>\d+)\s"
    r"(?P<bytes>\S*)\s"
    r'"(?P<referer>.*?)"\s'
    r'"(?P<user_agent>.*?)"\s*'
)

def parseLine(line):
    match = log_format.match(line)
    if not match:
        return ("", "", "", "", "", "", "" ,"", "")

    request = match.group('request').split()
    return (match.group('host'), match.group('time').split()[0], \
       request[0], request[1], match.group('status'), match.group('bytes'), \
        match.group('referer'), match.group('user_agent'),
        dt.strptime(match.group('time').split()[0], '%d/%b/%Y:%H:%M:%S').hour)


lines = sc.textFile(DATASET)
parsed_logs = lines.map(parseLine).cache()

In [66]:
parsed_logs.take(10)

[('109.105.128.100',
  '10/Dec/2015:00:00:00',
  'GET',
  '/id45574',
  '200',
  '27513',
  '-',
  'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.99 Safari/537.36',
  0),
 ('217.146.45.122',
  '10/Dec/2015:00:00:00',
  'GET',
  '/id40851',
  '200',
  '11914',
  '-',
  'Mozilla/5.0 (X11; Linux i686; rv:10.0.4) Gecko/20120421 Firefox/10.0.4',
  0),
 ('17.72.78.198',
  '10/Dec/2015:00:00:00',
  'GET',
  '/id58931',
  '200',
  '32457',
  '-',
  'Mozilla/5.0; TOB 6.11 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko',
  0),
 ('46.245.183.68',
  '10/Dec/2015:00:00:00',
  'GET',
  '/id19513',
  '200',
  '26190',
  '-',
  'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.99 Safari/537.36',
  0),
 ('91.197.164.156',
  '10/Dec/2015:00:00:01',
  'GET',
  '/id39028',
  '200',
  '14115',
  '-',
  'Mozilla/5.0 (X11; Fedora; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari

#### 2й вариант парсинга - с помощью namedtuple

```python
LogItem = namedtuple("LogItem", 
                     ["host", "time", "method", "path", "status", "length", "referer", "user_agent", "hour"])

def parseLine(line):
    match = log_format.match(line)
    if not match:
        return LogItem("", "", "", "", "", "", "" ,"", "")

    request = match.group('request').split()
    return LogItem(
        host=match.group('host'),
        time=match.group('time').split()[0],
        method=request[0],
        path=request[1],
        status=match.group('status'),
        length=match.group('bytes'),
        referer=match.group('referer'),
        user_agent=match.group('user_agent'),
        hour=dt.strptime(match.group('time').split()[0],'%d/%b/%Y:%H:%M:%S').hour
    )
```

Распарсили, получили RDD, закешировали.

In [18]:
parsed_logs.take(1)

[('109.105.128.100',
  '10/Dec/2015:00:00:00',
  'GET',
  '/id45574',
  '200',
  '27513',
  '-',
  'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.99 Safari/537.36',
  0)]

In [19]:
import re
import sys
from pyspark.sql import SparkSession, Row

log_format = re.compile(
    r"(?P<host>[\d\.]+)\s"
    r"(?P<identity>\S*)\s"
    r"(?P<user>\S*)\s"
    r"\[(?P<time>.*?)\]\s"
    r'"(?P<request>.*?)"\s'
    r"(?P<status>\d+)\s"
    r"(?P<bytes>\S*)\s"
    r'"(?P<referer>.*?)"\s'
    r'"(?P<user_agent>.*?)"\s*'
)


def parseLine(line):
    match = log_format.match(line)
    if not match:
        return ("", "", "", "", "", "", "", "", "")

    request = match.group('request').split()
    return (match.group('host'), match.group('time').split()[0],
        request[0], request[1], match.group('status'), int(match.group('bytes')),
        match.group('referer'), match.group('user_agent'),
        dt.strptime(match.group('time').split()[0], '%d/%b/%Y:%H:%M:%S').hour)


if __name__ == "__main__":
    spark_session = SparkSession.builder.master("yarn").appName("501 df").config("spark.ui.port", "18089").getOrCreate()
    lines = spark_session.sparkContext.textFile("%s" % sys.argv[1])
    parts = lines.map(parseLine)

### Задача 4.
> Напишите программу, выводящую на экран TOP5 ip адресов, в которых содержится хотя бы одна цифра 4, с наибольшим количеством посещений.
Каждая строка результата должна содержать IP адрес и число посещений, разделенные табуляцией, строки должны быть упорядочены по числу посещений по убыванию, например:
```
195.206.123.39<TAB>40
196.206.123.40<TAB>39
191.206.123.41<TAB>38
175.206.123.42<TAB>37
195.236.123.43<TAB>36
```

In [20]:
parsed_logs

PythonRDD[82] at RDD at PythonRDD.scala:53

### Задача 5.
>  Напишите программу, выводящую на экран суммарное распределение количества посетителей по часам (для каждого часа в сутках вывести количество посетителей, пришедших в этот час). Id посетителя = ip + user_agent.
Результат должен содержать час в сутках и число посетителей, разделенные табом и упорядоченные по часам. Например:
```
0<tab>10
1<tab>10
2<tab>10
…..
21<tab>30
22<tab>20
23<tab>10
```

In [21]:
parsed_logs

PythonRDD[82] at RDD at PythonRDD.scala:53

# Dataframe

In [36]:
df = spark.read.format('csv').option('sep', ' ').load('/data/access_logs/big_log_10000')


In [37]:
df.limit(1).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9
0,109.106.133.8,-,-,[12/Dec/2015:01:31:46,+0400],GET /id53821 HTTP/1.1,200,21546,-,Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4)...


In [38]:
df = spark.read.csv('/data/twitter/twitter_sample_small.txt', sep='\t')

In [39]:
df.limit(1).toPandas()

Unnamed: 0,_c0,_c1
0,12,2241


In [42]:
df.select(
    F.col("_c0").alias("destination"),
    F.col("_c1").alias("source")
).show(1)

+-----------+------+
|destination|source|
+-----------+------+
|         12|  2241|
+-----------+------+
only showing top 1 row



In [43]:
df1 = df.select(
    F.col("_c0").alias("destination"),
    F.col("_c1").alias("source")
)

In [46]:
df1.limit(10).toPandas()

Unnamed: 0,destination,source
0,12,2241
1,12,13349
2,12,41873
3,12,82473
4,12,414853
5,12,755452
6,12,758983
7,12,793023
8,12,794748
9,12,806280


In [47]:
df1.groupBy('source').count().show()

+--------+-----+
|  source|count|
+--------+-----+
|19593065|    1|
|20651832|    1|
|21215059|    1|
|21240863|    1|
|21705463|    1|
|22197844|    1|
|22377590|    1|
|22935113|    1|
|23993819|    1|
|24268091|    1|
|24299946|    1|
|24326074|    1|
|25053014|    1|
|26715269|    1|
|27121291|    1|
|27385940|    1|
|27628353|    1|
|28817312|    1|
|29847995|    1|
|30290044|    1|
+--------+-----+
only showing top 20 rows



In [50]:
df1.groupBy('source').count().orderBy(F.col('count').desc()).show()

+--------+-----+
|  source|count|
+--------+-----+
|      53|    7|
| 9598762|    4|
|15458708|    4|
|13342022|    4|
|      20|    4|
|19489341|    4|
|      12|    4|
|14206015|    3|
|26468557|    3|
|14287820|    3|
|     107|    3|
|52041136|    3|
|17184081|    3|
|19788155|    3|
|21494147|    3|
|18662758|    3|
|18234522|    3|
|16227030|    3|
|47516482|    3|
|      23|    3|
+--------+-----+
only showing top 20 rows



In [55]:
from pyspark.sql.functions import sum,avg,max,count

df1.groupBy("source").agg(count("*").alias("count"), avg("source").alias("mean")).show(10)

+--------+-----+-----------+
|  source|count|       mean|
+--------+-----+-----------+
|19593065|    1|1.9593065E7|
|20651832|    1|2.0651832E7|
|21215059|    1|2.1215059E7|
|21240863|    1|2.1240863E7|
|21705463|    1|2.1705463E7|
|22197844|    1|2.2197844E7|
|22377590|    1| 2.237759E7|
|22935113|    1|2.2935113E7|
|23993819|    1|2.3993819E7|
|24268091|    1|2.4268091E7|
+--------+-----+-----------+
only showing top 10 rows



#### Пример

In [31]:
import pyspark.sql.functions as F

TweeterDF = spark.read.format("csv")\
          .option("sep", "\t")\
          .load("/data/twitter/twitter_sample_small.txt")

PeakDF = TweeterDF\
    .select(
        F.col("_c0").alias("destination"),
        F.col("_c1").alias("source"))\
    .groupBy("source")\
    .agg(F.collect_list(F.col("destination")).alias("all_peak"))\
    .persist()

NewStageDF = PeakDF\
    .filter(F.col("source") == F.lit("12"))\
    .select(
        F.col("source").alias("path"),
        F.col("all_peak").alias("last_peak"))

while not NewStageDF.select(F.expr("array_contains(last_peak, '34')")).first()[0]:
    NewStageDF = NewStageDF\
        .alias("init")\
        .join(
            PeakDF.alias("df"),
            F.expr("array_contains(init.last_peak, df.source)"))\
        .distinct()\
        .select(
            F.concat_ws(",", F.col("init.path"), F.col("df.source")).alias("path"),
            F.col("df.all_peak").alias("last_peak"))

path = NewStageDF\
    .filter(F.expr("array_contains(last_peak, '34')"))\
    .select(F.concat_ws(",", F.col("path"), F.lit("34")).alias("path"))\
    .collect()[0][0]

PeakDF.unpersist()

print(path)


12,422,53,52,107,20,23,274,34
