Ziel: Funktion die für ein Viereck x1,y1 x2,y2 zu einem Zeitpunkt t das aktuell gezeichnete Bild ausrechnet

Dazu müssen für alle Pixel in dem Viereck jeweils der letzte gesetzte Pixelwert aus dem Hauptdatensatz gefunden werden.

Ausgabe ist ein DataFrame mit diesen Pixelwerten

Idee:
- generiere DataFrame mit allen Pixelkoordinaten des Vierecks
- join mit Hauptdatensatz
- finde pro Pixel den neuesten Timestamp

In [3]:
#Eingabedaten Beispiel
x1 = 100
y1 = 100
x2 = 300
y2 = 150
# 200 x 50 Viereck - das sind 10.000 Pixel
tz = 1500

In [4]:
from pyspark.sql import SparkSession, DataFrame
import itertools
import numpy as np
import pyspark.sql.functions as F

spark = SparkSession.builder.appName('placegroups').getOrCreate()
spark.sparkContext.setCheckpointDir('../data/interim/checkpoints')

In [5]:
x_list = list(range(x1,x2+1))
y_list = list(range(y1,y2+1))

generatedData = list(itertools.product(x_list,y_list))

#print(generatedData)

In [6]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType

boxSchema = ["x","y"]
generatedFrame = spark.createDataFrame(data = generatedData, schema = boxSchema)

In [7]:
generatedFrame.show(5)

+---+---+
|  x|  y|
+---+---+
|100|100|
|100|101|
|100|102|
|100|103|
|100|104|
+---+---+
only showing top 5 rows



das sind alle relevanten pixel - joinen mit den Daten aus dem Hauptdatensatz

In [14]:
from src.data.dataset_functions import get_dataframei

dataFrame = get_dataframei(50)

providing ../data/raw/2022_place_canvas_history-000000000050.csv ...
../data/raw/2022_place_canvas_history-000000000050.csv is already in data/raw
+----------+--------------------+-----------+----------+
| timestamp|             user_id|pixel_color|coordinate|
+----------+--------------------+-----------+----------+
|1649047095|vKtmwRQwAYtw5poO4...|    #FF3881|  804,1992|
|1649047095|eoZO4uYgBzVoDR0Rb...|    #FFFFFF|  1667,616|
|1649047095|Mdd6j8t/24pCeslV3...|    #D4D7D9|   392,467|
|1649047095|xHWcOdv98PR7yQNKD...|    #000000|   21,1691|
|1649047095|hI0Z36Jzkw+coR+im...|    #515252|  971,1026|
+----------+--------------------+-----------+----------+
only showing top 5 rows



                                                                                

In [15]:
dataFrame.show(5)

+--------------------+----+----+---+-----------+
|             user_id|   x|   y|  t|pixel_color|
+--------------------+----+----+---+-----------+
|vKtmwRQwAYtw5poO4...| 804|1992|  0|    #FF3881|
|eoZO4uYgBzVoDR0Rb...|1667| 616|  0|    #FFFFFF|
|Mdd6j8t/24pCeslV3...| 392| 467|  0|    #D4D7D9|
|xHWcOdv98PR7yQNKD...|  21|1691|  0|    #000000|
|hI0Z36Jzkw+coR+im...| 971|1026|  0|    #515252|
+--------------------+----+----+---+-----------+
only showing top 5 rows



In [16]:
relevantData = generatedFrame.alias("gf").join(dataFrame.alias("df"), F.col("gf.x") == F.col("df.x"))
relevantData = relevantData.where(F.col("gf.y") == F.col("df.y"))
relevantData = relevantData.where(F.col("df.t") < tz)

In [17]:
relevantData.show(5)



+---+---+--------------------+---+---+---+-----------+
|  x|  y|             user_id|  x|  y|  t|pixel_color|
+---+---+--------------------+---+---+---+-----------+
|100|114|+kRn/lyRystaDIFVb...|100|114|617|    #FFD635|
|100|114|Q4Unvumry0qe7SKyN...|100|114|648|    #FFFFFF|
|156|116|vzpcqFGg5QWGuCTf9...|156|116|113|    #898D90|
|156|116|UP5N60v2k9h4ji7Az...|156|116|265|    #000000|
|156|116|o5NK1a8XGb5KtT8zo...|156|116|269|    #00CC78|
+---+---+--------------------+---+---+---+-----------+
only showing top 5 rows



                                                                                

Das sind jetzt alle Pixeldaten für die relevanten Pixel im Viereck.

Idee:
-nach t absteigend sortieren
-x,y Duplikate entfernen

Dann bleibt pro Pixel nur der aktuellste Wert!

In [23]:
sortedData = relevantData.orderBy(F.col('t').desc())
sortedData.show(5)



+---+---+--------------------+---+---+----+-----------+
|  x|  y|             user_id|  x|  y|   t|pixel_color|
+---+---+--------------------+---+---+----+-----------+
|242|130|H80DxFk6VNyC2hEXZ...|242|130|1499|    #FFFFFF|
|172|147|UYNSYvMNvpzeMNU8c...|172|147|1499|    #FF99AA|
|159|108|w80Igl7pEwIHr8aom...|159|108|1498|    #FFFFFF|
|134|143|7CL2L6T2wfnQpibKH...|134|143|1496|    #000000|
|278|124|5vyQX/+QPZZKKADNu...|278|124|1496|    #BE0039|
+---+---+--------------------+---+---+----+-----------+
only showing top 5 rows



                                                                                

In [24]:
droppedData = sortedData.dropDuplicates(['x','y'])
droppedData.show(5)
droppedData.count()

                                                                                

+---+---+--------------------+---+---+----+-----------+
|  x|  y|             user_id|  x|  y|   t|pixel_color|
+---+---+--------------------+---+---+----+-----------+
|100|114|Q4Unvumry0qe7SKyN...|100|114| 648|    #FFFFFF|
|100|138|FXzBQaCFC8zH/TKFa...|100|138|1090|    #BE0039|
|101|105|rCHFYvafD1nKrDEzL...|101|105| 164|    #000000|
|102|133|46Q0DVs3l4i9CTylT...|102|133| 564|    #BE0039|
|103|106|igZa+H/H2SBbN+d5H...|103|106| 293|    #000000|
+---+---+--------------------+---+---+----+-----------+
only showing top 5 rows



                                                                                

637

Das scheint leider nicht die Reihenfolge beizubehalten. Es werden "irgendwelche" Daten behalten ohne Duplicates.

https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first

In [32]:
from pyspark.sql import Window
window = Window.partitionBy("gf.x","gf.y").orderBy(F.col("t").desc())
sortedData = relevantData.withColumn('steps',F.row_number().over(window))
droppedData = sortedData.where(F.col('steps') == 1)


AttributeError: 'builtin_function_or_method' object has no attribute 'where'