In [None]:
import pyspark as ps

In [1]:
## *** run 1

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [2]:
## *** run 2

from pyspark.sql import SparkSession

if (sc is None):
    sc = SparkContext(master="local[*]")
spark = SparkSession(sparkContext=sc)\
        .builder\
        .appName("PySpark")\
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

# ต้อง Run ส่วนนี้ทุกครั้งเพื่อให้ข้อมูลถูกเก็บที่ SparkSession

In [None]:

json_a = spark.read.json("customer.json")
json_b = spark.read.load("people.json", format="json")

parquet_a = spark.read.load("users.parquet")

txt_a = spark.read.text("people.txt")

csv_a = spark.read.csv('mtcars.csv', header=True, inferSchema=True)

In [3]:
## header=True, inferSchema=True
csv_a = spark.read.csv('HR01.csv', header=True, inferSchema=True)

#  ข้อมูลจะถูกเก็บไว้ในหน่วยความจำของ Spark ในขณะประมวลผล 
#  หากต้องการเก็บข้อมูลไว้ถาวร สามารถบันทึกข้อมูลลงดิสก์ได้โดยใช้คำสั่ง csv_a.write.csv("path/to/output.csv") ( ข้อมูลจะถูกบันทึกลงไฟล์ CSV ที่ระบุ ใน path/to/output.csv )

In [6]:
csv_a.show()

+-----------+----------+-----------+-----------------+------+-----+----------+----------+--------------------+
|Employee ID|      Name| Department|         Position|Salary|Bonus|Start Date|  End Date|  Resignation Reason|
+-----------+----------+-----------+-----------------+------+-----+----------+----------+--------------------+
|       7780|qiqhdlnhjf|      Sales|    Sales Manager| 95195|44918|2023-01-01|2023-12-31|Taking a break fr...|
|       7484|gjfermnzzi|    Finance|       Accountant| 74028|26978|2023-01-01|2023-12-31|Moving to a new city|
|       1922|pvaarwbvtu|Engineering|    Sales Manager| 94516| 5975|2023-01-01|2023-12-31|Moving to a new city|
|       6859|qsiyfoytqq|    Finance|Marketing Manager| 57882|49312|2023-01-01|2023-12-31|Moving to a new city|
|       6211|dsvjwwhzyw|  Marketing|       Accountant| 89673|27118|2023-01-01|2023-12-31|     Found a new job|
|       9226|dbcldodcnv|Engineering|    Sales Manager| 97405|15871|2023-01-01|2023-12-31|     Found a new job|
|

In [5]:
## เรียกดูข้อมูลเฉพาะคอลัมน์ Employee ID และ Name
csv_a.select("Employee ID", "Name").show()

+-----------+----------+
|Employee ID|      Name|
+-----------+----------+
|       7780|qiqhdlnhjf|
|       7484|gjfermnzzi|
|       1922|pvaarwbvtu|
|       6859|qsiyfoytqq|
|       6211|dsvjwwhzyw|
|       9226|dbcldodcnv|
|        755|ulmyvioojm|
|       2819|eyhxfsliiz|
|       8329|vnyrnuzgtd|
|       8567|rzvwjnvraj|
|       7105|ridlzjwdpe|
|       7829|evgsgmgnil|
|        482|vgbmdcfglz|
|       9552|qbbptozpnb|
|       5830|mmxsiycxvv|
|       5178|uhddrsgpex|
|       5695|obvvfthjax|
|       9852|cqamtaxqae|
|       9331|pnjmmqjcow|
|       1576|hkumvqvynx|
+-----------+----------+



In [6]:
## เรียกดูข้อมูลของพนักงานที่มีเงินเดือนมากกว่า 100,000 บาท
csv_a.filter(csv_a["Salary"] > 70000).show()

+-----------+----------+-----------+-----------------+------+-----+----------+----------+--------------------+
|Employee ID|      Name| Department|         Position|Salary|Bonus|Start Date|  End Date|  Resignation Reason|
+-----------+----------+-----------+-----------------+------+-----+----------+----------+--------------------+
|       7780|qiqhdlnhjf|      Sales|    Sales Manager| 95195|44918|2023-01-01|2023-12-31|Taking a break fr...|
|       7484|gjfermnzzi|    Finance|       Accountant| 74028|26978|2023-01-01|2023-12-31|Moving to a new city|
|       1922|pvaarwbvtu|Engineering|    Sales Manager| 94516| 5975|2023-01-01|2023-12-31|Moving to a new city|
|       6211|dsvjwwhzyw|  Marketing|       Accountant| 89673|27118|2023-01-01|2023-12-31|     Found a new job|
|       9226|dbcldodcnv|Engineering|    Sales Manager| 97405|15871|2023-01-01|2023-12-31|     Found a new job|
|        755|ulmyvioojm|    Finance|Software Engineer| 79951|35278|2023-01-01|2023-12-31|     Found a new job|
|

In [None]:
csv_a = spark.read.csv('HR01.csv', header=False, inferSchema=False)

## กำหนด schema ของ DataFrame ด้วยตัวเอง (กรณีไฟล์ไม่มี header row อยู่ด้านบน)
schema = StructType([
    StructField("Employee ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Department", StringType(), True),
    StructField("Position", StringType(), True),
    StructField("Salary", DoubleType(), True),
    StructField("Bonus", DoubleType(), True),
    StructField("Start Date", DateType(), True),
    StructField("End Date", DateType(), True),
    StructField("Resignation Reason", StringType(), True),
])

csv_a.schema = schema

In [None]:
## เช็ค Data type
csv_a.dtypes 


In [None]:
## แสดงเนื้อหา
csv_a.show() 

## แสดง n แถวแรก
csv_a.head(n)

## แสดงแถวแรกเท่านั้น
csv_a.first()

## แสดง n แถวแรกเท่านั้น โดยแสดงออกมาเป็น list ของแถว
csv_a.take(2) 

## แสดง statistics (count, mean, stddev, min, max)
csv_a.describe().show()
csv_a.describe(['age']).show()

## แสดง column  -  อยากรู้ว่าคอลัมน์มีอะไรบ้าง โดยที่จะแสดงชื่อคอลัมน์ออกมาเป็น list
csv_a.columns 

## นับจำนวนแถว  -  อยากรู้ว่าดาต้าเฟรมของเรามีจำนวนแถวทั้งหมดกี่แถว
csv_a.count() 

## นับจำนวนแถวที่ unique
csv_a.distinct().count() 

## การดรอป ค่าซ้ำ  -  เมื่อมีการใส่ค่าผิด อย่างเช่น ใส่อายุผิด โดยที่ชื่อและ ความสูงถูก ทำให้มีแถวที่ซ้ำกันสองแถว เราสามารถดรอปแถวที่ซ้ำได้
csv_a.dropDuplicates()
csv_a.dropDuplicates().show()
csv_a.dropDuplicates(['name', 'height']).show()
# ปล. dropDuplicates() ให้ผลเหมือนกับ drop_duplicates()

## การดรอป NA  -  การดรอป null
csv_a.dropna()         # Method 1
csv_a.na.drop()        # Method 2






### การเลือกข้อมูลนั้นทำได้หลายวิธี โดยใช้ Queries ได้ดังต่อไปนี้ ###

## การเลือกข้อมูลโดยใช้ Select
# เรียกดาต้าจากดาต้าเฟรมแบบเฉพาะเจาะจงสามารถทำได้โดยใช้คำสั่ง select() ซึ่งเป็นคำสั่งที่สามารถเลือกชื่อคอลัมน์ที่เราต้องการจะดูได้ นอกจากนี้ยังจัดการดาต้าเพิ่มได้
csv_a.select("name").show()                            # เลือกคอลัมน์ name อย่างเดียว
csv_a.select(csv_a['name'], csv_a['age'] + 1).show()         # เลือกทั้งสองคอลัมน์ โดยบวกอายุเพิ่มให้ทุกคน 1 ปี

## การเลือกข้อมูลโดยใช้ When
# ไว้ใช้เลือกในกรณีที่มี condition เข้ามาด้วย
# แต่ถ้าเราไม่ใส่ otherwise ผลจะแสดงแค่คนที่มีอายุมากกว่า 20 คือแถวแรกแถวเดียว
# เรายังสามารถใช้ when ซ้อน when ในกรณีที่มีหลาย conditions
from pyspark.sql import functions as F
csv_a.select(csv_a.name, F.when(csv_a.age > 20, 1).otherwise(0)).show()      # อยากรู้ว่าใครอายุมากกว่า 20 บ้าง ให้แสดงเป็น 1 และให้แสดงเป็น 0 โดยใช้ otherwise
csv_a.select(csv_a.name, F.when(csv_a.age > 18, 1).when(df.age < 40, -1).otherwise(0)).show()

## การเลือกข้อมูลโดยใช้ Like
# Like เป็นคำสั่งเมื่อเราต้องการเทียบ หรือจับคู่เหมือน ยกตัวอย่างเช่นการหาคนชื่อซ้ำ หรือหาคนนามสกุลเดียวกัน
csv_a.select("firstName", csv_a.lastName.like("Smith")).show()        # ให้แสดง ชื่อ แล้วก็นามสกุล สำหรับคนที่มีนามสกุล Smith

## การเลือกข้อมูลโดยใช้ Startswith หรือ Endwith
# ใช้ในกรณีที่ต้องการจับคู่คำขึ้นต้นหรือ คำลงท้าย
csv_a.select("firstName", csv_a.lastName.startswith("Sm")).show()       # การหาคนที่มีนามสกุลขึ้นต้นด้วย ‘Sm’
csv_a.select(csv_a.lastName.endswith("th")).show()                      # หาคนที่มีนามสกุลลงท้ายด้วย ‘th’ 

## การใช้ Substring
# เป็นคำสั่งที่ไว้ใช้เลือกดาต้าในส่วนย่อยเข้าไปอีก โดยเลือกคอลัมน์ที่อยากให้แสดง ตามด้วยฟังชั่น substr แล้วระบุจำนวนตัวอักษรว่าให้เริ่มจากตัวที่เท่าไหร่ จบที่ตัวที่เท่าไหร่ โดยนับจากตัวแรก
csv_a.select(csv_a.firstName.substr(1, 3).alias("name")).collect()[Row(name='And'), Row(name='Mic')]        # จากตัวที่หนึ่งถึงตัวที่สาม
# alias = มันสามารถเอาไว้เปลี่ยนชื่อคอลัมน์ตอนแสดงออกมาได้ จาก first name เป็น name 

## การใช้ Between
# เป็นการเลือกข้อมูลในช่วงนับตั้งแต่ขอบเขตด้านล่าง ขึ้นไปจนถึงขอบเขตด้านบน และจะแสดงผลออกมาเป็น boolean หรือว่า true/false
csv_a.select(csv_a.name, csv_a.age.between(2, 4)).show()







### วิธีการ เพิ่ม, ลบ, อัพเดท คอลัมน์ใน DataFrame ###
# สามารถจัดการ เพิ่ม ลด เปลี่ยนชื่อ คอลัมน์ได้ดังต่อไปนี้

## การเพิ่มคอลัมน์
# ใช้ withColumn ตามด้วยชื่อคอลัมน์ที่เราจะตั้ง แล้วตามด้วยค่าในคอลัมน์นั้น, สามารถใช้ค่าจากคอลัมน์ที่มีอยู่แล้วได้ ขึ้นอยู่กับลักษณะข้อมูล
csv_a.withColumn('age2', csv_a.age + 2)                     # เป็นการสร้างคอลัมน์ชื่อ age2 โดยการเพิ่มอายุจากคอลัมน์ age ไปอีก 2 ปี
csv_a.withColumn('city',csv_a.address.city) \
 .withColumn('postalCode',csv_a.address.postalCode) \
 .withColumn('state',csv_a.address.state)                   # จะเป็นการสร้างคอลัมน์ใหม่ให้กับ city, postal code และ state โดยเลือกมาจากแต่ละส่วนของ address

## การลบคอลัมน์
# เมื่อต้องการลบคอลัมน์ที่ไม่ต้องการ ใช้ drop แล้วตามด้วยชื่อคอลัมน์
csv_a.drop("address", "phoneNumber")                    # การดรอป 2 คอลัมน์ address และ phone number
csv_a.drop(csv_a.address).drop(csv_a.phoneNumber)       # เราสามารถทำการ ดรอปซ้อนดรอปได้ดังนี้

## การอัพเดทคอลัมน์
# อยากเปลี่ยนชื่อคอลัมน์ สามารถทำได้โดยใช้ withColumnRenamed ระบุชื่อคอลัมน์ที่ต้องการจะเปลี่ยนตามด้วยชื่อใหม่
csv_a = csv_a.withColumnRenamed('telePhoneNumber', 'phoneNumber')








### การใช้ Operation จัดกลุ่ม กรอง เรียง ข้อมูล
# วิธีการใช้ operation ต่างๆโดยเราสามารถจัดกลุ่ม กรองดาต้าตามเงื่อนไข หรือแม้แต่เรียงลำดับได้

## การจัดกลุ่มโดย GroupBy
# เราสามารถจัดกลุ่มดาต้า แล้วหาจำนวนในแต่ละกลุ่ม หรือค่าเฉลี่ยได้
csv_a.groupBy("age").count().show()         # เราอยากรู้ว่าคนอายุเท่านี้ มีทั้งหมดกี่คน ก็ทำได้โดยให้ groupBy อายุ แล้วนับจำนวนด้วย count

## การจัดกลุ่มโดย GroupBy
# เราสามารถกำหนด condition ได้ 
csv_a.filter(csv_a['age'] > 21).show()      # การเลือกคนทั้งหมดที่มีอายุมากกว่า น้อยกว่า หรือเท่ากับ ตัวเลขหนึ่ง โดยจะแสดงดาต้าเฟรมออกมาเฉพาะแถวที่เข้า condition นั้น

## การเรียงลำดับข้อมูลโดยใช้ Sort
# การเรียงลำดับข้อมูล จากมากไปน้อย (descending) หรือ จากน้อยไปมาก (ascending)
csv_a.sort(csv_a.age.desc())                               # วิธีที่ 1 จากมากไปน้อย
csv_a.sort("age", ascending=False)                         # วิธีที่ 2 จากมากไปน้อย โดยให้น้อยไปมาก เป็น false
csv_a.orderBy(["age","city"],ascending=[0,1])              # วิธีที่ 3 จากมากไปน้อยโดย age, จากน้อยไปมากโดย city








### การใช้ Operation จัดกลุ่ม กรอง เรียง ข้อมูล

## การเปลี่ยนชนิดของข้อมูล
# เราสามารถเล่นกับข้อมูลโดยเปลี่ยน ชนิดของดาต้าไปมาได้ ตั้งแต่เปลี่ยนดาต้าเฟรมเป็น RDD หรือ ให้อยู่ในรูปแบบ Pandas นอกจากเปลี่ยนไปแล้วก็ยังเปลี่ยนกลับได้ด้วย
rdd1 = csv_a.rdd                                   # เปลี่ยน dataframe เป็น RDD
csv_a.toJSON().first()                             # เปลี่ยน dataframe เป็น string RDD
csv_a.toPandas()                                   # ทำให้ spark dataframe อยู่ในรูปแบบ pandas dataframe
csv_a = spark.createDataFrame(pandas_df)           # ทำให้ pandas dataframe อยู่ในรูปแบบ spark dataframe

## การเซฟไฟล์เป็น Parquet files
# ถ้าเราอยากเซฟไฟล์เป็น parquet สามารถทำได้โดยใช้ write.save แล้วตามด้วยชื่อ
csv_a.write.save("newFile.parquet")

## การเซฟไฟล์เป็น JSON
# ในกรณีของ json สามารถทำได้โดยใช้ write.save แล้วตามด้วยชื่อ จากนั้นระบุ format เข้าไปด้วย
csv_a.write.save("newFile.json",format="json")

## หยุดใช้งาน Spark
# ในกรณีที่อยากหยุดการทำงานของ Spark
spark.stop()




In [17]:
## แสดง schema
csv_a.printSchema()

# แสดง schema ดูว่าคอลัมน์มีอะไรบ้าง ชนิดอะไร 

root
 |-- Employee ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Bonus: integer (nullable = true)
 |-- Start Date: date (nullable = true)
 |-- End Date: date (nullable = true)
 |-- Resignation Reason: string (nullable = true)



In [18]:
## แสดง statistics (count, mean, stddev, min, max)
csv_a.describe().show() 

## ในกรณีที่ต้องการให้แสดงเฉพาะคอลัมน์ก็ใส่ชื่อเข้าไปได้
csv_a.describe(['age']).show()

# อยากรู้สถิติของข้อมูลว่าแต่ละคอลัมน์เป็นอย่างไร ทำได้โดยใช้ describe() ตามด้วย show() เพื่อแสดงข้อมูล

In [None]:
## การดรอป ค่าซ้ำ  -  เมื่อมีการใส่ค่าผิด อย่างเช่น ใส่อายุผิด โดยที่ชื่อและ ความสูงถูก ทำให้มีแถวที่ซ้ำกันสองแถว เราสามารถดรอปแถวที่ซ้ำได้
df.dropDuplicates()

## พอเราดรอปค่าซ้ำ
df.dropDuplicates().show()

## ถ้สต้องการดรอปโดบเจาะจงคอลัมน์
df.dropDuplicates(['name', 'height']).show()