<a href="https://colab.research.google.com/github/Utlak88/NarrativeWave-App/blob/main/NarrativeWave_App.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Narrative Wave App**

This Notebook documents the steps to load and process csv data in prepartion for plotting in a Django application. The data will ultimately be stored in parquet format.

PySpark and pandas will be needed to analyze and manipulate the provided dataset.

PySpark requires a series of steps to install. First install Java Virtual Machine and Apache Spark with Hadoop.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

Next install findspark to locate Spark within the system and import as a library.

In [None]:
!pip install -q findspark

Set environment paths to allow PySpark to run.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

Locate Spark within the system.

In [None]:
import findspark
findspark.init()

Next import PySpark and pandas.

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *
import pandas

Initiating a Spark session.

In [None]:
spark = SparkSession.builder.getOrCreate()

Data is initially provided in csv format. These files can be read into a variable with the following line, which requires that all csv files are saved in the same directory.

In [None]:
csv_data = spark.read.option("header", True).csv('/content/csv_files/*.csv')

The dataset can be visualized by converting to a pandas dataframe, which indicates that the dataset is structured in long format.

The initial data structure consists of three columns labeled 'timestamp', 'tag', and 'value'.

In [None]:
csv_df = csv_data.toPandas()
csv_df.head()

tag is provided in the format '/​narrativewave​/​WTG13​/Amb_WindSpeed_Avg' where 'WTG13' is labeled as 'asset' and 'Amb_WindSpeed_Avg' is labeled as 'column'. These elements need to be extracted from the tag, which is accomplished by the two functions below.

In [None]:
def asset_extract(tag):
    return tag.split('/')[2]

def column_extract(tag):
    return tag.split('/')[3]

Spark requires that the two above functions be expressed as user defined functions (UDF).

In [None]:
data_asset_extract_udf = f.udf(asset_extract, StringType())
data_column_extract_udf = f.udf(column_extract, StringType())

Now assets and columns can be extracted.

In [None]:
data_asset_extracted = csv_data.withColumn('asset', data_asset_extract_udf('tag'))
data_asset_column_extracted = data_asset_extracted.withColumn('column', data_column_extract_udf('tag'))

With assets and columns extracted, the tag column is no longer needed.

In [None]:
data_only_extracted = data_asset_column_extracted.drop('tag')

Visualize the processed dataset as a dataframe.

In [None]:
data_only_extracted_df = data_only_extracted.toPandas()
data_only_extracted_df.head()

Entries in the 'value' column associated with 'TimeStamp' in the column labeled as 'column' are non-numeric. As the goal is to plot data values, rows containing 'TimeStamp' need to be removed.

This does not impact the dataset as timestamps are also included in the 'timestamp' column.

In [None]:
data_no_time_stamp_column = data_only_extracted.filter(f.col('column') != 'TimeStamp')

By default, all dataset entries are strings, which need to be converted to intended types.

Entries in the 'value' column, for instance, need to be converted to a numeric type. It will be helpful to first view a section from 'value'.

In [None]:
data_no_time_stamp_column_df = data_no_time_stamp_column.toPandas()
data_no_time_stamp_column_df['value'][:20]

This sampling indicates that both integer and floats exist as values.

An additional validation can be performed by using a regex search to confirm that no characters exist in the 'value' column. Note, this search excludes 'E' to bypass numeric values reported in scientific notation.

In [None]:
data_no_time_stamp_column_df['value'].str.contains(r'[a-zA-DF-Z]').unique()

The regex search indicates that all values are numeric. As such, these values can now be converted to float type, which will properly account for all number formats in the 'value' column.

In [None]:
data_values_float_type = data_no_time_stamp_column.withColumn('value', data_no_time_stamp_column.value.cast(FloatType()))

Similarly, timestamp strings need to be converted to timestamp format.

In [None]:
data_timestamp_type = data_values_float_type.withColumn('timestamp', f.to_timestamp('timestamp'))

Month and year can now be extracted from the timestamps.

In [None]:
data_month_year_columns = data_timestamp_type.withColumn('month', f.month('timestamp')).withColumn('Year', f.year('timestamp'))

It is useful at this point to visualize the updated dataset.

In [None]:
data_month_year_columns_df = data_month_year_columns.toPandas()
data_month_year_columns_df.head()

Converted data types can also be confirmed.

In [None]:
data_month_year_columns_df.dtypes

Data types have been defined as intended.

The dataset now needs to be converted to wide format in preparation for storing in parquet format.

In [None]:
data_values_wide_format = data_month_year_columns.groupby('asset', 'timestamp', 'month', 'year').pivot('column').agg(f.sum('value'))

Visualizing wide-structured dataset.

In [None]:
data_values_wide_format_df = data_values_wide_format.toPandas()
data_values_wide_format_df.head()

There is the potential that a percentage of column values may be missing. Let's verify if that is the case, and if so, where these missing values are located.

In [None]:
pandas.set_option('display.max_row', None)
data_values_wide_format_df.isnull().sum().sort_values(ascending=False)

The above table indicates that missing values exist in 14 columns, all of which except one are prediction categories. These could be addressed by removing from the dataset or employing a form of interpolation if bracketed by values at close timestamps.

The current effort will not alter the missing values as the end goal is to demonstrate that a REST API service can be consumed and does not request that the data be plotted. The missing data will not be impactful if the dataset is not graphically visualized.

The dataset can now be stored in a directory in parquet format partitioned by asset, year, and month.

In [None]:
data_values_wide_format.write.partitionBy('asset', 'year', 'month').parquet('/content/parquet_output/')