# Visualization of annual weather measurements

In [1]:
import cols
import pandas as pd
import json
import transform_in_spark
from hdfs import InsecureClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType, DateType
import aggregate
import plotly.graph_objects as go

local_data_path = '../Data/'

pivot_start_year = 1900
pivot_end_year = 1929

# HDFS
hdfs_url = 'http://localhost:9870'
hdfs_user = 'my_username'
hdfs_dir = '/bigdata/'
datalake_dir = hdfs_dir + 'datalake/'
data_temp_prepared = 'data_temp_prepared.csv'
data_stations_prepared = 'data_stations_prepared.csv'

In [2]:
hdfs_client = InsecureClient(hdfs_url, user=hdfs_user)

In [None]:
spark = SparkSession.builder \
      .master("yarn") \
      .appName("weather_data") \
      .getOrCreate()

## Acquire data

The data are put into a Hadoop datalake. For simplicity, we downloaded the data beforehand and stored it on a local drive. From ther, we copy the data into the datalake.

- Measurements of temperature, rainfall and data on stations: Annual means provided by the Deutscher Wetterdienst, https://cdc.dwd.de/portal/
- GeoJSONs: GeoJSONs in lowest resolution, taken from https://github.com/isellsoap/deutschlandGeoJSON/

In [None]:
from pathlib import Path
datapath = Path(local_data_path).absolute()

def put_to_hdfs(local_path, hdfs_path):
    try:
        hdfs_client.upload(hdfs_path, local_path)
        print(f"Copied file to HDFS successfully: {Path(local_path).name}")
    except Exception as e:
        print("Error while copying the file to HDFS: ", e)


print('starting data acquisition')
for path in datapath.iterdir():
    if path.is_file():
        if path.suffix == '.csv':
            name = path.name
            if name.startswith('temperatures_') or name.startswith('stations_') or name.startswith('rainfall_'):
                put_to_hdfs( local_data_path + name, datalake_dir + name )
print('data acquisition complete')

## Read data

The measurements of temperature and rainfall contain one measurement per line, a timestamp and an ID of the station. Alongside, there are some meta data of the station, too.

As main data, the station data contain an ID and the geolocation of each station.

The GeoJSONs are JSONs in a special format. They list data on each area listed in a file. Main infos for our needs are the name of the areas and polygons representing the area boundaries. The GeoJSON of the counties also include the names of the larger areas the county is located in, i.e. state and nation.

The choropleth map requires these GeoJSONs for displaying the areas. As a handy plus, we can use the GeoJSON of the counties for assigning the stations to the areas they are located in. The geolocations of the stations and the polygons in the GeoJSON are used for this automated assignment.

In [None]:
# Read measurements of temperatues and restrict to the columns we need
schema_temps = StructType([
    StructField( 'Produkt_Code', StringType(), True),
    StructField( cols.sdo_id, StringType(), True),
    StructField( cols.timestamp, DateType(), True),
    StructField( cols.temperature, FloatType(), True),
    StructField( 'Qualitaet_Byte', IntegerType(), True),
    StructField( 'Qualitaet_Niveau', IntegerType(), True),
])

sdf_temps = spark.read.option('header', True) \
        .schema(schema_temps) \
        .csv('hdfs://'+datalake_dir+'temperatures_*.csv') \
        .select(cols.sdo_id, cols.timestamp, cols.temperature) \
        .drop_duplicates()



# Read measurements of rainfall and restrict to the columns we need
schema_rainfall = StructType([
    StructField( 'Produkt_Code', StringType(), True),
    StructField( cols.sdo_id, StringType(), True),
    StructField( cols.timestamp, DateType(), True),
    StructField( cols.rainfall, FloatType(), True),
    StructField( 'Qualitaet_Byte', IntegerType(), True),
    StructField( 'Qualitaet_Niveau', IntegerType(), True),
])

sdf_rainfall = spark.read.option('header', True) \
        .schema(schema_rainfall) \
        .csv('hdfs://'+datalake_dir+'rainfall_*.csv') \
        .select(cols.sdo_id, cols.timestamp, cols.rainfall) \
        .drop_duplicates()



# Read data of stations and restrict to the columns we need
# CAUTION: cols.longitude and cols.latitude use ',' as period, while cols.altitude uses '.'
schema_stations = StructType([
    StructField( cols.sdo_id, StringType(), True ),
    StructField( 'SDO_Name', StringType(), True ),
    StructField( cols.longitude, StringType(), True ),
    StructField( cols.latitude, StringType(), True ),
    StructField( cols.altitude, FloatType(), True ),
    StructField( 'Metadata_Link', StringType(), True ),
])

sdf_stations_0 = spark.read.option('header', True) \
        .schema(schema_stations).csv('hdfs://'+datalake_dir+'stations_*.csv') \
        .select(cols.sdo_id, 'SDO_Name', cols.longitude, cols.latitude) \
        .dropDuplicates([cols.sdo_id]) \
        .withColumn( cols.longitude, regexp_replace( col(cols.longitude), ',', '.').cast('float') ) \
        .withColumn( cols.latitude, regexp_replace( col(cols.latitude), ',', '.').cast('float') )

sdf_temps.printSchema()
sdf_temps.show()
sdf_rainfall.printSchema()
sdf_rainfall.show()
sdf_stations_0.printSchema()
sdf_stations_0.show()

# GeoJSONs
# highest level
with open( local_data_path + '4_niedrig_bund.geo.json' ) as file:
    national_json = json.load( file )

# first level of subdivisions
with open( local_data_path + '4_niedrig_laender.geo.json' ) as file:
    state_json = json.load( file )

# second level of subdivisions
with open( local_data_path + '4_niedrig_kreise.geo.json' ) as file:
    county_json = json.load( file )

## Preprocess data

The measurements contain a timestamp. We extract the year and add this as a new column.

Also, we compute an individual reference temperature for each station by determining the mean temperature in a certain, userdefined time interval. Note that this ends up in *NULL* for stations without measurements in this interval.

In [None]:
# Add numeric column for the year; Add numeric column for reference temperature
sdf_temps = sdf_temps.withColumn( cols.year, year(cols.timestamp) )
sdf_temps = sdf_temps.join(
            aggregate.get_pivotal_mean( sdf_temps, pivot_start_year, pivot_end_year ),
            on = cols.sdo_id,
            how = 'left'
        )

# Add numeric column for the year
sdf_rainfall = sdf_rainfall.withColumn( cols.year, year(cols.timestamp) )

# just for the show() we shuffle the lines. Otherwise, we would see just the first 20 years of the same station.
sdf_temps.printSchema()
sdf_temps.withColumn( 'rand', (rand(seed=123456789)*100000).cast('int') ).sort( 'rand' ).drop('rand').show()

We expand the station data by the nations, the states, and the counties each station is located in.

In [None]:
# Register required modules on the nodes of the cluster
# Make sure the module 'shapely' is available on each node of the cluster
spark.sparkContext.addPyFile('./cols.py')
spark.sparkContext.addPyFile('./transform_in_spark.py')

aux_col_areas = 'areas' # name of auxilliary column
udf_find_area = udf(
    lambda lon, lat: transform_in_spark.find_area( lon, lat, county_json ),
    StructType([
        StructField( 'Nation', StringType(), True ),
        StructField( 'State', StringType(), True ),
        StructField( 'County', StringType(), True ),
        StructField( 'Assignment', StringType(), True ),
    ])
)

# flatten the auxillary, nested column and drop it
sdf_stations = sdf_stations_0.withColumn( aux_col_areas, udf_find_area( col(cols.longitude), col(cols.latitude) ) ) \
        .withColumn( cols.nation, col('areas.Nation') ) \
        .withColumn( cols.state, col('areas.State') ) \
        .withColumn( cols.county, col('areas.County') ) \
        .withColumn( cols.assignment, col('areas.Assignment') ) \
        .drop( aux_col_areas )

sdf_stations.printSchema()
sdf_stations.withColumn( 'rand', (rand(seed=5)*10000000).cast('int') ).sort( 'rand' ).drop('rand').show()

**Caution!** The polygons in the GeoJSONs just approximate the boundaries of the political areas. Especially stations close to the borders have a fair chance to be located outside of the approximating polygon. These may not be assigned to any area, or might be even assigned to a wrong area. If we do not correct the processed data for these effects, we have to expects *NULLs* at least.

In [None]:
# For stations where the automated assignment to the areas has failed, this can be done manually.
# As an example, we do this here for few stations, outlining the way

sdf_stations.where( col(cols.nation).isNull() | col(cols.state).isNull() | col(cols.county).isNull() ).show()

def set_area(sdo_id: str, county: str, state: str, nation: str = 'Deutschland'):
    aux = sdf_stations.withColumn( cols.nation, when( col(cols.sdo_id) == sdo_id, nation ).otherwise( col(cols.nation) ) ) \
            .withColumn( cols.state, when( col(cols.sdo_id) == sdo_id, state).otherwise( col(cols.state )) ) \
            .withColumn( cols.county, when( col(cols.sdo_id) == sdo_id, county).otherwise( col(cols.county) ) ) \
            .withColumn( cols.assignment, when( col(cols.sdo_id) == sdo_id, cols.manually).otherwise( col(cols.assignment) ) )
    return aux

sdf_stations = set_area( '1014', 'Leer', 'Niedersachsen' )
sdf_stations = set_area( '1260', 'Bernkastel-Wittlich', 'Rheinland-Pfalz')
sdf_stations = set_area( '13714', 'Neuwied', 'Rheinland-Pfalz')

sdf_stations.where( (col(cols.sdo_id) == '1014') | (col(cols.sdo_id) == '1260') | (col(cols.sdo_id) == '13714') ).show()
print('area not assigned for', sdf_stations.filter( col(cols.nation).isNull() | col(cols.state).isNull() | col(cols.county).isNull() ).count(), 'out of', sdf_stations.count() )

Create a complete list with the name of all areas. There will be areas without data. These would be dropped on the way when alysing the data. But we want these areas to be included in the analysis results, with *NULLs* as values, or with 0 for the number of stations, respectively.

In [9]:
# extract names of all areas for each level of detail
national_names = [national_json['features'][idx]['properties']['NAME_LOCAL'] for idx in range(len(national_json['features']))]
state_names = [state_json['features'][idx]['properties']['name'] for idx in range(len(state_json['features']))]
county_names = [county_json['features'][idx]['properties']['NAME_3'] for idx in range(len(county_json['features']))]

# convert python arrays to spark DataFrames
schema = StructType([ StructField( cols.area, StringType(), False ) ])
sdf_national_names = spark.createDataFrame( [(area,) for area in national_names], schema )

schema = StructType([ StructField( cols.area, StringType(), False ) ])
sdf_state_names = spark.createDataFrame( [(area,) for area in state_names], schema )

schema = StructType([ StructField( cols.area, StringType(), False ) ])
sdf_county_names = spark.createDataFrame( [(area,) for area in county_names], schema )



## Create spark data frame for data analysis

In [None]:
sdf = sdf_temps.join( sdf_rainfall, on = [cols.sdo_id, cols.year], how = 'outer' )
sdf = sdf.join( sdf_stations, on = cols.sdo_id, how = 'left' ).drop( cols.timestamp )
sdf.printSchema()
sdf.select([cols.sdo_id, 'SDO_Name', cols.year, cols.pivot_temp,cols.temperature,cols.rainfall,cols.nation,cols.state,cols.county]) \
        .withColumn( 'rand', (rand(seed=5647382910)*10000000).cast('int') ).sort( 'rand' ).drop('rand').show()

print(
    'stations with temperatures recorded but no area assigned:',
    sdf.where( col(cols.temperature).isNotNull() ).filter( col(cols.nation).isNull() | col(cols.state).isNull() | col(cols.county).isNull() ).dropDuplicates([cols.sdo_id]).count(),
    'out of',
    sdf.where( col(cols.temperature).isNotNull() ).dropDuplicates([cols.sdo_id]).count()
)

# esingle colums data frame with all years of measurements. This is used for the x axis of our plots later on.
year_min = sdf.agg( min(cols.year) ).collect()[0][0]
year_max = sdf.agg( max(cols.year) ).collect()[0][0]

schema = StructType([ StructField( cols.year, IntegerType(), False ) ])
sdf_years = spark.createDataFrame( [(_,) for _ in range(year_min, year_max+1)], schema )

## Visualizing with Plotly Dash

In [None]:
from dash import Dash, html, dcc, callback, Input, Output, ctx
from typing import List
import plotly.express as px
import plotly.graph_objects as go


# --------  create choropleth figures ----------

def make_choropleth_figure(df: pd.DataFrame, use_json, key_id: str, min_col: int, max_col: int):
    """
    Creates the plotly express choropleth map.

    ---
    df: pandas data frame with columns 'name' (str) and 'funniness' (int).

    use_json: GeoJSON to be used.

    key_id: where the name of an area can be found in the GeoJSON.

    min_col:
    Bottom of colorscale corresponds to this vale.

    max_col: int
    Top of color scale corresponds to this value.
    ---
    returns: figure
    """
    fig = px.choropleth(df,
                    geojson=use_json,
                    locations=cols.area,
                    featureidkey=key_id,
                    color=cols.mean,
                    color_continuous_scale='Viridis',
                    range_color=(min_col, max_col),
                    labels={cols.mean,'Value'},
                    hover_name=cols.area,
                    hover_data=[cols.mean, cols.stddev, cols.stations]
    )
    # Disable underlying outline of global land masses and fit view to the bounds of the displayed area
    fig.update_geos(
        visible=False,
        fitbounds='locations'
    )
    fig.update_layout(margin={'r':0,'t':0,'l':0,'b':0})

    return fig



# ---------  create plots of temperature over timer  ----------

def make_temperature_plot(pdf_annual: pd.DataFrame, pdf_running: pd.DataFrame, title: str, y_label: str):
    """
    Makes a plot with two curves.

    ---
    pdf_annual: pd.DataFrame
    DataFrame with actual annual values.

    pdf_running: pd.DataFrame
    DataFrame containing running means for each year.

    title: str
    Title of the plot
    """
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x = pdf_annual[cols.year],
            y = pdf_annual[cols.mean],
            name = 'Annual mean'
        )
    )
    fig.add_trace(
        go.Scatter(
            x = pdf_running[cols.year],
            y = pdf_running[cols.running],
            name = '7-year running mean'
        )
    )


    # add labels
    fig.update_layout(
        title = {'text': title},
        xaxis = { 'title': {'text': 'Year'}},
        yaxis = { 'title': {'text': y_label}},
    )
    fig.update_traces(mode='lines')

    return fig



def make_temp_diff_plot(area_col_name, area):
    pdf = sdf.where( col(area_col_name) == area ) \
            .where( col(cols.temperature).isNotNull() ) \
            .where( col(cols.pivot_temp).isNotNull() ) \
            .withColumn( 'delta', col(cols.temperature) - col(cols.pivot_temp) ) \
            .groupBy( cols.year ) \
            .agg(
                mean( 'delta' ).alias( cols.mean ),
                stddev( 'delta' ).alias( cols.stddev ),
                count( 'delta' ).alias( cols.stations )
            ) \
            .join( sdf_years, on=cols.year, how='outer' ) \
            .fillna( 0, cols.stations ) \
            .sort( cols.year, ascending=True ) \
            .toPandas()

    fig = px.scatter( pdf, x=cols.year, y=cols.mean, error_y=cols.stddev, hover_name=cols.year, hover_data=[cols.stations])

    # add labels
    fig.update_layout(
        title = {'text': f'Annual temperature anomaly in {area} compared to the station mean temperatures in years {pivot_start_year} through {pivot_end_year}'},
        xaxis = { 'title': {'text': 'Year'}},
        yaxis = { 'title': {'text': 'Temperature difference [°C]'}},
    )

    return fig



# ---------  UI elements with simple settings  ----------

level_options = ['Nation', 'State', 'Counties']
level_select_id = 'level_select'
level_select = dcc.Dropdown(level_options, level_options[0], id=level_select_id)
level_select_label = html.Label( 'Select level of detail', htmlFor=level_select_id )

measurement_options = ['Temperature', 'Rainfall']
measurement_select_id = 'measurement_select'
measurement_select = dcc.Dropdown( measurement_options, measurement_options[0], id = measurement_select_id )
measurement_select_label = html.Label( 'Select measurement', htmlFor=measurement_select_id )

cmap = dcc.Graph()
plot_label = html.Div()
year_text = html.Div()



# ---------  slider for selecting the year  ----------

year_slider_id = 'year_slider'
year_ticks_spacing = 10

year_slider_label = html.Label('Select year', htmlFor=year_slider_id)
year_slider = dcc.Slider(
            min = year_min,
            max = year_max,
            marks={ year: str(year) for year in range(year_min, year_max+1, year_ticks_spacing) },
            value=year_max,
            step=1,
            id=year_slider_id
        )



# ----------  plot of temperature  ----------

plot = dcc.Graph()
diff_plot = dcc.Graph()



# ----------  layout dahsboard  ----------

app = Dash()
app.layout = html.Div([
    level_select_label,
    level_select,
    measurement_select_label,
    measurement_select,
    cmap,
    year_slider_label,
    year_slider,
    year_text,
    plot,
    diff_plot
])



# --------  callbacks  ---------

@callback(
    Output( cmap, 'figure' ),
    Output( year_text, 'children' ),
    Input( level_select, 'value' ),
    Input( measurement_select, 'value' ),
    Input( year_slider, 'value' ),
    running=[
            (Output(level_select_id, "disabled"), True, False),
            (Output(measurement_select_id, "disabled"), True, False),
            (Output(year_slider_id, "disabled"), True, False)
        ]
)
def set_level_of_detail(value_lod: str, value_meas: str, year: int) -> str:
    # get GeoJSON-specific values
    if value_lod == level_options[2]:
        use_geojson = county_json
        key_id = 'properties.NAME_3'
        area_col_name = cols.county
        full_list = sdf_county_names
    elif value_lod == level_options[1]:
        use_geojson = state_json
        key_id = 'properties.name'
        area_col_name = cols.state
        full_list = sdf_state_names
    else:
        use_geojson = national_json
        key_id = 'properties.NAME_LOCAL'
        area_col_name = cols.nation
        full_list = sdf_national_names

    # get name of column with measurements of interest
    if value_meas == measurement_options[0]:
        value_col_name = cols.temperature
        min_col = 7
        max_col = 12
    else:
        value_col_name = cols.rainfall
        min_col = 400
        max_col = 2000

    # aggregate and plot data
    df = aggregate.aggregate_spatially( sdf, year, value_col_name, area_col_name, full_list )
    fig = make_choropleth_figure( df, use_geojson, key_id, min_col, max_col )

    text = f'Selected year: {year}'
    return fig, text



@callback(
    Output(plot, 'figure'),
    Output(diff_plot, 'figure'),
    Input( cmap, 'clickData' ),
    Input( level_select, 'value' ),
    Input( measurement_select, 'value' )
)
def show_additional_data(clickData, value_lod, value_meas):
    caused_by = ctx.triggered_id
    if caused_by == level_select_id or caused_by == measurement_select_id:
        # clear additional output
        return {}, {}

    if clickData is not None:
        area_name = clickData['points'][0]['location']

        # translate values of drop downs to data-frame column-names
        dic = {level_options[0]: cols.nation, level_options[1]: cols.state, level_options[2]: cols.county}
        area_col_name = dic[value_lod]

        # set values specific to the measurements
        if value_meas == measurement_options[0]:
            value_col_name = cols.temperature
            title = f'Annual mean temperatures in {area_name}'
            y_label = 'Temperature [°C]'
        else:
            value_col_name = cols.rainfall
            title = f'Annual total rainfall in {area_name}'
            y_label = 'Rainfall [l/m²]'

        # aggregate and plot data
        pdf_annual = aggregate.aggregate_temporarly( sdf, value_col_name, area_name, area_col_name, sdf_years )
        pdf_running = aggregate.aggregate_running_mean( sdf, value_col_name, area_name, area_col_name, sdf_years )

        temp_plot = make_temperature_plot( pdf_annual, pdf_running, title, y_label )
        if value_meas == measurement_options[0]:
            diff_plot = make_temp_diff_plot( area_col_name, area_name )
        else:
            diff_plot = {}

        output = temp_plot, diff_plot
    else:
        output = {}, {}

    return output



if __name__ == '__main__':
    app.run(debug=False, port=8054)

In [11]:
spark.stop()