In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.appName('sparksparkspark').config('spark.driver.memory', '25g').getOrCreate()

In [3]:
df = spark.read.parquet('2022_place.parquet')

In [4]:
df.show()

+--------------------+--------+-----------+----+----+
|           timestamp| user_id|pixel_color|   x|   y|
+--------------------+--------+-----------+----+----+
|2022-04-04 00:53:...| 8069230|    #00CCC0| 826|1048|
|2022-04-04 00:53:...| 1465682|    #94B3FF| 583|1031|
|2022-04-04 00:53:...| 4369819|    #6A5CFF|1873| 558|
|2022-04-04 00:54:...| 9487099|    #009EAA|1627| 255|
|2022-04-04 00:55:...| 3739625|    #94B3FF|  49|1478|
|2022-04-04 00:55:...| 2086589|    #E4ABFF| 408|1863|
|2022-04-04 00:55:...|  700254|    #94B3FF| 111|1582|
|2022-04-04 00:55:...|  495390|    #6A5CFF|1334|1840|
|2022-04-04 00:55:...| 9447904|    #6A5CFF|1908|1854|
|2022-04-04 00:56:...| 1098668|    #009EAA|1504|1722|
|2022-04-04 00:56:...| 7700138|    #94B3FF|1850|1809|
|2022-04-04 00:56:...|10000053|    #009EAA| 123|1108|
|2022-04-04 00:56:...| 9728790|    #94B3FF| 890| 971|
|2022-04-04 00:56:...| 4049244|    #00CCC0|1982|1827|
|2022-04-04 00:56:...| 6917476|    #6A5CFF|1211| 719|
|2022-04-04 00:56:...| 76207

In [5]:
df = df.orderBy('timestamp')

In [6]:
df = df.withColumn('timestamp_round', (F.unix_timestamp('timestamp') / 0.1).cast('bigint') * 0.1)

In [7]:
window_spec = Window.partitionBy('user_id').orderBy('timestamp_round')

In [8]:
df = df.withColumn('time_diff', F.col('timestamp_round') - F.lag('timestamp_round').over(window_spec))

In [None]:
window_spec = Window.partitionBy('user_id').orderBy('timestamp')

In [None]:
df = df.withColumn('same_diff', (F.col('time_diff') == F.lead('time_diff').over(window_spec)).cast('int'))

In [None]:
bots = df.groupBy('user_id').agg(F.sum('same_diff').alias('consecutive_duplicates'))

In [None]:
bots = bots.filter(F.col('consecutive_duplicates') > 0)

In [None]:
# just to make sure its working, 
# this takes a really long time to run... i wonder why? 
bots.show()

+-------+----------------------+
|user_id|consecutive_duplicates|
+-------+----------------------+
|     54|                     1|
|    191|                     2|
|    367|                     1|
|   1127|                     1|
|   1145|                     1|
|   1217|                     1|
|   1258|                     2|
|   1919|                     5|
|   2906|                     1|
|   2909|                     1|
|   2941|                     1|
|   3009|                     1|
|   3081|                     3|
|   3704|                     3|
|   3866|                     1|
|   4552|                     1|
|   5148|                     1|
|   6156|                    15|
|   6362|                     1|
|   6508|                     6|
+-------+----------------------+
only showing top 20 rows



In [15]:
bot_activity = df.join(bots, on='user_id').withColumn('user_type', F.lit('bot'))

In [None]:
window_spec = Window.partitionBy('x', 'y').orderBy('timestamp') 

In [None]:
bot_activity = bot_activity.withColumn('last_color', F.lag('pixel_color').over(window_spec))

In [23]:
# really really slow...
bot_activity.count()

54793646

In [33]:
bot_activity.filter(F.col('pixel_color') == F.col('last_color')).count()
# not the same, not sure why...

18321266

In [None]:
quantiles = [0, 0.1, 0.25, 0.5, 0.75, 0.9, 1]

quantile_expr = f"percentile_approx(time_diff, array({', '.join(map(str, quantiles))}))"

In [None]:
result = bot_activity.select(F.expr(quantile_expr)).collect()[0][0]

print(f"0th percentile: {result[0]}")
print(f"10th percentile: {result[1]}")
print(f"25th percentile: {result[2]}")
print(f"50th percentile (median): {result[3]}")
print(f"75th percentile: {result[4]}")
print(f"90th percentile: {result[5]}")
print(f"100th percentile: {result[6]}")


0th percentile: 0.0
10th percentile: 305.0
25th percentile: 311.0
50th percentile (median): 344.0
75th percentile: 616.0
90th percentile: 2194.0
100th percentile: 293307.0


In [27]:
human_activity = df.join(bots, on='user_id', how='left_anti').withColumn('user_type', F.lit('bot'))

In [None]:
human_activity = human_activity.withColumn('last_color', F.lag('pixel_color').over(window_spec))

In [31]:
human_activity.filter(F.col('pixel_color') == F.col('last_color')).count()

26465587

In [32]:
result = human_activity.select(F.expr(quantile_expr)).collect()[0][0]

print(f"0th percentile: {result[0]}")
print(f"10th percentile: {result[1]}")
print(f"25th percentile: {result[2]}")
print(f"50th percentile (median): {result[3]}")
print(f"75th percentile: {result[4]}")
print(f"90th percentile: {result[5]}")
print(f"100th percentile: {result[6]}")


0th percentile: 0.0
10th percentile: 312.0
25th percentile: 341.0
50th percentile (median): 600.0
75th percentile: 3016.0
90th percentile: 18797.0
100th percentile: 298857.0
