In [None]:
!apt-get update                                                                          # อัพเดท Package ทั้งหมดใน VM ตัวนี้
!apt-get install openjdk-8-jdk-headless -qq > /dev/null                                  # ติดตั้ง Java Development Kit (จำเป็นสำหรับการติดตั้ง Spark)
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz # ติดตั้ง Spark 3.1.2
!tar xzvf spark-3.1.2-bin-hadoop2.7.tgz                                                  # Unzip ไฟล์ Spark 3.1.2
!pip install -q findspark==1.3.0                                                         # ติดตั้ง Package Python สำหรับเชื่อมต่อกับ Spark 

In [None]:
# Set enviroment variable ให้ Python รู้จัก Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
# ติดตั้ง PySpark ลงใน Python
!pip install pyspark==3.1.2

In [None]:
# Server ของ Google Colab มีกี่ Core
!cat /proc/cpuinfo

In [None]:
ใช้ `local[*]` เพื่อเปิดการใช้งานการประมวลผลแบบ multi-core. Spark จะใช้ CPU ทุก core ที่อนุญาตให้ใช้งานในเครื่อง.

In [None]:
# สร้าง Spark Session เพิ้อใช้งาน Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# ดูเวอร์ชั่น Python
import sys
sys.version_info

In [None]:
# ดูเวอร์ชั่น Spark
spark.version

In [1]:
## Load  Data

In [None]:
# Download Data File
!wget -O data.zip https://file.designil.com/zdOfUE+
!unzip data.zip

In [None]:
dt = spark.read.csv('/content/ws2_data.csv', header = True, inferSchema = True, )

In [2]:
# Step 2) Data Profiling

In [None]:
# ดูว่ามีคอลัมน์อะไรบ้าง
dt

In [None]:
# ดูข้อมูล
dt.show()

In [None]:
# ดูข้อมูล 100 แถวแรก
dt.show(100)

In [None]:
# อีกคำสั่งในการดูข้อมูลแต่ละคอลัมน์ (Schema)
dt.printSchema()

In [None]:
# นับจำนวนแถวและ column
print((dt.count(), len(dt.columns)))

In [None]:
# สรุปข้อมูลสถิติ
dt.describe().show()

In [None]:
# อีกคำสั่งในการสรุปข้อมูลสถิติ
# ReferenceL: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.summary.html

dt.summary().show()

In [None]:
# สรุปข้อมูลสถิติเฉพาะ column ที่ระบุ
dt.select("price").describe().show()

In [None]:
# Solution - คอลัมน์ไหน? user_id
dt.summary("count").show()

In [None]:
# Solution - แสดงแถวข้อมูลที่มี Missing Value
dt.where( dt.user_id.isNull() ).show()

In [6]:
# Step 3) EDA - Exploratory Data Analysis

In [None]:
## Non-Graphical EDA

เราสามารถใช้คำสั่ง Spark ในการค้นหาข้อมูลที่ต้องการได้

In [None]:
# ข้อมูลที่เป็นตัวเลข
dt.where(dt.price >= 1).show()

In [None]:
# ข้อมูลที่เป็นตัวหนังสือ
dt.where(dt.country == 'Canada').show()

In [None]:
### Exercise 2: 
1. การซื้อทั้งหมดที่เกิดขึ้นในเดือนเมษายน มีกี่แถว
2. การซื้อทั้งหมดที่เกิดขึ้นในเดือนสิงหาคม มีกี่แถว

In [None]:
# Solution - ไม่มีสักแถว (ข้อมูลมีแค่เดือน 5 - 7)
dt.where( dt.timestamp.startswith("2021-04") ).count()
dt.where( dt.timestamp.startswith("2021-08") ).count()

In [None]:
dt.filter(dt.timestamp >= "2021-04-01").filter(dt.timestamp <= "2021-04-30").count()

In [7]:
## Graphical EDA



In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
# แปลง Spark Dataframe เป็น Pandas Dataframe - ใช้เวลาประมาณ 6 วินาที
dt_pd = dt.toPandas()

In [None]:
# ดูตัวอย่างข้อมูล
dt_pd.head()

In [None]:
# Boxplot - แสดงการกระจายตัวของข้อมูลตัวเลข
sns.boxplot(x = dt_pd['book_id'])

In [None]:
# Histogram - แสดงการกระจายตัวของข้อมูลตัวเลข
# bins = จำนวน bar ที่ต้องการแสดง
sns.histplot(dt_pd['price'], bins=10)

In [None]:
# Solution - ใช้ Scatterplot
sns.scatterplot(x=dt_pd.book_id, y=dt_pd.price)

In [None]:
####  สร้าง interactive chart

In [None]:
# Plotly - interactive chart
import plotly.express as px
fig = px.scatter(dt_pd, 'book_id', 'price')
fig.show()

In [None]:
# Step 4) Data Cleansing with Spark

In [None]:
### แปลง Data Type

ปัญหาที่เจอบ่อยที่สุดแบบหนึ่งในข้อมูล คือ **Data Type ไม่ตรงกับที่เราต้องการ**

In [None]:
dt.show(truncate=False)

In [None]:
# Show top 5 rows
dt.show(5)

In [None]:
# Show Schema
dt.printSchema()

In [None]:
dt.select("timestamp").show(10)

In [None]:
# แปลง string เป็น datetime
from pyspark.sql import functions as f

dt_clean = dt.withColumn("timestamp",
                        f.to_timestamp(dt.timestamp, 'yyyy-MM-dd HH:mm:ss')
                        )
dt_clean.show()

In [None]:
dt_clean.printSchema()

In [None]:
# นับยอด transaction ช่วงครึ่งเดือนแรก ของเดือนมิถุนายน
dt_clean.where( (f.dayofmonth(dt_clean.timestamp) <= 15) & ( f.month(dt_clean.timestamp) == 6 ) ).count()

In [None]:
## Anomalies Check

ใช้ Spark ตามหาสิ่งที่ผิดปกติในข้อมูล

In [None]:
###  Syntactical Anomalies

In [None]:
คุณเห็นชื่อประเทศที่สะกดผิดในคอลัมน์ Country มั้ย

In [None]:
# ใน Data set ชุดนี้ มีข้อมูลจากกี่ประเทศ
dt_clean.select("Country").distinct().count()

In [None]:
# แทนที่เลข 0 ด้วยจำนวนประเทศ เพื่อดูรายชื่อประเทศทั้งหมด
# sort = ทำให้ข้อมูลเรียงตามตัวอักษร อ่านง่ายขึ้น
# show() ถ้าไม่ใส่ตัวเลขจะขึ้นมาแค่ 20 อัน และใส่ False เพื่อให้แสดงข้อมูลในคอลัมน์แบบเต็ม ๆ (หากไม่ใส่ คอลัมน์ที่ยาวจะถูกตัดตัวหนังสือ)
dt_clean.select("Country").distinct().sort("Country").show( 58, False )

In [None]:
# สำคัญ: เปลี่ยน aaa เป็นชื่อประเทศที่คุณคิดว่าผิด
dt_clean.where(dt_clean['Country'] == 'Japane').show()

In [None]:
# สำคัญ: เปลี่ยน aaa เป็นชื่อประเทศที่คุณคิดว่าผิด และ bbb เป็นชื่อประเทศที่ถูกต้อง
from pyspark.sql.functions import when

dt_clean_country = dt_clean.withColumn("CountryUpdate", when(dt_clean['Country'] == 'Japane', 'Japan').otherwise(dt_clean['Country']))

In [None]:
# ตรวจสอบข้อมูลที่แก้ไขแล้ว
dt_clean_country.select("CountryUpdate").distinct().sort("CountryUpdate").show(58, False)

In [None]:
# ดูหน้าตาข้อมูลตอนนี้
dt_clean_country.show()

In [None]:
# เอาคอลัมน์ CountryUpdate ไปแทนที่คอลัมน์ Country
dt_clean = dt_clean_country.drop("Country").withColumnRenamed('CountryUpdate', 'Country')

In [None]:
# ดูหน้าตาข้อมูล
dt_clean.show()

In [None]:
### ความผิดปกติ 2) Semantic Anomalies

**Integrity constraints**: ค่าอยู่นอกเหนือขอบเขตของค่าที่รับได้ เช่น
- user_id: ค่าจะต้องเป็นตัวเลขหรือตัวหนังสือ 8 ตัวอักษร

In [None]:
# ดูว่าข้อมูล user_id ตอนนี้หน้าตาเป็นอย่างไร
dt_clean.select("user_id").show(10)

In [None]:
# นับจำนวน user_id ทั้งหมด
dt_clean.select("user_id").count()

In [None]:
ดูว่า user_id ตรงตามรูปแบบที่เราต้องการ มีกี่แถว

คำใบ้: ใช้เว็บไซต์ https://www.regex101.com เพื่อสร้าง Regular Expression ตามรูปแบบที่เราต้องการ

In [None]:
# แทนที่ ... ด้วย Regular Expression ของรูปแบบ user_id ที่เราต้องการ
dt_clean.where(dt_clean["user_id"].rlike("^[a-z0-9]{8}$")).count()

In [None]:
# แทนที่ ... ด้วย Regular Expression ของรูปแบบ user_id ที่เราต้องการ
dt_correct_userid = dt_clean.filter(dt_clean["user_id"].rlike("^[a-z0-9]{8}$"))
dt_incorrect_userid = dt_clean.subtract(dt_correct_userid)

dt_incorrect_userid.show(10)

In [None]:
# ลองเอาโค้ดำสหรับแทนค่า จาก Exercise 4 มาใช้
dt_clean_userid = dt_clean.withColumn("user_id_update", when(dt_clean['user_id'] == 'ca86d17200', 'ca86d172').otherwise(dt_clean['user_id']))

In [None]:
# ตรวจสอบผลลัพธ์
dt_correct_userid = dt_clean_userid.filter(dt_clean_userid["user_id"].rlike("^[a-z0-9]{8}$"))
dt_incorrect_userid = dt_clean_userid.subtract(dt_correct_userid)

dt_incorrect_userid.show(10)

In [None]:
# เอาคอลัมน์ user_id_update ไปแทนที่ user_id
dt_clean = dt_clean_userid.drop("user_id").withColumnRenamed('user_id_update', 'user_id')

In [None]:
###  Missing values


In [None]:
# วิธีที่ 1 ในการเช็ค Missing Value
# ใช้เทคนิค List Comparehension - ทบทวนได้ใน Pre-course Python https://school.datath.com/courses/road-to-data-engineer-2/contents/6129b780564a8
# เช่น [ print(i) for i in [1,2,3] ]

# col = คำสั่ง Spark ในการเลือกคอลัมน์
# sum = คำสั่ง Spark ในการคิดผลรวม
from pyspark.sql.functions import col, sum

dt_nulllist = dt_clean.select([ sum(col(colname).isNull().cast("int")).alias(colname) for colname in dt_clean.columns ])
dt_nulllist.show()

In [None]:
# วิธีที่ 2 ในการเช็ค Missing Value - จาก Exercise 1 โค้ดสะอาดกว่ามาก แต่ต้องมาบวกลบเอง
dt_clean.summary("count").show()

In [None]:
# ดูช้อมูลว่าแถวไหนมี user_id เป็นค่าว่างเปล่า (โค้ดเดียวกับ Exercise 1)

dt_clean.where( dt_clean.user_id.isNull() ).show()

In [None]:
#### Exercise 6:
ทางทีม Data Analyst แจ้งว่าอยากให้เราแทน user_id ที่เป็น NULL ด้วย 00000000 ไปเลย

In [None]:
# Solution

dt_clean_userid = dt_clean.withColumn("user_id_update", when(dt_clean['user_id'].isNull(), '00000000').otherwise(dt_clean['user_id']))

In [None]:
dt_clean = dt_clean_userid.drop("user_id").withColumnRenamed('user_id_update', 'user_id')

In [None]:
# เช็คว่า user ID ที่เป็น NULL หายไปแล้วจริงไหม
dt_clean.where( dt_clean.user_id.isNull() ).show()

In [None]:
### ความผิดปกติ 4) Outliers:

ข้อมูลที่สูงหรือต่ำผิดปกติจากข้อมูลส่วนใหญ่

มาลองใช้ Boxplot ในการหาค่า Outlier ของราคาหนังสือ

In [None]:
dt_clean_pd = dt_clean.toPandas()

In [None]:
sns.boxplot(x = dt_clean_pd['price'])

In [None]:
เห็นได้ว่ามีหนังสือบางเล่มที่ราคาสูงกว่าปกติไปเยอะมาก ลองมาดูกันว่าหนังสือ book_id อะไรบ้าง ที่ราคาเกิน $80

In [None]:
dt_clean.where( dt_clean.price > 80 ).select("book_id").distinct().show()

In [None]:
เราสามารถนำ Book_ID อันนี้ไปเช็คต่อกับแหล่งข้อมูลได้ ว่าเป็นหนังสืออะไร และราคาเกิน $80 ผิดปกติมั้ย

ถ้าเอาไปเช็คในข้อมูลจาก Workshop 1 ก็จะพบว่า Book_ID = 635 คือ หนังสือชื่อ "The Power Broker"
https://www.audible.com/pd/The-Power-Broker-Audiobook/B0051JH67K?ipRedirectOverride=true&overrideBaseCountry=true&pf_rd_p=2756bc30-e1e4-4174-bb22-bce00b971761&pf_rd_r=MF7KC1JQF3A6GK2ET8XM

In [None]:
# แปลงข้อมูลจาก Spark DataFrame ให้เป็น TempView ก่อน
dt.createOrReplaceTempView("data")
dt_sql = spark.sql("SELECT * FROM data")
dt_sql.show()

In [None]:
# ลองแปลงโค้ดสำหรับลิสต์ชื่อประเทศเป็น SQL
dt_sql_country = spark.sql("""
SELECT distinct country
FROM data
ORDER BY country
""")
dt_sql_country.show(100)

In [None]:
# ลองแปลงโค้ดสำหรับแทนที่ชื่อประเทศ เป็น SQL
dt_sql_result = spark.sql("""
SELECT timestamp, user_id, book_id,
  CASE WHEN country = 'Japane' THEN 'Japan' ELSE country END AS country,
price
FROM data
""")
dt_sql_result.show()

In [None]:
# เช็คผลลัพธ์ว่าถูกจริงมั้ย
dt_sql_result.select("country").distinct().sort("country").show(58, False)

In [None]:
# เช็คว่ามีข้อมูล user_id ที่ไม่เป็นตัวหนังสือหรือตัวเลข 8 หลักมั้ย
dt_check_userid = spark.sql("""
SELECT *
FROM data
WHERE user_id NOT RLIKE '^[a-z0-9]{8}$'
""").show()

In [None]:
# Solution: แทนค่า (ใช้ CASE WHEN ตามปกติ)
dt_sql_uid_result = spark.sql("""
SELECT timestamp,
CASE WHEN user_id = 'ca86d17200' THEN 'ca86d172' ELSE user_id END AS user_id,
book_id, country, price
FROM data
""")
dt_sql_uid_result.show()

In [None]:
# เช็คว่าข้อมูลหายไปหรือยัง
dt_sql_uid_result.where( dt_sql_uid_result.user_id == 'ca86d17200' ).show()

In [None]:
# Step 5) Save data เป็น CSV

โดยปกติแล้ว Spark จะทำการ Save ออกมาเป็นหลายไฟล์ เพราะใช้หลายเครื่องในการประมวลผล

In [None]:
# เซฟเป็น partitioned files (ใช้ multiple workers)
dt_clean.write.csv('Cleaned_data.csv', header = True)

In [None]:
# เซฟเป็น 1 ไฟล์ (ใช้ single worker)
dt_clean.coalesce(1).write.csv('Cleaned_Data_Single.csv', header = True)