In [1]:
!pip install pyspark



In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('covid-19').master('local[*]').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/16 09:16:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/11/16 09:16:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
spark

In [35]:
df = spark.read.csv('covid-19.csv', header = True, inferSchema = True)

In [36]:
df.printSchema()

root
 |-- txn_date: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age_number: double (nullable = true)
 |-- age_range: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- job: string (nullable = true)
 |-- risk: string (nullable = true)
 |-- patient_type: string (nullable = true)
 |-- province: string (nullable = true)
 |-- time: string (nullable = true)



In [37]:
print((df.count(), len(df.columns)))

(5947, 11)


In [39]:
df.show(20, False)

+----------+-------+------+----------+---------+-----------+----+---------------------------------------------+-------------------+---------+-------------------+
|txn_date  |user_id|gender|age_number|age_range|nationality|job |risk                                         |patient_type       |province |time               |
+----------+-------+------+----------+---------+-----------+----+---------------------------------------------+-------------------+---------+-------------------+
|2021-11-16|2010632|หญิง  |18.0      |10-19 ปี |null       |null|สัมผัสใกล้ชิดกับผู้ป่วยยืนยันรายก่อนหน้านี้  |2.สัมผัสผู้ติดเชื้อ|เชียงใหม่|2021-11-16 07:31:38|
|2021-11-16|2010633|ชาย   |37.0      |30-39 ปี |null       |null|ไปสถานที่ชุมชน เช่น ตลาดนัด สถานที่ท่องเที่ยว|10.อื่นๆ           |เชียงใหม่|2021-11-16 07:31:38|
|2021-11-16|2010634|หญิง  |50.0      |50-59 ปี |null       |null|อยู่ระหว่างการสอบสวน                         |10.อื่นๆ           |เชียงใหม่|2021-11-16 07:31:38|
|2021-11-16|2010635|ชาย   |6

In [44]:
df = df.withColumnRenamed("txn_date","timestamp")

In [45]:
df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age_number: double (nullable = true)
 |-- age_range: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- job: string (nullable = true)
 |-- risk: string (nullable = true)
 |-- patient_type: string (nullable = true)
 |-- province: string (nullable = true)
 |-- time: string (nullable = true)



In [61]:
df_clean = df.select('time','user_id','gender', 'age_number', 'nationality', 'job', 'risk', 'province')

In [62]:
df_clean.where(df_clean.nationality == 'Thai').count()

5132

In [64]:
df_clean.select('nationality').count()

5947

In [66]:
df_clean = df_clean.withColumnRenamed("time","timestamp")

In [68]:
df_clean.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age_number: double (nullable = true)
 |-- nationality: string (nullable = true)
 |-- job: string (nullable = true)
 |-- risk: string (nullable = true)
 |-- province: string (nullable = true)



In [71]:
#เปลี่ยน timestamp type จาก string เป็น timestamp
from pyspark.sql import functions as f

df_clean = df_clean.withColumn('timestamp', f.to_timestamp(df_clean.timestamp,'yyyy-MM-dd HH:mm:ss'))

In [74]:
df_clean.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age_number: double (nullable = true)
 |-- nationality: string (nullable = true)
 |-- job: string (nullable = true)
 |-- risk: string (nullable = true)
 |-- province: string (nullable = true)



In [76]:
#เช็คค่า missing value
from pyspark.sql.functions import col, sum

df_clean2 = df_clean.select([ sum(col(colname).isNull().cast("int")).alias(colname) for colname in df_clean.columns ])
df_clean2.show()

[Stage 38:>                                                         (0 + 1) / 1]

+---------+-------+------+----------+-----------+----+----+--------+
|timestamp|user_id|gender|age_number|nationality| job|risk|province|
+---------+-------+------+----------+-----------+----+----+--------+
|        0|      0|     0|       555|        579|5837|   0|       0|
+---------+-------+------+----------+-----------+----+----+--------+



                                                                                

In [77]:
df_clean.where(df_clean.age_number.isNull()).show()


+-------------------+-------+-------+----------+-----------+----+--------------------+---------+
|          timestamp|user_id| gender|age_number|nationality| job|                risk| province|
+-------------------+-------+-------+----------+-----------+----+--------------------+---------+
|2021-11-16 07:31:38|2014927|   หญิง|      null|       Thai|null|               อื่นๆ|  ปัตตานี|
|2021-11-16 07:31:38|2010865|    ชาย|      null|       null|null|               อื่นๆ|เชียงใหม่|
|2021-11-16 07:31:38|2010903|    ชาย|      null|       null|null|สัมผัสใกล้ชิดกับผ...|เชียงใหม่|
|2021-11-16 07:31:38|2015234|    ชาย|      null|       Thai|null|               อื่นๆ|     สตูล|
|2021-11-16 07:31:38|2015253|   หญิง|      null|       Thai|null|สัมผัสใกล้ชิดกับผ...|     สตูล|
|2021-11-16 07:31:38|2011211|ไม่ระบุ|      null|       Thai|null|          ระบุไม่ได้| พิษณุโลก|
|2021-11-16 07:31:38|2011212|ไม่ระบุ|      null|       Thai|null|          ระบุไม่ได้| พิษณุโลก|
|2021-11-16 07:31:38|2011213|ไ

In [84]:
#กำหนดให้ค่า missing value ใน age_number เป็น 0
from pyspark.sql.functions import when

df_clean_age_number = df_clean.withColumn('age_number_update', when(df_clean['age_number'].isNull(), '0').otherwise(df_clean['age_number']))

In [90]:
df_clean_age_number.show()

+-------------------+-------+------+----------+-----------+----+--------------------+---------+-----------------+
|          timestamp|user_id|gender|age_number|nationality| job|                risk| province|age_number_update|
+-------------------+-------+------+----------+-----------+----+--------------------+---------+-----------------+
|2021-11-16 07:31:38|2010632|  หญิง|      18.0|       null|null|สัมผัสใกล้ชิดกับผ...|เชียงใหม่|             18.0|
|2021-11-16 07:31:38|2010633|   ชาย|      37.0|       null|null|ไปสถานที่ชุมชน เช...|เชียงใหม่|             37.0|
|2021-11-16 07:31:38|2010634|  หญิง|      50.0|       null|null|อยู่ระหว่างการสอบสวน|เชียงใหม่|             50.0|
|2021-11-16 07:31:38|2010635|   ชาย|      63.0|       null|null|อยู่ระหว่างการสอบสวน|เชียงใหม่|             63.0|
|2021-11-16 07:31:38|2010636|   ชาย|      19.0|       null|null|อยู่ระหว่างการสอบสวน|เชียงใหม่|             19.0|
|2021-11-16 07:31:38|2010637|   ชาย|      50.0|       null|null|               อื่นๆ|เชี

In [92]:
df_clean = df_clean_age_number.drop('age_number').withColumnRenamed('age_number_update', 'age_number')

In [93]:
df_clean.show()

+-------------------+-------+------+-----------+----+--------------------+---------+----------+
|          timestamp|user_id|gender|nationality| job|                risk| province|age_number|
+-------------------+-------+------+-----------+----+--------------------+---------+----------+
|2021-11-16 07:31:38|2010632|  หญิง|       null|null|สัมผัสใกล้ชิดกับผ...|เชียงใหม่|      18.0|
|2021-11-16 07:31:38|2010633|   ชาย|       null|null|ไปสถานที่ชุมชน เช...|เชียงใหม่|      37.0|
|2021-11-16 07:31:38|2010634|  หญิง|       null|null|อยู่ระหว่างการสอบสวน|เชียงใหม่|      50.0|
|2021-11-16 07:31:38|2010635|   ชาย|       null|null|อยู่ระหว่างการสอบสวน|เชียงใหม่|      63.0|
|2021-11-16 07:31:38|2010636|   ชาย|       null|null|อยู่ระหว่างการสอบสวน|เชียงใหม่|      19.0|
|2021-11-16 07:31:38|2010637|   ชาย|       null|null|               อื่นๆ|เชียงใหม่|      50.0|
|2021-11-16 07:31:38|2010638|  หญิง|       null|null|สัมผัสใกล้ชิดกับผ...|เชียงใหม่|      18.0|
|2021-11-16 07:31:38|2010639|  หญิง|    

In [94]:
#กำหนดให้ค่า missing value ใน nationality เป็น Unspecified 

df_clean_nationality = df_clean.withColumn('nationality_update', when(df_clean['nationality'].isNull(), 'Unspecified').otherwise(df_clean['nationality']))

In [95]:
df_clean = df_clean_nationality.drop('nationality').withColumnRenamed('nationality_update', 'nationality')

In [97]:
#กำหนดให้ค่า missing value ใน job เป็น Unspecified

df_clean_job = df_clean.withColumn('job_update', when(df_clean['job'].isNull(), 'Unspecified').otherwise(df_clean['job']))

In [98]:
df_clean = df_clean_job.drop('job').withColumnRenamed('job_update', 'job')

In [102]:
df_clean = df_clean.select([ sum(col(colname).isNull().cast("int")).alias(colname) for colname in df_clean.columns ])
df_clean.show()

+---------+-------+------+----+--------+----------+-----------+---+
|timestamp|user_id|gender|risk|province|age_number|nationality|job|
+---------+-------+------+----+--------+----------+-----------+---+
|        0|      0|     0|   0|       0|         0|          0|  0|
+---------+-------+------+----+--------+----------+-----------+---+



In [101]:
# เซฟเป็น 1 ไฟล์ (ใช้ single worker)
df_clean.coalesce(1).write.csv('Cleaned_covid-19.csv', header = True)

                                                                                

In [104]:
df_clean.write.csv('Cleaned_covid-19-2.csv', header = True)

                                                                                