In [1]:
import numpy as np 
import pandas as pd
import json
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql.functions import expr
import findspark
from pyspark.sql import SparkSession, SQLContext, Window
from pandas import json_normalize
import argparse
from typing import Optional
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

In [2]:
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.feature import VectorAssembler

In [3]:
findspark.init()

In [4]:
spark = (
    SparkSession.builder
    .appName("HDFS")
    .getOrCreate()
)
spark

In [5]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)

df_weather = (
    sqlContext.read.parquet("/user/bdaa/weather/2022-12-17-05-12-13fc975232-44b9-4c24-a9ac-7dd026527564")
)

df_weather.printSchema()

root
 |-- coord_lon: double (nullable = true)
 |-- coord_lat: double (nullable = true)
 |-- weather_0__id: integer (nullable = true)
 |-- weather_0__main: string (nullable = true)
 |-- weather_0__description: string (nullable = true)
 |-- weather_0__icon: string (nullable = true)
 |-- base: string (nullable = true)
 |-- main_temp: double (nullable = true)
 |-- main_feels_like: double (nullable = true)
 |-- main_temp_min: double (nullable = true)
 |-- main_temp_max: double (nullable = true)
 |-- main_pressure: integer (nullable = true)
 |-- main_humidity: integer (nullable = true)
 |-- visibility: integer (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- dt: integer (nullable = true)
 |-- sys_type: integer (nullable = true)
 |-- sys_id: integer (nullable = true)
 |-- sys_country: string (nullable = true)
 |-- sys_sunrise: integer (nullable = true)
 |-- sys_sunset: integer (nullable = true)

In [6]:
def get_joined_data(weather_file_name='/user/bdaa/weather/2022-12-17-05-12-13fc975232-44b9-4c24-a9ac-7dd026527564', energy_file_name='/user/bdaa/energy_prices/2022-12-17-05-13-179225cf8b-1f0c-498c-a32b-dbcb0d8e4b45'):
    findspark.init()
    spark = (
        SparkSession.builder
        .appName('HDFS')
        .getOrCreate()
    )
    sc = spark.sparkContext
    sqlContext = SQLContext(sc)
    
    df_weather = (
        sqlContext.read.parquet(weather_file_name)  
        .withColumn('dt', F.expr('to_timestamp(int(dt / 300) * 300)'))
        .dropDuplicates()
        .drop('weather_0__icon', 'base', 'sys_type', 'sys_id', 'sys_country', 'timezone', 'id', 
              'name', 'cod', 'weather_0__main', 'weather_0__description', 'coord_lon', 'coord_lat', 
              'main_temp_min', 'main_temp_max', 'sys_sunrise', 'sys_sunset', 'weather_0__id')
    )
    
    weather = df_weather.groupBy('dt') \
        .agg(F.avg('main_temp').alias('avg_temp'),
             F.avg('main_feels_like').alias('avg_feels_like_temp'),
             F.avg('main_pressure').alias('avg_pressure'),
             F.avg('main_humidity').alias('avg_humidity'),
             F.avg('visibility').alias('avg_visibility'),
             F.avg('wind_speed').alias('avg_wind_speed'),
             F.avg('wind_deg').alias('avg_wind_deg'),
             F.avg('clouds_all').alias('avg_clouds'),
         )
    
    df_prices = (
        sqlContext.read.parquet(energy_file_name)
            .withColumn('Datetime__UTC_', F.unix_timestamp('Datetime__UTC_'))
            .withColumn('Datetime__UTC_', F.expr('to_timestamp(int((Datetime__UTC_ + 251129400) / 300) * 300)'))
            .drop('Country', 'ISO3_Code', 'Datetime__Local_')
            .withColumnRenamed('Datetime__UTC_', 'dt_prices')
            .withColumnRenamed('Price__EUR_MWhe_', 'price')
            .groupBy('dt_prices').agg(F.avg('price').alias('avg_price'))
    )
    
    data = weather.join(df_prices, weather.dt == df_prices.dt_prices).drop('dt_prices')
    return data

In [7]:
data = get_joined_data()
data.show()

+-------------------+------------------+-------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|                 dt|          avg_temp|avg_feels_like_temp|      avg_pressure|     avg_humidity|    avg_visibility|    avg_wind_speed|      avg_wind_deg|        avg_clouds|         avg_price|
+-------------------+------------------+-------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|2022-12-16 18:05:00|           271.425|           269.3625|           1018.25|             95.5|            3500.0|              1.54|             175.0|             58.25| 33.97022710270928|
|2022-12-16 16:55:00|271.83676274944577|  270.3342572062086|1018.4567627494457| 90.7450110864745| 6445.232815964523|1.5302217294900204|185.05543237250555| 48.51441241685144|28.031349965259228|
|2022-12-16 16:15:00|272.3496661101

In [8]:
data.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- avg_temp: double (nullable = true)
 |-- avg_feels_like_temp: double (nullable = true)
 |-- avg_pressure: double (nullable = true)
 |-- avg_humidity: double (nullable = true)
 |-- avg_visibility: double (nullable = true)
 |-- avg_wind_speed: double (nullable = true)
 |-- avg_wind_deg: double (nullable = true)
 |-- avg_clouds: double (nullable = true)
 |-- avg_price: double (nullable = true)



In [9]:
train_data = data.withColumnRenamed("avg_temp","main_temp") \
    .withColumnRenamed("avg_feels_like_temp","main_feels_like") \
    .withColumnRenamed("avg_pressure","main_pressure") \
    .withColumnRenamed("avg_humidity","main_humidity") \
    .withColumnRenamed("avg_visibility","visibility") \
    .withColumnRenamed("avg_wind_speed","wind_speed") \
    .withColumnRenamed("avg_wind_deg","wind_deg") \
    .withColumnRenamed("avg_clouds","clouds_all") \
    .withColumnRenamed("avg_price","price")

In [10]:
train_data.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- main_temp: double (nullable = true)
 |-- main_feels_like: double (nullable = true)
 |-- main_pressure: double (nullable = true)
 |-- main_humidity: double (nullable = true)
 |-- visibility: double (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- wind_deg: double (nullable = true)
 |-- clouds_all: double (nullable = true)
 |-- price: double (nullable = true)



In [11]:
train_data.count()

39

In [12]:
vectorAssembler = VectorAssembler(inputCols = ['main_temp', 'main_feels_like', 'main_pressure', 'main_humidity', 'visibility', 'wind_speed', 'wind_deg', 'clouds_all'], outputCol = 'features')
df_weather_train = vectorAssembler.transform(train_data)
df_weather_train = df_weather_train.select(['features', 'price'])

In [13]:
df_weather_train.show()

+--------------------+------------------+
|            features|             price|
+--------------------+------------------+
|[271.425,269.3624...| 33.97022710270928|
|[271.836762749445...|28.031349965259228|
|[272.349666110183...| 27.83741654692217|
|[271.274545454545...|30.885670970193846|
|[270.981380846325...| 38.80183312399668|
|[271.208666666666...|30.544822057531256|
|[272.422968749999...| 28.15346578005265|
|[271.123685152057...|35.545279918791856|
|[270.799905362776...|42.345201859412064|
|[271.198691860465...|31.884012986140394|
|[271.115466666666...| 33.01107540635505|
|[270.847155963302...|44.622789367830855|
|[271.712302158273...|28.979714278087364|
|[271.209189189189...|34.567584484174795|
|[272.105825242718...| 28.01822750887067|
|[271.502648648648...|29.238047875287798|
|[270.671616161616...| 45.10807661504458|
|[272.268725761772...| 27.98963638138911|
|[272.101276223776...|27.798741510259095|
|[271.367574257425...|31.374272391294273|
+--------------------+------------

In [14]:
lr = LinearRegression(featuresCol='features', labelCol='price',
                      maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(df_weather_train)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [-1.5758695900435304,-0.010224654675196529,6.405304558390184,0.0,-0.0015385040686054022,0.0,-0.008579052885349823,-0.1377630528711601]
Intercept: -6043.587466967722
numIterations: 11
objectiveHistory: [0.5, 0.4244576403826503, 0.1495625045822526, 0.14142503656090274, 0.12775617827931662, 0.1260652990590617, 0.12452146291768977, 0.12296969878286086, 0.12231100630849541, 0.12209982596711064, 0.12182376539667478]
+-------------------+
|          residuals|
+-------------------+
| 0.7516658369406954|
| -2.576454915502932|
|  2.032748431093708|
| -3.751201188844984|
| 0.5079137687665067|
|-1.2477147506684005|
| 3.5069418199269435|
|-0.6085510177113989|
| 1.9578089310548066|
|-1.3965786079403664|
| -1.498356540095557|
| 3.4824136573791193|
|-2.4004574606076616|
| 2.9069137462915293|
|-0.2573166129225761|
| -2.445022270715249|
|  4.015193416484351|
| 2.8596358923618865|
| 0.1393710965751218|
|-1.3160384406605026|
+-------------------+
only showing top 20 rows

RMSE: 2.228147
r2:

In [15]:
lrModel.transform(df_weather_train).select(F.col("prediction")).show()

+------------------+
|        prediction|
+------------------+
|33.218561265768585|
| 30.60780488076216|
|25.804668115828463|
| 34.63687215903883|
| 38.29391935523017|
|31.792536808199657|
|24.646523960124796|
|36.153830936503255|
| 40.38739292835817|
| 33.28059159408076|
|34.509431946450604|
|41.140375710451735|
|31.380171738695026|
|31.660670737883265|
|28.275544121793246|
|31.683070146003956|
| 41.09288319856023|
|25.130000489027225|
|27.659370413683973|
|32.690310831954775|
+------------------+
only showing top 20 rows



In [16]:
def receive_messages(
    project_id: str, subscription_id: str, timeout: Optional[float] = None
) -> None:
    """Receives messages from a pull subscription."""
    # [START pubsub_subscriber_async_pull]
    # [START pubsub_quickstart_subscriber]

    subscriber = pubsub_v1.SubscriberClient()
    # The `subscription_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/subscriptions/{subscription_id}`
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    def callback(message: pubsub_v1.subscriber.message.Message) -> None:
        json_message = json.loads(message.data.decode("UTF-8"))
        json_message_norm = json_normalize(json_message, sep='_')
        sparkDF_message = spark.createDataFrame(json_message_norm)
        answer = lrModel.transform(vectorAssembler.transform(sparkDF_message)).select(F.col("prediction"))
        print(answer.collect()[0])
        message.ack()

    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    print(f"Listening for messages on {subscription_path}..\n")

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result(timeout=timeout)
        except TimeoutError:
            streaming_pull_future.cancel()  # Trigger the shutdown.
            streaming_pull_future.result()  # Block until the shutdown is complete.
    # [END pubsub_subscriber_async_pull]
    # [END pubsub_quickstart_subscriber]


In [17]:
receive_messages(
    "bda-m3", "spark_weather", 3
)

Listening for messages on projects/bda-m3/subscriptions/spark_weather..



### Train - Test evaluation

In [18]:
splits = df_weather_train.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [19]:
lr = LinearRegression(featuresCol = 'features', labelCol='price',
                      maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel_batch = lr.fit(train_df)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel_batch.coefficients))
print("Intercept: %s" % str(lrModel_batch.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel_batch.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [-1.874271362083798,0.0,6.441441964073649,-0.006792285776684618,-0.0012015934244489718,0.0,-0.010769546296398997,-0.06992861168803052]
Intercept: -6006.539095762855
numIterations: 11
objectiveHistory: [0.5, 0.4299851567079309, 0.17656402075727465, 0.16515427336643101, 0.15177132095995408, 0.15012039962621604, 0.1493706172335528, 0.14834615012021773, 0.14725681271932592, 0.14675169006555427, 0.14571997901646774]
+--------------------+
|           residuals|
+--------------------+
|  1.0473999111084353|
|  -2.517400885591183|
|  2.2745960270139705|
|  0.6145076313362594|
| -1.5812201602252927|
|    3.70310078963994|
|  2.2049471514087244|
| -0.7182080357548344|
| -1.3062912130323205|
|  3.4736759110794893|
|  -2.468833038717797|
|  2.6598291267304646|
|-0.23066284933366532|
| -2.1832898155823344|
|   4.545463613352865|
|   3.034792591447051|
| 0.12672073432189634|
| -1.3528901413950898|
| 0.36318493923190687|
| -1.8872035384531891|
+--------------------+
only showing top 20

In [20]:
test_result = lrModel_batch.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 2.09229


In [21]:
print("r2 on train data = %f" % trainingSummary.r2)
print("Root Mean Squared Error (RMSE) on train data = %f" % trainingSummary.rootMeanSquaredError)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

r2 on train data = 0.808917
Root Mean Squared Error (RMSE) on train data = 2.320003
Root Mean Squared Error (RMSE) on test data = 2.09229
