# Real-time Weather Prediction System 


### Thành viên nhóm: 
##### 21522232 - Tạ Anh Khoa
##### 21521214 - Đỗ Trọng Nhân
##### 18521440 - Nguyễn Đức Thịnh

In [1]:
import findspark
findspark.init()
import pyspark  
from pyspark.sql import SparkSession  
from pyspark.sql.functions import col
import datetime
from pyspark.sql import functions as f
from pyspark.conf import SparkConf
from sodapy import Socrata
from pyspark.sql.streaming import DataStreamReader
scala_version = '2.12'
spark_version = '3.5.0'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.6.1'
]
spark = SparkSession \
.builder \
.appName("Chronos") \
.master("local") \
.config("spark.executor.memory", "16g") \
.config("spark.driver.memory", "16g") \
.config("spark.python.worker.reuse", "true") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.sql.execution.arrow.maxRecordsPerBatch", "16") \
.config("spark.jars.packages", ",".join(packages)) \
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

conf=SparkConf()

spark

In [2]:
# Import các modules cần thiết
import pandas as pd
import numpy as np
from pyspark.ml.feature import StandardScaler
from bigdl.chronos.forecaster.tcn_forecaster import TCNForecaster
from bigdl.chronos.metric.forecast_metrics import Evaluator
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from bigdl.chronos.data import TSDataset
from sklearn.preprocessing import StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer, StringIndexerModel, IndexToString, PolynomialExpansion
from pymongo import MongoClient

In [3]:
# tập train
train = spark.read.csv('Weather_train.csv', header=True, inferSchema=True)

### Tiền xử lí dữ liệu

##### Đổi tên cột của tập train sao cho giống tập test.

In [4]:
train = train.withColumnRenamed('Air Temperature', 'air_temperature')
train = train.withColumnRenamed('Humidity', 'humidity')
train = train.withColumnRenamed('Rain Intensity', 'rain_intensity')
train = train.withColumnRenamed('Interval Rain', 'interval_rain')
train = train.withColumnRenamed('Total Rain', 'total_rain')
train = train.withColumnRenamed('Barometric Pressure', 'barometric_pressure')
train = train.withColumnRenamed('Measurement Timestamp Label', 'measurement_timestamp_label')
train = train.withColumnRenamed('Measurement Timestamp', 'measurement_timestamp')
train = train.withColumnRenamed('Measurement Timestamp Label', 'measurement_timestamp_label')
train = train.withColumnRenamed('Station Name', 'station_name')
train = train.withColumnRenamed('Wet Bulb Temperature', 'wet_bulb_temperature')
train = train.withColumnRenamed('Precipitation Type', 'precipitation_type')
train = train.withColumnRenamed('Wind Direction', 'wind_direction')
train = train.withColumnRenamed('Wind Speed', 'wind_speed')
train = train.withColumnRenamed('Maximum Wind Speed', 'maximum_wind_speed')
train = train.withColumnRenamed('Solar Radiation', 'solar_radiation')
train = train.withColumnRenamed('Heading', 'heading')
train = train.withColumnRenamed('Battery Life', 'battery_life')
train = train.withColumnRenamed('Measurement ID', 'measurement_id')

##### Hàm thêm một cột để chuyển đổi giờ trong bộ dữ liệu sang giờ quốc tế.

In [5]:
def twelve_to_twentyfour(Hour,AorP):
    hour = ''
    if (AorP == 'AM'):
        if ((int)(Hour) < 10):
            hour = "0"+(Hour)
        elif (Hour == '12'):
            hour = '00'
        else: 
            hour = Hour
    else:
        if (Hour == '12'):
            hour = '12'
        else:
            hour = ((int)(Hour) + 12)
    return hour
Hour_Converter = udf(twelve_to_twentyfour, StringType())

##### Hàm trả về chuỗi thời gian đã được xử lí với giờ đã được chuyển thành giờ quốc tế.

In [6]:
def timestamp_merger(date, month, year, hour):
    return (year) + "-" + (month) + "-" +(date)+ " " + (hour)+":00:00"
timestamp_merger_udf = udf(timestamp_merger, StringType())

##### Hàm trả về một dataframe mới đã được thêm cột timestamp và đã được sắp xếp tăng dần theo cột timestamp vừa thêm.

In [7]:
def timestamp_adder(df):
    copy = df.alias('copy')
    copy = copy.withColumn('Date',f.split(col('measurement_timestamp_label'),' ')[0])
    copy = copy.withColumn('Hour',(f.split(col('measurement_timestamp_label'),' ')[1]))
    copy = copy.withColumn('AMorPM',(f.split(col('measurement_timestamp_label'),' ')[2]))                         
    copy = copy.withColumn('Month',f.split(col('Date'),'/')[0].cast('string'))
    copy = copy.withColumn('Day',f.split(col('Date'),'/')[1].cast('string'))
    copy = copy.withColumn('Year',f.split(col('Date'),'/')[2].cast('string'))
    copy = copy.withColumn('Time', Hour_Converter(f.split(copy.Hour,':')[0], copy.AMorPM).cast('string'))
    copy = copy.withColumn('timestamp',timestamp_merger_udf(copy.Day,copy.Month,copy.Year,copy.Time))
    copy = copy.withColumn('timestamp', f.to_timestamp('timestamp', 'yyyy-MM-dd HH:mm:ss'))
    copy = copy.orderBy('timestamp')
    copy = copy.dropna()
    return copy

In [8]:
train = timestamp_adder(train)

#### Xử lí dữ liệu bị âm trong biến interval_rain và rain_intensity

In [9]:
def interval_outlier(x):
    return abs(x)
interval_outliers_udf = udf(interval_outlier, DoubleType())
train = train.withColumn('interval_rain', interval_outliers_udf(train.interval_rain))
def intensity_outlier(x):
    return abs(x)
intensity_outliers_udf = udf(intensity_outlier, DoubleType())
train = train.withColumn('rain_intensity', intensity_outliers_udf(train.rain_intensity))

##### Chỉ lấy dữ liệu từ 15 giờ ngày 17/05/2021 trở về sau vì dữ liệu trong khoảng này ổn định và đảm bảo tính liên tục.

In [10]:
#train = train.filter(col('timestamp') >= "2021-05-17 15:00:00")

##### List chứa các biến mục tiêu để mô hình TCNForecaster dự đoán.

In [11]:
target_cols = ['air_temperature','interval_rain','humidity','barometric_pressure']

##### List chứa các biến độc lập sẽ được coi như là extra_feature_col khi tạo dữ liệu với TSDataset

In [12]:
extra_feature_cols = ['rain_intensity','total_rain']

### Huấn luyện mô hình TCNForecaster

In [13]:
tsdata_train, _, tsdata_test = TSDataset.from_pandas(train.toPandas(), dt_col="timestamp", target_col=target_cols,extra_feature_col=extra_feature_cols,repair=True,
                                                                with_split=True, test_ratio=0.3)

  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


##### Set lookback = 12 (12 giờ trước) và dự đoán cho 1 giờ sau (horizon = 1).

In [14]:
lookback, horizon = 12, 1

scaler = StandardScaler()
for tsdata in [tsdata_train, tsdata_test]:
    tsdata.deduplicate()\
          .impute()\
          .gen_dt_feature()\
          .scale(scaler, fit=(tsdata is tsdata_train))\
          .roll(lookback=lookback, horizon=horizon)

In [15]:
x, y = tsdata_train.to_numpy()
forecaster = TCNForecaster(past_seq_len=lookback,  
                           future_seq_len=horizon,  
                           input_feature_num=x.shape[-1],
                           output_feature_num=y.shape[-1],
                           num_channels=[64,72,72],
                           kernel_size=5,
                           dropout=0,
                           seed=0)
forecaster.fit((x, y), epochs=10)

Global seed set to 0
Global seed set to 0
GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name  | Type             | Params
-------------------------------------------
0 | model | NormalizeTSModel | 134 K 
1 | loss  | MSELoss          | 0     
-------------------------------------------
134 K     Trainable params
0         Non-trainable params
134 K     Total params
0.536     Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

In [16]:
# Predict with test set and perform some evaluations
x_test, y_test = tsdata_test.to_numpy()
pred = forecaster.predict(x_test)
pred_unscale, groundtruth_unscale = tsdata_test.unscale_numpy(pred), tsdata_test.unscale_numpy(y_test)
pred_unscale = pred_unscale.reshape(pred_unscale.shape[0], -1)
groundtruth_unscale = groundtruth_unscale.reshape(groundtruth_unscale.shape[0],-1)
res = Evaluator.evaluate(["smape", "mse","r2"],
                         y_true=groundtruth_unscale,
                         y_pred=pred_unscale)
# evaluate with sMAPE
print("sMAPE is", res[0])
# evaluate with mean_squared_error
print("mean_squared error is", res[1])
# evaluate with r2
print("r2 is", res[2])

sMAPE is 25.5754602156829
mean_squared error is 4.730735919024346
r2 is 0.9999732260186943


### Tạo một dataframe chứa các giá trị vừa dự đoán

In [17]:
schema = StructType([
    StructField("air_temperature", DoubleType(), True),
    StructField("interval_rain", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("barometric_pressure", DoubleType(), True),
])

In [18]:
predict = spark.createDataFrame(pred_unscale, schema)

  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [19]:
def rain_interval_standarder(interval):
    if (interval < 0.1):
        return 0.0
    else:
        return interval
rain_interval_standarder_udf = udf(rain_interval_standarder, DoubleType())

In [20]:
# Hàm chuẩn hóa giá trị các biến
def vars_standarder(df):
    copy = df.alias('copy')
    copy = copy.withColumn('interval_rain', rain_interval_standarder_udf(df.interval_rain))
    return copy

In [21]:
# Hàm trả về một dataframe đã được định nghĩa sẵn schema giống với train dataframe.
def cast_dataframe(df):
    # Define the schema
    schema = StructType([
        StructField("station_name", StringType(), True),
        StructField("measurement_timestamp", StringType(), True),
        StructField("air_temperature", DoubleType(), False),
        StructField("wet_bulb_temperature", DoubleType(), True),
        StructField("humidity", IntegerType(), True),
        StructField("rain_intensity", DoubleType(), True),
        StructField("interval_rain", DoubleType(), True),
        StructField("total_rain", DoubleType(), True),
        StructField("precipitation_type", IntegerType(), True),
        StructField("wind_direction", IntegerType(), True),
        StructField("wind_speed", DoubleType(), True),
        StructField("maximum_wind_speed", DoubleType(), True),
        StructField("barometric_pressure", DoubleType(), False),
        StructField("solar_radiation", IntegerType(), True),
        StructField("heading", IntegerType(), True),
        StructField("battery_life", DoubleType(), True),
        StructField("measurement_timestamp_label", StringType(), True),
        StructField("measurement_id", StringType(), True),
        StructField("Date", StringType(), True),
        StructField("Hour", StringType(), True),
        StructField("AMorPM", StringType(), True),
        StructField("Month", StringType(), True),
        StructField("Day", StringType(), True),
        StructField("Year", StringType(), True),
        StructField("Time", StringType(), True),
        StructField("timestamp", TimestampType(), True)
    ])
    # Cast each column to the correct type
    for field in schema.fields:
        df = df.withColumn(field.name, col(field.name).cast(field.dataType))
    return df

In [22]:
predict = vars_standarder(predict)

In [23]:
predict.show(10)

+-------------------+-------------+------------------+-------------------+
|    air_temperature|interval_rain|          humidity|barometric_pressure|
+-------------------+-------------+------------------+-------------------+
| -4.964935302734375|          0.0|  63.6673583984375|  998.8425903320312|
| -4.658403396606445|          0.0|  63.4568977355957|  999.4796142578125|
| -3.322305679321289|          0.0| 62.15800094604492|  999.6537475585938|
|-1.2513647079467773|          0.0|57.960140228271484|  999.7672729492188|
|-0.6242895126342773|          0.0| 58.21668243408203|  999.8861083984375|
| 0.2501564025878906|          0.0| 58.17109680175781|      998.701171875|
| 1.0307893753051758|          0.0|56.286407470703125|   997.632080078125|
| 1.4825973510742188|          0.0| 57.76877975463867|    996.73193359375|
| 1.1421918869018555|          0.0| 60.03619384765625|  996.7979125976562|
| 1.0093050003051758|          0.0| 62.15092849731445|  996.7252807617188|
+-------------------+----

### Train một mô hình RandomForestClassifier để phân loại loại mưa.

In [24]:
label_Indexer = StringIndexer(inputCol='precipitation_type', outputCol='label', handleInvalid='keep')
assembler = VectorAssembler(inputCols=target_cols,outputCol='features')
poly = PolynomialExpansion(degree=2,inputCol='features',outputCol='poly_features')
scale = pyspark.ml.feature.StandardScaler(inputCol='poly_features',outputCol='scaled_features')
rf = RandomForestClassifier(featuresCol='scaled_features',labelCol='label',numTrees=100,maxDepth=10)
pipeline = Pipeline(stages=[label_Indexer,assembler,poly,scale,rf])

In [25]:
model = pipeline.fit(train)

In [26]:
prediction = model.transform(predict)

In [27]:
def prediction_int_to_string(pred):
    if pred == 0:
        return "Không mưa"
    elif pred == 1:
        return "Mưa dạng lỏng"
    elif pred == 2:
        return "Mưa dạng rắn"
    else:
        return "Loại mưa không xác định"
    return ""
prediction_to_string = udf(prediction_int_to_string, StringType())
prediction = prediction.withColumn('rain_type',prediction_to_string(prediction.prediction))

### Khai báo streaming query

In [28]:
topic_name = 'TestDoAn'
kafka_server = 'localhost:9092'
streamRawDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).option("startingOffsets","latest").load()

In [29]:
headers= ['station_name','measurement_timestamp','air_temperature','wet_bulb_temperature','humidity','rain_intensity','interval_rain','total_rain','precipitation_type','wind_direction','wind_speed','maximum_wind_speed','barometric_pressure','solar_radiation','heading','battery_life','measurement_timestamp_label','measurement_id']

In [30]:
df = streamRawDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
stream_writer = (df.writeStream.queryName('Item').trigger(processingTime="5 seconds").outputMode("append").format("memory"))

In [31]:
query = stream_writer.start()

In [32]:
mongo_Client = MongoClient('localhost:27017')
collection = mongo_Client.weather.weather_predictions

In [33]:
def round_number_1(number):
    return round(number,1)
def round_number_2(number):
    return round(number,2)
def round_int_number(number):
    return ((int)(number))
round_udf_1 = udf(round_number_1, DoubleType())
round_udf_2 = udf(round_number_2, DoubleType())
round_int = udf(round_int_number, IntegerType())

In [34]:
def output_rounder(df):
    copy = df.alias('copy')
    copy = copy.withColumn('interval_rain', round_udf_1(copy.interval_rain))
    copy = copy.withColumn('barometric_pressure', round_int(copy.barometric_pressure))
    copy = copy.withColumn('air_temperature', round_udf_2(copy.air_temperature))
    copy = copy.withColumn('humidity', round_int(copy.humidity))
    return copy

### Dự đoán real-time sau đó lưu vào MongoDB

In [None]:
from time import sleep
from IPython.display import display, clear_output
time = 0;
while (True):
    try:
        print('Showing live view refreshed every 5 seconds')
        print(f'Seconds passed: {time*5}')
        result = spark.sql(f"SELECT distinct value from {query.name}")
        result = result.select(f.regexp_replace("value", "[^0-9a-zA-Z_,.:/ \-]+", "").alias('replaced_st'))
        for i in range(0,len(headers)):
            result = result.withColumn(headers[i],f.split(col('replaced_st'),",")[i])
        result = result.drop('replaced_st')
        result = timestamp_adder(result)
        result = cast_dataframe(result)
        result = result.select(train.columns)
        # Chỉ khi nào số records lớn hơn hoặc bằng lookbback + 1 thì bắt đầu dự đoán cho 1 giờ sau.
        if ((result.count() > lookback)):
            # Tạo một dataframe chứa (lookback + 1) giá trị mới nhất từ nguồn dữ liệu.
            result = spark.createDataFrame(result.tail(13))
            result = result.withColumn('interval_rain', interval_outliers_udf(result.interval_rain))
            result = result.withColumn('rain_intensity', intensity_outliers_udf(result.rain_intensity))
            train_data = TSDataset.from_pandas(result.toPandas(), dt_col="timestamp", target_col=target_cols,extra_feature_col=extra_feature_cols,with_split=False)
            train_data.deduplicate().impute().gen_dt_feature().scale(scaler, fit=True).roll(lookback=lookback, horizon=horizon)
            X,Y = train_data.to_numpy()
            pre = forecaster.predict(X)
            # Unscale kết quả dự đoán
            output = train_data.unscale_numpy(pre)
            # Reshape tensor kết quả để dễ dàng tạo dataframe với nó
            output = output.reshape(output.shape[0], -1)
            # Tạo một dataframe từ kết quả dự đoán mới được reshape
            result = spark.createDataFrame(output,schema)
            result = vars_standarder(result)
            result = output_rounder(result)
            result = model.transform(result).select('air_temperature','interval_rain','humidity','barometric_pressure','prediction')
            result = result.withColumn('rain_type',prediction_to_string(result.prediction))
            collection.delete_many({})
            collection.insert_one(result.tail(1)[0].asDict())
            display(result.show(truncate=False))
        sleep(5)
        clear_output(wait=True)
        time += 1
    except KeyboardInterrupt:
        print('break')
        break
print('Live view ended..')

Showing live view refreshed every 5 seconds
Seconds passed: 310


  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
