# Analyse the Data Quality from historical data

To use the notebook, the following Python modules must be installed from the command line.

`pip install pandas`

`pip install tqdm`

In [None]:
import findspark
findspark.init() #necessary to find the local spark
import pandas as pd
import datetime as dt
from tqdm import tqdm_notebook as tqdm
from pyspark.sql import Row, SparkSession
from pyspark import SparkContext
from pyspark.sql.types import IntegerType, StringType, DoubleType, BooleanType, TimestampType, StructField, StructType
from pyspark.sql.functions import col, sum, udf

In [None]:
sc = SparkContext()
spark = SparkSession.builder\
        .getOrCreate()

**Note:** Before the files can be read, they must be transferred to the HDFS. (For procedure, see documentation / installation instructions)

In [None]:
# df = spark.read.json('hdfs://192.168.0.10:9000/user/hadoop/IotTestbed_Dateien/festo_sensor_data_couchdb.json') #mock data
df = spark.read.json('hdfs://192.168.0.10:9000/user/hadoop/IotTestbed_Dateien/festo_sensor_data.json')

In [None]:
df.createOrReplaceTempView("iotData")

In [None]:
df.printSchema()

In [None]:
def get_date_utc(date):
    return dt.datetime.utcfromtimestamp(int(date)/1000).strftime('%Y-%m-%d %H:%M:%S.%f')

## general analysis

#### number of records in the data set

In [None]:
print("no. rows: {0}, no. columns: {1}, no. columns nested: {2}".format(
    df.count(), len(df.columns), len(df.select("doc.*").columns)))

#### get time period
from `1512746433260 - 2017-12-08 | 15:20:33`

to `1520587748768 - 2018-03-09 | 09:29:08`

In [None]:
spark.sql("SELECT doc.timestamp FROM iotData ORDER BY timestamp").show(50, False)
spark.sql("SELECT doc.timestamp FROM iotData ORDER BY timestamp DESC").show(2)

In [None]:
# get_date_utc(1520587748768)

#### modules in the data set

In [None]:
# number of modules
spark.sql("SELECT DISTINCT doc.modul FROM iotData WHERE doc.modul IS NOT NULL ORDER BY modul").count()

In [None]:
# list all modules
spark.sql("SELECT DISTINCT doc.modul FROM iotData ORDER BY modul").show(1000, False)

#### sensors in the data set

In [None]:
# list all sensor names
spark.sql("SELECT DISTINCT doc.sensor FROM iotData ORDER BY sensor").show(1000, False)

In [None]:
# list all sensors per module
spark.sql("SELECT DISTINCT doc.modul, doc.sensor FROM iotData ORDER BY sensor").show(1000, False)

In [None]:
sensor_names = spark.sql("SELECT DISTINCT doc.sensor FROM iotData ORDER BY sensor")
sensors_num = sensor_names.select("sensor").rdd.flatMap(lambda x: x).collect()

## check valid timestamp

In [None]:
def check_valid_month(date):
    valid_months = list(range(1,13,1))
    month = dt.datetime.utcfromtimestamp(int(date)/1000).strftime('%m')
    if int(month) not in valid_months:
        return False
    return True

In [None]:
# valid years are from 2010 to 2025
def check_valid_year(date):
    valid_years = list(range(2010,2026,1))
    year = dt.datetime.utcfromtimestamp(int(date)/1000).strftime('%Y')
    if int(year) not in valid_years:
        return False
    return True

In [None]:
def check_valid_time(date):
    valid_hours = list(range(0,25,1))
    valid_minutes = list(range(0,61,1))
    valid_seconds = list(range(0,61,1))
    hour = dt.datetime.utcfromtimestamp(int(date)/1000).strftime('%H')
    minute = dt.datetime.utcfromtimestamp(int(date)/1000).strftime('%M')
    second = dt.datetime.utcfromtimestamp(int(date)/1000).strftime('%S')
    if int(hour) not in valid_hours:
        return False
    if int(minute) not in valid_minutes:
        return False
    if int(second) not in valid_seconds:
        return False
    return True

In [None]:
timestamps = spark.sql("SELECT doc.timestamp FROM iotData ORDER BY timestamp")
timestamps_list = timestamps.select("timestamp").rdd.flatMap(lambda x: x).collect()

In [None]:
unvalid_timestamps = []
for timestamp in timestamps_list:
    if timestamp != None and timestamp != '0':
        if not check_valid_month(timestamp):
            unvalid_timestamps.append(timestamp)
        if not check_valid_year(timestamp):
            unvalid_timestamps.append(timestamp)
        if not check_valid_time(timestamp):
            unvalid_timestamps.append(timestamp)

In [None]:
unvalid_timestamps

## check duplicate sensor names

In [None]:
df_sensors = spark.sql("SELECT DISTINCT doc.sensor FROM iotData WHERE doc.sensor IS NOT NULL ORDER BY sensor").collect()
sensors = []
for i in range(len(df_sensors)):
    s = df_sensors[i].asDict()['sensor'].split('.')[::-1][0]
    sensors.append(df_sensors[i].asDict()['sensor'].split('.')[::-1])
    
s_name_1 = [i for i in sensors if len(i) == 1]
s_name_2 = [i for i in sensors if len(i) != 1]
duplicate_sensor_names = []

for i in range(len(s_name_2)):
    for j in range(len(s_name_1)):
        if s_name_2[i][0] == s_name_1[j][0]:
            duplicate_sensor_names.append(['.'.join(s_name_2[i][::-1]),''.join(s_name_1[j])])

In [None]:
len(duplicate_sensor_names)

#### analysis for every duplicated sensor name

In [None]:
for i in range(len(duplicate_sensor_names)):
    dp_df = spark.sql("SELECT DISTINCT doc.sensor, doc.modul FROM iotData\
                           WHERE doc.sensor='{}'\
                           OR doc.sensor='{}' ORDER BY modul"\
                          .format(duplicate_sensor_names[i][0], duplicate_sensor_names[i][1])).collect()
    
    for d in dp_df:    
        df_timestamp = spark.sql("SELECT MIN(doc.timestamp) AS Min, MAX(doc.timestamp) AS Max, doc.modul, doc.sensor\
            FROM iotData GROUP BY doc.modul, doc.sensor\
            HAVING modul = '{}' AND sensor = '{}'"\
                                .format(d.asDict()['modul'], d.asDict()['sensor']))
        lambda_get_date_utc = udf(lambda x: get_date_utc(x), returnType=StringType())

        df_timestamp = df_timestamp.withColumn('Min_utc', lambda_get_date_utc(df_timestamp.Min))
        df_timestamp = df_timestamp.withColumn('Max_utc', lambda_get_date_utc(df_timestamp.Max))
        df_timestamp.select("Min_utc", "Max_utc", "modul", "sensor").show(2, False)

#### analysis for only one sensor name

In [None]:
a = 'PressPneu'
b = 'App.xHL_BG1'
df_timestamp = spark.sql("SELECT MIN(doc.timestamp) AS Min, MAX(doc.timestamp) AS Max, doc.modul, doc.sensor\
            FROM iotData GROUP BY doc.modul, doc.sensor\
            HAVING modul = '{}' AND sensor = '{}'"\
                                .format(a, b))
lambda_get_date_utc = udf(lambda x: get_date_utc(x), returnType=StringType())

df_timestamp = df_timestamp.withColumn('Min_utc', lambda_get_date_utc(df_timestamp.Min))
df_timestamp = df_timestamp.withColumn('Max_utc', lambda_get_date_utc(df_timestamp.Max))
df_timestamp.select("Min_utc", "Max_utc", "modul", "sensor").show(2, False)

b = 'xHL_BG1'
df_timestamp = spark.sql("SELECT MIN(doc.timestamp) AS Min, MAX(doc.timestamp) AS Max, doc.modul, doc.sensor\
            FROM iotData GROUP BY doc.modul, doc.sensor\
            HAVING modul = '{}' AND sensor = '{}'"\
                                .format(a, b))
lambda_get_date_utc = udf(lambda x: get_date_utc(x), returnType=StringType())

df_timestamp = df_timestamp.withColumn('Min_utc', lambda_get_date_utc(df_timestamp.Min))
df_timestamp = df_timestamp.withColumn('Max_utc', lambda_get_date_utc(df_timestamp.Max))
df_timestamp.select("Min_utc", "Max_utc", "modul", "sensor").show(2, False)

## statistical analysis for sensors with numerical values

#### create a stats dataframe for sensor with number values

In [None]:
moduls = spark.sql("SELECT DISTINCT doc.modul, doc.sensor \
FROM iotData ORDER BY modul").collect()

schemaStats = StructType([
  StructField('count',IntegerType(), True),
  StructField('mean',DoubleType(), True),
  StructField('stddev',DoubleType(), True),
  StructField('min',DoubleType(), True),
  StructField('max',DoubleType(), True),
  StructField('modul',StringType(), True),
  StructField('sensor',StringType(), True)
])

In [None]:
# only sensors with numbers
exclude = ['','']

df_sensors_num = df.select('doc.sensor','doc.modul')\
    .filter(~col('doc.value')\
    .isin(['true','false']))\
    .createOrReplaceTempView("sensors_num")
df_sens = spark.sql("SELECT DISTINCT sensor FROM sensors_num ORDER BY sensor")
sensors_num = df_sens.select("sensor").rdd.flatMap(lambda x: x).collect()
for e in exclude:
    sensors_num.remove(e)

#### modules with sensors that have numarical values

In [None]:
spark.sql("SELECT DISTINCT modul FROM sensors_num ORDER BY modul").show()

#### statistics per column

In [None]:
df_stats = spark.createDataFrame([(0, 0.0, 0.0, 0.0, 0.0,"","")], schema=schemaStats)

for item in tqdm(moduls):
    if item.asDict()['sensor'] == 'Temp':
#     if item.asDict()['sensor'] in sensors_num:
        col_stats = df.filter(df.doc.modul == item.asDict()['modul'])\
            .filter(df.doc.sensor == item.asDict()['sensor'])\
            .filter(~col('doc.value').isin(['true','false']))\
            .describe('doc.value')

        df_pd = col_stats.toPandas().set_index("summary").transpose()
        df_pd['modul'] = item.asDict()['modul']
        df_pd['sensor'] = item.asDict()['sensor']        

        newRow = spark.createDataFrame(df_pd)
        df_stats = df_stats.union(newRow)

In [None]:
df_stats.write.csv('hdfs://192.168.0.10:9000/user/hadoop/Iot_Analytics_Results/sensors_with_numbers_statistics.csv')

In [None]:
df_stats.show(1000)

### find outliers in the numerical data frame

In [None]:
# calculate the outlier treshold with the 1.5 times interquartile range
def calculate_outlier_thresholds(q25, q75):
    iqr = q75 - q25
    cut_off = iqr * 1.5
    lower, upper = q25 - cut_off, q75 + cut_off
    return cut_off, lower, upper

In [None]:
schemaStats = StructType([
  StructField('count',IntegerType(), True),
  StructField('mean',DoubleType(), True),
  StructField('stddev',DoubleType(), True),
  StructField('min',DoubleType(), True),
  StructField('max',DoubleType(), True),
  StructField('quantil_25',DoubleType(), True),
  StructField('quantil_75',DoubleType(), True),
  StructField('threshold_lower',DoubleType(), True),
  StructField('threshold_upper',DoubleType(), True),
  StructField('outliers_num',IntegerType(), True),
  StructField('data_len',IntegerType(), True),
  StructField('outliers_percentage',DoubleType(), True),
  StructField('modul',StringType(), True),
  StructField('sensor',StringType(), True)
])

#### find outliers for a single column

In [None]:
df_sum = df.filter(df.doc.modul == 'Temp')\
    .filter(df.doc.sensor == 'Temp').select('doc.modul','doc.sensor','doc.value')
df_sum = df_sum.withColumn("value", df_sum["value"].cast(DoubleType()))

q75 = df_sum.approxQuantile("value", [0.75], 0)[0]
q25 = df_sum.approxQuantile("value", [0.25], 0)[0]

cut_off, lower, upper = calculate_outlier_thresholds(q25, q75)
outliers_num = df_sum.filter((df_sum.value < lower) | (df_sum.value > upper)).count()
data_len = df_sum.count()
outliers_percentage = outliers_num/data_len

outliers_num

#### find outliers for all columns

In [None]:
df_stats = spark.createDataFrame([(0, 0.0, 0.0, 0.0, 0.0,0.0,0.0,0.0,0.0,0,0,0.0,"","")], schema=schemaStats)

for item in tqdm(moduls):
    if item.asDict()['sensor'] == 'Temp':
#     if item.asDict()['sensor'] in sensors_num:
        col_stats = df.filter(df.doc.modul == item.asDict()['modul'])\
            .filter(df.doc.sensor == item.asDict()['sensor'])\
            .filter(~col('doc.value').isin(['true','false']))\
            .describe('doc.value')

        df_sum = df.filter(df.doc.modul == item.asDict()['modul'])\
                .filter(df.doc.sensor == item.asDict()['sensor']).select('doc.value')
        df_sum = df_sum.withColumn("value", df_sum["value"].cast(DoubleType()))
        q75 = df_sum.approxQuantile("value", [0.75], 0)[0]
        q25 = df_sum.approxQuantile("value", [0.25], 0)[0]
        cut_off, lower, upper = calculate_outlier_thresholds(q25, q75)
        outliers_num = df_sum.filter((df_sum.value < lower) | (df_sum.value > upper)).count()
        data_len = df_sum.count()
        outliers_percentage = outliers_num/data_len

        df_pd = col_stats.toPandas().set_index("summary").transpose()
        df_pd['quantil_25'] = q25
        df_pd['quantil_75'] = q75
        df_pd['threshold_lower'] = lower
        df_pd['threshold_upper'] = upper
        df_pd['outliers_num'] = outliers_num
        df_pd['data_len'] = data_len
        df_pd['outliers_percentage'] = outliers_percentage
        df_pd['modul'] = item.asDict()['modul']
        df_pd['sensor'] = item.asDict()['sensor']        

        newRow = spark.createDataFrame(df_pd)
        df_stats = df_stats.union(newRow)

## Number of Null-Values per Column

In [None]:
spark.sql("SELECT * FROM df WHERE (iCarrierID IS NOT NULL) AND value IS NOT NULL").show(1000, False)

In [None]:
for col in df.columns:
    sql_string = "SELECT COUNT(*) - COUNT({}) AS NULL_VALUES, COUNT({}) FROM df".format(col, col)
    spark.sql(sql_string).show(20, False)