# One Billion Row Challenge

Изначально [челендж](https://github.com/gunnarmorling/1brc) предназначен для соревнования программистов на Java. Мне стало интересно попробовать сделать то же самое на _интерактивном_ Python. 
Основная сложность в этом челендже - работа с памятью. Если создаются большие объекты - их необходимо свопировать на диск и это занимает время. При этом не факт, что ядро интерпретатора не разрушится. 

In [1]:
import pandas as pd, random, time, polars as pl
from os import cpu_count
from joblib import Parallel, delayed 
from tqdm import tqdm

In [29]:
#!pip install duckdb
import duckdb

## Обработка исходных данных в текстовом формате


### Генерация исходных данных 

Исходные данные для работы генерируются случайным образом и записываются на диск.  Я решил записывать данные на внешний SSD, подключенный через USB порт. Используется список всех погодных станций в мире, из которого отобрано 10к элементов. 

In [2]:
stations_path = './data/weather_stations.csv'
stations = pd.read_csv(stations_path, sep=';', skiprows=2, header=None, names=['City','Temp'])
print(stations.shape)
stations.head()

(44691, 2)


Unnamed: 0,City,Temp
0,Tokyo,35.6897
1,Jakarta,-6.175
2,Delhi,28.61
3,Guangzhou,23.13
4,Mumbai,19.0761


In [3]:
text_path = "/Volumes/Samsung_T5/Data/One Billion Row Challenge/data/measur.txt"
parq_path = "/Volumes/Samsung_T5/Data/One Billion Row Challenge/data/measur.parquet"

In [4]:

# Максимально 10 тыс станций для измерения температуры
weather_stations_10k = random.choices(list(stations.City), k=10000)
coldest_temp = -99.9
hottest_temp = 99.9
batch_size = 10000 # instead of writing line by line to file, process a batch of stations and put it to disk
num_rows_to_create = 1000_000_000
num_chunks = num_rows_to_create // batch_size

Генерация текстового файла. Использую внешний SSD для размещения файла. Алгоритм практически заимствован из соревнования. Маленькое исключение - формирование батча в виде списка и слияние перед записью оказалось чуть быстрее. 

In [5]:
was = time.time()
with open(text_path, 'w') as file:
    for n in tqdm(range(num_chunks)):
        selection = random.choices(weather_stations_10k, k=batch_size)
        batch = [f"{city};{random.uniform(coldest_temp, hottest_temp):.1f}\n" for city in selection]
        file.write(''.join(batch))
del selection, batch
print(f"It took {time.time()-was} sec to write file")

  0%|          | 0/100000 [00:00<?, ?it/s]

100%|██████████| 100000/100000 [15:40<00:00, 106.29it/s]


It takes 941.1888129711151 sec to write file


In [4]:
was = time.time()
cnt = 0
with open(text_path, "r") as f:
    while len(f.readline())>0 : cnt += 1
print(f"It took {time.time()-was} sec to read {cnt} lines.")    

It takes 391.10774302482605 sec to read 1000000000 lines.


### Решение задачи с помощью pandas

На моей машинке не очень много оперативной памяти и не удается решить задачу напрямую только методами pandas. Используется вариант кусочной обработки исходного файла:
- в память считывается кусок исходного файла в небольшой датафрейм,
- рассчитываются промежуточные статистики, и результат выгружается в список,
- список сливается в новый датафрейм,
- для итогового датафрейма определяются окончательные статистики. 

Для ускорения работы используем распараллеливание с помощью joblib. 

In [7]:
chunk_size  = 2_000_000


In [16]:
def process_chunk(chunk):
    res = chunk.groupby("City").aggregate({'Temp':['min','sum', 'count','max']}).reset_index()
    res.columns = ['_'.join(c) if len(c[1])>0 else c[0] for c in res.columns]
    return res

In [None]:
num_cpu = cpu_count()
print(f"Using {num_cpu} cpu cores")
multi_cpu = Parallel(n_jobs=num_cpu)

was = time.time()

interm = multi_cpu(delayed(process_chunk)(chunk) for chunk in pd.read_csv(text_path, sep=';', header=None, names=['City','Temp'],
                  chunksize=chunk_size)) 

r2 = (pd.concat(interm).groupby("City")
      .aggregate({'Temp_min':'min','Temp_sum':'sum', 'Temp_count':'count', 'Temp_max':'max'}).reset_index()
      .assign(Temp_mean= lambda x: x.Temp_sum / x.Temp_count)
      .drop(columns=['Temp_sum', 'Temp_count']))
del multi_cpu, interm
print(f"It took {time.time()-was} sec")
r2.head()


Удивительным образом, получилось довольно много - 420 секунд на чтение и обработку входного файла.  Похоже, что основным ограничителем здесь выступает процесс чтения куска данных методом `read_csv`.

### Решение задачи с помощью polars

Polars - актуальная библиотека для обработки больших массивов данных, написанная целиком на Rust. Ее отличительной особенностью является ленивая обработка входного файла, которая осуществляется параллельно, и только по специальной команде. 
     
Обработка файла большого объема требует использования двух нюансов: 
- необходимо в явном виде указать размер куска данных, обрабатываемого единовременно, так как встроенная формула слишком оптимистична;
- необходимо в явном виде указать кусочную обработку запросов, так как по умолчанию polars стремится прочесть все данные в память и делать запрос один раз, что приводит к проблемам памяти. 

В разработке решения мне помог https://github.com/lvgalvao/One-Billion-Row-Challenge-Python 

In [8]:
#Фиксирование размера одновременно обрабатываемого куска данных
pl.Config.set_streaming_chunk_size(chunk_size)  

was = time.time()
res = (pl.scan_csv(text_path, separator=';', has_header=False, new_columns=["City", "Temp"],  
                  schema={"City": pl.String, "Temp": pl.Float64}, low_memory=True)
                  .group_by('City')
                  .agg(pl.col('Temp').min().name.suffix('_min'), pl.col('Temp').mean().name.suffix('_mean'), pl.col('Temp').max().name.suffix('_max'))
                  .sort('City')
                  .collect(streaming=True))  #кусочная обработка!
print(f"It took {time.time()-was} sec")
res.head(5)


It takes 81.5114197731018 sec


City,Temp_min,Temp_mean,Temp_max
str,f64,f64,f64
"""A Yun Pa""",-99.9,-0.10654,99.9
"""Aabenraa""",-99.9,-0.32684,99.9
"""Aarau""",-99.9,-0.104795,99.9
"""Abadla""",-99.9,0.004239,99.9
"""Abadou""",-99.9,0.033001,99.9


Чтение и обработка файла c помощью polars заняли намного меньше времени - 82 сек.  Выигрыш примерно в 6 раз.
Но polars очень чувствителен к формату данных. У меня была версия кода, генерировавшая файл с пробелом перед окончанием строки. По непонятной причине polars не читал такой файл кусками и была проблема с работоспособностью кода. 


### Обработка данных с помощью DuckDB

DuckDB - это еще одна популярная библиотека для выполнения аналитических расчетов. Она позволяет обращаться к различным источникам данных для проведения операций аналитической свертки и группировки. В рамках этого анализа попробую использовать это решение сначала на текстовых данных. В разработке решения мне помог https://github.com/lvgalvao/One-Billion-Row-Challenge-Python 

In [31]:
query = """
        SELECT city,
            MIN(temperature) AS min_temperature,
            CAST(AVG(temperature) AS DECIMAL(3,1)) AS mean_temperature,
            MAX(temperature) AS max_temperature
        FROM read_csv("/Volumes/Samsung_T5/Data/One Billion Row Challenge/data/measur.txt", 
                    AUTO_DETECT=FALSE, sep=';', columns={'city':VARCHAR, 'temperature': 'DECIMAL(3,1)'})
        GROUP BY city
        ORDER BY city
"""
was = time.time()
df = duckdb.sql(query).df()
print(f"It took {time.time()-was} sec")
df.head()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

It took 48.0512969493866 sec


Unnamed: 0,city,min_temperature,mean_temperature,max_temperature
0,A Yun Pa,-99.9,-0.1,99.9
1,Aabenraa,-99.9,-0.3,99.9
2,Aarau,-99.9,-0.1,99.9
3,Abadla,-99.9,0.0,99.9
4,Abadou,-99.9,0.0,99.9


Обработка данных с помощью DuckDB заняла в случае текстового файла 48 секунд.  Это рекорд для текстовых файлов.


Обычный челендж на этом заканчивается.  Но у меня возникла идея попробовать все то же самое с помощью файлов parquet

## Обработка файлов parquet

### Генерация файла parquet c помощью fastparquet

Для генерации файла parquet можно использовать две библиотеки - fastparquet и pyarrow. Я сравнил оба варианта. Первоначальный вариант был на основе fastparquet. В результате нескольких итераций я решил генерировать куски (датафреймы) достаточно большого размера и дописывать их в один и тот же файл. Для ускорения генерации температур использован numpy. 


In [6]:
import fastparquet as fp
import numpy as np

In [10]:
was = time.time()
double_range = hottest_temp - coldest_temp
batch_size = 10_000_000
num_chunks = num_rows_to_create // batch_size

for i in tqdm(range(num_chunks)): #tqdm(range(10)): 
    selection = random.choices(weather_stations_10k, k=batch_size)
    temp = np.random.rand(batch_size) * double_range
    temp = temp + coldest_temp
    #for _ in range(batch_size):
    #    temp.append(random.uniform(coldest_temp, hottest_temp))
    df = pd.DataFrame({"City":selection, "Temp":np.around(temp, decimals=1)})
    if i < 1: fp.write(parq_path, df, append=False)
    else: fp.write(parq_path, df, append=True)
print(f"It took {time.time()-was} sec")
del selection, temp
pf = fp.ParquetFile(parq_path)
print(f"Total {pf.count()} lines and {len(pf.row_groups)} row groups in file")

100%|██████████| 100/100 [08:40<00:00,  5.21s/it]


Генерация файла данных с помомощью fast_parquet заняла в два раза меньше времени, но его объем получился больше чем у текстового файла - 21.3 GB. Вероятно есть еще какие-то тонкости. Внутри файла есть 100 групп строк и можно попробовать итерировать прямо по этим "естественным" группам строк. 

### Обработка fast parquet в pandas

Pandas из коробки допускает только чтение parquet файла целиком. Вероятно предполагается, что опытные пользователи используют возможности fast_parquet для итеративного чтения и последующей обработки файла.  В данном случае я решил не использовать многопроцессорной обработки. 

In [18]:
was = time.time()
pf = fp.ParquetFile(parq_path)
num_rg = len(pf.row_groups)
interm = []
for df in tqdm(pf.iter_row_groups(), total=num_rg):
    interm.append(process_chunk(df))

r2 = (pd.concat(interm).groupby("City")
      .aggregate({'Temp_min':'min','Temp_sum':'sum', 'Temp_count':'count', 'Temp_max':'max'}).reset_index()
      .assign(Temp_mean= lambda x: x.Temp_sum / x.Temp_count)
      .drop(columns=['Temp_sum', 'Temp_count']))
del interm
print(f"It took {time.time()-was} sec")
r2.head()

100%|██████████| 100/100 [06:28<00:00,  3.88s/it]


It takes 389.27612805366516 sec


Unnamed: 0,City,Temp_min,Temp_max,Temp_mean
0,A Yun Pa,-99.9,99.9,-33.057
1,Aabenraa,-99.9,99.9,177.683
2,Aalst,-99.9,99.9,-28.278
3,Aalten,-99.9,99.9,50.917
4,Abaji,-99.9,99.9,-129.605


Вариант с чтением и обработкой большого parquet файла занял 389 сек. 

### Генерация файла в pyarrow

Основная библиотека для работы с parquet - pyarrow. В данном случае использован тот же самый алгоритм генерации данных.

In [19]:
import pyarrow as pa, pyarrow.parquet as pq

double_range = hottest_temp - coldest_temp
batch_size = 10_000_000
num_chunks = num_rows_to_create // batch_size
parq_path = "/Volumes/Samsung_T5/Data/One Billion Row Challenge/data/measur2.parquet"
pqwriter = None

was = time.time()

for i in tqdm(range(num_chunks)): #tqdm(range(10)): 
    selection = random.choices(weather_stations_10k, k=batch_size)
    temp = np.random.rand(batch_size) * double_range
    temp = temp + coldest_temp
    table = pa.Table.from_pandas(pd.DataFrame({"City":selection, "Temp":np.around(temp, decimals=1)}))
    if i < 1: pqwriter = pq.ParquetWriter(parq_path, table.schema)            
    pqwriter.write_table(table)

# close the parquet writer
if pqwriter: pqwriter.close()
        
print(f"It took {time.time()-was} sec")
del selection, temp, table
pf = fp.ParquetFile(parq_path)
print(f"Total {pf.count()} lines and {len(pf.row_groups)} row groups in file")

100%|██████████| 100/100 [06:51<00:00,  4.11s/it]

It takes 411.12199091911316 sec
Total 1000000000 lines and 1000 row groups in file





Библиотека pyarrow работает чуть быстрее: 411 секунд вместо 520 секунд, и главное, формирует файл размером в семь раз меньше - 3.23 GB вместо 21.4 GB. 

Еще одна важная особенность - размер группы строк. Библиотека pyarrow использует размер 2**20 для формирования групп строк и каждая крайняя группа занимает меньше места. 

### Обработка pyarrow parquet в pandas

В данном случае читается группа строк в собственный формат pyarrow, а затем она конвертируется в DataFrame. В остальном алгоритм не отличается от реализованного выше. 

In [24]:
was = time.time()
parquet_file = pq.ParquetFile(parq_path)
interm = []
for i in tqdm(range(parquet_file.num_row_groups)):
    interm.append(process_chunk(parquet_file.read_row_group(i).to_pandas()))
parquet_file.close()
r2 = (pd.concat(interm).groupby("City")
      .aggregate({'Temp_min':'min','Temp_sum':'sum', 'Temp_count':'count', 'Temp_max':'max'}).reset_index()
      .assign(Temp_mean= lambda x: x.Temp_sum / x.Temp_count)
      .drop(columns=['Temp_sum', 'Temp_count']))
del interm
print(f"It took {time.time()-was} sec")
r2.head()
    

100%|██████████| 1000/1000 [03:53<00:00,  4.28it/s]


It takes 237.1843011379242 sec


Unnamed: 0,City,Temp_min,Temp_max,Temp_mean
0,A Yun Pa,-99.9,99.9,-9.8999
1,Aabenraa,-99.9,99.9,14.2728
2,Aalst,-99.9,99.9,16.7945
3,Aalten,-99.9,99.9,42.8011
4,Abaji,-99.9,99.9,6.7245


Результат в 237 секунд при чтении меньшего по объему файла порадовал.  Возникло желание попробовать сделать обработку на нескольких cpu с помощью joblib, как это было опробовано выше. 

In [25]:
was = time.time()
parquet_file = pq.ParquetFile(parq_path)
num_cpu = cpu_count()
print(f"Using {num_cpu} cpu cores")
multi_cpu = Parallel(n_jobs=num_cpu)

interm = multi_cpu(delayed(process_chunk)(parquet_file.read_row_group(i).to_pandas()) for i in range(parquet_file.num_row_groups))

parquet_file.close()

r2 = (pd.concat(interm).groupby("City")
      .aggregate({'Temp_min':'min','Temp_sum':'sum', 'Temp_count':'count', 'Temp_max':'max'}).reset_index()
      .assign(Temp_mean= lambda x: x.Temp_sum / x.Temp_count)
      .drop(columns=['Temp_sum', 'Temp_count']))
del interm
print(f"It took {time.time()-was} sec")
r2.head()
    

Using 8 cpu cores
It takes 171.3538670539856 sec


Unnamed: 0,City,Temp_min,Temp_max,Temp_mean
0,A Yun Pa,-99.9,99.9,-9.8999
1,Aabenraa,-99.9,99.9,14.2728
2,Aalst,-99.9,99.9,16.7945
3,Aalten,-99.9,99.9,42.8011
4,Abaji,-99.9,99.9,6.7245


Ускорение до 171 секунды порадовало. Но результату все равно далеко до polars на текстовых данных. 

### Обработка parquet на polars

In [27]:
#Фиксирование размера одновременно обрабатываемого куска данных
chunk_size  = 2_000_000
pl.Config.set_streaming_chunk_size(chunk_size)  

was = time.time()
res = (pl.scan_parquet(parq_path)
                  .group_by('City')
                  .agg(pl.col('Temp').min().name.suffix('_min'), pl.col('Temp').mean().name.suffix('_mean'), pl.col('Temp').max().name.suffix('_max'))
                  .sort('City')
                  .collect(streaming=True))
print(f"It took {time.time()-was} sec")
res.head(5)

It takes 39.408097982406616 sec


City,Temp_min,Temp_mean,Temp_max
str,f64,f64,f64
"""A Yun Pa""",-99.9,-0.099345,99.9
"""Aabenraa""",-99.9,0.071339,99.9
"""Aalst""",-99.9,0.168172,99.9
"""Aalten""",-99.9,0.429204,99.9
"""Abaji""",-99.9,0.033718,99.9


Этот результат в 39.4 секунды почти вне конкуренции. 

NB: В процессе работы над этим блокнотом я пробовал использовать polars для записи файла. Polars сделал файл размером 17.7 GB и при этом затем его обрабатывал в течение 1000 секунд.  Вероятно имеет смысл сохранять файлы из polars с преобразованием в pandas, а затем pyarrow table. 

### Обработка файла parquet в DuckDB

Попробуем запустить DuckDB поверх файла parquet.

In [33]:
query = """
        SELECT city,
            MIN(Temp) AS min_temp,
            CAST(AVG(Temp) AS DECIMAL(3,1)) AS mean_temp,
            MAX(Temp) AS max_temp
        FROM "/Volumes/Samsung_T5/Data/One Billion Row Challenge/data/measur2.parquet"
        GROUP BY city
        ORDER BY city
"""
was = time.time()
df = duckdb.sql(query).df()
print(f"It took {time.time()-was} sec")
df.head()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

It took 26.14646887779236 sec


Unnamed: 0,City,min_temp,mean_temp,max_temp
0,A Yun Pa,-99.9,-0.1,99.9
1,Aabenraa,-99.9,0.1,99.9
2,Aalst,-99.9,0.2,99.9
3,Aalten,-99.9,0.4,99.9
4,Abaji,-99.9,0.0,99.9


Результат в 26 секунд выглядит ошеломительным. 

## Итоги

- Для работы с файлами большого объема желательно использовать parquet, причем желательно сохранять естественный формат данных pyarrow с помощью библиотеки pyarrow.  Fastparquet больше не fast. 

- Файл parquet допускает дозапись данных. Можно большой массив данных, например продажи всех товаров розничной сети за несколько лет, записать в виде файла parquet, а затем и работать уже с ним.

- Для быстрого знакомства с csv файлом имеет смысл читать его с помощью duckdb. 

- Pandas может работать с большими данными при использовании кусочной обработки, для которой возможно понадобится использовать стороннюю библиотеку.  Распараллеливание (например с помощью joblib) возможно, но сильно большого выигрыша не дает - последовательное чтение файла является узким горлом. 

- Еще одна проблема обработки больших файлов на pandas - большое количество избыточного кода. Мне понадобилось 21 строка для кодирования всего процесса и это формирует большое поле для ошибки. 

- Polars работает быстрее чем pandas и на текстовых данных и на parquet файле. Ускорение составило 6 и 4 раза на разных входных форматах. Код уложился в 10 строк (сокращение в 2 раза) и намного более читаемый.  

- DuckDB оказалась самым быстрым решением и для текстовых данных и для бинарных в формате parquet. Размер программы примерно сопоставим с polars, но использует два языка - язык запросов SQL и код Python. Выгода в том, что не надо запоминать Polars API -- надо практиковаться в SQL запросах. 

- Сравнительного графика времен не будет - я "покупаю", а не "продаю".