In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, sequence, concat_ws, lit, rand, col
from pyspark.sql.types import DateType

spark = SparkSession.builder.appName("AltBaseGen").getOrCreate()

user_ids = spark.sql("SELECT explode(sequence(1, 5000)) as user_id")
event_dates = spark.sql("SELECT explode(sequence(DATE'2024-01-01', DATE'2024-03-31', INTERVAL 1 DAY)) as event_date")

event_types = ["login", "logout", "purchase", "browse", "error"]
event_type_df = spark.createDataFrame([(e,) for e in event_types], ["event_type"])

df_events = user_ids.crossJoin(event_dates).crossJoin(event_type_df)

df_events = df_events.withColumn("metric", (rand() * 100).cast("int"))

df_events_a = df_events.withColumnRenamed("user_id", "user_id_a")
df_events_b = df_events.withColumnRenamed("user_id", "user_id_b")

df_events.printSchema()
df_events.show(10)



root
 |-- user_id: integer (nullable = false)
 |-- event_date: date (nullable = false)
 |-- event_type: string (nullable = true)
 |-- metric: integer (nullable = true)

+-------+----------+----------+------+
|user_id|event_date|event_type|metric|
+-------+----------+----------+------+
|      1|2024-01-01|     login|     8|
|      1|2024-01-02|     login|    74|
|      1|2024-01-03|     login|    80|
|      1|2024-01-04|     login|    34|
|      1|2024-01-05|     login|    52|
|      1|2024-01-06|     login|    17|
|      1|2024-01-07|     login|    43|
|      1|2024-01-08|     login|    50|
|      1|2024-01-09|     login|    87|
|      1|2024-01-10|     login|    32|
+-------+----------+----------+------+
only showing top 10 rows



In [0]:
df_joined = df_events_a.join(df_events_b, df_events_a.event_date == df_events_b.event_date, "inner")
df_joined.show(10)



+---------+----------+----------+------+---------+----------+----------+------+
|user_id_a|event_date|event_type|metric|user_id_b|event_date|event_type|metric|
+---------+----------+----------+------+---------+----------+----------+------+
|        1|2024-01-02|     login|    74|        1|2024-01-02|     login|    74|
|        1|2024-01-02|     login|    74|        2|2024-01-02|     login|     6|
|        1|2024-01-02|     login|    74|        3|2024-01-02|     login|    53|
|        1|2024-01-02|     login|    74|        4|2024-01-02|     login|    17|
|        1|2024-01-02|     login|    74|        5|2024-01-02|     login|     5|
|        1|2024-01-02|     login|    74|        6|2024-01-02|     login|    25|
|        1|2024-01-02|     login|    74|        7|2024-01-02|     login|    35|
|        1|2024-01-02|     login|    74|        8|2024-01-02|     login|    82|
|        1|2024-01-02|     login|    74|        9|2024-01-02|     login|    81|
|        1|2024-01-02|     login|    74|

In [0]:
from pyspark.sql.functions import col

# Aliasowanie DataFrame'ów
dfa = df_events_a.alias("a")
dfb = df_events_b.alias("b")

# LEFT JOIN z aliasami
df_left = dfa.join(
    dfb,
    (col("a.user_id_a") == col("b.user_id_b")) & 
    (col("a.event_date") == col("b.event_date")),
    "left"
)

df_left_clean = df_left.select(
    col("a.user_id_a").alias("user_id"),
    col("a.event_date"),
    col("a.event_type").alias("event_type_a"),
    col("a.metric").alias("metric_a"),        
    col("b.event_type").alias("event_type_b"),  
    col("b.metric").alias("metric_b")          
)

display(df_left_clean.limit(10))

# Liczenie liczby wierszy
df_left_clean_count = df_left_clean.count()
print(f"Count of rows in df_left_clean: {df_left_clean_count}")


user_id,event_date,event_type_a,metric_a,event_type_b,metric_b
1,2024-01-08,login,50,login,50
1,2024-01-08,login,50,logout,69
1,2024-01-08,login,50,purchase,16
1,2024-01-08,login,50,browse,98
1,2024-01-08,login,50,error,27
1,2024-01-08,logout,69,login,50
1,2024-01-08,logout,69,logout,69
1,2024-01-08,logout,69,purchase,16
1,2024-01-08,logout,69,browse,98
1,2024-01-08,logout,69,error,27


Count of rows in df_left_clean: 11375000
