Данные, с которыми будет работать - открытые данные из комиссии по ценным бумагам и биржам США.
Описанние данных лежит вот здесь - https://www.sec.gov/dera/data/edgar-log-file-data-set.html Здесь порядка терабайта логов с сервера за месяц, разбитых по дням.


Важно заранее создать папку для данных

```
hdfs dfs -mkdir -p /seclog
```

Note: запускать необходимо с головной машины кластера
```
apt-get update && apt-get install parallel
```

```
printf %s\\n {01..30} | parallel -k --lb 'wget http://www.sec.gov/dera/data/Public-EDGAR-log-file-data/2017/Qtr2/log201706{}.zip && unzip -p log201706{}.zip log201706{}.csv | tail -n +2 | hdfs dfs -put - /seclog/day_{}.csv && rm log201706{}.zip'
```

**Для семинара** - чтобы не ждать слишком долго, можно сказать 3 первых дня для дальнейшей работы.

После того, как кластер spark создался, можно начать пользоваться более удобным интерфейсом и работать в Jupyter, который хостится уже на кластере.

Он немножко сломан по умолчанию, но на официальном форуме рассказали, что это легко починить:

Необходимо подключиться к головной машине через ssh, открыть файл `/usr/bin/anaconda/lib/python2.7/site-packages/nbformat/_version.py` и заменить 5 на 4.

После этого остается перезагрузить Jupyter через ambari.

Сессия спарка доступна в ноутбуке через переменную `spark`.
Для того, чтобы спарк "прогрелся" и начал выполнять запросы, создадим контекст, который нам впоследствии потребуется.

In [None]:
sp = spark.sparkContext

In [None]:
data = sp.textFile("wasb:///seclog/day_01.csv")

Количество строк в файле

In [None]:
data.count()

Предыдущая задача решенная на спарке:

In [None]:
def get_user_and_size(line):
    columns = line.split(',')
    user, size = columns[0], columns[8]

    return user, float(size)

result = data.map(get_user_and_size).reduceByKey(lambda x, y: x+y).values().mean()
print(result)

Можно заметить, что получилось гораздо приятнее и быстрее, чем класический MR.

Полный список операций, которые можно делать на спарке - здесь 
* https://spark.apache.org/docs/latest/rdd-programming-guide.html
* http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD

Попробуем тогда запустить этот же алгоритм на всех данных, что у вас есть

In [None]:
full_data = sp.textFile("wasb:///seclog/day_*.csv")
result = full_data.map(get_user_and_size).reduceByKey(lambda x, y: x+y).values().mean()
print(result)

Если вы скачали хотя бы 3 дня из всего датасета, то можно заметить, что спустя некоторое время все грохнулось со страшной ошибкой

```
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 7.0 failed 4 times, most recent failure: Lost task 8.3 in stage 7.0 (TID 76, wn4-spark2.woms2y4mgyiehbhy33ebogdnwh.bx.internal.cloudapp.net, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 377, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1861, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/hdp/current/spark2-client/python/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/usr/hdp/current/spark2-client/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 5, in get_user_and_size
ValueError: could not convert string to float: 'null'

	at org.apache.spark.api.python.BasePythonRunnerReaderIterator.handlePythonException(PythonRunner.scala:452)
    ...
```

Часто причину ошибки можно понять сразу из этого трейсбека, однако если это не получается сделать сразу - полезно пойти и посмотреть в консоль спарка.
Конкретно здесь можно увидеть, что это питон жалуется на то, что строку null нельзя превратить в число.

В такой ситуации мы можем предпринять самое простое - просто выкинуть такие элементы из датасета.

Вариант 1 - закешировать результаты и использовать кешированный RDD.

In [None]:
data = full_data.filter(lambda x: x.split(',')[8] != 'null').cache()

result = data.map(get_user_and_size).reduceByKey(lambda x, y: x+y).values().mean()
print(result)

data_count = data.count()
print(data_count)

Важно отметить, что для подсчета общего количества не потребовалось еще раз фильтровать - спарк переиспользовал закешированный RDDшник. 
Однако кеш держится только в рамказ сессии - при перезапуске сессии, спарку придется пересчитать его заного. Более того, в процессе расчетов спарк может посчитать, что этот кеш нужно сбросить (например не будет места для его хранения) и тогда следующий расчет поверш кешированного rdd опять же будет его пересчитывать. 

Чтобы персистентно сохранить результат, можно явно записать результат в HDFS.

In [None]:
! hdfs dfs -mkdir -p /seclog/cleaned

In [None]:
data.saveAsTextFile("wasb:///seclog/cleaned/data.bin")

In [None]:
! hdfs dfs -ls /seclog/cleaned/data.bin

In [None]:
data = sp.textFile("wasb:///seclog/cleaned/data.bin")

Считаем общее количество пользователей

In [None]:
total_users = data.map(lambda x: x.split(',')[0]).distinct().count()
print(total_users)

Считаем общее количество пользователей, которые сидят ночью

In [None]:
total_night_users = data.filter(lambda x: int(x.split(',')[2].split(':')[0]) < 6)\
                    .map(lambda x: x.split(',')[0]).distinct()\
                    .count()
print(total_night_users)

Считаем 10 самых больших документа, которые скачивали пользователи

In [None]:
def get_name_and_size(line):
    columns = line.split(',')
    name, size = columns[6], columns[8]
    return float(size), name

top_10_large_result = data.map(get_name_and_size).sortByKey(ascending=False).values()\
                      .zipWithIndex().filter(lambda x: x[1] < 10).keys().collect()
print(result)

Если результатом является не какая-то одна статистика, а большой массив данных, то его нужно сохранять в HDFS. В противном случае, у вас лопнет жупитер вместе с головной нодой.

In [56]:
! hdfs dfs -mkdir -p /seclogres

In [None]:
result = data.map(get_name_and_size).sortByKey(ascending=False).values()\
        .zipWithIndex().filter(lambda x: x[1] < 10).keys()

result.saveAsTextFile("wasb:///seclogres/top_10_requests.txt")

In [64]:
! hdfs dfs -cat /seclogres/top_10_requests.txt/part-00000

.txt
.txt
.txt
.txt
.txt
.txt
.txt
.txt
.txt
.txt


**Задача**

* Посчитать топ 10 самых посещаемых страниц (считаются только успешные запросы - код 200)
* Посчитать суммарное количество людей по часам (~гистограма)
* Посчитать среднее количество людей по часам (~гистограма)

In [None]:
# DO IT

Вычисления можно также проводить и в более "ручном" режиме (примерно как в MR)

Ниже - вычисление среднего объема, который выкачивает каждый пользователь (смотри задачу выше), решенная немного другим подходом

In [None]:
def mapper(line):
    columns = line.split(',')
    user, size = columns[0], columns[8]
    return user, float(size)

def sum_reducer(item):
    key, values = item
    result = 0
    for value in values:
        result += value
    return result, 1

def mean_reducer(item):
    result_key, values = item
    summ, count = 0, 0
    for current_summ, current_count in values:
        summ += current_summ
        count += current_count
    return summ / count


result = data.map(mapper).groupByKey().map(sum_reducer).groupBy(lambda x: 1).map(mean_reducer).collect()
print(result)

Также можно явно работать с партициями, которыми оперирует спарк

In [None]:
def mean_reducer_2(values):
    summ, count = 0, 0
    for s, c in values:
        summ, count = summ + s, count + c
    return summ / count

result = data.map(mapper).groupByKey().map(sum_reducer).repartition(1).glom().map(mean_reducer_2).collect()
print(result)

Казалось бы, что мы уже попали с распределенный рай и можем ворочать большие данные, как нам вздумается. 
Однако ситуацию можно еще раз качественно улучшить!

Спарк предоставляет возможность фиксировать формат датасетов в виде таблиц и умеет запускать SQL поверх этих таблиц. Таким образом, не требуется даже составлять спарковские операции в нужном порядке с ручным написанием функций - достаточно просто запустить один SQL запрос. Можно еще отдельно отметить, что работать это будет даже быстрее, так как у спарка есть возможноть оптимизировать запросы под капотом и не возиться с медленным питоном.

In [None]:
df = spark.read.csv('wasb:///seclog/cleaned/data.bin', header=False, inferSchema=True)
df.show()

In [None]:
columns_name = ['ip', 'date', 'time', 'zone', 'cik', 'accession', 'doc', 'code', 'size', 'norefer', 'noagent', 'find', 'crawler', 'browser']
for index, name in enumerate(columns_name):
    df = df.withColumnRenamed('_c{}'.format(index), name)
df.show()

От датафрейма, можно вернутся к RDD, который лежит под этим датафреймом

In [None]:
df.rdd.count()

Для того, чтобы начать делать к нему запросы необходимо зарегистрировать его как временную таблицу

In [None]:
df.registerTempTable('logs')

In [None]:
newdf = spark.sql("""
SELECT count(*) FROM logs
""").toPandas()
newdf.head()

Решим задачу про среднее количество данных еще раз, но уже на SQL.

In [None]:
result = spark.sql("""
SELECT avg(size) 
FROM (
    SELECT sum(size) as size
    FROM logs
    GROUP BY ip
) as t
""")
result.show()

**Задача**

* Найти 10 пользователей (ip-адрессов), которые выкачали больше всего данных за все время используя Spark SQL.
* Найти 10 пользователей (ip-адрессов), у которых самое большое время сессии на сайте (решать можно используя любые инструменты Spark). Сессия на сайте - серия запросов к серверу, сделанных одним пользователем в заданный промежуток времени. Сессия считается завершенной, если в течение 30 минут от пользователя не поступило к серверу ни одного нового запроса. Время сессии - время от запроса, который открыл сессию до последнего запроса в этой сессии.
* Скачать все логи за год в кластер. Найти всех пользователей, которые заходили каждый день месяца с указанием - что за месяц (если такие есть). Если таких нет - найти пользователей, которые заходили наибольшее число раз в наибольшее количество дней (с указанием в какие месяца)

In [None]:
# DO IT