In [1]:
from pyspark.sql import SparkSession
import imageio
import numpy as np
import shutil
from PIL import Image
import os
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"


min_date = '2022-04-01 12:00:00'

color_name = {
    16729344: 'red', 
    0: 'black', 
    6970623: 'periwinkle', 
    7143450: 'burgandy', 
    16775352: 'pale yellow', 
    2379940: 'dark blue', 
    8461983: 'dark purple', 
    16754688: 'orange', 
    40618: 'teal', 
    52416: 'light teal', 
    8318294: 'light green', 
    12451897: 'dark red', 
    16757872: 'beige', 
    9745407: 'lavendar', 
    16777215: 'white', 
    11815616: 'purple', 
    41832: 'dark green', 
    10250534: 'brown', 
    14986239: 'pale purple', 
    16766517: 'yellow', 
    3576042: 'blue', 
    16751018: 'light pink', 
    5368308: 'light blue', 
    9014672: 'gray', 
    4799169: 'indigo', 
    30063: 'dark teal', 
    13948889: 'light gray', 
    5329490: 'dark gray', 
    52344: 'green', 
    16726145: 'pink', 
    7161903: 'dark brown', 
    14553215: 'magenta'
}

db_path = r"../data/lossless_2022_place_history.parquet"
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()
df = spark.read.load(db_path, format="parquet", pathGlobFilter="*.parquet")
df.repartition("user_id")
df.createOrReplaceTempView('data')
#df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/10 19:15:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#utility functions
def time_delta(curr_date : str):
    min_tokens = min_date.split(' ')
    curr_tokens = curr_date.split(' ')
    curr_time = curr_tokens[1].split(':')
    seconds_diff = float(curr_time[2])
    minute_diff = 60 * int((curr_time)[1]) - int((min_tokens[1].split(':'))[1])
    hour_diff = 60 * 60 * (int((curr_tokens[1].split(':'))[0]) - int((min_tokens[1].split(':'))[0]))
    day_diff = 24 * 60 * 60 * (int((curr_tokens[0].split('-'))[2]) - int((min_tokens[0].split('-'))[2]))
    time_diff = seconds_diff + minute_diff + hour_diff + day_diff
    return time_diff

# Returns rgb values in a list from an integer representation of hex code
def int_to_rgb(color_value : int) -> list[int]:
    hex_code = f"{color_value:06x}" # Convert To Hex
    return [ int(hex_code[0:2], 16), int(hex_code[2:4], 16), int(hex_code[4:6], 16)]

In [3]:
# analysis functions
# Create most recent img from two coordinates
def draw_curr_img(top_left_x : int, top_left_y : int, bottom_right_x : int, bottom_right_y : int, time : float):
    x_range = bottom_right_x - top_left_x + 1
    y_range = bottom_right_y - top_left_y + 1

    sql_to_execute = f"""
        SELECT x-{top_left_x} AS x_img, y-{top_left_y} AS y_img, color FROM data
        WHERE timedelta <= {time}
        AND x >= {top_left_x} AND x <= {bottom_right_x}
        AND y >= {top_left_y} AND y <= {bottom_right_y}
        ORDER BY timedelta ASC
    """

    coords = spark.sql(sql_to_execute).collect()

    img_set = np.full([y_range, x_range, 3], 50, dtype=np.uint8) # fill with dark grey to show black
    for coord in coords:
        img_set[coord[1]][coord[0]] = int_to_rgb(coord[2])

    img = Image.fromarray(img_set, "RGB")
    file_name = f"{top_left_x}_{top_left_y}_{bottom_right_x}_{bottom_right_y}_{time}.png"
    img.save(file_name)
    shutil.move(file_name, f"visualizations/pngs/{file_name}")
    img_set = None

# Create most recent img from two coordinates from a given user
def draw_curr_img_from_user(top_left_x : int, top_left_y : int, bottom_right_x : int, bottom_right_y : int, user_id : int, time : float):
    x_range = bottom_right_x - top_left_x + 1
    y_range = bottom_right_y - top_left_y + 1

    sql_to_execute = f"""
        SELECT x-{top_left_x} AS x_img, y-{top_left_y} AS y_img, color FROM data
        WHERE timedelta <= {time}
        AND user_id = {user_id}
        AND x >= {top_left_x} AND x <= {bottom_right_x}
        AND y >= {top_left_y} AND y <= {bottom_right_y}
        ORDER BY timedelta ASC
    """

    coords = spark.sql(sql_to_execute).collect()

    img_set = np.full([y_range, x_range, 4], 0, dtype=np.uint8) # fill with dark grey to show black
    for coord in coords:
        rgba = int_to_rgb(coord[2])
        rgba.append(255)
        img_set[coord[1]][coord[0]] = rgba

    img = Image.fromarray(img_set, "RGBA")
    file_name = f"{user_id}_{top_left_x}_{top_left_y}_{bottom_right_x}_{bottom_right_y}_{time}.png"
    img.save(file_name)
    shutil.move(file_name, f"visualizations/pngs/{file_name}")
    img_set = None

# Create most recent img from two coordinates focused on a given user
def draw_curr_img_focus_users(top_left_x : int, top_left_y : int, bottom_right_x : int, bottom_right_y : int, user_id : list[int], time : float):
    x_range = bottom_right_x - top_left_x + 1
    y_range = bottom_right_y - top_left_y + 1
    user_id_str = '('+', '.join(map(str, user_id))+')'

    sql_to_execute = f"""
        SELECT x-{top_left_x} AS x_img, y-{top_left_y} AS y_img, color FROM data
        WHERE timedelta <= {time}
        AND user_id NOT IN {user_id_str}
        AND x >= {top_left_x} AND x <= {bottom_right_x}
        AND y >= {top_left_y} AND y <= {bottom_right_y}
        ORDER BY timedelta ASC
    """

    coords = spark.sql(sql_to_execute).collect()

    img_set = np.full([y_range, x_range, 4], 0, dtype=np.uint8) # fill with dark grey to show black
    for coord in coords:
        rgba = int_to_rgb(coord[2])
        rgba.append(100)
        img_set[coord[1]][coord[0]] = rgba

    sql_to_execute = f"""
        SELECT x-{top_left_x} AS x_img, y-{top_left_y} AS y_img, color FROM data
        WHERE timedelta <= {time}
        AND user_id IN {user_id_str}
        AND x >= {top_left_x} AND x <= {bottom_right_x}
        AND y >= {top_left_y} AND y <= {bottom_right_y}
        ORDER BY timedelta ASC
    """

    coords = spark.sql(sql_to_execute).collect()

    for coord in coords:
        rgba = int_to_rgb(coord[2])
        rgba.append(255)
        img_set[coord[1]][coord[0]] = rgba

    img = Image.fromarray(img_set, "RGBA")
    file_name = f"{user_id}_focus_{top_left_x}_{top_left_y}_{bottom_right_x}_{bottom_right_y}_{time}.png"
    img.save(file_name)
    shutil.move(file_name, f"visualizations/pngs/{file_name}")
    img_set = None

# Create most recent img from a block given its relative position and size
def draw_curr_img_at_group(x_group : int, y_group: int, group_size : int, time : float):

    sql_to_execute = f"""
        SELECT x-{x_group*group_size} AS x_img, y-{y_group*group_size} AS y_img, color FROM data
        WHERE timedelta <= {time}
        AND x >= {x_group*group_size} AND x < {(x_group+1)*group_size}
        AND y >= {y_group*group_size} AND y < {(y_group+1)*group_size}
        ORDER BY timedelta ASC
    """

    coords = spark.sql(sql_to_execute).collect()

    img_set = np.full([group_size, group_size, 3], 50, dtype=np.uint8) # fill with dark grey to show black
    for coord in coords:
        img_set[coord[1]][coord[0]] = int_to_rgb(coord[2])

    img = Image.fromarray(img_set, "RGB")
    file_name = f"{x_group}_{y_group}_{group_size}_{time}.png"
    img.save(file_name)
    shutil.move(file_name, f"visualizations/pngs/{file_name}")
    img_set = None

# Draw user placements in a block given block position and size
def draw_curr_img_at_group_from_users(x_group : int, y_group: int, group_size : int, user_id : list[int], time : float):
    user_id_str = '('+', '.join(map(str, user_id))+')'

    sql_to_execute = f"""
        SELECT x-{x_group*group_size} AS x_img, y-{y_group*group_size} AS y_img, color FROM data
        WHERE timedelta <= {time}
        AND user_id IN {user_id_str}
        AND x >= {x_group*group_size} AND x < {(x_group+1)*group_size}
        AND y >= {y_group*group_size} AND y < {(y_group+1)*group_size}
        ORDER BY timedelta ASC
    """

    coords = spark.sql(sql_to_execute).collect()

    img_set = np.full([group_size, group_size, 3], 50, dtype=np.uint8) # fill with dark grey to show black
    for coord in coords:
        img_set[coord[1]][coord[0]] = int_to_rgb(coord[2])

    img = Image.fromarray(img_set, "RGB")
    file_name = f"{user_id}_{x_group}_{y_group}_{group_size}_{time}.png"
    img.save(file_name)
    shutil.move(file_name, f"visualizations/pngs/{file_name}")
    img_set = None

In [12]:
min = time_delta("2022-04-01 12:00:00")
max = time_delta("2022-04-01 18:00:00")

In [13]:
spark.sql(f"""
  SELECT user_id, COUNT(1) AS actions FROM data GROUP BY user_id ORDER BY actions DESC LIMIT 3
""").show()

[Stage 9:>                                                        (0 + 11) / 12]

+-------+-------+
|user_id|actions|
+-------+-------+
|  78352|    795|
| 122804|    781|
| 280238|    777|
+-------+-------+



                                                                                

In [14]:
spark.sql(f"""
    WITH pixel_times AS (
        SELECT user_id, timedelta, (timedelta-LAG(timedelta) OVER (PARTITION BY user_id ORDER BY timedelta)) AS prev FROM data WHERE user_id IN (78352, 122804, 280238)
    )
       
    SELECT 
       user_id, MIN(prev), AVG(prev), MAX(prev),
       PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY prev) AS p50,  
       PERCENTILE_CONT(0.9) WITHIN GROUP (ORDER BY prev) AS p90, 
       PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY prev) AS p99
    FROM pixel_times GROUP BY user_id
""").show()

+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|user_id|         min(prev)|         avg(prev)|         max(prev)|               p50|              p90|               p99|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|  78352|300.57399999999325| 376.3957279596977| 18960.30900000001|305.54449999999997|337.3666000000005| 878.5880799999782|
| 122804|300.02899999998044|383.56283589743595|27719.151000000005|  304.833999999988|369.4231000000029|1309.8635000000254|
| 280238| 300.1109999999753|382.65825386597936| 7073.528999999995| 317.6334999999963|404.1359999999995|1567.8967500000035|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+



                                                                                

In [17]:
results = spark.sql(f"""
    WITH pixel_times AS (
        SELECT user_id, CAST(CAST(timedelta AS INTEGER)/3600 AS INTEGER) As hr FROM data WHERE user_id IN (78352, 122804, 280238)
    ), hourly_pixels AS (
        SELECT user_id, hr, COUNT(1) AS actions FROM pixel_times GROUP BY user_id, hr
    )
              
    SELECT user_id, hr, actions FROM hourly_pixels ORDER BY user_id, hr
""").collect()

hourly_pixels = {
    78352: [], 
    122804: [], 
    280238: []
}
for result in results:
    hourly_pixels[result[0]].append((result[1], result[2]))

for user in hourly_pixels:
    actions = [0 for _ in range(85)]
    for res in hourly_pixels[user]:
        actions[res[0]] = res[1]
    for act in actions:
        print(act)
            

0
11
11
12
3
0
1
0
6
8
10
11
11
12
11
12
12
11
11
11
12
11
12
4
0
0
0
0
4
3
0
0
9
12
10
11
11
11
12
11
11
12
12
12
5
10
11
11
11
12
12
12
12
12
12
12
10
12
12
11
12
11
12
11
12
12
11
11
11
12
11
10
12
12
11
12
11
12
12
11
12
12
10
9
2
0
11
11
9
11
10
10
9
11
6
7
10
9
12
12
11
1
0
0
0
0
0
0
2
11
11
12
10
11
9
8
11
6
8
8
3
0
8
10
10
12
12
11
12
12
12
10
8
3
12
8
12
12
12
12
11
12
11
10
12
12
11
12
12
12
12
12
12
12
12
11
12
12
12
12
12
11
11
12
12
12
11
10
7
1
0
9
11
12
9
10
8
10
11
10
11
10
12
11
10
11
11
10
11
11
10
11
11
11
10
11
11
11
7
10
10
8
0
3
6
6
9
2
6
6
11
11
12
11
12
11
11
12
11
8
9
9
2
3
2
8
7
12
11
8
11
11
12
11
12
12
11
12
11
12
12
11
12
12
11
12
11
11
5
1
6
7
7
6
0


                                                                                

In [21]:
# 280238 User 3 Is Singled Out
spark.sql("""
    WITH group_marker AS (
        SELECT user_id, CAST(x/50 AS INTEGER) AS x_group, CAST(y/50 AS INTEGER) AS y_group, x, y FROM data WHERE user_id IN (280238)
    ), grouped_placements AS (
        SELECT user_id, x_group, y_group, COUNT(1) AS actions FROM group_marker GROUP BY user_id, x_group, y_group
    )
       
    SELECT user_id, x_group, y_group, actions FROM grouped_placements WHERE user_id IN (280238) ORDER BY x_group, y_group, actions DESC
""").show()

+-------+-------+-------+-------+
|user_id|x_group|y_group|actions|
+-------+-------+-------+-------+
| 280238|      7|      5|     60|
| 280238|      7|      6|      4|
| 280238|      8|      5|    672|
| 280238|      8|      6|     25|
| 280238|     24|      2|      1|
| 280238|     25|      1|      1|
| 280238|     26|     16|      7|
| 280238|     27|     16|      3|
| 280238|     33|     31|      4|
+-------+-------+-------+-------+



In [22]:
colors = spark.sql("""
    SELECT user_id, color, COUNT(1) AS amt FROM data WHERE user_id IN (280238) GROUP BY user_id, color ORDER BY amt DESC
""").collect()

for color in colors:
    print(f"{color_name[color[1]]} : {color[2]}")

dark blue : 538
yellow : 191
black : 16
orange : 13
white : 13
dark red : 4
indigo : 2


                                                                                

In [23]:
# User 1 and 2 (78352, 122804)
spark.sql("""
    WITH group_marker AS (
        SELECT user_id, CAST(x/50 AS INTEGER) AS x_group, CAST(y/50 AS INTEGER) AS y_group, x, y FROM data WHERE user_id IN (78352, 122804)
    ), grouped_placements AS (
        SELECT user_id, x_group, y_group, COUNT(1) AS actions FROM group_marker GROUP BY user_id, x_group, y_group
    )
       
    SELECT user_id, x_group, y_group, actions FROM grouped_placements WHERE user_id IN (78352, 122804) ORDER BY actions DESC, x_group, y_group
""").show()

+-------+-------+-------+-------+
|user_id|x_group|y_group|actions|
+-------+-------+-------+-------+
| 122804|     12|      5|     93|
|  78352|     33|      5|     91|
| 122804|     33|      4|     86|
|  78352|     18|     37|     80|
| 122804|     32|      4|     79|
| 122804|     18|     37|     74|
|  78352|     12|      4|     65|
|  78352|     32|      5|     62|
| 122804|     32|      5|     60|
|  78352|     12|      5|     55|
|  78352|     33|      4|     55|
| 122804|     12|      4|     52|
|  78352|     32|      4|     52|
| 122804|     33|      5|     45|
| 122804|     18|     36|     42|
|  78352|     31|      5|     36|
|  78352|     11|      4|     33|
| 122804|     39|      7|     28|
|  78352|     39|      7|     27|
|  78352|     18|     36|     26|
+-------+-------+-------+-------+
only showing top 20 rows



                                                                                

In [25]:
# User 1 and 2 (78352, 122804)
spark.sql("""
    WITH group_marker AS (
        SELECT user_id, CAST(x/50 AS INTEGER) AS x_group, CAST(y/50 AS INTEGER) AS y_group, x, y FROM data WHERE user_id IN (78352, 122804)
    ), grouped_placements AS (
        SELECT x_group, y_group, COUNT(1) AS actions FROM group_marker GROUP BY x_group, y_group
    )
       
    SELECT x_group, y_group, actions FROM grouped_placements ORDER BY actions DESC, x_group, y_group
""").show()



+-------+-------+-------+
|x_group|y_group|actions|
+-------+-------+-------+
|     18|     37|    154|
|     12|      5|    148|
|     33|      4|    141|
|     33|      5|    136|
|     32|      4|    131|
|     32|      5|    122|
|     12|      4|    117|
|     18|     36|     68|
|     39|      7|     55|
|     11|      4|     53|
|     31|      5|     36|
|     11|      5|     34|
|     12|      9|     34|
|     27|     28|     31|
|     19|     37|     25|
|     35|      4|     23|
|     36|      9|     22|
|     26|     39|     17|
|     11|      9|     15|
|     13|      9|     15|
+-------+-------+-------+
only showing top 20 rows



                                                                                

In [4]:
before_white = time_delta("2022-04-04 12:00:00")

In [5]:
# Draw images at active spots
draw_curr_img_at_group(8, 5, 50, before_white)
draw_curr_img_at_group_from_users(8, 5, 50, [280238], before_white)
draw_curr_img_at_group(12, 5, 50, before_white)
draw_curr_img_at_group_from_users(12, 5, 50, [122804, 78352], before_white)
draw_curr_img_at_group(33, 5, 50, before_white)
draw_curr_img_at_group_from_users(33, 5, 50, [78352, 122804], before_white)
draw_curr_img_at_group(18, 37, 50, before_white)

                                                                                

In [37]:
# Enlarged images at similar blocks
draw_curr_img(32*50, 4*50, 34*50-1, 6*50-1, before_white)
draw_curr_img(7*50, 5*50, 9*50-1, 7*50-1, before_white)

In [41]:
# Focus on user placements in specific region
draw_curr_img_focus_users(392, 252, 433, 298, [280238], before_white)
draw_curr_img_focus_users(1614, 212, 1688, 280, [122804, 78352], before_white)
draw_curr_img_focus_users(892, 1830, 961, 1885, [122804, 78352], before_white)

                                                                                