In [1]:
%matplotlib inline 
%config InlineBackend.figure_format = 'retina' 
from IPython.core.interactiveshell import InteractiveShell 
InteractiveShell.ast_node_interactivity = "all"
import sys
import requests
import time
from pyspark.sql import SparkSession, functions as F
#parameter for master can be changed depending on the environment

spark = SparkSession.builder.master("local").appName("Exercise2").getOrCreate()
url = "https://raw.githubusercontent.com/Xiru1024/BigDataExercise/refs/heads/main/exampleData.csv"
local_path = "/home/jovyan/BigDataExercise/exercise2/example.csv"
with open(local_path, "wb") as f:
    f.write(requests.get(url).content)
df = spark.read.csv(local_path, header=True, inferSchema=True) 

In [2]:
df.createOrReplaceTempView("weatherConditions")
df.printSchema()
staticSchema = df.schema


root
 |-- _c0: integer (nullable = true)
 |-- dateTime: string (nullable = true)
 |-- indicator_rain: integer (nullable = true)
 |-- precipitation: string (nullable = true)
 |-- indicator_temp: integer (nullable = true)
 |-- air_temperature: string (nullable = true)
 |-- indicator_wetb: integer (nullable = true)
 |-- wetb: string (nullable = true)
 |-- dewpt: string (nullable = true)
 |-- vappr: string (nullable = true)
 |-- relative_humidity: string (nullable = true)
 |-- msl: string (nullable = true)
 |-- indicator_wdsp: integer (nullable = true)
 |-- wind_speed: string (nullable = true)
 |-- indicator_wddir: integer (nullable = true)
 |-- wind_from_direction: integer (nullable = true)



read streaming

In [3]:
streamingDF = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 2)\
.format("csv")\
.option("header", "true")\
.load("/home/jovyan/BigDataExercise/exercise2/")

In [4]:
streamingDF.isStreaming

True

In [5]:
filteredDF = streamingDF.filter((F.col('air_temperature')<5))

Write Stream to a sink

In [6]:
df = filteredDF.writeStream.format("memory") \
.queryName("weatherQuery") \
.outputMode("append") \
.trigger(once=True)\
.start()

time.sleep(10)
spark.sql("SELECT * FROM weatherQuery").show()

+---+-----------------+--------------+-------------+--------------+---------------+--------------+----+-----+-----+-----------------+------+--------------+----------+---------------+-------------------+
|_c0|         dateTime|indicator_rain|precipitation|indicator_temp|air_temperature|indicator_wetb|wetb|dewpt|vappr|relative_humidity|   msl|indicator_wdsp|wind_speed|indicator_wddir|wind_from_direction|
+---+-----------------+--------------+-------------+--------------+---------------+--------------+----+-----+-----+-----------------+------+--------------+----------+---------------+-------------------+
|298|12-jan-1990 20:00|             3|          0.0|             0|            4.9|             0| 4.3|  3.4|  7.8|               90|1025.5|             2|         1|              2|                210|
|300|12-jan-1990 22:00|             3|          0.0|             0|            3.9|             0| 3.2|  2.1|  7.1|               88|1025.9|             2|         1|              2|      

In [7]:
df.awaitTermination()