In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, BooleanType
from pyspark.sql.functions import col, lit, sum, window, udf
import time

# create SparkSession
spark = SparkSession.builder.appName("myApp").getOrCreate()

# define the schema for the DataFrame
schema = StructType([
    StructField("team", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("text", StringType(), True),
    StructField("location", StringType(), True),
    StructField("likes", IntegerType(), True),
    StructField("sentiment", IntegerType(), True)
])

#use gs link
csvpath = "gs://dataproc-staging-us-central1-300388429198-ocbfs3ky/inputdata/"

# read CSV file into streaming DataFrame with specified schema
df = (
    spark.readStream
    .schema(schema)
    .option("header", "false")
    .csv(csvpath)
)

# Perform data preprocessing steps on streaming_df (similar to the original code)

# define a user-defined function to check if a string is in English
def is_english(text):
    if text is None:
        return False
    try:
        text.encode('ascii')
    except UnicodeEncodeError:
        return False
    else:
        return True

# register the UDF
is_english_udf = udf(is_english, BooleanType())

# filter out non-English text
df_filtered = df.filter(is_english_udf(df["text"]))

# For aggregating data over a specified window of time, you can use the window function
# Here, we use a 1-hour window, sliding every 30 minutes
window_duration = "1 hour"
slide_duration = "30 minutes"

likes_by_location_windowed = (
    df_filtered
    .groupBy(
        window(col("timestamp"), window_duration, slide_duration),
        col("location")
    )
    .agg(sum("likes").alias("total_likes"))
)

# Start the streaming query and continuously output results to the console
query = (
    likes_by_location_windowed
    .writeStream
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .trigger(processingTime="30 seconds")
    .start()
)

query.awaitTermination()

23/04/30 03:12:13 WARN org.apache.spark.sql.streaming.StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6267ea50-32b1-4571-b278-d7270742c9a6. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/30 03:12:13 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------------------------------------------------------------------------+-----------+
|window                                    |location                                                                      |total_likes|
+------------------------------------------+------------------------------------------------------------------------------+-----------+
|{2023-04-17 17:00:00, 2023-04-17 18:00:00}| so we'll monitor him                                                         |null       |
|{2023-04-20 11:00:00, 2023-04-20 12:00:00}|No location available.                                                        |25         |
|{2023-04-12 23:30:00, 2023-04-13 00:30:00}|null                                                                          |null       |
|{2023-04-21 22:30:00, 2023-04-21 23:30:00}|null                                       

KeyboardInterrupt: 