In [None]:
%pip install --upgrade pip --quiet
%pip install pyspark --quiet
%pip install -U -q PyDrive --quiet
%pip install numpy pandas --quiet

In [None]:
SCALA_VERSION = "2.12"
KAFKA_VERSION = "3.7.0"
PYSPARK_SCALA_VERSION = "2.12"
SPARK_VERSION = "3.5.1"

%env SCALA_VERSION=$SCALA_VERSION
%env KAFKA_VERSION=$KAFKA_VERSION
%env PYSPARK_SCALA_VERSION=$PYSPARK_SCALA_VERSION
%env SPARK_VERSION=$SPARK_VERSION

In [None]:
!bash start-kafka.sh
!tail -n 100 kafka/logs/server.log | grep -i "Kafka Server started"

In [None]:
TOPIC1 = "topic1"
TOPIC2 = "topic2"
BOOTSTRAP_SERVER = "127.0.0.1:9092"

In [None]:
!bash create-topics.sh {BOOTSTRAP_SERVER} {TOPIC1} {TOPIC2}
!rm -rf generator.log

In [None]:
import subprocess

command = [
    "python3", "generator.py",
    "--topic1", TOPIC1,
    "--topic2", TOPIC2,
    "--bootstrap_server", BOOTSTRAP_SERVER
]

GENERATOR = subprocess.Popen(command)
print(f"GENERATOR PID: {GENERATOR.pid}")

In [None]:
from pyspark.sql import SparkSession

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{PYSPARK_SCALA_VERSION}:{SPARK_VERSION}',
    f'org.apache.kafka:kafka-clients:{KAFKA_VERSION}'
]

spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.jars.packages", ",".join(packages)) \
    .appName("PDD task-2") \
    .getOrCreate()

In [None]:
df = spark.readStream.format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
  .option("subscribe", f"{TOPIC1},{TOPIC2}") \
  .option("startingOffsets", "earliest") \
  .load()

In [None]:
from pyspark.sql.types import *
from pyspark.sql.streaming.state import GroupState
import pandas as pd
import random

SAMPLE_INTERVAL = 100_000
SAMPLE_COUNT = 20
SAMPLE_SIZE = 1_000
LAMBDA = 1e-5
BRS_INSERTION_PROBABILITY = SAMPLE_SIZE * LAMBDA


DATA_SCHEMA = StructType([
    StructField("time_point", IntegerType(), True),
    StructField("value", FloatType(), True)
])

OUTPUT_SCHEMA = StructType([
    StructField("RS_time_point", ArrayType(IntegerType()), False),
    StructField("RS_value", ArrayType(FloatType()), False),
    StructField("BRS_time_point", ArrayType(IntegerType()), False),
    StructField("BRS_value", ArrayType(FloatType()), False),
    StructField("topic", StringType(), False)
])

STATE_SCHEMA = StructType([
    StructField("RS_time_point", ArrayType(IntegerType()), False),
    StructField("RS_value", ArrayType(FloatType()), False),
    StructField("BRS_time_point", ArrayType(IntegerType()), False),
    StructField("BRS_value", ArrayType(FloatType()), False),
    StructField("next_yield", IntegerType(), False)
])

def updateState(_, pdf_iter, state: GroupState):
    (rs_tp, rs_v, brs_tp, brs_v, next_yield_tp) = state.get if state.exists else ([], [], [], [], SAMPLE_INTERVAL)

    for pdf in pdf_iter:
        pdf = pdf.sort_values(by="time_point")
        for _, row in pdf.iterrows():
            row_tp = row["time_point"]
            row_v = row["value"]
            # RS
            if len(rs_tp) < SAMPLE_SIZE:
                rs_tp.append(row_tp)
                rs_v.append(row_v)
            else:
                pos = random.randrange(row_tp)
                if pos < SAMPLE_SIZE:
                    rs_tp[pos] = row_tp
                    rs_v[pos] = row_v
            # BRS
            if random.random() < BRS_INSERTION_PROBABILITY:
                if random.random() < len(brs_tp) / SAMPLE_SIZE:
                    pos = random.randrange(len(brs_tp))
                    brs_tp[pos] = row_tp
                    brs_v[pos] = row_v
                else:
                    brs_tp.append(row_tp)
                    brs_v.append(row_v)
            # YIELD
            if row_tp > next_yield_tp:
                print(f"Topic {row['topic']}: passed {next_yield_tp} timepoint", end="\033[K\n", flush=True)
                next_yield_tp += SAMPLE_INTERVAL
                yield pd.DataFrame({
                    "RS_time_point": [rs_tp.copy()],
                    "RS_value": [rs_v.copy()],
                    "BRS_time_point": [brs_tp.copy()],
                    "BRS_value": [brs_v.copy()],
                    "topic": row["topic"]
                })

    state.update((rs_tp, rs_v, brs_tp, brs_v, next_yield_tp))

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.streaming.state import GroupStateTimeout

sample_dict = {
    TOPIC1: pd.DataFrame(columns=OUTPUT_SCHEMA.fieldNames()),
    TOPIC2: pd.DataFrame(columns=OUTPUT_SCHEMA.fieldNames())
}

def collect_samples(next_sample, _):
    global sample_dict
    next_sample_pdf = next_sample.toPandas()
    for topic in [TOPIC1, TOPIC2]:
        sample_dict[topic] = pd.concat(
            [sample_dict[topic], next_sample_pdf[next_sample_pdf["topic"] == topic]],
            ignore_index=True
        )

processing = df \
    .withColumn("data", F.from_json(F.col("value").cast("string"), DATA_SCHEMA)) \
    .select(
        F.col("data.time_point").alias("time_point"),
        F.col("data.value").alias("value"),
        F.col("topic")
    ) \
    .groupBy("topic") \
    .applyInPandasWithState(
        updateState,
        outputStructType=OUTPUT_SCHEMA,
        stateStructType=STATE_SCHEMA,
        outputMode="append",
        timeoutConf=GroupStateTimeout.NoTimeout
    ) \
    .writeStream \
    .outputMode("append") \
    .foreachBatch(collect_samples) \
    .start()

# Run until enough samples are collected
print("Collecting samples...")
while processing.isActive:
    processing.awaitTermination(5)
    if all(len(sample_pdf) >= SAMPLE_COUNT for sample_pdf in sample_dict.values()):
        print(f"{SAMPLE_COUNT} samples collected from each topic.")
        break

try:
    # Send stop() signal
    print("Sending stop() signal to the processing...")
    processing.stop()

    # Wait for shutdown
    print("Waiting 60sec for the processing to shutdown...")
    processing.awaitTermination(60)

# Shutdown is messy, to say the least...
except Exception:
    pass

In [None]:
from scipy.stats import kstest

DEPTH_TP = 4

# Trim the samples to have exactly SAMPLE_COUNT
for topic, sample_pdf in sample_dict.items():
    sample_dict[topic] = sample_pdf.drop(columns=["topic"]).head(SAMPLE_COUNT)

print("Kolmogorov-Smirnow test for stream distribution **COMPARISON**\n")
for now_tp in range(SAMPLE_COUNT):
    print(f"now time period: {now_tp * SAMPLE_INTERVAL} - {(now_tp + 1) * SAMPLE_INTERVAL}")
    print(f" RS test result: {kstest(sample_dict[TOPIC1].loc[now_tp,  'RS_value'], sample_dict[TOPIC2].loc[now_tp,  'RS_value'])}")
    print(f"BRS test result: {kstest(sample_dict[TOPIC1].loc[now_tp, 'BRS_value'], sample_dict[TOPIC2].loc[now_tp, 'BRS_value'])}")
    print()

print("\n\n\n")

print("Kolmogorov-Smirnow test for stream distribution **CHANGE**\n")
for now_tp in range(SAMPLE_COUNT):
    for old_tp in reversed(range(max(0, now_tp - DEPTH_TP), now_tp)):
        print(f"old time period: {old_tp * SAMPLE_INTERVAL} - {(old_tp + 1) * SAMPLE_INTERVAL}")
        print(f"now time period: {now_tp * SAMPLE_INTERVAL} - {(now_tp + 1) * SAMPLE_INTERVAL}")
        print(f"Topic: {TOPIC1}")
        print(f"     RS test result: {kstest(sample_dict[TOPIC1].loc[now_tp,  'RS_value'], sample_dict[TOPIC1].loc[old_tp,  'RS_value'])}")
        print(f"    BRS test result: {kstest(sample_dict[TOPIC1].loc[now_tp, 'BRS_value'], sample_dict[TOPIC1].loc[old_tp, 'BRS_value'])}")
        print(f"Topic: {TOPIC2}")
        print(f"     RS test result: {kstest(sample_dict[TOPIC2].loc[now_tp,  'RS_value'], sample_dict[TOPIC2].loc[old_tp,  'RS_value'])}")
        print(f"    BRS test result: {kstest(sample_dict[TOPIC2].loc[now_tp, 'BRS_value'], sample_dict[TOPIC2].loc[old_tp, 'BRS_value'])}")
        print()

In [None]:
for topic, sample_pdf in sample_dict.items():
    print(f"TOPIC: {topic}")
    print(sample_pdf.to_string(max_colwidth=50))
    for id, row in sample_pdf.iterrows():
        print(f"Row {id}:")
        for col in sample_pdf.columns:
            print(f"    Column {col} length: {len(row[col])}")

In [None]:
GENERATOR.terminate()
spark.stop()
!bash kafka/bin/kafka-server-stop.sh