In [1]:
#!pip install delta-spark
!pip install plotly



In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from delta import *
import plotly.express as px
#from IPython.display import IFrame

import plotly.io as pio
pio.renderers.default = "iframe"

train_bronze_basepath = "/data/delta/ts-spark_ch9_bronze_train"
train_silver_basepath = "/data/delta/ts-spark_ch9_silver_train"
forecast_gold_basepath = "/data/delta/ts-spark_ch9_gold_forecast"
eval_bronze_basepath = "/data/delta/ts-spark_ch9_bronze_eval"
eval_silver_basepath = "/data/delta/ts-spark_ch9_silver_eval"

runids = [1, 2, 3, 4, 5]

In [3]:
builder = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("ts-spark_ch9_data-ml-ops_results") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
# Create SparkSession
#spark = SparkSession.builder.master("local[*]") \
#                    .appName('ts-spark_ch9_data-ml-ops_results') \
#                    .getOrCreate()
#
#print(f'The PySpark {spark.version} version is running...')

In [5]:
sc = spark.sparkContext
sc

In [6]:
runid = 1
print(f"runid: {runid}")
print(f"reading delta table {train_bronze_basepath}_{runid}")
train_bronze_sdf = spark.read.format("delta").load(f"{train_bronze_basepath}_{runid}")
train_bronze_sdf = train_bronze_sdf.withColumn('runid', F.lit(runid))
print(f"reading delta table {train_silver_basepath}_{runid}")
train_silver_sdf = spark.read.format("delta").load(f"{train_silver_basepath}_{runid}")
train_silver_sdf = train_silver_sdf.withColumn('runid', F.lit(runid))
print(f"reading delta table {forecast_gold_basepath}_{runid}")
forecast_gold_sdf = spark.read.format("delta").load(f"{forecast_gold_basepath}_{runid}")
forecast_gold_sdf = forecast_gold_sdf.withColumn('runid', F.lit(runid))
print(f"reading delta table {eval_bronze_basepath}_{runid}")
eval_bronze_sdf = spark.read.format("delta").load(f"{eval_bronze_basepath}_{runid}")
eval_bronze_sdf = eval_bronze_sdf.withColumn('runid', F.lit(runid))
print(f"reading delta table {eval_silver_basepath}_{runid}")
eval_silver_sdf = spark.read.format("delta").load(f"{eval_silver_basepath}_{runid}")
eval_silver_sdf = eval_silver_sdf.withColumn('runid', F.lit(runid))
for runid in runids:
    if runid > 1:
        print(f"runid: {runid}")
        print(f"reading delta table {train_bronze_basepath}_{runid}")
        _train_bronze_sdf = spark.read.format("delta").load(f"{train_bronze_basepath}_{runid}")
        _train_bronze_sdf = _train_bronze_sdf.withColumn('runid', F.lit(runid))
        train_bronze_sdf = train_bronze_sdf.union(_train_bronze_sdf)
        print(f"reading delta table {train_silver_basepath}_{runid}")
        _train_silver_sdf = spark.read.format("delta").load(f"{train_silver_basepath}_{runid}")
        _train_silver_sdf = _train_silver_sdf.withColumn('runid', F.lit(runid))
        train_silver_sdf = train_silver_sdf.union(_train_silver_sdf)
        print(f"reading delta table {forecast_gold_basepath}_{runid}")
        _forecast_gold_sdf = spark.read.format("delta").load(f"{forecast_gold_basepath}_{runid}")
        _forecast_gold_sdf = _forecast_gold_sdf.withColumn('runid', F.lit(runid))
        forecast_gold_sdf = forecast_gold_sdf.union(_forecast_gold_sdf)
        print(f"reading delta table {eval_bronze_basepath}_{runid}")
        _eval_bronze_sdf = spark.read.format("delta").load(f"{eval_bronze_basepath}_{runid}")
        _eval_bronze_sdf = _eval_bronze_sdf.withColumn('runid', F.lit(runid))
        eval_bronze_sdf = eval_bronze_sdf.union(_eval_bronze_sdf)
        print(f"reading delta table {eval_silver_basepath}_{runid}")
        _eval_silver_sdf = spark.read.format("delta").load(f"{eval_silver_basepath}_{runid}")
        _eval_silver_sdf = _eval_silver_sdf.withColumn('runid', F.lit(runid))
        eval_silver_sdf = eval_silver_sdf.union(_eval_silver_sdf)

runid: 1
reading delta table /data/delta/ts-spark_ch9_bronze_train_1
reading delta table /data/delta/ts-spark_ch9_silver_train_1
reading delta table /data/delta/ts-spark_ch9_gold_forecast_1
reading delta table /data/delta/ts-spark_ch9_bronze_eval_1
reading delta table /data/delta/ts-spark_ch9_silver_eval_1
runid: 2
reading delta table /data/delta/ts-spark_ch9_bronze_train_2
reading delta table /data/delta/ts-spark_ch9_silver_train_2
reading delta table /data/delta/ts-spark_ch9_gold_forecast_2
reading delta table /data/delta/ts-spark_ch9_bronze_eval_2
reading delta table /data/delta/ts-spark_ch9_silver_eval_2
runid: 3
reading delta table /data/delta/ts-spark_ch9_bronze_train_3
reading delta table /data/delta/ts-spark_ch9_silver_train_3
reading delta table /data/delta/ts-spark_ch9_gold_forecast_3
reading delta table /data/delta/ts-spark_ch9_bronze_eval_3
reading delta table /data/delta/ts-spark_ch9_silver_eval_3
runid: 4
reading delta table /data/delta/ts-spark_ch9_bronze_train_4
reading

In [7]:
train_bronze_sdf.printSchema()
train_bronze_sdf.show(10)
print(f"count: {train_bronze_sdf.count()}")

train_bronze_sdf = train_bronze_sdf.withColumn('daily_min_temperature', train_bronze_sdf['daily_min_temperature'].cast("float").alias('daily_min_temperature'))

fig = px.scatter(train_bronze_sdf, x='date', y='daily_min_temperature')
fig.write_html('train_bronze_sdf.html', auto_open=True)
#IFrame(src='train_bronze_sdf.html', width=900, height=600)
fig.show()

root
 |-- date: date (nullable = true)
 |-- daily_min_temperature: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- runid: integer (nullable = false)

+----------+---------------------+----+-----+
|      date|daily_min_temperature| _c2|runid|
+----------+---------------------+----+-----+
|1981-01-01|                 20.7|NULL|    1|
|1981-01-02|                 17.9|NULL|    1|
|1981-01-03|                 18.8|NULL|    1|
|1981-01-04|                 14.6|NULL|    1|
|1981-01-05|                 15.8|NULL|    1|
|1981-01-06|                 15.8|NULL|    1|
|1981-01-07|                 15.8|NULL|    1|
|1981-01-08|                 17.4|NULL|    1|
|1981-01-09|                 21.8|NULL|    1|
|1981-01-10|                 20.0|NULL|    1|
+----------+---------------------+----+-----+
only showing top 10 rows

count: 9125


In [8]:
train_silver_sdf.printSchema()
train_silver_sdf.show(10)
print(f"count: {train_silver_sdf.count()}")

fig = px.scatter(train_silver_sdf, x='ds', y='y')
fig.write_html('train_silver_sdf.html', auto_open=True)
#IFrame(src='train_silver_sdf.html', width=900, height=600)
fig.show()

root
 |-- ds: date (nullable = true)
 |-- y: double (nullable = true)
 |-- runid: integer (nullable = false)

+----------+----+-----+
|        ds|   y|runid|
+----------+----+-----+
|1981-01-01|20.7|    1|
|1981-01-02|17.9|    1|
|1981-01-03|18.8|    1|
|1981-01-04|14.6|    1|
|1981-01-05|15.8|    1|
|1981-01-06|15.8|    1|
|1981-01-07|15.8|    1|
|1981-01-08|17.4|    1|
|1981-01-09|21.8|    1|
|1981-01-10|20.0|    1|
+----------+----+-----+
only showing top 10 rows

count: 9117


In [9]:
forecast_gold_sdf.printSchema()
forecast_gold_sdf.show(10)
print(f"count: {forecast_gold_sdf.count()}")

fig = px.scatter(forecast_gold_sdf, x='ds', y='yhat', color='runid')
fig.write_html('forecast_gold_sdf.html', auto_open=True)
#IFrame(src='forecast_gold_sdf.html', width=900, height=600)
fig.show()

root
 |-- ds: timestamp (nullable = true)
 |-- yhat: double (nullable = true)
 |-- yhat_lower: double (nullable = true)
 |-- yhat_upper: double (nullable = true)
 |-- runid: integer (nullable = false)

+-------------------+------------------+------------------+------------------+-----+
|                 ds|              yhat|        yhat_lower|        yhat_upper|runid|
+-------------------+------------------+------------------+------------------+-----+
|1986-07-02 00:00:00|7.0502054294413465| 3.307358146371069|10.465938828776352|    1|
|1986-07-03 00:00:00| 6.587560735891971|  2.97254994764213|10.167342777711973|    1|
|1986-07-04 00:00:00| 6.824346417179263| 3.520005151593687|10.381008117982446|    1|
|1986-07-05 00:00:00| 6.665679404797449|3.2559132714407397|10.206536118631064|    1|
|1986-07-06 00:00:00| 6.499936967527267|3.2156487525403317| 10.40039431933827|    1|
|1986-07-07 00:00:00| 6.975215084511728|3.3075225241951824|10.432759147392009|    1|
|1986-07-08 00:00:00| 7.041646186

In [10]:
eval_bronze_sdf.printSchema()
eval_bronze_sdf.show(10)
print(f"count: {eval_bronze_sdf.count()}")

eval_bronze_sdf = eval_bronze_sdf.withColumn('daily_min_temperature', eval_bronze_sdf['daily_min_temperature'].cast("float").alias('daily_min_temperature'))

fig = px.scatter(eval_bronze_sdf, x='date', y='daily_min_temperature')
fig.write_html('eval_bronze_sdf.html', auto_open=True)
#IFrame(src='eval_bronze_sdf.html', width=900, height=600)
fig.show()

root
 |-- date: date (nullable = true)
 |-- daily_min_temperature: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- runid: integer (nullable = false)

+----------+---------------------+----+-----+
|      date|daily_min_temperature| _c2|runid|
+----------+---------------------+----+-----+
|1981-01-01|                 20.7|NULL|    1|
|1981-01-02|                 17.9|NULL|    1|
|1981-01-03|                 18.8|NULL|    1|
|1981-01-04|                 14.6|NULL|    1|
|1981-01-05|                 15.8|NULL|    1|
|1981-01-06|                 15.8|NULL|    1|
|1981-01-07|                 15.8|NULL|    1|
|1981-01-08|                 17.4|NULL|    1|
|1981-01-09|                 21.8|NULL|    1|
|1981-01-10|                 20.0|NULL|    1|
+----------+---------------------+----+-----+
only showing top 10 rows

count: 10950


In [11]:
eval_silver_sdf.printSchema()
eval_silver_sdf.show(10)
print(f"count: {eval_silver_sdf.count()}")

fig = px.scatter(eval_silver_sdf, x='ds', y='y')
fig.write_html('eval_silver_sdf.html', auto_open=True)
#IFrame(src='eval_silver_sdf.html', width=900, height=600)
fig.show()

root
 |-- ds: date (nullable = true)
 |-- y: double (nullable = true)
 |-- runid: integer (nullable = false)

+----------+----+-----+
|        ds|   y|runid|
+----------+----+-----+
|1981-01-01|20.7|    1|
|1981-01-02|17.9|    1|
|1981-01-03|18.8|    1|
|1981-01-04|14.6|    1|
|1981-01-05|15.8|    1|
|1981-01-06|15.8|    1|
|1981-01-07|15.8|    1|
|1981-01-08|17.4|    1|
|1981-01-09|21.8|    1|
|1981-01-10|20.0|    1|
+----------+----+-----+
only showing top 10 rows

count: 10942


In [12]:
forecast_gold_sdf_join = forecast_gold_sdf.withColumn('date', forecast_gold_sdf['ds'].cast('date'))
eval_silver_sdf_forjoin = eval_silver_sdf.withColumnRenamed('ds', 'date')

forecast_gold_sdf_join = forecast_gold_sdf_join.join(eval_silver_sdf_forjoin, ['date', 'runid'], "inner")
forecast_gold_sdf_join = forecast_gold_sdf_join.sort(['runid', 'date'])

forecast_gold_sdf_join.printSchema()
forecast_gold_sdf_join.show(10)
print(f"count: {forecast_gold_sdf_join.count()}")

root
 |-- date: date (nullable = true)
 |-- runid: integer (nullable = false)
 |-- ds: timestamp (nullable = true)
 |-- yhat: double (nullable = true)
 |-- yhat_lower: double (nullable = true)
 |-- yhat_upper: double (nullable = true)
 |-- y: double (nullable = true)

+----------+-----+-------------------+------------------+------------------+------------------+----+
|      date|runid|                 ds|              yhat|        yhat_lower|        yhat_upper|   y|
+----------+-----+-------------------+------------------+------------------+------------------+----+
|1986-01-01|    1|1986-01-01 00:00:00|15.304088658556083|11.762421919032162|18.681151694847234|12.9|
|1986-01-02|    1|1986-01-02 00:00:00|14.912985035062876| 11.35717035259453|18.553112375066544|13.8|
|1986-01-03|    1|1986-01-03 00:00:00|15.218134049807485| 11.83353839921466|18.931096197141773|10.6|
|1986-01-04|    1|1986-01-04 00:00:00|15.124372509163033|11.502590066069416|18.716316207313586|12.6|
|1986-01-05|    1|1986-0

In [13]:
fig1 = px.line(forecast_gold_sdf_join, x='date', y=['yhat_lower', 'yhat', 'yhat_upper'], color_discrete_sequence = ['rgba(10,10,10,0.2)'])
fig2 = px.scatter(forecast_gold_sdf_join, x='date', y=['y'], color='runid') \
        .add_trace(fig1.data[0]) \
        .add_trace(fig1.data[1]) \
        .add_trace(fig1.data[2])
fig2.update_layout(showlegend=False)
fig2.write_html('forecast_gold_sdf_join.html', auto_open=True)
#IFrame(src='forecast_gold_sdf_join.html', width=900, height=600)
fig2.show()

In [14]:
#spark.sparkContext.stop()