In [2]:
import datetime
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import unix_timestamp, from_unixtime, col
from pyspark.sql import functions as F

In [3]:
fg1_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("label", StringType(), True)    
])

fg2_schema = StructType([
  StructField("id_2", IntegerType(), True),
  StructField("ts_2", IntegerType(), True),
  StructField("f2", StringType(), True)    
])

fg3_schema = StructType([
  StructField("id_3", IntegerType(), True),
  StructField("ts_3", IntegerType(), True),
  StructField("f3", StringType(), True)    
])

In [4]:
data1 = [[1, 5, "1x"],
         [1, 7, "1y"],
         [1, 4, "1z"],
         [2, 6, "2x"],
         [2, 8, "2y"]]

data2 = [[1, 5, "1x"],
         [1, 7, "1y"],
         [1, 4, "1z"],
         [2, 6, "2x"],
         [2, 8, "2y"]]

data3 = [[1, 10, "f3-1-10"],
         [1, 1, "f3-1-1"],
         [1, 6, "f3-1-6"],
         [2, 2, "f3-2-2"],
         [2, 8, "f3-2-8"]]

In [5]:
data1 = [[1, 5, "1x"],
         [1, 7, "1y"],
         [1, 4, "1z"],
         [2, 6, "2x"],
         [2, 8, "2y"]]

In [6]:
data3 = [[1, 10, "f3-1-10"],
         [1, 1, "f3-1-1"],
         [1, 6, "f3-1-6"],
         [2, 2, "f3-2-2"],
         [2, 8, "f3-2-8"]]

In [7]:
spark = spark = SparkSession.builder.master("local").appName("Three Way PIT").config("spark.some.config.option", "some-value").getOrCreate()

In [7]:
fg1 = spark.createDataFrame(data1, schema=fg1_schema) 
fg2 = spark.createDataFrame(data2, schema=fg2_schema) 
fg3 = spark.createDataFrame(data3, schema=fg3_schema) 

In [8]:
fg1.join(fg2, fg1.id == fg2.id_2).count()

13

In [9]:
fg1.join(fg2, fg1.id == fg2.id_2).show(50)

+---+---+-----+----+----+---+
| id| ts|label|id_2|ts_2| f2|
+---+---+-----+----+----+---+
|  1|  5|   1x|   1|   5| 1x|
|  1|  5|   1x|   1|   7| 1y|
|  1|  5|   1x|   1|   4| 1z|
|  1|  7|   1y|   1|   5| 1x|
|  1|  7|   1y|   1|   7| 1y|
|  1|  7|   1y|   1|   4| 1z|
|  1|  4|   1z|   1|   5| 1x|
|  1|  4|   1z|   1|   7| 1y|
|  1|  4|   1z|   1|   4| 1z|
|  2|  6|   2x|   2|   6| 2x|
|  2|  6|   2x|   2|   8| 2y|
|  2|  8|   2y|   2|   6| 2x|
|  2|  8|   2y|   2|   8| 2y|
+---+---+-----+----+----+---+

In [10]:
fg1.join(fg2, fg1.id == fg2.id_2).join(fg3, fg1.id == fg3.id_3).count()

35

In [11]:
fg1.join(fg2, fg1.id == fg2.id_2).join(fg3, fg1.id == fg3.id_3).show(50)

+---+---+-----+----+----+---+----+----+-------+
| id| ts|label|id_2|ts_2| f2|id_3|ts_3|     f3|
+---+---+-----+----+----+---+----+----+-------+
|  1|  5|   1x|   1|   5| 1x|   1|  10|f3-1-10|
|  1|  5|   1x|   1|   5| 1x|   1|   1| f3-1-1|
|  1|  5|   1x|   1|   5| 1x|   1|   6| f3-1-6|
|  1|  5|   1x|   1|   7| 1y|   1|  10|f3-1-10|
|  1|  5|   1x|   1|   7| 1y|   1|   1| f3-1-1|
|  1|  5|   1x|   1|   7| 1y|   1|   6| f3-1-6|
|  1|  5|   1x|   1|   4| 1z|   1|  10|f3-1-10|
|  1|  5|   1x|   1|   4| 1z|   1|   1| f3-1-1|
|  1|  5|   1x|   1|   4| 1z|   1|   6| f3-1-6|
|  1|  7|   1y|   1|   5| 1x|   1|  10|f3-1-10|
|  1|  7|   1y|   1|   5| 1x|   1|   1| f3-1-1|
|  1|  7|   1y|   1|   5| 1x|   1|   6| f3-1-6|
|  1|  7|   1y|   1|   7| 1y|   1|  10|f3-1-10|
|  1|  7|   1y|   1|   7| 1y|   1|   1| f3-1-1|
|  1|  7|   1y|   1|   7| 1y|   1|   6| f3-1-6|
|  1|  7|   1y|   1|   4| 1z|   1|  10|f3-1-10|
|  1|  7|   1y|   1|   4| 1z|   1|   1| f3-1-1|
|  1|  7|   1y|   1|   4| 1z|   1|   6| 

### With Time

In [12]:
fg1.join(fg2, (fg1.id == fg2.id_2) & (fg1.ts >= fg2.ts_2)).count()

9

In [13]:
fg1.join(fg2, (fg1.id == fg2.id_2) & (fg1.ts >= fg2.ts_2)).show()

+---+---+-----+----+----+---+
| id| ts|label|id_2|ts_2| f2|
+---+---+-----+----+----+---+
|  1|  5|   1x|   1|   5| 1x|
|  1|  5|   1x|   1|   4| 1z|
|  1|  7|   1y|   1|   5| 1x|
|  1|  7|   1y|   1|   7| 1y|
|  1|  7|   1y|   1|   4| 1z|
|  1|  4|   1z|   1|   4| 1z|
|  2|  6|   2x|   2|   6| 2x|
|  2|  8|   2y|   2|   6| 2x|
|  2|  8|   2y|   2|   8| 2y|
+---+---+-----+----+----+---+

In [14]:
fg1.join(fg2, (fg1.id == fg2.id_2) & (fg1.ts >= fg2.ts_2)).join(
    fg3, (fg1.id == fg3.id_3) & (fg1.ts >= fg3.ts_3)
).count()


14

In [15]:
fg1.join(fg2, (fg1.id == fg2.id_2) & (fg1.ts >= fg2.ts_2)).join(
    fg3, (fg1.id == fg3.id_3) & (fg1.ts >= fg3.ts_3)
).show(50)


+---+---+-----+----+----+---+----+----+------+
| id| ts|label|id_2|ts_2| f2|id_3|ts_3|    f3|
+---+---+-----+----+----+---+----+----+------+
|  1|  5|   1x|   1|   5| 1x|   1|   1|f3-1-1|
|  1|  5|   1x|   1|   4| 1z|   1|   1|f3-1-1|
|  1|  7|   1y|   1|   5| 1x|   1|   1|f3-1-1|
|  1|  7|   1y|   1|   5| 1x|   1|   6|f3-1-6|
|  1|  7|   1y|   1|   7| 1y|   1|   1|f3-1-1|
|  1|  7|   1y|   1|   7| 1y|   1|   6|f3-1-6|
|  1|  7|   1y|   1|   4| 1z|   1|   1|f3-1-1|
|  1|  7|   1y|   1|   4| 1z|   1|   6|f3-1-6|
|  1|  4|   1z|   1|   4| 1z|   1|   1|f3-1-1|
|  2|  6|   2x|   2|   6| 2x|   2|   2|f3-2-2|
|  2|  8|   2y|   2|   6| 2x|   2|   2|f3-2-2|
|  2|  8|   2y|   2|   6| 2x|   2|   8|f3-2-8|
|  2|  8|   2y|   2|   8| 2y|   2|   2|f3-2-2|
|  2|  8|   2y|   2|   8| 2y|   2|   8|f3-2-8|
+---+---+-----+----+----+---+----+----+------+

## Add window

In [16]:
from pyspark.sql import Window
from pyspark.sql.functions import lit, when, col, lag, rank, row_number, dense_rank, desc, asc

win2 = Window.partitionBy(["id", "ts"]).orderBy(col('ts_2').desc())
win3 = Window.partitionBy(["id", "ts"]).orderBy(col('ts_3').desc())

In [17]:
fg1.join(fg2, (fg1.id == fg2.id_2) & (fg1.ts >= fg2.ts_2)).join(
    fg3, (fg1.id == fg3.id_3) & (fg1.ts >= fg3.ts_3)
).withColumn("rank2", rank().over(win2)).withColumn(
    "rank3", rank().over(win3)
).withColumn(
    "row2", row_number().over(win2)
).withColumn(
    "row3", row_number().over(win3)
).withColumn(
    "dense2", dense_rank().over(win2)
).withColumn(
    "dense3", dense_rank().over(win3)
).show(
    50
)


+---+---+-----+----+----+---+----+----+------+-----+-----+----+----+------+------+
| id| ts|label|id_2|ts_2| f2|id_3|ts_3|    f3|rank2|rank3|row2|row3|dense2|dense3|
+---+---+-----+----+----+---+----+----+------+-----+-----+----+----+------+------+
|  1|  4|   1z|   1|   4| 1z|   1|   1|f3-1-1|    1|    1|   1|   1|     1|     1|
|  1|  5|   1x|   1|   5| 1x|   1|   1|f3-1-1|    1|    1|   1|   1|     1|     1|
|  1|  5|   1x|   1|   4| 1z|   1|   1|f3-1-1|    2|    1|   2|   2|     2|     1|
|  1|  7|   1y|   1|   7| 1y|   1|   6|f3-1-6|    1|    1|   1|   1|     1|     1|
|  1|  7|   1y|   1|   5| 1x|   1|   6|f3-1-6|    3|    1|   3|   2|     2|     1|
|  1|  7|   1y|   1|   4| 1z|   1|   6|f3-1-6|    5|    1|   5|   3|     3|     1|
|  1|  7|   1y|   1|   7| 1y|   1|   1|f3-1-1|    1|    4|   2|   4|     1|     2|
|  1|  7|   1y|   1|   5| 1x|   1|   1|f3-1-1|    3|    4|   4|   5|     2|     2|
|  1|  7|   1y|   1|   4| 1z|   1|   1|f3-1-1|    5|    4|   6|   6|     3|     2|
|  2

## One Window

In [18]:

# 1. Join the data
joined_data = fg1.join(fg2, (fg1.id == fg2.id_2) & (fg1.ts >= fg2.ts_2)).join(
    fg3, (fg1.id == fg3.id_3) & (fg1.ts >= fg3.ts_3)
)

# 2. Create window for partitioning and ordering the data
win = Window.partitionBy(["id", "ts"]).orderBy(desc("ts_2"), desc("ts_3"))

# 3. Rank the rows of each partition
ranked_data = joined_data.withColumn("rank", rank().over(win))

# 4. Take only the columns with rank == 1, for each partition
filtered_data = ranked_data.filter(col("rank") == 1)

filtered_data.explain(mode="formatted")


== Physical Plan ==
* Filter (17)
+- Window (16)
   +- * Sort (15)
      +- * SortMergeJoin Inner (14)
         :- * SortMergeJoin Inner (9)
         :  :- * Sort (4)
         :  :  +- Exchange (3)
         :  :     +- * Filter (2)
         :  :        +- * Scan ExistingRDD (1)
         :  +- * Sort (8)
         :     +- Exchange (7)
         :        +- * Filter (6)
         :           +- * Scan ExistingRDD (5)
         +- * Sort (13)
            +- Exchange (12)
               +- * Filter (11)
                  +- * Scan ExistingRDD (10)


(1) Scan ExistingRDD [codegen id : 1]
Output [3]: [id#0, ts#1, label#2]
Arguments: [id#0, ts#1, label#2], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter [codegen id : 1]
Input [3]: [id#0, ts#1, label#2]
Condition : (isnotnull(id#0) AND isnotnull(ts#1))

(3) Exchange
Input [3]: [id#0, ts#1, label#2]
Arguments: hashpartitioning(id#0, 200), ENSURE_REQUIREMENTS, [id=#79

In [19]:
filtered_data.show()

+---+---+-----+----+----+---+----+----+------+----+
| id| ts|label|id_2|ts_2| f2|id_3|ts_3|    f3|rank|
+---+---+-----+----+----+---+----+----+------+----+
|  1|  4|   1z|   1|   4| 1z|   1|   1|f3-1-1|   1|
|  1|  5|   1x|   1|   5| 1x|   1|   1|f3-1-1|   1|
|  1|  7|   1y|   1|   7| 1y|   1|   6|f3-1-6|   1|
|  2|  6|   2x|   2|   6| 2x|   2|   2|f3-2-2|   1|
|  2|  8|   2y|   2|   8| 2y|   2|   8|f3-2-8|   1|
+---+---+-----+----+----+---+----+----+------+----+

In [20]:
# 1. Join the data
fg2_sorted = fg2.orderBy(asc("ts_2"))
fg2_sorted.show()
joined_data = fg1.join(, "outer")

#joined_data.show()

An error was encountered:
invalid syntax (<stdin>, line 4)
  File "<stdin>", line 4
    joined_data = fg1.join(, "outer")
                           ^
SyntaxError: invalid syntax



In [21]:
win = Window.partitionBy(["id", "ts"]).orderBy(desc("ts_2"), desc("ts_3"))

In [22]:
fg1.join(fg2, (fg1.id == fg2.id_2) & (fg1.ts >= fg2.ts_2)) \
   .join(fg3, (fg1.id == fg3.id_3) & (fg1.ts >= fg3.ts_3)) \
   .withColumn("rank", rank().over(win)) \
   .withColumn("row", row_number().over(win)) \
   .withColumn("dense", dense_rank().over(win)) \
   .show(50)

+---+---+-----+----+----+---+----+----+------+----+---+-----+
| id| ts|label|id_2|ts_2| f2|id_3|ts_3|    f3|rank|row|dense|
+---+---+-----+----+----+---+----+----+------+----+---+-----+
|  1|  4|   1z|   1|   4| 1z|   1|   1|f3-1-1|   1|  1|    1|
|  1|  5|   1x|   1|   5| 1x|   1|   1|f3-1-1|   1|  1|    1|
|  1|  5|   1x|   1|   4| 1z|   1|   1|f3-1-1|   2|  2|    2|
|  1|  7|   1y|   1|   7| 1y|   1|   6|f3-1-6|   1|  1|    1|
|  1|  7|   1y|   1|   7| 1y|   1|   1|f3-1-1|   2|  2|    2|
|  1|  7|   1y|   1|   5| 1x|   1|   6|f3-1-6|   3|  3|    3|
|  1|  7|   1y|   1|   5| 1x|   1|   1|f3-1-1|   4|  4|    4|
|  1|  7|   1y|   1|   4| 1z|   1|   6|f3-1-6|   5|  5|    5|
|  1|  7|   1y|   1|   4| 1z|   1|   1|f3-1-1|   6|  6|    6|
|  2|  6|   2x|   2|   6| 2x|   2|   2|f3-2-2|   1|  1|    1|
|  2|  8|   2y|   2|   8| 2y|   2|   8|f3-2-8|   1|  1|    1|
|  2|  8|   2y|   2|   8| 2y|   2|   2|f3-2-2|   2|  2|    2|
|  2|  8|   2y|   2|   6| 2x|   2|   8|f3-2-8|   3|  3|    3|
|  2|  8

In [23]:
fg1.join(fg2, (fg1.id == fg2.id_2) & (fg1.ts >= fg2.ts_2)) \
   .join(fg3, (fg1.id == fg3.id_3) & (fg1.ts >= fg3.ts_3)) \
   .withColumn("rank", rank().over(win)) \
   .withColumn("row", row_number().over(win)) \
   .withColumn("dense", dense_rank().over(win)) \
   .filter(col("rank") == 1) \
   .show(50)

+---+---+-----+----+----+---+----+----+------+----+---+-----+
| id| ts|label|id_2|ts_2| f2|id_3|ts_3|    f3|rank|row|dense|
+---+---+-----+----+----+---+----+----+------+----+---+-----+
|  1|  4|   1z|   1|   4| 1z|   1|   1|f3-1-1|   1|  1|    1|
|  1|  5|   1x|   1|   5| 1x|   1|   1|f3-1-1|   1|  1|    1|
|  1|  7|   1y|   1|   7| 1y|   1|   6|f3-1-6|   1|  1|    1|
|  2|  6|   2x|   2|   6| 2x|   2|   2|f3-2-2|   1|  1|    1|
|  2|  8|   2y|   2|   8| 2y|   2|   8|f3-2-8|   1|  1|    1|
+---+---+-----+----+----+---+----+----+------+----+---+-----+

In [24]:
fg1.join(fg2, (fg1.id == fg2.id_2) & (fg1.ts >= fg2.ts_2)) \
   .join(fg3, (fg1.id == fg3.id_3) & (fg1.ts >= fg3.ts_3)) \
   .withColumn("rank", rank().over(win)) \
   .filter(col("rank") == 1) \
   .explain()

== Physical Plan ==
*(9) Filter (isnotnull(rank#962) AND (rank#962 = 1))
+- Window [rank(ts_2#7, ts_3#13) windowspecdefinition(id#0, ts#1, ts_2#7 DESC NULLS LAST, ts_3#13 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#962], [id#0, ts#1], [ts_2#7 DESC NULLS LAST, ts_3#13 DESC NULLS LAST]
   +- *(8) Sort [id#0 ASC NULLS FIRST, ts#1 ASC NULLS FIRST, ts_2#7 DESC NULLS LAST, ts_3#13 DESC NULLS LAST], false, 0
      +- *(8) SortMergeJoin [id#0], [id_3#12], Inner, (ts#1 >= ts_3#13)
         :- *(5) SortMergeJoin [id#0], [id_2#6], Inner, (ts#1 >= ts_2#7)
         :  :- *(2) Sort [id#0 ASC NULLS FIRST], false, 0
         :  :  +- Exchange hashpartitioning(id#0, 200), ENSURE_REQUIREMENTS, [id=#1152]
         :  :     +- *(1) Filter (isnotnull(id#0) AND isnotnull(ts#1))
         :  :        +- *(1) Scan ExistingRDD[id#0,ts#1,label#2]
         :  +- *(4) Sort [id_2#6 ASC NULLS FIRST], false, 0
         :     +- Exchange hashpartitioning(id_2#6, 200),

In [25]:
fg1.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[id#0,ts#1,label#2]

In [26]:
fg1.registerTempTable("fg1")
fg2.registerTempTable("fg2")
fg3.registerTempTable("fg3")

In [27]:
result = spark.sql(
    "SELECT id, ts, label, ts_2, f2, ts_3, f3 "
    "LEFT OUTER JOIN ( "
        "SELECT TOP 1 * "
        "FROM fg2 b "
        "WHERE b.id = a.id AND b.ts_2 <= a.ts "
        "ORDER BY b.ts_2 DESC "
    ")"
)

An error was encountered:

mismatched input 'OUTER' expecting {<EOF>, ';'}(line 1, pos 46)

== SQL ==
SELECT id, ts, label, ts_2, f2, ts_3, f3 LEFT OUTER JOIN ( SELECT TOP 1 * FROM fg2 b WHERE b.id = a.id AND b.ts_2 <= a.ts ORDER BY b.ts_2 DESC )
----------------------------------------------^^^

Traceback (most recent call last):
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 723, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/srv/hops/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.ParseException: 
mismatched input 'OUTER' expecting {<EOF>, ';'}(line 1, pos 46)

== SQL ==
SELECT id, ts, label, ts_2, f2, ts_3, f3 LEFT OUTER JOIN ( SELECT TOP 1 * FROM fg2 b WHERE b.id = a.id AND b.ts_2 <= a.ts ORDER BY b.ts_2 DES

In [28]:
result = spark.sql("with x as ("
                       "SELECT id, ts, label, ts_2, f2, ts_3, f3, RANK() OVER (PARTITION BY id, ts ORDER BY (ts_2, ts_3) DESC) AS rank "
                       "FROM fg1 a "
                       "join fg2 b "
                       "on a.id = b.id_2 "
                       "and a.ts >= b.ts_2 "
                       "join fg3 c "
                       "on a.id = c.id_3 "
                       "and a.ts >= c.ts_3) "
                   "select * from x where rank = 1");

In [29]:
result.show(50)

+---+---+-----+----+---+----+------+----+
| id| ts|label|ts_2| f2|ts_3|    f3|rank|
+---+---+-----+----+---+----+------+----+
|  1|  4|   1z|   4| 1z|   1|f3-1-1|   1|
|  1|  5|   1x|   5| 1x|   1|f3-1-1|   1|
|  1|  7|   1y|   7| 1y|   6|f3-1-6|   1|
|  2|  6|   2x|   6| 2x|   2|f3-2-2|   1|
|  2|  8|   2y|   8| 2y|   8|f3-2-8|   1|
+---+---+-----+----+---+----+------+----+

In [None]:
result.explain(True)

In [None]:
result2 = spark.sql("with "
                   "x1 ("
                       "select * from ("
                           "SELECT id, ts, label, ts_2, f2, RANK() OVER (PARTITION BY id, ts ORDER BY ts_2 DESC) AS rank "
                           "FROM fg1 a "
                           "join fg2 b "
                           "on a.id = b.id_2 "
                           "and a.ts >= b.ts_2) "
                       "where rank = 1), "
                   "x2 ("
                       "select * from ("
                           "SELECT id, ts, label, ts_3, f3, RANK() OVER (PARTITION BY id, ts ORDER BY ts_3 DESC) AS rank "
                           "FROM fg1 a "
                           "join fg3 b "
                           "on a.id = b.id_3 "
                           "and a.ts >= b.ts_3) "
                       "where rank = 1) "
                   "select * from x1 "
                   "join x2 "
                   "on x1.id = x2.id "
                   "and x1.ts = x2.ts");

In [None]:
result2.show(50)

In [None]:
result2.explain(True)

## Real Data

In [None]:
from pyspark.sql.functions import unix_timestamp, from_unixtime, col
from hops import hdfs as hdfs

In [None]:
fg01_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("label", StringType(), True)    
])

fg01=spark.read.csv("hdfs:///Projects/" + hdfs.project_name() + "/Jupyter/PIT-joins/example-data/5-10-2-out.csv", header=True, schema=fg01_schema)
#fg01=fg01.sort(col("id"),col("ts")).withColumn("ts", from_unixtime("ts").cast("timestamp"))
fg01=fg01.sort(col("id"),col("ts"))
fg01.show()

In [None]:
fg02_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("f2", StringType(), True)    
])

fg02=spark.read.csv("hdfs:///Projects/" + hdfs.project_name() + "/Jupyter/PIT-joins/example-data/5-10-3-out.csv", header=True, schema=fg02_schema)
#fg02=fg02.select([col("id").alias("id_2"), col("ts").alias("ts_2"), col("f2")]).withColumn("ts_2", from_unixtime("ts_2").cast("timestamp"))
fg02=fg02.select([col("id").alias("id_2"), col("ts").alias("ts_2"), col("f2")])
fg02.show()

In [None]:
fg03_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("f3", StringType(), True)    
])

fg03=spark.read.csv("hdfs:///Projects/" + hdfs.project_name() + "/Jupyter/PIT-joins/example-data/5-10-3-out.csv", header=True, schema=fg03_schema)
#fg03=fg03.select([col("id").alias("id_3"), col("ts").alias("ts_3"), col("f3")]).withColumn("ts_3", from_unixtime("ts_3").cast("timestamp"))
fg03=fg03.select([col("id").alias("id_3"), col("ts").alias("ts_3"), col("f3")])
fg03.show()

In [None]:
fg01.registerTempTable("fg01")
fg02.registerTempTable("fg02")
fg03.registerTempTable("fg03")

In [None]:
result = spark.sql("with x as ("
                       "SELECT id, ts, label, ts_2, f2, ts_3, f3, RANK() OVER (PARTITION BY id, ts ORDER BY (ts_2, ts_3) DESC) AS rank "
                       "FROM fg01 a "
                       "join fg02 b "
                       "on a.id = b.id_2 "
                       "and a.ts >= b.ts_2 "
                       "join fg03 c "
                       "on a.id = c.id_3 "
                       "and a.ts >= c.ts_3) "
                   "select * from x where rank = 1");

In [None]:
result.sort(col("id"),col("ts")).show(50)

In [None]:
spark.sql(
                       "SELECT id, ts, label, ts_2, f2, ts_3, f3, RANK() OVER (PARTITION BY id, ts ORDER BY (ts_2, ts_3) DESC) AS rank "
                       "FROM fg01 a "
                       "join fg02 b "
                       "on a.id = b.id_2 "
                       "and a.ts >= b.ts_2 "
                       "join fg03 c "
                       "on a.id = c.id_3 "
                       "and a.ts >= c.ts_3) "
).show(50)

In [None]:
result.explain(True)

In [None]:
result2 = spark.sql("with "
                   "x1 as ("
                       "select * from ("
                           "SELECT id, ts, label, ts_2, f2, RANK() OVER (PARTITION BY id, ts ORDER BY ts_2 DESC) AS rank "
                           "FROM fg01 a "
                           "join fg02 b "
                           "on a.id = b.id_2 "
                           "and a.ts >= b.ts_2) "
                       "where rank = 1), "
                   "x2 as ("
                       "select * from ("
                           "SELECT id, ts, label, ts_3, f3, RANK() OVER (PARTITION BY id, ts ORDER BY ts_3 DESC) AS rank "
                           "FROM fg01 a "
                           "join fg03 b "
                           "on a.id = b.id_3 "
                           "and a.ts >= b.ts_3) "
                       "where rank = 1) "
                   "select x1.id, x1.ts, x1.label, x1.f2, x2.f3 from x1 "
                   "join x2 "
                   "on x1.id = x2.id "
                   "and x1.ts = x2.ts");

In [None]:
result2.sort(col("id"),col("ts")).show(50)

In [None]:
result2.explain(True)

## 1 mio rows

In [1]:
fg01_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("label", StringType(), True)    
])

fg01=spark.read.csv("hdfs:///Projects/" + hdfs.project_name() + "/Jupyter/PIT-joins/example-data/100000-20-1-out.csv", header=True, schema=fg01_schema)
fg01=fg01.sort(col("id"),col("ts")).withColumn("ts", from_unixtime("ts").cast("timestamp"))
fg01.show()

Starting Spark application


ID,Application ID,Kind,State,Spark UI,Driver log
7,application_1642582607798_0006,pyspark,idle,Link,Link


SparkSession available as 'spark'.


An error was encountered:
name 'StructType' is not defined
Traceback (most recent call last):
NameError: name 'StructType' is not defined



In [None]:
fg02_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("f2", StringType(), True)    
])

fg02=spark.read.csv("hdfs:///Projects/" + hdfs.project_name() + "/Jupyter/PIT-joins/example-data/100000-20-2-out.csv", header=True, schema=fg02_schema)
fg02=fg02.select([col("id").alias("id_2"), col("ts").alias("ts_2"), col("f2")]).withColumn("ts_2", from_unixtime("ts_2").cast("timestamp"))
fg02.show()

In [None]:
fg03_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("f3", StringType(), True)    
])

fg03=spark.read.csv("hdfs:///Projects/" + hdfs.project_name() + "/Jupyter/PIT-joins/example-data/100000-20-2-out.csv", header=True, schema=fg03_schema)
fg03=fg03.select([col("id").alias("id_3"), col("ts").alias("ts_3"), col("f3")]).withColumn("ts_3", from_unixtime("ts_3").cast("timestamp"))
fg03.show()

In [None]:
fg01.registerTempTable("fg01")
fg02.registerTempTable("fg02")
fg03.registerTempTable("fg03")

In [None]:
result = spark.sql("with x as ("
                       "SELECT id, ts, label, ts_2, f2, ts_3, f3, ROW_NUMBER() OVER (PARTITION BY id, ts ORDER BY (ts_2, ts_3) DESC) AS rank "
                       "FROM fg01 a "
                       "join fg02 b "
                       "on a.id = b.id_2 "
                       "and a.ts >= b.ts_2 "
                       "join fg03 c "
                       "on a.id = c.id_3 "
                       "and a.ts >= c.ts_3) "
                   "select * from x where rank = 1");

In [None]:
result.show(10)

In [None]:
result.explain(True)

In [None]:
result2 = spark.sql("with "
                   "x1 as ("
                       "select * from ("
                           "SELECT id, ts, label, ts_2, f2, RANK() OVER (PARTITION BY id, ts ORDER BY ts_2 DESC) AS rank "
                           "FROM fg01 a "
                           "join fg02 b "
                           "on a.id = b.id_2 "
                           "and a.ts >= b.ts_2) "
                       "where rank = 1), "
                   "x2 as ("
                       "select * from ("
                           "SELECT id, ts, label, ts_3, f3, RANK() OVER (PARTITION BY id, ts ORDER BY ts_3 DESC) AS rank "
                           "FROM fg01 a "
                           "join fg03 b "
                           "on a.id = b.id_3 "
                           "and a.ts >= b.ts_3) "
                       "where rank = 1) "
                   "select c.id, c.ts, c.label, c.ts_2, c.f2, d.ts_3, d.f3, d.rank from x1 c "
                   "join x2 d "
                   "on c.id = d.id "
                   "and c.ts = d.ts");

In [None]:
result2.explain(True)

In [None]:
result.write.format("parquet").save("hdfs:///Projects/demo_fs_meb10000/Resources/option1", mode="overwrite")

In [None]:
result2.write.format("parquet").save("hdfs:///Projects/demo_fs_meb10000/Resources/option2", mode="overwrite")

# Real Feature Group PIT Join

In [None]:
from hops import hdfs
import hsfs
conn = hsfs.connection()
fs = conn.get_feature_store()

In [None]:
fg01_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("label", StringType(), True)    
])

fg01=spark.read.csv("hdfs:///Projects/" + hdfs.project_name() + "/Jupyter/PIT-joins/example-data/5-10-2-out.csv", header=True, schema=fg01_schema)
#fg01=fg01.sort(col("id"),col("ts")).withColumn("ts", from_unixtime("ts").cast("timestamp"))
fg01=fg01.sort(col("id"),col("ts")).withColumn("ts", col("ts").cast(LongType())).withColumn("id2", col("id") + 10000)

In [None]:
fg02_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("f2", StringType(), True)    
])

fg02=spark.read.csv("hdfs:///Projects/" + hdfs.project_name() + "/Jupyter/PIT-joins/example-data/5-10-3-out.csv", header=True, schema=fg02_schema)
#fg02=fg02.select([col("id").alias("id_2"), col("ts").alias("ts_2"), col("f2")]).withColumn("ts_2", from_unixtime("ts_2").cast("timestamp"))
fg02=fg02.select([col("id"), col("ts"), col("f2")]).withColumn("ts2", col("ts").cast(LongType())).select([col("id"), col("ts2"), col("f2")])

In [None]:
fg02.printSchema()

In [None]:
fg03_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("f3", StringType(), True)    
])

fg03=spark.read.csv("hdfs:///Projects/" + hdfs.project_name() + "/Jupyter/PIT-joins/example-data/5-10-3-out.csv", header=True, schema=fg03_schema)
#fg03=fg03.select([col("id").alias("id_3"), col("ts").alias("ts_3"), col("f3")]).withColumn("ts_3", from_unixtime("ts_3").cast("timestamp"))
fg03=fg03.select([col("id"), col("ts"), col("f3")]).withColumn("id2", col("id") + 10000).withColumn("ts", col("ts").cast(LongType()))

In [None]:
fg1 = fs.create_feature_group("fg1", statistics_config=False, online_enabled=True, primary_key=["id"], event_time="ts")

fg1.save(fg01)

In [None]:
fg = fs.get_feature_group("players_features")

In [None]:
query = fg1.select_all().join(fg.select_all(), left_on=["id"], right_on=["team_id"])

In [None]:
print(query.to_string())

In [None]:
fg2 = fs.create_feature_group("fg2", statistics_config=False, online_enabled=True, primary_key=["id"], event_time="ts2")

fg2.save(fg02)

In [None]:
fg3 = fs.create_feature_group("fg3", statistics_config=False, online_enabled=True, primary_key=["id", "id2"], event_time="ts")

fg3.save(fg03)

In [None]:
fg1 = fs.get_feature_group("fg1")
fg2 = fs.get_feature_group("fg2")
fg3 = fs.get_feature_group("fg3")

In [None]:
query = fg1.select_all().join(fg2.select_all(), on="id").join(fg3.select_all(), left_on=["id", "id2"], right_on=["id", "id2"])

In [None]:
print(query.to_string())

In [None]:
print(query.to_string(online=True))

In [None]:
query.read(online=True).sort(col("id"),col("ts")).show(50)

In [None]:
query.read().sort(col("id"),col("ts")).show(50)

In [None]:
query = fg1.select_all().join(fg2.select_all(), on="id")

In [None]:
print(query.to_string())

In [None]:
print(query.to_string(online=True))

In [None]:
query.read(online=True).sort(col("id"),col("ts")).show(50)

In [None]:
query.read().sort(col("id"),col("ts")).show(50)

In [None]:
fg1_nonhudi = fs.create_feature_group("fg1_nonhudi", statistics_config=False, online_enabled=True, primary_key=["id"], event_time="ts", time_travel_format=None)

fg1_nonhudi.save(fg01)

In [None]:
fg2_nonhudi = fs.create_feature_group("fg2_nonhudi", statistics_config=False, online_enabled=True, primary_key=["id"], event_time="ts2", time_travel_format=None)

fg2_nonhudi.save(fg02)

In [None]:
fg3_nonhudi = fs.create_feature_group("fg3_nonhudi", statistics_config=False, online_enabled=True, primary_key=["id", "id2"], event_time="ts", time_travel_format=None)

fg3_nonhudi.save(fg03)

In [None]:
fg1_nonhudi = fs.get_feature_group("fg1_nonhudi")
fg2_nonhudi = fs.get_feature_group("fg2_nonhudi")
fg3_nonhudi = fs.get_feature_group("fg3_nonhudi")

In [None]:
query = fg1_nonhudi.select_all().join(fg2_nonhudi.select_all(), on="id").join(fg3_nonhudi.select_all(), left_on=["id", "id2"], right_on=["id", "id2"])

In [None]:
print(query.to_string())

In [None]:
print(query.to_string(online=True))

In [None]:
query.read().sort(col("id"),col("ts")).show(50)

In [None]:
query.read(online=True).sort(col("id"),col("ts")).show(50)

In [None]:
query = fg1.select_all().join(fg2_nonhudi.select_all(), on="id").join(fg3_nonhudi.select_all(), left_on=["id", "id2"], right_on=["id", "id2"])

In [None]:
print(query.to_string())

In [None]:
query.read().sort(col("id"),col("ts")).show(50)

In [None]:
query.read(online=True).sort(col("id"),col("ts")).show(50)

In [None]:
ondmd = fs.get_on_demand_feature_group("season_features_on_demand")
teams = fs.get_feature_group("teams_features")

In [None]:
query = fg1.select_all().join(ondmd.select_all(), left_on=["id"], right_on=["team_id"])

In [None]:
print(query.to_string())

In [None]:
query.read().sort(col("id"),col("ts")).show(50)

In [None]:
sc = fs.get_storage_connector("demo_fs_meb10000_meb10000_onlinefeaturestore")

In [None]:
fg3_ondmd = fs.create_on_demand_feature_group("fg3_ondmd", sc, statistics_config=False, event_time="ts", query="select * from fg3_1")

fg3_ondmd.save()

In [None]:
fg3_ondmd.show(50)

In [None]:
fg3_ondmd = fs.get_on_demand_feature_group("fg3_ondmd")

In [None]:
query = fg1.select_all().join(fg2_nonhudi.select_all(), on="id").join(fg3_ondmd.select_all(), left_on=["id", "id2"], right_on=["id", "id2"])

In [None]:
print(query.to_string())

In [None]:
query.read().sort(col("id"),col("ts")).show(50)

In [None]:
query.to_string(online=True)

In [None]:
query.read(online=True).sort(col("id"),col("ts")).show(50)

## PIT with filters

In [None]:
fg1 = fs.get_feature_group("fg1")
fg2 = fs.get_feature_group("fg2")
fg3 = fs.get_feature_group("fg3")

In [None]:
query = fg1.select_all().filter(fg1["id"] == 1).join(fg2.select_all().filter(fg2["id"] == 1), on="id").join(fg3.select_all(), left_on=["id", "id2"], right_on=["id", "id2"])

In [None]:
print(query.to_string())

In [None]:
query.show(50)

## Features with default values

In [None]:
from hsfs.feature import Feature
from pyspark.sql.functions import lit

In [None]:
fg1_default = fs.create_feature_group("fg1_default", statistics_config=False, online_enabled=True, primary_key=["id"], event_time="ts")

fg1_default.save(fg01.select(["id", "label", "ts"]))

In [None]:
fg1_default_nonhudi = fs.create_feature_group("fg1_default_nonhudi", statistics_config=False, online_enabled=True, primary_key=["id"], event_time="ts", time_travel_format=None)

fg1_default_nonhudi.save(fg01.select(["id", "label", "ts"]))

In [None]:
fg1_default.append_features([Feature("id2", type="int", default_value="10")])

In [None]:
fg1_default_nonhudi.append_features([Feature("id2", type="int", default_value="10")])

In [None]:
fg1_default = fs.get_feature_group("fg1_default")
fg1_default_nonhudi = fs.get_feature_group("fg1_default_nonhudi")

In [None]:
fg1_default.insert(fg01, storage="offline")

In [None]:
fg1_default.select_all().show(50)

In [None]:
fg1_default.select_all().to_string()

In [None]:
query = fg1_default.select_all().join(fg2.select_all(), on="id").join(fg3.select_all(), left_on=["id", "id2"], right_on=["id", "id2"])

In [None]:
print(query.to_string())

In [None]:
query.read().show(50)

In [None]:
sc = fs.get_storage_connector("demo_fs_meb10000_Training_Datasets")

In [None]:
td = fs.create_training_dataset("time_travel", statistics_config=False, data_format="csv", storage_connector=sc)

In [None]:
td.save(query)

In [None]:
td = fs.get_training_dataset("time_travel")

In [None]:
td.query

In [None]:
print(td.get_query(False))

In [None]:
query = fg1_default.select_all().join(fg2.select_all(), on="id").join(fg3.select_all(), left_on=["id", "id2"], right_on=["id", "id2"], prefix="right_")

In [None]:
print(query.to_string())

In [None]:
td = fs.create_training_dataset("time_travel_prefix", statistics_config=False, data_format="csv", storage_connector=sc)

In [None]:
td.save(query)

In [None]:
query.read().show(50)

In [None]:
print(td.get_query(False))

In [None]:
td.query

In [None]:
print(query.as_of("20210706150910").to_string())

In [None]:
fg1_default.commit_details()

In [None]:
td = fs.create_training_dataset("time_travel_pit", statistics_config=False, data_format="csv", storage_connector=sc)

In [None]:
td.save(query.as_of("20210706150910"))

In [None]:
fg = fs.get_feature_group("games_features_hudi_tour")

In [None]:
fg.read().show(30)