In [None]:
from dask import dataframe as dd
from datetime import datetime
import pandas as pd

In [None]:
def parse_datetime(dt):
    date_format = '%Y-%m-%d %H:%M:%S.%f'
    try:
        dt = datetime.strptime(dt[:-4], date_format).timestamp()
    except ValueError:
        dt = datetime.strptime(dt[:-4], date_format[:-3]).timestamp()

    dt *= 1000.0
    dt = int(dt)
    dt -= 1648806250315

    return dt

In [None]:
def parse_timestamp(timestamp):
    date_format = '%Y-%m-%d %H'

    timestamp /= 1000
    timestamp = int(timestamp)
    timestamp += 1648806250

    dt = datetime.fromtimestamp(timestamp).strftime(date_format)
    dt = datetime.strptime(dt, date_format)

    return dt

In [None]:
def parse_color(pixel_color):
    colors_dict = {
        '#000000': 0,
        '#00756F': 1,
        '#009EAA': 2,
        '#00A368': 3,
        '#00CC78': 4,
        '#00CCC0': 5,
        '#2450A4': 6,
        '#3690EA': 7,
        '#493AC1': 8,
        '#515252': 9,
        '#51E9F4': 10,
        '#6A5CFF': 11,
        '#6D001A': 12,
        '#6D482F': 13,
        '#7EED56': 14,
        '#811E9F': 15,
        '#898D90': 16,
        '#94B3FF': 17,
        '#9C6926': 18,
        '#B44AC0': 19,
        '#BE0039': 20,
        '#D4D7D9': 21,
        '#DE107F': 22,
        '#E4ABFF': 23,
        '#FF3881': 24,
        '#FF4500': 25,
        '#FF99AA': 26,
        '#FFA800': 27,
        '#FFB470': 28,
        '#FFD635': 29,
        '#FFF8B8': 30,
        '#FFFFFF': 31,
    }

    if type(pixel_color) is str:
        return colors_dict[pixel_color]
    else:
        return [key for key, value in colors_dict.items() if value == pixel_color][0]


## Оптимизиране на оригиналната информация. Премахване на колоната user_id

In [None]:
ddf_core = dd.read_csv('data\\2022_place_canvas_history.csv', converters={'timestamp': parse_datetime, 'pixel_color': parse_color})

ddf_core.astype({'timestamp': 'uint32'}).dtypes
ddf_core.astype({'pixel_color': 'uint8'}).dtypes

ddf_core = ddf_core.drop('user_id', axis=1)
ddf_core = ddf_core.sort_values('timestamp')

ddf_core.head()

In [None]:
ddf_core.to_parquet('data\\data_core')

## Експортиране на колоната user_id в отделен файл

In [None]:
ddf_users = dd.read_csv('data\\2022_place_canvas_history.csv')

ddf_users = ddf_users.drop(['timestamp','pixel_color','coordinate'], axis=1)

ddf_users.head()

In [None]:
ddf_users.to_parquet('data\\data_users')

## Завъртане на pixel_color колоната според datetime

In [None]:
ddf_colors_pivot = dd.read_parquet('data\\data_core')

ddf_colors_pivot['timestamp'] = ddf_colors_pivot['timestamp'].map(parse_timestamp)
ddf_colors_pivot['pixel_color'] = ddf_colors_pivot['pixel_color'].map(parse_color)

ddf_colors_pivot = ddf_colors_pivot.drop('coordinate', axis=1)
ddf_colors_pivot = ddf_colors_pivot.rename(columns={'timestamp': 'datetime'})
ddf_colors_pivot['index'] = ddf_colors_pivot.reset_index().index

ddf_colors_pivot.head()

In [None]:
ddf_colors_pivot['pixel_color'] = ddf_colors_pivot['pixel_color'].astype('category')
ddf_colors_pivot['pixel_color'] = ddf_colors_pivot.pixel_color.cat.as_known()

ddf_colors_pivot.dtypes

In [None]:
ddf_colors_pivot = ddf_colors_pivot.pivot_table(index='datetime', columns='pixel_color', values='pixel_color', aggfunc='count')

ddf_colors_pivot.head()

In [None]:
for column in ddf_colors_pivot.columns:
    ddf_colors_pivot[column] = ddf_colors_pivot[column].astype('uint32')

ddf_colors_pivot.dtypes

In [None]:
ddf_colors_pivot.to_parquet('data\\data_hourly-colors-count')

## Изнасяне на колоната datetime в отделен файл + брой пиксели поставени за всеки час

In [None]:
ddf_date = dd.read_parquet('data\\data_core')

ddf_date['timestamp'] = ddf_date['timestamp'].map(parse_timestamp)

ddf_date = ddf_date.drop({'pixel_color', 'coordinate'}, axis=1)
ddf_date = ddf_date.rename(columns={'timestamp': 'datetime'})

ddf_date.head()

In [None]:
ddf_date['datetime'] = dd.to_datetime(ddf_date['datetime'])

ddf_date.dtypes

In [None]:
ddf_date = ddf_date.datetime.value_counts('datetime').compute()

ddf_date.head()

In [None]:
ddf_date = ddf_date.reset_index(name='num_pixels').sort_values('index').set_index('index')
ddf_date = ddf_date.reset_index()
ddf_date = ddf_date.rename(columns={'index': 'datetime'})

ddf_date.head()

In [None]:
ddf_date.to_parquet('data\\data_hourly-pixels')

## Завъртане на datetime колоната според coordinate, намиране на средния брой поставени пиксели за всяка точка за всеки час

In [None]:
#ddf_coords_pivot = dd.read_csv('data\\data_core')
#ddf_coords_pivot['timestamp'] = ddf_coords_pivot['timestamp'].map(parse_timestamp)

#ddf_coords_pivot = ddf_coords_pivot.rename(columns={'timestamp': 'datetime'})
#ddf_coords_pivot = ddf_coords_pivot.drop('pixel_color', axis=1)
#ddf_coords_pivot['index'] = ddf_coords_pivot.index

#ddf_coords_pivot['datetime'] = ddf_coords_pivot['datetime'].astype('category')
#ddf_coords_pivot['datetime'] = ddf_coords_pivot.datetime.cat.as_known()

#ddf_coords_pivot = ddf_coords_pivot.repartition(npartitions=120)

#ddf_coords_pivot = ddf_coords_pivot.pivot_table(index='coordinate', columns='datetime', values='coordinate', aggfunc='count')

#ddf_coords_pivot.to_parquet('data\\data_hourly-pixels-coords')

## Разделяне на X и Y координати в отделни колони в основния файл

In [None]:
def split_coords_points(ddf_point):
    ddf_point['coordinate'] = ddf_point['coordinate'].apply(lambda x: x.split(','))
    
    ddf_point['x'] = ddf_point['coordinate'].apply(lambda x: x[0]).astype('uint16')
    ddf_point['y'] = ddf_point['coordinate'].apply(lambda x: x[1]).astype('uint16')

    ddf_point = ddf_point.drop('coordinate', axis=1)

    return ddf_point

In [None]:
def split_coords_rectangles(ddf_rectangle):
    ddf_rectangle['coordinate'] = ddf_rectangle['coordinate'].apply(lambda x: [int(c) for c in x.split(',')], meta='int')
    pts_from_recs = pd.DataFrame(columns=['timestamp', 'pixel_color', 'x', 'y'])

    for rect in ddf_rectangle.itertuples():
        x1, y1, x2, y2 = rect.coordinate
        width = x2 - x1 + 1
        height = y2 - y1 + 1

        for i in range(width):
            for j in range(height):
                x = x1 + i
                y = y1 + j

                pts_from_recs.loc[len(pts_from_recs)] = [
                    rect.timestamp,
                    rect.pixel_color,
                    x,
                    y,
                ]

    ddf_rectangle = dd.from_pandas(pts_from_recs, npartitions=4)

    ddf_rectangle['timestamp'] = ddf_rectangle['timestamp'].astype('uint32')
    ddf_rectangle['pixel_color'] = ddf_rectangle['pixel_color'].astype('uint8')
    ddf_rectangle['x'] = ddf_rectangle['x'].astype('uint16')
    ddf_rectangle['y'] = ddf_rectangle['y'].astype('uint16')

    return ddf_rectangle

In [None]:
ddf_commas = dd.read_parquet('data\\data_core', dtype={'timestamp': 'uint32', 'pixel_color': 'uint8'})

ddf_commas['comma_count'] = ddf_commas.coordinate.str.count(',')

ddf_commas.head()

In [None]:
ddf_single = ddf_commas[ddf_commas['comma_count'] == 1]
ddf_single = ddf_single.drop('comma_count', axis=1)

ddf_rect = ddf_commas[ddf_commas['comma_count'] > 1]
ddf_rect = ddf_rect.drop('comma_count', axis=1)

In [None]:
ddf_single = split_coords_points(ddf_single)

ddf_single.head()

In [None]:
ddf_rect = split_coords_rectangles(ddf_rect)

ddf_rect.head()

In [None]:
ddf_split = dd.concat([ddf_single, ddf_rect], axis=0, interleave_partitions=True)
ddf_split = ddf_split.sort_values('timestamp')

ddf_split.head()

In [None]:
ddf_split.to_parquet('data\\data_split-coords')