### Calculating batch statistics


In this notebook we'll calculate statistics based on the data recieved from sensors. It'll be further used to build predictive models.

In [1]:
# First, we need to import some libraries and initialize Spark:

from os.path import join

import findspark
findspark.init('/home/alex/spark-2.4.3-bin-hadoop2.7')

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('calc-stats').getOrCreate()

from pyspark.sql.functions import concat, col, lit

Let's import data from all the files from the _processed_ directory. We know that all the files are of the same format so we ask Spark to infer data types and to use header:

In [2]:
data = spark.read.csv(join('..','data','processed'), inferSchema=True, header=True)
data.printSchema()

root
 |-- Equipment: string (nullable = true)
 |-- Sensor: string (nullable = true)
 |-- DateTime: timestamp (nullable = true)
 |-- Batch: string (nullable = true)
 |-- Status: integer (nullable = true)
 |-- Value: double (nullable = true)
 |-- StringValue: string (nullable = true)
 |-- Delta: integer (nullable = true)



Sometimes it's convenient to use SQL syntax. For example, using SQL, we can calculate such statistics as maximum, minimum, average and standard deviation of the signal for every batch, equipment, status and sensor:

In [3]:
# Initialize data view
data.createOrReplaceTempView("data")

In [4]:
spark.sql('''
    SELECT Batch, Equipment, Status, Sensor,
           min(Value) min_value, max(Value) max_value,
           avg(Value) avg_value, std(Value) std_value FROM data
    WHERE Sensor <> 'BATCH' AND Sensor <> 'STATUS'
    GROUP BY Batch, Equipment, Status, Sensor
    ORDER BY Batch, Equipment, Status, Sensor
''').show()

+------+---------+------+------+-------------------+------------------+------------------+------------------+
| Batch|Equipment|Status|Sensor|          min_value|         max_value|         avg_value|         std_value|
+------+---------+------+------+-------------------+------------------+------------------+------------------+
|B10001|     EQ01|     0| POWER| 0.8166018352206666|234.52915671993958|118.44601031257261| 67.97115667939978|
|B10001|     EQ01|     0|  TEMP|0.13629972554137326| 7.204119708946687| 3.695704560097333|1.8595244777380584|
|B10001|     EQ02|     0| POWER| 0.5490865080468417| 196.7394043401826| 94.57460402295054|58.499911873289314|
|B10001|     EQ02|     0|  TEMP| 1.3258145430902477|231.63468740045474|117.05393107947519|  67.7050913745392|
|B10001|     EQ02|     1| POWER| 0.6868167974953926|145.83674862401526| 71.56339880605131|41.278839209071364|
|B10001|     EQ02|     1|  TEMP|0.40395794914323024| 65.23380018446827| 33.79116209768979| 18.59236338906267|
|B10001|  

We will store all the calculation in the specific format: batch, statistic description (feature), value. That way it will be easy to just append newly calculated statistics.

In [5]:
spark.sql('''
    SELECT Batch, concat(Equipment,'_ST',Status,'_',Sensor,'_min') as Feature,
           min(Value) Value FROM data
    WHERE Sensor <> 'BATCH' AND Sensor <> 'STATUS'
    GROUP BY Batch, Equipment, Status, Sensor
    ORDER BY Batch, Feature
''').show()

+------+------------------+-------------------+
| Batch|           Feature|              Value|
+------+------------------+-------------------+
|B10001|EQ01_ST0_POWER_min| 0.8166018352206666|
|B10001| EQ01_ST0_TEMP_min|0.13629972554137326|
|B10001|EQ02_ST0_POWER_min| 0.5490865080468417|
|B10001| EQ02_ST0_TEMP_min| 1.3258145430902477|
|B10001|EQ02_ST1_POWER_min| 0.6868167974953926|
|B10001| EQ02_ST1_TEMP_min|0.40395794914323024|
|B10001|EQ02_ST2_POWER_min|  0.560560721831199|
|B10001| EQ02_ST2_TEMP_min| 0.8535380540261546|
|B10001|EQ03_ST0_POWER_min| 0.2060085592214812|
|B10001| EQ03_ST0_TEMP_min| 0.7627633485921386|
|B10002|EQ01_ST0_POWER_min|0.12791689439997234|
|B10002| EQ01_ST0_TEMP_min|0.15875894773020682|
|B10002|EQ02_ST0_POWER_min|0.01986560635990497|
|B10002| EQ02_ST0_TEMP_min| 2.6715467536769917|
|B10002|EQ02_ST1_POWER_min| 0.5512370532104673|
|B10002| EQ02_ST1_TEMP_min| 0.7480056729028342|
|B10002|EQ02_ST2_POWER_min| 1.1799732516937331|
|B10002| EQ02_ST2_TEMP_min| 0.3487601421

In [6]:
# calculate min, max, avg and std for every batch, equipment, status, sensor

tables = []

for stat in ['min', 'max', 'avg', 'std']:
    tables.append(spark.sql(
    '''
        SELECT Batch, concat(Equipment,'_ST',Status,'_',Sensor,'_{}') as Feature,
               {}(Value) Value FROM data
        WHERE Sensor <> 'BATCH' AND Sensor <> 'STATUS'
        GROUP BY Batch, Equipment, Status, Sensor
        ORDER BY Batch, Feature
    '''.format(stat,stat)))

It is also possible to make aggregation without SQL. We will calculate weighted average that way:

In [7]:
weighted_avg =\
    data.filter("Sensor <> 'BATCH' and Sensor <> 'STATUS'")\
        .withColumn('WeightedValue', data.Value*data.Delta)\
        .groupBy(['Batch', 'Equipment', 'Status', 'Sensor'])\
        .agg({'Delta': 'sum', 'WeightedValue': 'sum'})\
        .withColumn('Feature',concat(col('Equipment'),\
                                    lit('_ST'),col('Status'),
                                    lit('_'),col('Sensor'),
                                    lit('_weighted_avg')))\
        .withColumn('Value', col('sum(WeightedValue)')/col('sum(Delta)'))\
        .select(['Batch','Feature','Value'])

In [8]:
weighted_avg.show()

+------+--------------------+------------------+
| Batch|             Feature|             Value|
+------+--------------------+------------------+
|B10054|EQ01_ST0_TEMP_wei...| 34.99804995898578|
|B10343|EQ02_ST1_TEMP_wei...| 73.91405720706193|
|B10351|EQ03_ST0_POWER_we...| 31.97015157990611|
|B10245|EQ03_ST0_TEMP_wei...| 43.37907305008837|
|B10221|EQ02_ST1_TEMP_wei...| 41.29693787053319|
|B10463|EQ03_ST0_POWER_we...|44.597730362606804|
|B10450|EQ02_ST0_TEMP_wei...| 105.4038662090142|
|B10119|EQ01_ST0_POWER_we...| 18.58378329510701|
|B10372|EQ02_ST1_TEMP_wei...|124.37761547134666|
|B10272|EQ02_ST2_TEMP_wei...| 44.61103544930486|
|B10178|EQ01_ST0_POWER_we...| 9.578686596026547|
|B10150|EQ03_ST0_TEMP_wei...|17.705416541740906|
|B10430|EQ02_ST0_POWER_we...|127.91370735573052|
|B10396|EQ03_ST0_POWER_we...| 5.485610202610873|
|B10403|EQ03_ST0_POWER_we...|30.579866432372835|
|B10306|EQ03_ST0_POWER_we...| 24.88337710662018|
|B10086|EQ02_ST0_POWER_we...|142.56863785675816|
|B10091|EQ02_ST0_TEM

In [9]:
# append all statistics
features = tables[0].union(tables[1]).union(tables[2]).union(tables[3]).union(weighted_avg)

Next, features data should be pivoted. After that we transform the result to pandas.

In [10]:
features_pd = features.groupBy('Batch').pivot('Feature').avg('Value').toPandas()

Finally, we save it as a csv file:

In [11]:
features_pd.to_csv(join('..','data','features.csv'), index=False)