In [None]:
# Website at https://parquet.apache.org/
"""
29-4-2022
Apache Parquet är ett open source, kolumn orienterat file format som är designat för data förvaring och hämtning.
Den har effektiv datakompression och kodning scheman med förbättrad prestanda att hantera komplex data i omgångar.
 
Parquet finns tillgänglig till språken Java, C++, Python med mera.

Fördelar parquet:
    - Parquet hållet metadata som är snabbt tillgängligt

Parquets kompression är baserat på kolumnernas värden. Om en kolumn har många unika värden,
t.ex. användar ID så har kolumnen hög kordinalitet. Om kolumnen har få unika värden t.ex.
land då har kolumnen låg kordinalitet. Låg kordinalitet leder till bättre kompression
med parquet för den kolumen. ([1]https://towardsdatascience.com/apache-arrow-read-dataframe-with-zero-memory-69634092b1a)

[1] Pratar ingående med exempel på hur man läser stora filer.

Man kan filtrera datasettet innan man laddar in det i minnet, med hjälp av 
parquets metadata som blandannat håller vilka kolumner den har, kan man välja
att endast ladda in dessa kolumner och därför spara minne och cpu cyklar.
"""

"""
https://arrow.apache.org/docs/python/getstarted.html
Apache arrow är en kombination av verktyg för att jobba med olika filtyper inom bigdata.
"""
 
"""
För att utföra testerna skulle man kunna generera skräpdata utan någon mening.
Jag väljer att använda riktiga dataset för att få en bra representation hur resultaten ser ut i verkligheten.
Två datasets kommer att användas, ett smalt dataset med få kolumner och ett brett dataset med många kolumner.
 
Formatet smalt mot brett är för att täcka det fallen där ett filformat kanske är bättre på att hantera
få antal kolumner mot många, vice versa.
 
Official Reddit r/place Dataset CSV:
https://www.kaggle.com/datasets/antoinecarpentier/redditrplacecsv

Reddits dataset är på 
 
Pratar om benchmarks
https://www.slideshare.net/HadoopSummit/file-format-benchmark-avro-json-orc-and-parquet

Github Logs:


Skillnaded mellan Parquet och Feather.
https://stackoverflow.com/questions/48083405/what-are-the-differences-between-feather-and-parquet
Går igenom timeit också.

timeit för att mäta tid på funktioner.
https://stackoverflow.com/a/55239060
from timeit import default_timer as timer
start = timer()
# Do something
end = timer()
time_in_seconds = end - start


Försökte att använda %%timeit på funktionerna
https://docs.python.org/3/library/timeit.html
"By default, timeit() temporarily turns off garbage collection during the timing."
går att fixa med:
 - timeit.Timer('for i in range(10): oct(i)', 'gc.enable()').timeit()


Steam Reviews Dataset: https://www.kaggle.com/datasets/najzeko/steam-reviews-2021
 - 21 Miljoner användarrecensioner av 300 spel på spelplattformen Steam.
 - 21 Miljoner rader
 - 23 Kolumner
 - 8.17 GB Stor
 
"""

"""
Jag väljer till att börja med att endast köra lokalt utan distibruerade filsystem, med hadoop och spark.
Detta leder till att jag begränas på hur mycket ram-minne jag har på dator. Mitt nuvarande system har 64 GB
i ramminne, vilket leder till att datasetten jag läser in för inte överstiga denna nivån. Man kan eventuellt läsa
filen i omgångar, får kolla närmare på detta.

"""
 
"""
Avgränsningar
Jag väljer att inte gå in på djupet på filformatens arkitekturer, då endast en
djupgående genomgång av en fil förtjänar en uppsats för sig själv.
"""

"""
Huvudsakliga testerna som kommer att genomföras är följande:
 - Läsning av fil (Mäta hur lång tid detta tar).
 - Skrivning av fil (Mäta hur lång tid detta tar).
 - Kompression (Hur mycket plats sparar man vid kompression för filformatet)
 - Hur mycket belaster filen minnet vid läsning (Minnesfotavtryck).
    + Memory Profiler @ pip

Testning
Parquet vs CSV
Kör med hemmagjord time funktionen för att mäta tid för funktionerna,
Konstigt beteende efter flera inläsningar, parquet 18s inläsning i nystart av vscode,
efter ett tag tar det 40s varje gång man läser in parquet filen.


Läsning:
1. Reddit antalet unika användare som deltog i eventet.
2. Populärast färgen
"""

"""
Arrow filformat 'memory-mapped' [1] tar knappt någon plats.

"""


In [34]:
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.json as json
import pyarrow.parquet as parquet
import pyarrow.orc as orc
from pyarrow import feather
from datetime import timedelta
import timeit
from timeit import default_timer as timer
import time
import gc
#days = pa.array([1, 12, 17, 23, 28], type=pa.int8())
file = None


def time_func(func, *args, **kwargs):
    start = timer()
    print(f'Starting {func.__name__}: [{start}]')
    result = func(*args, **kwargs)
    end = timer()
    print(f'Done {func.__name__}: [{end}]')
    total_in_seconds = end - start
    print(f'Total time: {timedelta(seconds=total_in_seconds)}')

    return result, total_in_seconds


reddit_place = "2022_place_canvas_history"
reddit_original_path = f'data/original/{reddit_place}.csv'
reddit_parquet_path = f'data/raw/parquet/{reddit_place}.parquet'
reddit_partitioned_path = f'data/partitioned/reddit'

In [None]:
# Getting raw data and transform to different formats
# Manually making sure that file is deleted from memory if rerunning cell.
if file:
    del file
    # Garbage collecion: https://docs.python.org/3/library/gc.html
    gc.collect()

    print('Clearing memory')
    time.sleep(2)


Clearing memory


In [None]:
file, t = time_func(
    csv.read_csv,
    reddit_original_path
)

Starting read_csv: [14360.043166249]
Done read_csv: [14398.871795605]
Total time: 0:00:38.828629


In [None]:
# We have data in pyarrow table, so we can save it to parquet.
result, time_seconds = time_func(
    parquet.write_table, file, reddit_parquet_path
)

Starting write_table: [12786.182915386]
Done write_table: [12829.513975965]
Total time: 0:00:43.331061


In [3]:
arrow_table, t = time_func(
    parquet.read_table,
    reddit_parquet_path
)

Starting read_table: [101217.530706251]
Done read_table: [101258.00013417]
Total time: 0:00:40.469428


In [4]:
arrow_df = arrow_table.to_pandas()
arrow_df.head()

Unnamed: 0,timestamp,user_id,pixel_color,coordinate
0,2022-04-04 00:53:51.577 UTC,ovTZk4GyTS1mDQnTbV+vDOCu1f+u6w+CkIZ6445vD4XN8a...,#00CCC0,8261048
1,2022-04-04 00:53:53.758 UTC,6NSgFa1CvIPly1VniNhlbrmoN3vgDFbMSKqh+c4TTfrr3d...,#94B3FF,5831031
2,2022-04-04 00:53:54.685 UTC,O5Oityp3Z3owzTuwM9XnMggpLcqKEumsOMKGhRiDTTImWb...,#6A5CFF,1873558
3,2022-04-04 00:54:57.541 UTC,tc273UiqS0wKa6VwiOs/iz/t4LyPYrhL2Q347awn11IQQE...,#009EAA,1627255
4,2022-04-04 00:55:16.307 UTC,OOWsU/HLb4UUkQwclDeXFtsJTOXMlAdNHiRpFA1Qk+SxUr...,#94B3FF,491478


In [5]:
arrow_df.shape

(160353104, 4)

In [17]:
import pandas as pd
arrow_df['conv_timestamp'] = pd.to_datetime(arrow_df['timestamp'])
#first_day = arrow_df['timestamp'].astype() < pd.Timestamp(2022, 4, 2)

In [6]:
a = arrow_df.groupby('coordinate')
a.head()

Unnamed: 0,timestamp,user_id,pixel_color,coordinate
0,2022-04-04 00:53:51.577 UTC,ovTZk4GyTS1mDQnTbV+vDOCu1f+u6w+CkIZ6445vD4XN8a...,#00CCC0,8261048
1,2022-04-04 00:53:53.758 UTC,6NSgFa1CvIPly1VniNhlbrmoN3vgDFbMSKqh+c4TTfrr3d...,#94B3FF,5831031
2,2022-04-04 00:53:54.685 UTC,O5Oityp3Z3owzTuwM9XnMggpLcqKEumsOMKGhRiDTTImWb...,#6A5CFF,1873558
3,2022-04-04 00:54:57.541 UTC,tc273UiqS0wKa6VwiOs/iz/t4LyPYrhL2Q347awn11IQQE...,#009EAA,1627255
4,2022-04-04 00:55:16.307 UTC,OOWsU/HLb4UUkQwclDeXFtsJTOXMlAdNHiRpFA1Qk+SxUr...,#94B3FF,491478
...,...,...,...,...
160353044,2022-04-05 00:13:59.177 UTC,NezxFMEgIDUQg9hW34kKrLPJ3AEKAR1Mh+2U/HxsGlslDy...,#FFFFFF,11661102
160353048,2022-04-05 00:13:59.233 UTC,uHV1sP1YB2WyDljN/Iy9nRR/+DKMa68kPQoNoeAzTSJy4/...,#FFFFFF,16721453
160353062,2022-04-05 00:13:59.418 UTC,LEtYgmofohe4IbF8fascHb//jOh3qsCcBkRLgf+xiW8OQW...,#FFFFFF,9641170
160353083,2022-04-05 00:13:59.709 UTC,h1PHSmLb9lHzIx3gniYiVxcIkxEjs4GhjD3M5JLGyX4XgH...,#FFFFFF,11861097


In [18]:
arrow_df.head()

Unnamed: 0,timestamp,user_id,pixel_color,coordinate,conv_timestamp
0,2022-04-04 00:53:51.577 UTC,ovTZk4GyTS1mDQnTbV+vDOCu1f+u6w+CkIZ6445vD4XN8a...,#00CCC0,8261048,2022-04-04 00:53:51.577000+00:00
1,2022-04-04 00:53:53.758 UTC,6NSgFa1CvIPly1VniNhlbrmoN3vgDFbMSKqh+c4TTfrr3d...,#94B3FF,5831031,2022-04-04 00:53:53.758000+00:00
2,2022-04-04 00:53:54.685 UTC,O5Oityp3Z3owzTuwM9XnMggpLcqKEumsOMKGhRiDTTImWb...,#6A5CFF,1873558,2022-04-04 00:53:54.685000+00:00
3,2022-04-04 00:54:57.541 UTC,tc273UiqS0wKa6VwiOs/iz/t4LyPYrhL2Q347awn11IQQE...,#009EAA,1627255,2022-04-04 00:54:57.541000+00:00
4,2022-04-04 00:55:16.307 UTC,OOWsU/HLb4UUkQwclDeXFtsJTOXMlAdNHiRpFA1Qk+SxUr...,#94B3FF,491478,2022-04-04 00:55:16.307000+00:00


In [40]:
arrow_df.to_csv(f'data/small/csv/{reddit_place}.csv')

In [None]:
pa.Table.from_pandas(arrow_df)

In [70]:
filter_mask = arrow_df['conv_timestamp'] < pd.Timestamp(2022, 4, 1, 14).tz_localize('utc')

In [71]:
filter_mask.value_counts()

False    159804317
True        548787
Name: conv_timestamp, dtype: int64

In [72]:
filter_mask.value_counts()
small_df = arrow_df[filter_mask]

In [73]:
small_df.head()
small_table = pa.Table.from_pandas(small_df)

In [74]:
csv.write_csv(small_table, f'data/small/before14/{reddit_place}.csv')

In [15]:
a.count()

Unnamed: 0_level_0,timestamp,user_id,pixel_color
coordinate,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
00,98807,98807,98807
01,9592,9592,9592
010,307,307,307
0100,88,88,88
01000,192,192,192
...,...,...,...
999995,284,284,284
999996,372,372,372
999997,876,876,876
999998,2707,2707,2707


In [None]:
metadata, t = time_func(
    parquet.read_metadata,
    reddit_parquet_path
)
print('-' * 20)
column_names = metadata.schema.names
print(f'Column Names [{", ".join(column_names)}]')

Starting read_metadata: [16044.791414067]
Done read_metadata: [16044.791936784]
Total time: 0:00:00.000523
--------------------
Column Names [timestamp, user_id, pixel_color, coordinate]


In [None]:
parquet.write_to_dataset(
    arrow_table, 
    root_path=reddit_partitioned_path, 
    partition_cols=['pixel_color'], 
)

In [None]:
# Writing memory mapped arrow file.
with pa.OSFile('test.arrow', 'wb') as sink:
    with pa.RecordBatchFileWriter(sink, arrow_table.schema) as writer:
        writer.write_table(arrow_table)

In [2]:
source = pa.memory_map('test.arrow', 'r')
table = pa.ipc.RecordBatchFileReader(source).read_all().column("user_id")
df = table.to_pandas()
df.head()

#### We now have a small file we can handle.

In [4]:
before14 = f'data/small/before14/{reddit_place}'
day1 = f'data/small/day1/{reddit_place}'

In [8]:
small_table = csv.read_csv(f'{day1}.csv')

In [17]:
%%timeit
parquet.write_table(small_table, f'{day1}.parquet')

5.56 s ± 498 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [18]:
%%timeit
csv.write_csv(small_table, f'{day1}.csv2')

18.4 s ± 1.41 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [19]:
%%timeit
orc.write_table(small_table, f'{day1}.orc')

6.63 s ± 65.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [20]:
%%timeit
feather.write_feather(small_table, f'{day1}.feather')

4.2 s ± 230 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## All files saved in their respective formats.

In [54]:
%%timeit
df = parquet.read_table(f'{day1}.parquet').to_pandas()

9.83 s ± 228 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [55]:
%%timeit
df = csv.read_csv(f'{day1}.csv2').to_pandas()

8.45 s ± 121 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [56]:
%%timeit
df = orc.read_table(f'{day1}.orc').to_pandas()

9.02 s ± 77.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [57]:
%%timeit
df = feather.read_feather(f'{day1}.feather')

8 s ± 187 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [58]:
%%timeit
df = feather.read_table(f'{day1}.feather').to_pandas()

8.08 s ± 63.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [68]:
del file
gc.collect()

15

In [None]:
from pyarrow.json import json

In [39]:
from pympler import asizeof

@consumed
def read_table(file_name, file_type='csv'):
    if file_type == 'csv':
        table = csv.read_csv(file_name)
    elif file_type == 'parquet':
        table = parquet.read_table(file_name)
    elif file_type == 'orc':
        table = orc.read_table(file_name)
    elif file_type == 'feather':
        table = feather.read_table(file_name)

    return table

table = read_table(f'{day1}.parquet', file_type='parquet')
table = read_table(f'{day1}.orc', file_type='orc')
table = read_table(f'{day1}.csv2', file_type='csv')
table = read_table(f'{day1}.feather', file_type='feather')

read_table: Consumed: 3427.5390625 MB.
read_table: Consumed: 1973.90234375 MB.
read_table: Consumed: 4271.8984375 MB.
read_table: Consumed: 1962.13671875 MB.


In [38]:
import psutil
import os

def get_process_memory():
    current_pid = os.getpid()
    process = psutil.Process(current_pid)
    rss = process.memory_info().rss
    return rss


def consumed(func):
    def wrapper(*args, **kwargs):
        gc.collect()
        time.sleep(5)
        pre_mem = get_process_memory()
        result = func(*args, **kwargs)
        post_mem = get_process_memory()
        consumed_mem = post_mem - pre_mem
        consumed_mem_MB = consumed_mem / (1024 ** 2)
        print(f'{func.__name__}: Consumed: {consumed_mem_MB} MB.')
        return result
    return wrapper