In [1]:
from bin.config import *
from bin.consumer import Consumer

In [2]:
spark_session: SparkSession = (
    SparkSession.builder.appName("Consumer data for Visualizing")  # type: ignore
    .config("spark.jars.packages", ",".join(SPARK_PACKAGES))
    .getOrCreate()
)

In [3]:
capture_con: Consumer = Consumer(
    topic=CAPTURE_TOPIC,
    schema_list=CAPTURE_SCHEMA_LIST,
    spark_session=spark_session,
)

segment_con: Consumer = Consumer(
    topic=SEGMENT_TOPIC,
    schema_list=SEGMENT_SCHEMA_LIST,
    spark_session=spark_session,
)

prediction_con: Consumer = Consumer(
    topic=PREDICTION_TOPIC,
    schema_list=PREDICTION_SCHEMA_LIST,
    spark_session=spark_session,
)

In [4]:
BATCH_FILE_NAME: str = f"{BATCH_FOLDER}batch.csv"
CACHE_FILE_NAME: str = f"{BATCH_FOLDER}cache.csv"
PREDICTION_FILE_NAME: str = f"{BATCH_FOLDER}prediction.csv"

In [5]:
def store_dataframe_to_csv(df: DataFrame, path: str, **kwargs) -> None:
    return df.toPandas().to_csv(path, **kwargs)

In [None]:
# def handle_batch_event_table(event_df: DataFrame, epoch_id: int) -> None:
#     result: DataFrame = event_df.join(segment_table, on="SegmentID", how="inner")
#     store_dataframe_to_csv(result, BATCH_NAME_FILE, header=True, index=False)
    
#     result = (
#         result.groupBy("Boro", "Timestamp")
#         .agg(F.sum(F.col("Vol")), F.count(F.col("Vol")))
#         .withColumn("Avg(Vol)", F.col("Sum(Vol)") / F.col("Count(Vol)"))
#         .drop("Sum(Vol)", "Count(Vol)")
#     )

#     store_dataframe_to_csv(
#         result, CACHE_NAME_FILE, header=False, index=False, mode="a"
#     )

In [None]:
# streaming_capture_df: DataFrame = capture_con.get_streaming_df()
# streaming_capture_query: StreamingQuery = capture_con.handle_batch_streaming_with_callable(
#     streaming_df, handle_batch_capture_table
# )



# print("Streaming query is running...")

# try:
#     streaming_query.awaitTermination()
# except KeyboardInterrupt:
#     print("Stopping streaming query...")
#     streaming_query.stop()
#     print("Streaming query stopped.")

In [8]:
segment_df: DataFrame = segment_con.get_history_df()
capture_history_df: DataFrame = capture_con.get_history_df()
prediction_df: DataFrame = prediction_con.get_history_df()

In [None]:
# def callable_handle_batch(prediction_df: DataFrame, epoch_id: int) -> None:
#     full_prediction_df: DataFrame = (
#         prediction_df.join(segment_df, on="SegmentID", how="inner")
#         .groupBy("Boro", "Timestamp")
#         .agg(F.mean("prediction_vol").alias("Vol"))
#         .orderBy("Timestamp", "Boro")
#     )
    
#     store_dataframe_to_csv(
#         full_prediction_df, 
#         path=PREDICTION_FILE_NAME,
#         header=True,
#         index=False,
#         mode="w+"
#     )

In [None]:
# prediction_query.stop()

# prediction_streaming_df: DataFrame = prediction_con.get_streaming_df()
# prediction_query: StreamingQuery = prediction_con.handle_batch_streaming_with_callable(
#     prediction_streaming_df, callable_handle_batch
# )

In [13]:
from pyspark.sql.window import Window
window_spec = Window.orderBy(F.col("prediction_ds").desc())

In [None]:
while True:    
    point_start_time: datetime = datetime.now()
    
    current_timestamp: datetime = capture_history_df.agg(F.max("Timestamp")).first()[0]
    print(current_timestamp)

    minable_timestamp: datetime = current_timestamp - timedelta(hours=EXPIRE_TIME)
    print(minable_timestamp)
    
    cache_history_df: DataFrame = capture_history_df.filter(F.col("Timestamp") > minable_timestamp)
    batch_history_df: DataFrame = capture_history_df.filter(F.col("Timestamp") == current_timestamp)
    cache_prediction_df: DataFrame = (
        prediction_df.withColumn("max_prediction_ds", F.max("prediction_ds").over(window_spec))
        .select("SegmentID", "Timestamp", "Direction", "prediction_vol", "max_prediction_ds", "prediction_ds")
        .filter(F.col("Timestamp") > current_timestamp)
        .filter(F.col("prediction_ds") == F.col("max_prediction_ds"))
        .drop("prediction_ds", "max_prediction_ds")
    )
    full_batch_history_df: DataFrame = (
        batch_history_df.join(segment_df, on=["SegmentID"], how="inner")
        .groupBy("Timestamp", "Long", "Lat")
        .agg(F.sum("Vol"))
    )
    store_dataframe_to_csv(
        full_batch_history_df,
        path=BATCH_FILE_NAME,
        header=True,
        index=False,
        mode="w+"
    )
    
    full_cache_history_df: DataFrame = (
        cache_history_df.join(segment_df, on=["SegmentID"], how="inner")
        .groupBy("Boro", "Timestamp")
        .agg(F.mean("Vol").alias("Vol"))
        .orderBy("Timestamp", "Boro")
    )
    store_dataframe_to_csv(
        full_cache_history_df,
        path=CACHE_FILE_NAME,
        header=True,
        index=False,
        mode="w+"
    )

    full_cache_prediction_df: DataFrame = (
        cache_prediction_df.join(segment_df, on=["SegmentID"], how="inner")
        .groupBy("Boro", "Timestamp")
        .agg(F.mean("prediction_vol").alias("Vol"))
        .orderBy("Timestamp", "Boro")
    )
    store_dataframe_to_csv(
        full_cache_prediction_df,
        path=PREDICTION_FILE_NAME,
        header=True,
        index=False,
        mode="w+"
    )

    timestamp = (datetime.now() - point_start_time).total_seconds()
    print(timestamp)
    time_to_sleep: float = max(DELAY - timestamp, 0)
    sleep(time_to_sleep)
    

2017-06-20 06:45:00
2017-06-20 03:45:00
11.51273
2017-06-20 07:00:00
2017-06-20 04:00:00
6.155432
2017-06-20 07:15:00
2017-06-20 04:15:00
5.409148
2017-06-20 07:30:00
2017-06-20 04:30:00
5.385054
2017-06-20 07:45:00
2017-06-20 04:45:00
5.243362
2017-06-20 08:00:00
2017-06-20 05:00:00
5.056513
2017-06-20 08:15:00
2017-06-20 05:15:00
5.084335
2017-06-20 08:30:00
2017-06-20 05:30:00
4.988272
2017-06-20 08:45:00
2017-06-20 05:45:00
5.070641
2017-06-20 09:00:00
2017-06-20 06:00:00
4.79811
2017-06-20 09:15:00
2017-06-20 06:15:00
5.517992
2017-06-20 09:30:00
2017-06-20 06:30:00
4.780598
2017-06-20 09:45:00
2017-06-20 06:45:00
4.964492
2017-06-20 10:00:00
2017-06-20 07:00:00
4.942403
2017-06-20 10:15:00
2017-06-20 07:15:00
4.716591
