# Workshop 2: Data Cleansing with Spark

ภาพรวมของคอร์สนี้

![alt text](https://cdn-std.droplr.net/files/acc_513973/z7Gqhs)

Workshop 2 นี้เราจะทำอะไรกันบ้าง

![alt text](https://cdn-std.droplr.net/files/acc_513973/SCN8wh)

## Spark Cheatsheet by DataCamp

แนะนำให้โหลดเก็บไว้ อุ่นใจกว่า <3

![alt text](https://cdn-std.droplr.net/files/acc_513973/PglivG)

**RDD:**
https://www.datacamp.com/community/blog/pyspark-cheat-sheet-python 

**DataFrame:**
https://www.datacamp.com/community/blog/pyspark-sql-cheat-sheet

## ข้อมูลขายของออนไลน์
### Data Dictionary
https://archive.ics.uci.edu/ml/datasets/Online+Retail

This is a transactional data set which contains all the transactions occurring between 01/12/2018 and 09/12/2019 for a UK-based and registered non-store online retail.

The company mainly sells unique all-occasion gifts. Many customers of the company are wholesalers.

- InvoiceNo: Invoice number. Nominal, a 6-digit integral number uniquely assigned to each transaction. If this code starts with letter 'c', it indicates a cancellation.
- StockCode: Product (item) code. Nominal, a 5-digit integral number uniquely assigned to each distinct product.
- Description: Product (item) name. Nominal.
- Quantity: The quantities of each product (item) per transaction. Numeric.
- InvoiceDate: Invice Date and time. Numeric, the day and time when each transaction was generated.
- UnitPrice: Unit price. Numeric, Product price per unit in sterling.
- CustomerID: Customer number. Nominal, a 5-digit integral number uniquely assigned to each customer.
- Country: Country name. Nominal, the name of the country where each customer resides.


## ลง Pyspark และเชื่อมต่อ Google Colab กับ Google Drive

In [None]:
# ลง Spark ใน Google Colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xzvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark==1.3.0

In [None]:
# Set enviroment variable ให้รู้จัก Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [None]:
# ลง pyspark ผ่านคำสั่ง pip
!pip install pyspark==2.4.5

#### ใช้งาน Spark

ใช้ `local[*]` เพื่อเปิดการใช้งานการประมวลผลแบบ multi-core. Spark จะใช้ CPU ทุก core ที่อนุญาตให้ใช้งานในเครื่อง.

In [None]:
# Server ของ Google Colab มีกี่ Core
!cat /proc/cpuinfo

In [None]:
# สร้าง Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# Get Python version
import sys
sys.version_info

In [None]:
# Get Spark version
spark.version

#### เชื่อมต่อ Google Drive

In [None]:
# เชื่อมต่อ Google colab กับ Google Drive
from google.colab import drive
drive.mount('/content/drive')

## Load data


ใช้คำสั่ง `spark.read.csv` เพื่ออ่านข้อมูลจากไฟล์ CSV

Arguments:

Header = True << บอกให้ Spark รู้ว่าบรรทัดแรกในไฟล์ CSV เป็น Header


Inferschema = True << บอกให้ Spark พยายามเดาว่าแต่ละ column มี type เป็นอะไร ถ้าตั้งเป็น False, ทุก column จะถูกอ่านเป็น string

In [None]:
dt = spark.read.csv('/content/drive/My Drive/Data Science Chill/Online 2020: Road to Data Engineer/Workshop Files/WS2/Data Files/Online Retail WS2.csv', header = True, inferSchema = True, )

### Data Profiling

Data Profiling is a process of analysing summary of the data.

Example: max, min, average, sum, how many missing values etc.

#### Data

> Columns
- InvoiceNo
- StockCode
- Description
- Quantity
- InvoiceDate
- UnitPrice
- CustomerID
- Country

In [None]:
dt

In [None]:
dt.show()

In [None]:
dt.show(100)

In [None]:
# Show Schema
dt.dtypes

In [None]:
# Show Schema (อีกแบบ)
dt.printSchema()

In [None]:
# นับจำนวนแถวและ column
print((dt.count(), len(dt.columns)))

In [None]:
# สรุปข้อมูลสถิติ
dt.describe().show()

In [None]:
# สรุปข้อมูลสถิติ
dt.summary().show()

In [None]:
# สรุปข้อมูลสถิติเฉพาะ column ที่ระบุ
dt.select("Quantity", "UnitPrice").describe().show()

### Exercise: ลองเช็ค Median ของ ค่า Quantity

In [None]:
# Write Answer here

### ดู Summary แล้วเห็นอะไรบ้าง?
- Missing values?
- Mean, Min, Max

## EDA - Exploratory Data Analysis

### Non-Graphical EDA

In [None]:
# Select text-based information
dt.where(dt['Quantity'] < 0).show()

### Exercise: 
1. ลองเลือก Quantity ระหว่าง 50 - 120
2. ลองเลือก UnitPrice ระหว่าง 0.1 - 0.5
3. Quantity ระหว่าง 50 - 120 และ UnitPrice ระหว่าง 0.1 - 0.5

In [None]:
# TODO: 1. Quantity 50 - 120

In [None]:
# TODO: 2. UnitPrice 0.1 - 0.5

In [None]:
# TODO: 3. Quantity 50 - 120 and UnitPrice 0.1 - 0.5

### Graphical EDA


Spark ไม่ได้ถูกพัฒนามาเพื่องาน plot ข้อมูล เพราะฉะนั้นเราจะใช้ package `seaborn` `matplotlib` และ `pandas` ในการ plot ข้อมูลแทน

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
# แปลง Spark Dataframe เป็น Pandas Dataframe
dt_pd = dt.toPandas()

In [None]:
dt_pd.head()

In [None]:
# เลือกข้อมูล 500 แถวแรกเพื่อความรวดเร็วและความเรียบง่ายในการ visualize ข้อมูล
dt_pd_subset = dt_pd[0:500]

In [None]:
# Boxplot
sns.boxplot(dt_pd_subset['UnitPrice'])

In [None]:
# Histogram
sns.distplot(dt_pd_subset['UnitPrice']) 
plt.show()

In [None]:
# Scatterplot
dt_pd_subset.plot.scatter('UnitPrice', 'Quantity')

#### Bonus: สร้าง interactive chart

In [None]:
# Plotly - interactive chart
import plotly.express as px
fig = px.scatter(dt_pd_subset, 'UnitPrice', 'Quantity')
fig.show()

### Type Conversion

แปลง `InvoiceDate` จาก string -> date

In [None]:
# Show top 5 rows
dt.show(5)

In [None]:
# Show Schema
dt.printSchema()

Is the date DD/MM/YYYY or MM/DD/YYYY? Let's find out



In [None]:
# Show unique Invoice Date
dt.select("InvoiceDate").distinct().show()

In [None]:
# แปลง string เป็น date
from pyspark.sql import functions as f

# dt_temp = dt.withColumn('InvoiceDateTime', functions.to_date(
#     functions.unix_timestamp('InvoiceDate', 'dd/MM/yyyy HH:mm').cast('timestamp')
# ))

dt_temp = dt.withColumn('InvoiceDateTime', 
    f.unix_timestamp('InvoiceDate', 'dd/MM/yyyy HH:mm').cast('timestamp')
)
dt_temp.show()

In [None]:
dt_temp.printSchema()

In [None]:
dt_final = dt_temp.drop('InvoiceDate')
dt_final.show()

In [None]:
dt_final.printSchema()

## Data Cleansing with Spark

### Anomalies Check

#### Syntactical Anomalies
**Lexical errors** เช่น พิมพ์ผิด

In [None]:
# Check country distinct values. Find something interesting?
# ลองมาดูชื่อประเทศกัน เจออะไรบ้าง ?
dt_final.select("Country").distinct().show()

In [None]:
dt_final.where(dt_final['Country'] == 'EIREs').show()

In [None]:
# เปลี่ยน EIREs เป็น EIRE
from pyspark.sql.functions import when

dt_temp_eire = dt_final.withColumn("CountryUpdate", when(dt_final['Country'] == 'EIREs', 'EIRE').otherwise(dt_final['Country']))

In [None]:
# Check the result
dt_temp_eire.select("CountryUpdate").distinct().show()

In [None]:
# Create final Dataframe
dt_final_eire = dt_temp_eire.drop("Country").withColumnRenamed('CountryUpdate', 'Country')

In [None]:
dt_final_eire.show()

#### Semantic Anomalies

**Integrity constraints**: ค่าอยู่นอกเหนือขอบเขตของค่าที่รับได้ เช่น
- Stockcode: ค่าจะต้องเป็นตัวเลข 5 ตัว

In [None]:
dt_final_eire.select("Stockcode").show(100)

In [None]:
dt_final_eire.count()

In [None]:
dt_final_eire.filter(dt_final_eire["Stockcode"].rlike("^[0-9]{5}$")).count()

In [None]:
# ลองดูข้อมูลที่ถูกต้อง
dt_final_eire.filter(dt_final_eire["Stockcode"].rlike("^[0-9]{5}$")).show(5)

In [None]:
# ลองดูข้อมูลที่ไม่ถูกต้อง
dt_correct_stockcode = dt_final_eire.filter(dt_final_eire["Stockcode"].rlike("^[0-9]{5}$"))
dt_incorrect_stockcode = dt_final_eire.subtract(dt_correct_stockcode)

dt_incorrect_stockcode.show(10)

> คุณเห็น Pattern ของ Stock Code ที่ไม่ถูกต้องหรือยัง?

In [None]:
# ลบตัวอักษรตัวสุดท้ายออกจาก stock code
from pyspark.sql.functions import regexp_replace

dt_temp_stockcode = dt_final_eire.withColumn("StockcodeUpdate", regexp_replace(dt_final_eire['Stockcode'], r'[A-Z]', ''))

In [None]:
# Check the result
dt_temp_stockcode.show()

In [None]:
# Create final Dataframe
dt_final_stockcode = dt_temp_stockcode.drop("Stockcode").withColumnRenamed('StockcodeUpdate', 'StockCode')

In [None]:
dt_final_stockcode.show(4)

#### Missing values

การเช็คและแก้ไข Missing Values (หากจำเป็น)

In [None]:
# Check จำนวน missing values ในแต่ละ column
from pyspark.sql.functions import col,sum

dt_final_stockcode.select(*[sum(col(c).isNull().cast("int")).alias(c) for c in dt_final_stockcode.columns]).show()

In [None]:
# Check ว่ามีแถวไหนที่ description เป็น null บ้าง

dt_final_stockcode.where( dt_final_stockcode['Description'].isNull() ).show()

In [None]:
# Check ว่ามีแถวไหนที่ customerID เป็น null บ้าง

dt_final_stockcode.where( dt_final_stockcode['customerID'].isNull() ).show()

### Exercise:
ทางทีม Data Analyst แจ้งว่าอยากให้เราแทน Customer ID ที่เป็น NULL ด้วย -1

In [None]:
# Write code here

### Clean ข้อมูลด้วย Spark SQL

![alt text](https://cdn-std.droplr.net/files/acc_513973/881iHw)

เลือกเฉพาะข้อมูลที่ `unitPrice` กับ `Quantity` ถูกต้อง (มากกว่า 0)

In [None]:
dt_final_stockcode.createOrReplaceTempView("sales")
dt_sql = spark.sql("SELECT * FROM sales")
dt_sql.show()

In [None]:
dt_sql_count = spark.sql("SELECT count(*) as cnt_row FROM sales")
dt_sql_count.show()

In [None]:
dt_sql_count = spark.sql("SELECT count(*) as cnt_row, country FROM sales GROUP BY Country ORDER BY cnt_row DESC")
dt_sql_count.show()

In [None]:
dt_sql_valid_price = spark.sql("SELECT count(*) as cnt_row FROM sales WHERE UnitPrice > 0 AND Quantity > 0")
dt_sql_valid_price.show()

In [None]:
dt_sql_valid_price = spark.sql("SELECT * FROM sales WHERE UnitPrice > 0 AND Quantity > 0")
dt_sql_valid_price.show()

### Exercise: 
1. ลองเลือก Country USA ที่มี InvoiceDateTime ตังแต่วันที่ 2010-12-01 เป็นต้นไป และ UnitPrice เกิน 3.5 
2. ลองเลือก Country France ที่มี InvoiceDateTime ตังแต่วันที่ 2010-12-05 เป็นต้นไป และ UnitPrice เกิน 5.5 และ Description มีคำว่า Box

In [None]:
# TODO: Country USA ที่มี InvoiceDateTime ตั้งแต่วันที่ 2010-12-01 เป็นต้นไป และ UnitPrice เกิน 3.5
dt_sql_usa = spark.sql("""
SELECT * FROM sales
  WHERE InvoiceDateTime >= '2010-12-01'
  AND UnitPrice > 3.5
  AND Country='USA'
""").show()

In [None]:
# TODO: Country France ที่มี InvoiceDateTime ตังแต่วันที่ 2010-12-05 เป็นต้นไป และ UnitPrice เกิน 5.5 และ Description มีคำว่า Box
dt_sql_france = spark.sql("""
SELECT * FROM sales
  WHERE UnitPrice > 5.5
  AND InvoiceDateTime >= '2010-12-05'
  AND Country = 'France'
  AND LOWER(Description) LIKE '%box%'
""")

## Save cleaned data เป็น CSV

โดยปกติแล้ว Spark จะทำการ Save ออกมาเป็นหลายไฟล์ เพราะใช้หลายเครื่องในการประมวลผล

In [None]:
# Write as partitioned files (use multiple workers)
dt_sql_valid_price.write.csv('Cleaned_Data_Now_Final.csv')

เราสามารถบังคับให้ Spark ใช้เครื่องเดียวได้

In [None]:
# Write as 1 file (use single worker)
dt_sql_valid_price.coalesce(1).write.csv('Cleaned_Data_Now_Final_Single.csv')

### Bonus Exercise: อ่านไฟล์ที่มีหลาย Part
เช่น
- /content/Cleaned_Data.csv/part-00000-25a1e27a-a2b1-4553-b8ae-e05a6c574b59-c000.csv
- /content/Cleaned_Data.csv/part-00001-25a1e27a-a2b1-4553-b8ae-e05a6c574b59-c000.csv

ป.ล. เครื่องคอมพิวเตอร์แต่ละท่านจะสร้างชื่อไฟล์ไม่เหมือนกัน กรุณาเช็คชื่อไฟล์ในแท็บ Files ด้านซ้าย

In [None]:
# อ่าน CSV ไฟล์ที่ 1
part1 = spark.read.csv('/content/Cleaned_Data_Now_Final.csv/part-00000-a156ec9a-ff0a-4baf-933e-3f7888196855-c000.csv', header = True, inferSchema = True, )
part1.count()

In [None]:
# อ่าน CSV ไฟล์ที่ 2
part2 = spark.read.csv('/content/Cleaned_Data_Now_Final.csv/part-00001-a156ec9a-ff0a-4baf-933e-3f7888196855-c000.csv', header = True, inferSchema = True, )
part2.count()

In [None]:
# วิธีอ่าน CSV ทุกไฟล์ในโฟลเดอร์นี้
# Write Code Here