# Принципы обработки больших данных
Дмитрий Бугайченко, Mail.ru

## Принцип 1


__Большие данные – это то, что вы не можете втянуть в память на одной машине__

### Применительно к нашей задаче

* Объем данных (текст в гзипе) меньше гигабайта. Для обработки в таком случае обычно достаточно 6-8 гигабайт, но можно упереться рогом и уложится и в 4.
* И это хорошо, так как питон это не инструмент больших данных 
* Но остальные принципы продолжают работать

## Принцип 2

__Верное решение это простое решение__

### Применительно к нашей задаче

* Первый прогноз можно построить по счетчику общих друзей
* Выданный граф не симетричный (по сути двудольный), чтобы удобно считать общих друзей нужно его развернуть
* Имея в памяти основной и развернуты графы задача решается просто

## Попытка номер 1

Рекомендуем в качестве друзей тех, у кого много общих друзей с нами. Для этого втягиваем в память словарь вида [я->мои друзья], и его же в "развернутом виде" [я->те у кого я в друзьях]

Для начала импортнем пакеты для работы с файлами

In [1]:
# Для чтения/записи csv файлов
import csv
# Для работы с архивами
import gzip
# Для работы с файловой системой
import os

from collections import defaultdict

Законстантим относителные пути

In [2]:
# Пути к данным
dataPath = "./"
graphPath = os.path.join(dataPath, "trainGraph")

Теперь будем читать

In [3]:
mineFriends = defaultdict(list)
friendsOfMine = defaultdict(list)

# Итерируемся по файлам в папке
for file in [f for f in os.listdir(graphPath) if f.startswith("part")]:
    csvinput = gzip.open(os.path.join(graphPath, file))
    csv_reader = csv.reader(csvinput, delimiter='\t')
    # А теперь по строкам в файле
    for line in csv_reader:
        user = int(line[0]) 
        # Разбираем идшки и маски друзей
        for friendship in line[1].replace("{(", "").replace(")}", "").split("),("):
            parts=friendship.split(",")
            friend = int(parts[0])
            mineFriends[user] += [friend]
            friendsOfMine[friend] += [user]

Все бы ничего, но два словарика в памяти *уже* заняли больше трех гигабайт и дальше ворочать их будет неудобно. Но поробуем...

In [4]:
friendsOfFriends = defaultdict(lambda: defaultdict(int))

for (user,friends) in mineFriends.iteritems():
    for friend in friends:
        for theirFriend in friendsOfMine[friend]:
            friendsOfFriends[user][theirFriend] += 1

KeyboardInterrupt: 

На 22-х гигах пришлось прибить... Теперь надо еще и вычистить из памяти шлак.

In [5]:
del friendsOfFriends
del mineFriends
del friendsOfMine

## Принцип 3

__Семь раз отмерь, один втяни.__

### Применительно к нашей задаче

* Стоит заранее оценить объемы памяти, которые потребуются
* Использовать подходящие структуры данных

### Ислледуем данные

Если с ходу не получилось, значит пришло время задуматся о том, как использовать память эффективнее. Для начала давайте попробуем прикинуть базовые показатели графа с которым работаем.

In [6]:
numUsersFrom = 0
numLinks = 0
maxUserIdFrom = 0
maxUserIdTo = 0
allIds = set()

for file in [f for f in os.listdir(graphPath) if f.startswith("part")]:
    csvinput = gzip.open(os.path.join(graphPath, file))
    csv_reader = csv.reader(csvinput, delimiter='\t')
    for line in csv_reader:
        user = int(line[0])
        numUsersFrom += 1
        maxUserIdFrom = max(user,maxUserIdFrom)
        allIds.add(user)
        for friendship in line[1].replace("{(", "").replace(")}", "").split("),("):
            parts=friendship.split(",")
            friend = int(parts[0])
            allIds.add(friend)
            numLinks += 1
            maxUserIdTo = max(maxUserIdTo,friend)
            
print("Num users from {0}, max {1}, num total {2}, max {3}, num links {4}".format(
        numUsersFrom,maxUserIdFrom,len(allIds),maxUserIdTo,numLinks))

Num users from 107474, max 16619036, num total 12417564, max 16619131, num links 36192484


## Выбираем структуры данных

### Словари

* Легко пользоватся.
* Но _ОЧЕНЬ_ много накладных расходов: указатель на запись, указатель на ключ/значение, объектная обертка для ключа и значения.
* Итого: на пару инт-инт _накладных_ расходов 48 байт (facepalm).
* Данные размазаны по памяти и фрагментированны - кеш процессора резко терят эффективность.

### Списки

* Если ключ инт - можно использовать вместо словаря.
* НО! Список в питоне это массив указателей, а значит накладные расходы на указатель и объектную обертку ключа. остаются - 20 байт накладных расходов на запист :(.
* Проблема с потерей эффективности кеша сохраняется.

### Массивы numpy

* На пару инт-инт нужно всего 4 байта.
* Кеш процессора работает.
* Но нужно чтобы ключи шли последовательно (к счастью, в наших даных это так почти так: максимальный ид 16619131 при общем числе 12417564).
* Для того чтобы заполнить массив _очень_ желательно знать его размер изначально.
* Если делать двумерный массив 100к на 16м - будет очень, очень, очень больно...

### Спарс матрицы scipy

* Предназначены для хранения двмерных матриц с очень большим количеством нулей.
* Эффективные варианты хранят в виде нескольких массивов numpy.
* COO - три массива одинаковой длинны моделируют списов троек [(i,j,v)]. Просто строить, легко интегрировать с пандой.
* CSR - один маленьки массив с индексом начала для каждого ряда (монотонно растет), и два массива равно длинны с ид колонки и значением: [{i : [(j,v)]}]. Более компактна в памяти, быстрые опреации.
* CSC - тоже самое что CSR, но индексированны колонки.

### Применительно к нашей задаче

* Инициализруем граф в виде COO матрицы, затем переводим в CSR.
* Развернутый граф получаем с помощью транспонирования.
* Общих друзей считаем умножая матрицы.

## Попытка номер 2

Считаем на базе матриц!

In [7]:
# Почистим шлак и оставим константы чтоб потом не пресчитывать
del allIds
maxUserId = 16619131
numLinks = 36192484

In [8]:
# Эффективные массивы простых типов
import numpy
# Работа с матрицами (подсчет общих друзей реализован как умножение матрицы графа самое на себя)
import scipy
from scipy.sparse import coo_matrix, csr_matrix

Собственно давайте попробуем вчитать матрицу используя coc формат и затем преобразуем в csr.

In [9]:
# В этих массивах мы будем собирать данные. Инициализируем их заранее нужным размером чтобы 
# небыло лишнего копирования
formUser = numpy.zeros( (numLinks), dtype=numpy.int32 ) 
toUser = numpy.zeros( (numLinks), dtype=numpy.int32 ) 
data = numpy.ones( (numLinks), dtype=numpy.int16 ) 

# Здесь храним позицию, на которую надо записать новую связь
current = 0

# Итерируемся по файлам в папке... Да, опять. God bless Cmd+C/Cmd+V
for file in [f for f in os.listdir(graphPath) if f.startswith("part")]:
    csvinput = gzip.open(os.path.join(graphPath, file))
    csv_reader = csv.reader(csvinput, delimiter='\t')
    # А теперь по строкам в файле
    for line in csv_reader:
        user = int(line[0]) 
        maxUserId = max(user,maxUserId)
        # Разбираем идшки и маски друзей
        for friendship in line[1].replace("{(", "").replace(")}", "").split("),("):
            parts=friendship.split(",")
            # Записываем связь в массивы и двигаем указатель
            formUser[current] = user
            friend = int(parts[0])
            toUser[current] = friend
            maxUserId = max(friend,maxUserId)
            current += 1
            
    # Не забываем закрыть файл
    csvinput.close()
    
# Собственно из массивов создаем нашу матрицу
fullMatrix = coo_matrix(
    (data, (formUser, toUser)),
    shape=(maxUserId + 1, maxUserId + 1)).tocsr()

В матрице три массива, соответственно легко померять сколько памяти она занимает

In [10]:
print fullMatrix.data.nbytes + fullMatrix.indptr.nbytes + fullMatrix.indices.nbytes

283631436


Теперь аккуратно вычистим ненужное

In [11]:
del formUser
del toUser
del data

Матрицу надо развернуть. К счастью все уже украдено до нас и её можно просто транспонировать :). Главное не забыть перевести обрантно в csr формат (после транспонирования получим csc)

In [12]:
reversedMatrix = scipy.transpose(fullMatrix).tocsr()

Есть развернутая матрица, проверим сколько занимает

In [13]:
print reversedMatrix.data.nbytes + reversedMatrix.indptr.nbytes + reversedMatrix.indices.nbytes

283631436


Итого - две матрицу в сумме 540Мб в памяти :). Как будем считать общих друзей? Умножением матриц!

In [14]:
commonFriends = fullMatrix.dot(reversedMatrix)

Долговато, интересно, сколько получилось в памяти?

In [15]:
print commonFriends.data.nbytes + commonFriends.indptr.nbytes + commonFriends.indices.nbytes

3387510156


Мда, 3 с лишним гигабайта, а в мониторе на процессе почти 5... На ваших 6Гб ноутах может и не пролезть...

Эврика! Нам же нужно предиктить не для всех! Давайте отфильтруем только нужных :)

## Принцип 4

__Фильтрую ненужное сразу.__

### Применительно к нашей задаче

* Считать общих друзей только для 1/11-й.

## Попытка номер 3

Фильтруем данные и пробуем построить прогноз.

In [16]:
# Но сначала память подчистим
del commonFriends 

# Забьем ненужное 0-ями
for i in range(maxUserId + 1):
    if i % 11 != 7:
        ptr = fullMatrix.indptr[i]
        ptr_next = fullMatrix.indptr[i+1]
        if ptr != ptr_next:
            fullMatrix.data[ptr:ptr_next].fill(0)
            
# Чтобы нули не мешались при умножении, вычистим их и подуменьшим матрицу
fullMatrix.eliminate_zeros()

Сколько получилось после фильтрации?

In [17]:
print fullMatrix.data.nbytes + fullMatrix.indptr.nbytes + fullMatrix.indices.nbytes

85726554


85Мб :) Другое дело. Теперь можно и умножать

In [18]:
commonFriends = fullMatrix.dot(reversedMatrix)

Заметно быстрее :) А по памяти что?

In [19]:
print commonFriends.data.nbytes + commonFriends.indptr.nbytes + commonFriends.indices.nbytes

350058222


350Мб, у вас тоже наверняка в память влезет :)

### Генерируем прогноз

Ну а теперь, давайте записывать прогноз.

In [20]:
output = gzip.open( os.path.join(dataPath,"prediction.gz"), "w")
writer = csv.writer(output, delimiter='\t')

for i in range(maxUserId + 1):
    # Два указателя дают нам границы в которых лежат данные для этого i в матрице
    ptr = commonFriends.indptr[i]
    ptr_next = commonFriends.indptr[i+1] 
    # Если они не равны, значит данные есть и можно экспортировать
    if ptr != ptr_next:
        # Достаем счетчики общих друзей и создаем порядок на них от большего к меньшему
        counts = commonFriends.data[ptr:ptr_next]
        order = numpy.argsort(-counts)
        
        # Не забываем что из прогноза надо убрать себя и своих известных друзей
        mineFriends = set(fullMatrix.indices[fullMatrix.indptr[i]:fullMatrix.indptr[i+1]])
        mineFriends.add(i)
        
        # Достаем идшки друзей, сортируем, фильтруем, обрезаем и пишем
        ids = commonFriends.indices[ptr:ptr_next]        
        writer.writerow([i] + filter(lambda x: x not in mineFriends, ids[order]))      

# Не забываем закрыть файл
output.close()        

Хм, как долго-то... И сколько получилось в итоге?

In [21]:
print os.path.getsize(os.path.join(dataPath,"prediction.gz"))

182059146


Мда, 182Мб, не влезет ни в какие лимиты. Снова применим _4-й принцип_ - надо рубить хвосты!

## Попытка номер 4

Уменьшаем объем результата

In [22]:
output = gzip.open( os.path.join(dataPath,"prediction_short.gz"), "w")
writer = csv.writer(output, delimiter='\t')

for i in range(maxUserId + 1):
    ptr = commonFriends.indptr[i]
    ptr_next = commonFriends.indptr[i+1] 
    if ptr != ptr_next:
        counts = commonFriends.data[ptr:ptr_next]
        order = numpy.argsort(-counts)
        mineFriends = set(fullMatrix.indices[fullMatrix.indptr[i]:fullMatrix.indptr[i+1]])
        mineFriends.add(i)
        ids = commonFriends.indices[ptr:ptr_next]   
        # И собственно фильтр. 42
        writer.writerow([i] + filter(lambda x: x not in mineFriends, ids[order])[:42])      

# Не забываем закрыть файл
output.close() 

Гораздо быстрее! И сколько же там?

In [23]:
print os.path.getsize(os.path.join(dataPath,"prediction_short.gz"))

1471944


Полтора мегабайта! Первый сабмит готов! Что делать дальше?

## Принцип 5

__Слышишь тревожный шум вентилятора - сохраняйся__

### Применительно к нашей задаче

* Сохраним полученные результаты (граф, развернутый граф, счетчики общих друзей) в бинарном формате, чтобы не тратить время на чтение текстов.
* Подготовим выборки для тренировочного и тестового множества, если хотим пробовать машинное обучений.

## Cохраняемся

### Сначала данные по валидационному множеству

In [24]:
numpy.savez(os.path.join(dataPath, "commonFriends_validation.npz"),
            data=commonFriends.data,
            indices=commonFriends.indices,
            indptr=commonFriends.indptr,
            shape=commonFriends.shape)

Сколько занял файл?

In [25]:
print os.path.getsize(os.path.join(dataPath,"commonFriends_validation.npz"))

350058960


Те же 350Мб. Как теперь его вчитать обратно?

In [26]:
loaded = numpy.load(os.path.join(dataPath,"commonFriends_validation.npz"))
commonFriends_validation = csr_matrix((loaded['data'], loaded['indices'], loaded['indptr']), shape=loaded['shape'])


Перввая фича для валидациионного множества есть.

### Теперь трэйн и тест

Но полную матрицу мы уже пофильтровали... Но у нас же есть транспонированная полная матрица, транспонируем её еще раз!

In [27]:
train = scipy.transpose(reversedMatrix).tocsr()
test = train.copy()

Теперь пофильтруем каждую

In [28]:
for i in range(maxUserId + 1):
    if i % 11 != 3:
        ptr = train.indptr[i]
        ptr_next = train.indptr[i+1]
        if ptr != ptr_next:
            train.data[ptr:ptr_next].fill(0)
    if i % 11 != 9:
        ptr = test.indptr[i]
        ptr_next = test.indptr[i+1]
        if ptr != ptr_next:
            test.data[ptr:ptr_next].fill(0)

train.eliminate_zeros()
test.eliminate_zeros()

Ок - теперь у нас есть разметка для тестового и тренировочного множества. Сохраним её

In [29]:
numpy.savez(os.path.join(dataPath, "train_markup.npz"),
            data=train.data,
            indices=train.indices,
            indptr=train.indptr,
            shape=train.shape)

numpy.savez(os.path.join(dataPath, "test_markup.npz"),
            data=test.data,
            indices=test.indices,
            indptr=test.indptr,
            shape=test.shape)

Да, известную часть графа по валидационному множеству тоже сохраним

In [30]:
numpy.savez(os.path.join(dataPath, "validation_markup.npz"),
            data=fullMatrix.data,
            indices=fullMatrix.indices,
            indptr=fullMatrix.indptr,
            shape=fullMatrix.shape)

### Сохраняем распаршенные графы

И обратную матрицу с полным графом тоже сохраним, чтобы не парсить снова тексты

In [31]:
numpy.savez(os.path.join(dataPath, "reverseGraph.npz"),
            data=reversedMatrix.data,
            indices=reversedMatrix.indices,
            indptr=reversedMatrix.indptr,
            shape=reversedMatrix.shape)

validation_markup = fullMatrix
fullMatrix = scipy.transpose(reversedMatrix).tocsr()

numpy.savez(os.path.join(dataPath, "fullGraph.npz"),
            data=fullMatrix.data,
            indices=fullMatrix.indices,
            indptr=fullMatrix.indptr,
            shape=fullMatrix.shape)

Теперь подсчитаем общих друзей по тесту и трейну и сохраним их:

In [32]:
trainCommon = train.dot(reversedMatrix)
testCommon = test.dot(reversedMatrix)

# Надоело копировать - заведи метод!
numpy.savez(os.path.join(dataPath, "commonFriends_train.npz"),
            data=trainCommon.data,
            indices=trainCommon.indices,
            indptr=trainCommon.indptr,
            shape=trainCommon.shape)

numpy.savez(os.path.join(dataPath, "commonFriends_test.npz"),
            data=testCommon.data,
            indices=testCommon.indices,
            indptr=testCommon.indptr,
            shape=testCommon.shape)

Сколько получилось в памяти?

In [33]:
print trainCommon.data.nbytes + trainCommon.indptr.nbytes + trainCommon.indices.nbytes
print testCommon.data.nbytes + testCommon.indptr.nbytes + testCommon.indices.nbytes

373562826
371175918


Хм, на 20Мб больше чем на валидационном множестве... Ну понятно, для них же связи не прятали так активно.. Надо иметь ввиду что наша тренировочная выборка получилась смещенной!

# Итоги

Итого, сейчас в памяти:
* Полная и развернутая матрица графа.
* Три частичных матрицы графа (3/11)
* Три блока счетчиков общих друзей

И монитор системы показывает почти 5Гб - на 6 гигах памяти уже есть риск уйти в своп :(

## Принцип 6

__Жадность фраера погубит.__

### Применительно к нашей задаче

* Держать в памяти только то, что нужно.
* А раз уж мы в этом блокноте намусорили, надо его закрыть и начать готовить данные для машинного обучения в отдельном.

# Главный принцип действительно больших данных

__Разделяй и властвуй.__

<img src="./images/MapReduceExplained.png" />