In [58]:
import numpy as np
import pandas as pd

import dask.dataframe as dd
import dask.array as da
import dask.bag as db

In [89]:
chunksize = 10 ** 4
filepath = "data/2022_place_canvas_history.csv"
tfr = pd.read_csv(filepath, chunksize=chunksize, parse_dates=["timestamp"], infer_datetime_format=True, dtype={"coordinate": "str"})

ddf = dd.read_csv(filepath, blocksize="30MB")
ddf

Unnamed: 0_level_0,timestamp,user_id,pixel_color,coordinate
npartitions=723,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,object,object,object,object
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [90]:
part = ddf.partitions[0]
part["timestamp"] = dd.to_datetime(part["timestamp"])
part.sort_values(by=["timestamp"], ascending=True)
part.compute().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 222600 entries, 0 to 222599
Data columns (total 4 columns):
 #   Column       Non-Null Count   Dtype              
---  ------       --------------   -----              
 0   timestamp    222600 non-null  datetime64[ns, UTC]
 1   user_id      222600 non-null  object             
 2   pixel_color  222600 non-null  object             
 3   coordinate   222600 non-null  object             
dtypes: datetime64[ns, UTC](1), object(3)
memory usage: 6.8+ MB


In [61]:
chunk = tfr.get_chunk()
chunk.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype              
---  ------       --------------  -----              
 0   timestamp    10000 non-null  datetime64[ns, UTC]
 1   user_id      10000 non-null  object             
 2   pixel_color  10000 non-null  object             
 3   coordinate   10000 non-null  object             
dtypes: datetime64[ns, UTC](1), object(3)
memory usage: 312.6+ KB


In [62]:
mode = part["user_id"].mode().compute()
part.loc[part["user_id"] == mode[0]].sort_values(by=["timestamp"], ascending=True).compute()


Unnamed: 0,timestamp,user_id,pixel_color,coordinate
4916,2022-04-03 17:38:25.783000+00:00,6xVYRRm/5uIgQtZ6/3kVmYLwIFkWH5jgkNtAzCEMvmvf4G...,#FFFFFF,1654260
4919,2022-04-03 17:38:25.784000+00:00,6xVYRRm/5uIgQtZ6/3kVmYLwIFkWH5jgkNtAzCEMvmvf4G...,#898D90,1653246


In [63]:
# add x and y cols
x_and_y = part["coordinate"].compute().str.split(",", expand=True)
x_and_y.columns = ["x", "y"]
dd.merge(part, x_and_y, left_index=True, right_index=True, how="outer").drop(["coordinate"], axis=1).compute()

Unnamed: 0,timestamp,user_id,pixel_color,x,y
0,2022-04-03 17:38:20.021000+00:00,p0sXpmkcmg1KLiCdK5e4xKdudb1f8cjscGs35082sKpGBf...,#FF4500,371,488
1,2022-04-03 17:38:20.024000+00:00,Ctar52ln5JEpXT+tVVc8BtQwm1tPjRwPZmPvuamzsZDlFD...,#51E9F4,457,493
2,2022-04-03 17:38:20.025000+00:00,rNMF5wpFYT2RAItySLf9IcFZwOhczQhkRhmTD4gv0K78Dp...,#000000,65,986
3,2022-04-03 17:38:20.025000+00:00,u0a7l8hHVvncqYmav27EARAE6ciLtpUTPXMI33lDrUmtj5...,#3690EA,73,961
4,2022-04-03 17:38:20.026000+00:00,L8P+AXoFbbXPh2zBAkkXk96UrkKpB5hLq5gBMwvgSV0H4f...,#FF4500,1865,290
...,...,...,...,...,...
44501,2022-04-03 17:39:19.242000+00:00,AUWiszWswTQzMpDkcbJ66rSnvCgId5IbvOOAnl5a1XXbgL...,#000000,72,943
44502,2022-04-03 17:39:19.244000+00:00,r62PY4tOxh0Isn2Qsvp8wHKqXEyymjndc4sXpKsReiXPM/...,#3690EA,75,952
44503,2022-04-03 17:39:19.247000+00:00,KAqGxjzE+p+U0W9101wNStMwsnWcUVMZbAKgSi73uWXFW4...,#FFFFFF,1855,646
44504,2022-04-03 17:39:19.249000+00:00,hFkvaiVXEcESWo/hNpDzRZXucyBU7TUGbZHOAtBXiyGtyL...,#3690EA,51,943


In [64]:
# chunk_max = []
# for chunk in tfr:
#     idx_max = chunk["user_id"].value_counts().argmax()
#     chunk_max.append(chunk["user_id"].value_counts().iloc[[idx_max]])

pdf = pd.DataFrame(part.compute())
idx_max = pdf["user_id"].value_counts().argmax()
most_freq_user = pdf["user_id"].value_counts().iloc[[idx_max]]
most_freq_user

6xVYRRm/5uIgQtZ6/3kVmYLwIFkWH5jgkNtAzCEMvmvf4GqclqwvKtQMQP1aHKiR4bfbecBpLkYRH8Ofvufgsw==    2
Name: user_id, dtype: int64

In [101]:
timestamp_idx = 1
user_id_idx = 2

threshold = 360 # in seconds

count_dict = {}
variance_dict = {}
time_since_dict = {}
last_pixel_time_dict = {}
thing1 = []
thing2 = []

for row in part.itertuples():
    user_id = row[user_id_idx]
    if user_id in last_pixel_time_dict.keys():
        if user_id in time_since_dict.keys():
            time_since_last_pixel = row[timestamp_idx] - last_pixel_time_dict[row[user_id_idx]]
            variance = time_since_dict[user_id] - time_since_last_pixel
            thing1.append(variance.seconds)
            if variance.seconds < threshold:
                if user_id not in count_dict.keys():
                    count_dict[user_id] = 0
                count_dict[user_id] += 1
            time_since_dict[user_id] = time_since_last_pixel
        else:
            time_since_dict[user_id] = row[timestamp_idx] - last_pixel_time_dict[user_id]
    last_pixel_time_dict[user_id] = row[timestamp_idx]


In [102]:
count_dict

{'OyMFA0s63ws7hiPqKESzQwEPTejT4Lh1zMzKD9e6Wb/6VeMsmZHgKLS1ZPyilgoq1F2mImTtwtNdMvUNJrb6vA==': 1}

In [103]:
time_since_dict

{'OyMFA0s63ws7hiPqKESzQwEPTejT4Lh1zMzKD9e6Wb/6VeMsmZHgKLS1ZPyilgoq1F2mImTtwtNdMvUNJrb6vA==': Timedelta('0 days 00:00:00.002000'),
 '6xVYRRm/5uIgQtZ6/3kVmYLwIFkWH5jgkNtAzCEMvmvf4GqclqwvKtQMQP1aHKiR4bfbecBpLkYRH8Ofvufgsw==': Timedelta('0 days 00:05:01.087000'),
 'BNOnkE1j28Tmy08CUKK9KlGjv7wxR0PrVkb5ppx4NyIRa9+Xs3X4jTV68ffQhN3IdaOnXRF4BOctVvSakOz37A==': Timedelta('0 days 00:05:05.482000'),
 '1FiF/OZl8Ry0ZBfXYTf4po24JUfV/boqCSGJ4Y8vgJIVhLcyBQFzGqjaY0imiWeRlxfU5/EZMJRgHvX/t3X5hw==': Timedelta('0 days 00:00:00.002000'),
 'FWKJw/bDGGeKOhFUOWkDPSOLW8fYKVpHNqaVAh6q4VDi16wga6hKlJ5rgTZItiMmC06sJRgw8Mv8IusHRmI0ow==': Timedelta('0 days 00:00:00.001000'),
 'zWhaupH3Fy5BNlS+peI3kjIcTXZJExETzFxkwojafRfWStqEWhAmiQgjC9PxDs1tuENo6n7WvkBWGqie0RK/qA==': Timedelta('0 days 00:00:00.001000'),
 'R3KP+dw41NakTkD0ilnB2kLrnA5K6XWLiiLCvsVMkoaqC3gL6PborKZ6fhgm1p9D9yJcUQloMW4i93/yRPr2SQ==': Timedelta('0 days 00:00:00.022000'),
 '5kevmGiNrlHmh3KgF3ItRDjcWUbqhlC5/icRTLw+YvlwdLxQVaZ1nkcIEjJbCTAXDaGVHEHHrVA2PCV7LAt7cw==

In [104]:
last_pixel_time_dict

{'p0sXpmkcmg1KLiCdK5e4xKdudb1f8cjscGs35082sKpGBfQIw92nZ7yGvWbQ/ggB1+kkRBaYu1zy6n16yL/yjA==': Timestamp('2022-04-03 17:38:20.021000+0000', tz='UTC'),
 'Ctar52ln5JEpXT+tVVc8BtQwm1tPjRwPZmPvuamzsZDlFDkeo3+ItUW89J1rXDDeho6A4zCob1MKmJrzYAjipg==': Timestamp('2022-04-03 17:38:20.024000+0000', tz='UTC'),
 'rNMF5wpFYT2RAItySLf9IcFZwOhczQhkRhmTD4gv0K78DpieXrVUw8T/MBAZjj2BIS8h5exPISQ4vlyzLzad5w==': Timestamp('2022-04-03 17:38:20.025000+0000', tz='UTC'),
 'u0a7l8hHVvncqYmav27EARAE6ciLtpUTPXMI33lDrUmtj5Ei3ixlfRuG28KUvs7r5LpeiE/iOKPALVjkILhrYg==': Timestamp('2022-04-03 17:38:20.025000+0000', tz='UTC'),
 'L8P+AXoFbbXPh2zBAkkXk96UrkKpB5hLq5gBMwvgSV0H4foa10rSyCwzWFrfvF2MUspFSJ9FkzNzY1//dL5u9A==': Timestamp('2022-04-03 17:38:20.026000+0000', tz='UTC'),
 'PNgTPhAA1zue68tZDNiqnNnKAsEyf4HSF826SBy5RHBebE+ocEHCh0wsOnAZtjdAWqfZyOR/Xb55zSiBSktBIw==': Timestamp('2022-04-03 17:38:20.027000+0000', tz='UTC'),
 'H9YaxLRMacKoLOjRnZ//0gJxUZ7JJAHPPahCReGZfxS88tZGT4e8tRkti55woOshlbdXc09zTvOao4uAcB/01A==': Timestamp('20

In [105]:
thing1

[Timedelta('0 days 00:05:01.087000'),
 86098,
 Timedelta('0 days 00:05:06.039000'),
 86093,
 Timedelta('0 days 00:00:00.002000'),
 306,
 Timedelta('0 days 00:05:05.482000'),
 86094]