# **Setting Apache Spark on Google Colab**

---



In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xzvf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark==1.3.0

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
!pip install pyspark==3.1.2

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import  StringType, IntegerType

spark = SparkSession.builder.master("local[*]").getOrCreate()

# **Load Data & Clean Data & Process Data**

---



## Read Data

In [None]:
data = spark.read.csv('/content/data_test/*.csv', header = False, inferSchema = False) #Read all data that have .csv
data=data.withColumn("filename", input_file_name()) #Create a column that contrain path of files
data = data.withColumn('filename', regexp_replace('filename', 'file:///content/data_test/', '')) #Repalce path of files

number_files=int(data.select('filename').distinct().count())
print("Number of all files : {}".format(number_files)) # Output number of all data that we readed

Number of all files : 4


In [None]:
data.show(truncate=False) # Show raw data that we readed

+-----+----------------+-----------+-----------+-----------+-------------------+
|_c0  |_c1             |_c2        |_c3        |_c4        |filename           |
+-----+----------------+-----------+-----------+-----------+-------------------+
|ID   |First name      |Last name  |Score Part1|Score Part2|sample_data_004.csv|
|30485|Bokim           |Enjivoko   |42         |21         |sample_data_004.csv|
|42443|Zine            |Honolim    |34         |33         |sample_data_004.csv|
|89923|Boxinz          |Fulamaranko|23         |34         |sample_data_004.csv|
|ID   |First name      |Last name  |Score Part1|Score Part2|sample_data_003.csv|
|08067|Lin             |Enjivoko   |35         |42         |sample_data_003.csv|
|09443|Dome            |Fulkivol   |23         |37         |sample_data_003.csv|
|08963|Valki           |Domtomkiz  |12         |25         |sample_data_003.csv|
|ID   |Name            |Score      |null       |null       |sample_data_002.csv|
|07012|Bim Sarimarai   |38  

## Separate row of data that contrain header

In [None]:
body_data=data.filter(data['_c0'] !="ID") #Separate row of data that contrain header
col_data=data.filter(data['_c0'] =="ID")

In [None]:
number_type_col_1=int(col_data.distinct().count()) # Output number of data that contrain header
print("Number of data that contrain header : {}".format(number_type_col_1))

Number of data that contrain header : 4


In [None]:
if number_type_col_1 == number_files:
  print("PASS :)")                                               # Check number of data that contrain header must have equivalent number of all data that we readed
else:
  print("TYPE COLUMN NOT MACTH WITH NUMBER FILE!!!")

PASS :)


In [None]:
number_type_col_2=int(col_data.dropDuplicates(['_c0','_c1','_c2','_c3','_c4']).count()) # Show Unique data that contrain header
print("Number of type columns : {}".format(number_type_col_2))

Number of type columns : 3


In [None]:
col_data.show() # Show data that contrain header

+---+----------+---------+-----------+-----------+-------------------+
|_c0|       _c1|      _c2|        _c3|        _c4|           filename|
+---+----------+---------+-----------+-----------+-------------------+
| ID|First name|Last name|Score Part1|Score Part2|sample_data_004.csv|
| ID|First name|Last name|Score Part1|Score Part2|sample_data_003.csv|
| ID|      Name|    Score|       null|       null|sample_data_002.csv|
| ID|First name|Last name|      Score|       null|sample_data_001.csv|
+---+----------+---------+-----------+-----------+-------------------+



In [None]:
name_file_type1=col_data.filter(col_data['_c3'] =="Score")
name_file_type1=name_file_type1.withColumn('type',lit('type1')).select("filename","type")   # Set type of data

name_file_type2=col_data.filter(col_data['_c1'] =="Name")
name_file_type2=name_file_type2.withColumn('type',lit('type2')).select("filename","type")  # Set type of data

name_file_type3=col_data.filter((col_data['_c4'] =="Score Part2") )
name_file_type3=name_file_type3.withColumn('type',lit('type3')).select("filename","type")  # Set type of data

table_type=name_file_type1
table_type=table_type.unionByName(name_file_type2, allowMissingColumns=True)
table_type=table_type.unionByName(name_file_type3, allowMissingColumns=True)

table_type.show() # Show table that contrain type of data

+-------------------+-----+
|           filename| type|
+-------------------+-----+
|sample_data_001.csv|type1|
|sample_data_002.csv|type2|
|sample_data_004.csv|type3|
|sample_data_003.csv|type3|
+-------------------+-----+



## Separate row of data that contrain body data

In [None]:
data_added_type=body_data.join(table_type,body_data.filename == table_type.filename,"inner").drop("filename")  # Join data that contrain with table that contrain type of data and drop filename column


data_type1=data_added_type.filter(data_added_type['type'] =="type1") # saprated data for setting
data_type2=data_added_type.filter(data_added_type['type'] =="type2")
data_type3=data_added_type.filter(data_added_type['type'] =="type3")

data_type1 = data_type1.withColumn("Name", concat(data_type3["_c1"],lit(' '),data_type3["_c2"]))      # setting body data that saprated
data_type1 = data_type1.withColumnRenamed("_c0", "ID")\
                                    .withColumnRenamed("_c3", "Score")\
                                    .select('ID','Name','Score')

data_type2 = data_type2.withColumnRenamed("_c0", "ID")\
                                    .withColumnRenamed("_c1", "Name")\
                                    .withColumnRenamed("_c2", "Score")\
                                    .select('ID','Name','Score')

data_type3 = data_type3.withColumn("Score", data_type3["_c3"].cast(IntegerType())+data_type3["_c4"].cast(IntegerType()))
data_type3 = data_type3.withColumn("Name", concat(data_type3["_c1"],lit(' '),data_type3["_c2"]))
data_type3 = data_type3.withColumnRenamed("_c0", "ID")\
                                    .select('ID','Name','Score')

data_set_column=data_type1
data_set_column=data_set_column.unionByName(data_type2, allowMissingColumns=True) # Union body data that saprated and seted
data_set_column=data_set_column.unionByName(data_type3, allowMissingColumns=True)

In [None]:
table_data_missing = data_set_column.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c
                           )).alias(c)
                    for c in data_set_column.columns]).show() #Show number of row that have null value

+---+----+-----+
| ID|Name|Score|
+---+----+-----+
|  0|   0|    1|
+---+----+-----+



In [None]:
col_will_see_null="Score"
data_set_column.filter(col(col_will_see_null).contains('None') | col(col_will_see_null).contains('NULL') | (col(col_will_see_null) == '' ) |  col(col_will_see_null).isNull() | isnan(col_will_see_null) ).show() #Show row of body data that have null value

+-----+----------------+-----+
|   ID|            Name|Score|
+-----+----------------+-----+
|10876|Nomoji Boomhabim| null|
+-----+----------------+-----+



In [None]:
data_set_column=data_set_column.dropna() #Drop row of data that have null value

## Define grade of student by using UDF  

In [None]:
data_set_column = data_set_column.withColumn("Score", data_set_column["Score"].cast(IntegerType()))

def define_grade(score):
      if score>=90:
        grade = "A"
      elif score>=80:
        grade = "B"
      elif score>=70:
        grade = "C"
      elif score>=60:
        grade = "D"
      else:
        grade = "F"
      return grade

define_grade_UDF = udf(lambda z: define_grade(z),StringType())

data_set_column=data_set_column.withColumn('Grade', define_grade_UDF(col('Score')))

data_set_column.show()

+-----+------------------+-----+-----+
|   ID|              Name|Score|Grade|
+-----+------------------+-----+-----+
|07019|      John Jinmano|   89|    B|
|07023|    Ampino Nattima|   45|    F|
|07540|     Pina Aimomani|   54|    F|
|07012|     Bim Sarimarai|   38|    F|
|07885|    Fern Chimokoli|   86|    B|
|08459|     Non Koomjicus|   78|    C|
|30485|    Bokim Enjivoko|   63|    D|
|42443|      Zine Honolim|   67|    D|
|89923|Boxinz Fulamaranko|   57|    F|
|08067|      Lin Enjivoko|   77|    C|
|09443|     Dome Fulkivol|   60|    D|
|08963|   Valki Domtomkiz|   37|    F|
+-----+------------------+-----+-----+



# **Write Files**

---



In [None]:
data_set_column.write.csv('/content/result/Cleaned_data.csv', header = True)