<a href="https://colab.research.google.com/github/aekanun2020/2022-PUB_COC-Data-Science-for-Tourism/blob/main/E_Null_Detection_and_Clean_using_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
   .appName("Neural Network Model") \
   .config("spark.executor.memory", "3gb") \
   .getOrCreate()
   
sc = spark.sparkContext

sc

In [None]:
from pyspark.sql import functions as sparkf

raw_df = spark.createDataFrame(
 [
  ('Store 1',1,448),
  ('Store 1',2,None),
  ('Store 1',3,499),
  ('Store 1',44,432),
  (None,None,None),
  ('Store 2',1,355),
  ('Store 2',1,355),
  ('Store 2',None,345),
  ('Store 2',3,387),
  ('Store 2',4,312)
],
 ['Store','WeekInMonth','Revenue']
)


In [None]:
raw_df.printSchema()

root
 |-- Store: string (nullable = true)
 |-- WeekInMonth: long (nullable = true)
 |-- Revenue: long (nullable = true)



In [None]:
raw_df.count()

10

In [None]:
raw_df.show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|   null|
|Store 1|          3|    499|
|Store 1|         44|    432|
|   null|       null|   null|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|       null|    345|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+



In [None]:
raw_df.describe().show()

+-------+-------+------------------+------------------+
|summary|  Store|       WeekInMonth|           Revenue|
+-------+-------+------------------+------------------+
|  count|      9|                 8|                 8|
|   mean|   null|             7.375|           391.625|
| stddev|   null|14.841423689890979|62.741960213469355|
|    min|Store 1|                 1|               312|
|    max|Store 2|                44|               499|
+-------+-------+------------------+------------------+



In [None]:
raw_df.select(
  [sparkf.count(sparkf.when(sparkf.isnull(c), c)).alias(c) for c in raw_df.columns]
).show()


+-----+-----------+-------+
|Store|WeekInMonth|Revenue|
+-----+-----------+-------+
|    1|          2|      2|
+-----+-----------+-------+



In [None]:
#ระบุว่า Nulls อยู่ที่ row ไหน - เพื่อดูว่าเป็น MCAR / MAR / MNAR / Structural Missing

from functools import reduce

raw_df.filter(reduce(lambda a1, a2: a1 | a2,\
                     (sparkf.col(c).isNull() \
                      for c in raw_df.columns))).show()


+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          2|   null|
|   null|       null|   null|
|Store 2|       null|    345|
+-------+-----------+-------+



In [None]:
#ซ่อน/ลบ เฉพาะ row ที่มีค่า nulls ในทุก attributes แล้วซ่อนค่า nulls จากนั้นส่งให้กับตัวแปรใหม่ [III]

noNullRow_Df = raw_df.dropna('all')

noNullRow_Df.show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|   null|
|Store 1|          3|    499|
|Store 1|         44|    432|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|       null|    345|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+



In [None]:
#ซ่อน/ลบ row ที่มีค่า nulls ใน attributes ใด attribute หนึ่ง (หรือทุก attributes) จาก subset [IV]


raw_df.dropna(how='any', \
              subset=['Store','WeekInMonth']).show()


+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|   null|
|Store 1|          3|    499|
|Store 1|         44|    432|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+



In [None]:
#ซ่อน/ลบ row ที่มีค่า nulls ใน attributes ใด attribute หนึ่ง (หรือทุก attributes) แล้วส่งให้กับตัวแปรใหม่ [V]

raw_df.dropna(how='any').show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          3|    499|
|Store 1|         44|    432|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+



In [None]:
#แทนที่ค่า nulls ด้วยเลข 9999 เฉพาะใน “Revenue” [VI]

raw_df.fillna(9999,['Revenue']).show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|   9999|
|Store 1|          3|    499|
|Store 1|         44|    432|
|   null|       null|   9999|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|       null|    345|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+



In [None]:
#แทนที่ค่า nulls ด้วยเลข 0 ในทุก Attributes ซึ่งเป็น numeric และด้วยค่าอื่นๆ สำหรับ string [VII]
raw_df.fillna({'Store':'Assume_Store 1',\
              'WeekInMonth':'2','Revenue':3}).show()

+--------------+-----------+-------+
|         Store|WeekInMonth|Revenue|
+--------------+-----------+-------+
|       Store 1|          1|    448|
|       Store 1|          2|      3|
|       Store 1|          3|    499|
|       Store 1|         44|    432|
|Assume_Store 1|          2|      3|
|       Store 2|          1|    355|
|       Store 2|          1|    355|
|       Store 2|          2|    345|
|       Store 2|          3|    387|
|       Store 2|          4|    312|
+--------------+-----------+-------+



In [None]:
# [VIII]
from pyspark.sql import functions as sparkf

In [None]:
raw_df.show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|   null|
|Store 1|          3|    499|
|Store 1|         44|    432|
|   null|       null|   null|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|       null|    345|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+



In [None]:
raw_df.printSchema()

root
 |-- Store: string (nullable = true)
 |-- WeekInMonth: long (nullable = true)
 |-- Revenue: long (nullable = true)



In [None]:
magic_percentile = sparkf.expr('percentile_approx(Revenue, 0.5)')

In [None]:
raw_df.na.drop().orderBy('Revenue').show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 2|          4|    312|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|          3|    387|
|Store 1|         44|    432|
|Store 1|          1|    448|
|Store 1|          3|    499|
+-------+-----------+-------+



In [None]:
raw_df.na.drop().groupBy('Store').agg(magic_percentile.alias('med_val'))\
.filter(sparkf.col('Store') == 'Store 1').show()

+-------+-------+
|  Store|med_val|
+-------+-------+
|Store 1|    448|
+-------+-------+



In [None]:
imputed_value = raw_df.na.drop().groupBy('Store').agg(magic_percentile.alias('med_val'))\
.filter(sparkf.col('Store') == 'Store 1').collect()[0][1]

In [None]:
imputed_value

448

In [None]:
raw_df.withColumn('noNull_Revenue'\
                  ,sparkf.when((sparkf.col('Store')=='Store 1')\
                &(sparkf.col('Revenue').isNull()),imputed_value).otherwise(sparkf.col('Revenue'))).show()

+-------+-----------+-------+--------------+
|  Store|WeekInMonth|Revenue|noNull_Revenue|
+-------+-----------+-------+--------------+
|Store 1|          1|    448|           448|
|Store 1|          2|   null|           448|
|Store 1|          3|    499|           499|
|Store 1|         44|    432|           432|
|   null|       null|   null|          null|
|Store 2|          1|    355|           355|
|Store 2|          1|    355|           355|
|Store 2|       null|    345|           345|
|Store 2|          3|    387|           387|
|Store 2|          4|    312|           312|
+-------+-----------+-------+--------------+



In [None]:
raw_df.show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|   null|
|Store 1|          3|    499|
|Store 1|         44|    432|
|   null|       null|   null|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|       null|    345|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+

