# PySpark custom data sources

[DataBricks docs](https://docs.databricks.com/aws/en/pyspark/datasources)

PySpark Custom Data Sources คือ feature ที่ช่วยให้เราสามารถอ่านข้อมูลจากแหล่งข้อมูลแบบ custom ได้เอง และยังสามารถเขียนไปยังปลายทางได้แบบ custom เองใน Apache Spark ได้ โดยใช้ Python

ซึ่งในไฟล์นี้ จะมีด้วยกัน 3 ตัวอย่าง (อ้างอิงตาม DataBricks docs) ดังนี้
- Batch Query
- GitHub DataSource
- Streaming

### Implement the data source subclass

| Property / Method | Description |
|------------------------------------|---------------------------------------------------------------------------|
| **name** | **Required** เป็นการระบุชื่อของ Data Source |
| **schema** | **Required** เป็นการระบุโครงสร้างข้อมูล (Schema) ของ Data Source ที่จะอ่านหรือเขียน |
| reader() | ต้องคืนค่า `DataSourceReader` เพื่อให้ Data Source อ่านข้อมูลแบบ Batch |
| writer() | ต้องคืนค่า `DataSourceWriter` เพื่อให้ Data Sink เขียนข้อมูลแบบ Batch |
| streamReader() / simpleStreamReader() | ต้องคืนค่า `DataSourceStreamReader` เพื่อให้ Data Stream อ่านข้อมูลแบบ Streaming |
| streamWriter() | ต้องคืนค่า `DataSourceStreamWriter` เพื่อให้ Data Stream เขียนข้อมูลแบบ Streaming |


# Example 1: Create a PySpark DataSource for batch query

ติดตั้ง Library ที่จำเป็น ในที่นี้ เราจะใช้ faker มาช่วยในการจำลองข้อมูลตัวอย่างกัน

In [0]:
%pip install faker

หลังจาก install จะมี Note บอกให้ restart kernel โดยใช้ `%restart_python` หรือ `dbutils.library.restartPython()`

(ในที่นี้เลยเลือกใช้ `%restart_python`) แล้วกดรันใหม่อีกรอบ จะขึ้นแจ้งเตือนเหมือนเดิม แต่สามารถใช้งาน faker lib ได้แล้ว

In [0]:
%restart_python

## Step 1: Define the example DataSource

ขั้นแรก : Define Class หลักสำหรับแหล่งข้อมูลขึ้นมาก่อน โดยจะมี 3 ส่วนใน Class คือ
- name : ชื่อที่จะใช้ตอนเราเอามาใช้กับ spark
- schema : โครงสร้างข้อมูลที่จะอ่าน ว่ามี column อะไรบ้าง, data type เป็นยังไง 
- reader : วิธีการส่งต่องานไปยังตัวที่ใช้ในการอ่านข้อมูลจริง

In [0]:
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

## Step 2: Implement the reader for a batch query

ขั้นที่ 2 : สร้างตัวอ่านข้อมูล

Method ที่สำคัญคือ `read()`


In [0]:
class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

จากโค้ด ใน method `read()` มีจุดให้สังเกตเพิ่มเติม 2 ส่วน
1. ข้อมูลตัวอย่างถูกสร้างจาก lib faker ที่เราติดตั้งเมื่อก่อนหน้านี้นั่นเอง
2. เราจะเห็นว่า method นี้ใช้คำสั่ง `yield` ในการส่งข้อมูลกลับออกมา ซึ่งคำสั่งนี้มันจะทำการส่งค่าออกมาทีละค่า แล้ว pause ไว้ พอถูกเรียกครั้งต่อไป มันก็จะทำงานต่อจากจุดเดิม ซึ่งจะมีผลดีเมื่อเจอข้อมูลที่มันใหญ่มากๆ (อันนี้สรุปเท่าที่เข้าใจในแง่ของ python จะมีกล่าวถึงเพิ่มเติมในตัวอย่างหลังๆ)

## Step 3: Register and use the example data source

ขั้นสุดท้ายสำหรับ e.g. นี้ : การ register & use

- พอสร้าง class ที่จำเป็นในขั้น 1 & 2 เสร็จแล้ว ก็ต้องบอกให้ spark รู้ก่อน โดยใช้คำสั่ง `register`
- เรียกใช้งาน โดยใช้ name ที่เราตั้งไว้ เติมใน `.format()`

In [0]:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()

In [0]:
# ถ้าต้องการเพียงแค่ 2 คอลัมน์ก็ทำได้ ให้ระบุ schema ได้เลย
spark.read.format("fake").schema("name string, company string").load().show()

In [0]:
# ถ้าต้องการจำนวนแถวมากกว่า 3 สามารถระบุได้ด้วย
spark.read.format("fake").option("numRows", 5).load().show()

# Example 2: Create a PySpark GitHub DataSource

ในส่วนนี้ เป็นการดึงข้อมูลจาก GitHub เพื่อมาแสดงเป็นตารางใน spark
- ***เดิม*** ตาม docs จะมีการนำเข้าข้อมูลแบบ [**Variant**](https://docs.databricks.com/aws/en/sql/language-manual/data-types/variant-type) เข้ามาโดยใช้ python ด้วย แต่จากการทดลองหลายครั้ง พบว่าติดปัญหาผ่านการใช้ python และยังหาวิธีแก้ไขไม่สำเร็จ 
- หากใช้ SQL ปกติอาจจะนำเสนอเกี่ยวกับรายละเอียดของ Variant Data ได้ ดังตัวอย่างด้านล่าง ผลลัพธ์จะแสดงให้เห็นว่า ตรงส่วน col `my_variant_data` เป็น object ไม่ใช่ str

In [0]:
%sql
SELECT 
  parse_json('{"name": "Variant", "stamina": 100, "active": true}') AS my_variant_data,
  parse_json('{"name": "Variant", "stamina": 100}'):stamina AS extracted_score

ซึ่งเดิมที การจะใช้ข้อมูลประเภท Variant ได้ จะต้องใช้ runtime version 17.1 ขึ้นไป และสิ่งที่เรามีควรจะสามารถใช้งานได้บน python ด้วย ทั้งนี้ เพื่อให้ตัวอย่างนี้แสดงเนื้อหาในการนำข้อมูลจากแหล่งต่างๆแบบกำหนดเองมาใช้งานต่อ จึงจะปรับประเภท data ใน schema ให้เป็นประเภทแบบที่เราๆรู้จักกันก่อน

In [0]:
%sql
SELECT current_version();

## Step 1: Define the GitHub DataSource

ขั้นแรก : Define Class หลักสำหรับแหล่งข้อมูลขึ้นมา โดยจะมี 3 ส่วนใน Class คือ
- name : ชื่อที่จะใช้ตอนเราเอามาใช้กับ spark
- schema : โครงสร้างข้อมูลที่จะอ่าน ว่ามี column อะไรบ้าง, data type เป็นยังไง 
- reader : วิธีการส่งต่องานไปยังตัวที่ใช้ในการอ่านข้อมูลจริง

ซึ่งส่วนนี้จะเหมือนกันกับตัวอย่างแรก

In [0]:
import json
import requests

from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal

class GithubVariantDataSource(DataSource):
    @classmethod
    def name(self):
        return "githubVariant"
    def schema(self):
        return "id int, title string, user string, created_at string, updated_at string"
    def reader(self, schema):
        return GithubVariantPullRequestReader(self.options)


## Step 2: Implement the reader to retrieve pull requests

ขั้นที่ 2 : ส่วนนี้คือส่วนของการสร้างตัวอ่านข้อมูล แต่จะมีความซับซ้อนกว่าตัวอย่างแรก เนื่องจากเป็นการดึงข้อมูลจาก GitHub

มี 2 ส่วน คือ
1. `__init__` ส่วนนี้จะเป็นส่วนที่รับ path ของ repo, token(ถ้ามี) หาก repo ว่าง จะไม่สามารถทำงานต่อได้
2. `read` ส่วนนี้จะมีการทำงานย่อยลงไปอีก กล่าวคือ
> - เตรียม HTTP Header
> - Request & Response
> - แปลงเป็น row ส่งกลับ (yield rows)

In [0]:
class GithubVariantPullRequestReader(DataSourceReader):
    def __init__(self, options):
        self.token = options.get("token")
        self.repo = options.get("path")
        if self.repo is None:
            raise Exception(f"Must specify a repo in `.load()` method.")

    def read(self, partition):
        header = {
            "Accept": "application/vnd.github+json", # ต้องการข้อมูลแบบ JSON
        }
        if self.token is not None: # ถ้ามี token ให้แนบไปด้วย
            header["Authorization"] = f"Bearer {self.token}"
        url = f"https://api.github.com/repos/{self.repo}/pulls" # เตรียม address ปลายทาง
        response = requests.get(url, headers=header) # สร้างตัวแปร response เพื่อไปรับผลจากการ request
        response.raise_for_status() # ตรวจสอบ status
        prs = response.json() # แปลงเป็น json
        for pr in prs: # loop ส่งข้อมูลกลับออกมาเป็น row
            yield Row(
                id = pr.get("number"),
                title = pr.get("title"),
                user = pr.get("user"),
                created_at = pr.get("created_at"),
                updated_at = pr.get("updated_at")
            )

## Step 3: Register and use the data source

เหมือนเดิมจากที่เราเคยทำในตัวอย่างแรก เมื่อเรา custom data source ขึ้นมา ต้องมีการ `register` ให้ spark ทราบ แล้วใช้งานต่อไปได้

In [0]:
spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()

# Example 3: Create PySpark DataSource for streaming read and write

ตัวอย่างนี้จะเป็นเชิง concept เท่านั้น เนื่องจากข้อจำกัดของ free version บน databricks

## Step 1: Define the example DataSource

ขั้นแรก : Define Class หลักสำหรับแหล่งข้อมูลขึ้นมา โดยจะมี 4 ส่วนใน Class คือ
- name : ชื่อที่จะใช้ตอนเราเอามาใช้กับ spark
- schema : โครงสร้างข้อมูล ว่ามี column อะไรบ้าง, data type เป็นยังไง
- streamReader : อ่านข้อมูลแบบ Streaming
- streamWriter : เขียนข้อมูลแบบ Streaming

In [0]:
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

## Step 2: Implement the stream reader

จาก step แรก จะมีส่วนของ comment ที่บอกว่า
> If you don't need partitioning, you can implement the `simpleStreamReader` method instead of `streamReader`.

ซึ่งจะเกี่ยวกับประเภทข้อมูลที่เราจะอ่านด้วย ดังนั้น ใน step นี้จะมีตัวอย่างของทั้ง 2 แบบ

### DataSourceStreamReader implementation

ในส่วนของตัวอย่างนี้ จะเหมาะกับข้อมูลที่มีขนาดใหญ่ มีการแบ่ง partition ในการอ่านข้อมูล และมีการรับ partition เป็น input โดยจะแบ่งออกเป็น 2 part คือ
1. RangePartition ส่วนแบ่งงาน ซึ่งจะช่วยอำนวยความสะดวกให้ executor หลายตัวช่วยกันอ่านกรณีข้อมูลมีจำนวนมาก
2. FakeStreamReader ส่วนอ่านข้อมูล จะมีรายละเอียดปลีกย่อยในแต่ละ method เพิ่มเติม ดังนี้
- `initialOffset` จุดเริ่มต้นของ stream , เป็น dict โดยเริ่มจาก 0
- `latestOffset` จุดล่าสุดของข้อมูล จะเรียกทุกครั้งที่มีการ trigger ซึ่งจะบอกว่าข้อมูลล่าสุดถึงไหนแล้ว โดยในตัวอย่างนี้ จะเป็นการจำลองว่า ข้อมูลที่เข้ามาใหม่เพิ่มขึ้นทีละ 2 หน่วย นั่นแปลว่า offset ใหม่ก็จะเพิ่มขึ้นไปเรื่อยๆ
- `partitions` วางแผนกระจายงาน โดยรับค่า start offset, end offset และคืนค่าออกมาเป็น**ลำดับ**ของ `InputPartition` แต่ในกรณีเฉพาะตัวอย่างนี้ จะเป็นการคืนค่าออกมาเพียง `InputPartition` เดียว สังเกตจากลำดับที่มีเพียงลำดับเดียว
- `commit` แจ้งสถานะว่าสำเร็จ, clean up resources
- `read` ขั้นนี้คือตัวที่อ่านข้อมูลจริง โดยจะรับ partition ที่เข้ามาจากขั้นตอน partitions ก่อนหน้านี้ และส่ง record กลับไปแบบ Iterator ซึ่งภายใน Iterator จะเป็น tuple โดยแต่ละ tuple = 1 row 
> สังเกตได้ว่า ในขั้นนี้ เราไม่ได้ใช้ return แต่ใช้ `yield` ซึ่งคุณสมบัติของ yield จะเป็นดังตัวอย่างแรกๆที่เราเคยกล่าวไปในเชิงของ python พอกล่าวมาถึงจุดนี้ จะมีส่วนที่เรียกว่า `Iterator` แล้ว
> - ดังนั้น `yield` จะใช้ในการคืนข้อมูลออกมาเป็น Iterator ซึ่ง executor สามารถอ่านและส่ง record ออกมาทีละ row โดยไม่ต้อง load ข้อมูลทั้งหมดของ partition ขึ้นมาใน memory พร้อมกัน จึงเหมาะกับข้อมูลที่มีขนาดใหญ่มากๆ

In [0]:
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

# 1 ส่วนแบ่งงาน
class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

# 2 ส่วนอ่านข้อมูล
class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0 # offset ปัจจุบัน

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

### SimpleDataSourceStreamReader implementation

ในส่วนของตัวอย่างนี้ จะเหมาะกับข้อมูลที่ไม่ได้มีขนาดใหญ่มาก เพราะ**ไม่ได้มีการแบ่ง** partition ในการอ่านข้อมูล

โดยจะมี method ภายใน ดังนี้
- `initialOffset` กำหนดจุดเริ่มต้น
- `read` รับค่า start offset เข้ามา แล้วคืนค่า Iterator, จุด start offset ถัดไป โดยในตัวอย่างคือการอ่านข้อมูลเพิ่ม 2 แถวจากจุดที่ start และคำนวณ start offset ถัดไป
- `readBetweenOffsets` ส่วนนี้มีไว้ในกรณีที่เกิด failed ซึ่งจะรับ offset ทั้ง start และ end แล้วอ่านข้อมูลตรง offset ช่วงนั้น
- `commit` หน้าที่แบบเดียวกันกับตัวอย่างก่อนหน้า

In [0]:
class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

## Step 3: Implement the stream writer

มาสู่ขั้นตอนของการเขียนกันบ้าง ในส่วนนี้มี 2 class คือ
1. SimpleCommitMessage
2. FakeStreamWriter

---
**SimpleCommitMessage**
- คลาสนี้ไว้ใช้สำหรับการรายงานผลจาก executor

---
**FakeStreamWriter**

- คลาสนี้ไว้ใช้สำหรับการเขียนหลักๆ มี method 3 ส่วน คือ
    - `write` : ทำงานบน executor ซึ่งจะรับข้อมูลจริงมาประมวลผล และข้อมูลที่รับมาจะเป็น iterator หลังจากเขียนเสร็จก็จะมี commit message กลับมา
    - `commit` : รับ commit message จากทุก executor ที่ทำงานสำเร็จ จนครบ แล้วตัดสินใจว่าจะทำอะไรกับมันต่อไป ซึ่งในที่นี้ เราเขียน metadata ของแต่ละ micro-batch (จำนวนแถว และ จำนวน partition) ลงใน json file
    - `abort` : ในกรณีที่มีจุดที่เขียนไม่ได้บางส่วน จะเรียกใช้ตัวนี้แทน commit และรับ commit message จากส่วนที่สำเร็จมาด้วย จากนั้นก็ตัดสินใจต่อว่าจะทำอะไรกับมันต่อไป ซึ่งในกรณีของตัวอย่างนี้ เราจะเขียนข้อความการ fail ลงใน textfile

In [0]:
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage

class SimpleCommitMessage(WriterCommitMessage):
   def __init__(self, partition_id: int, count: int):
       self.partition_id = partition_id
       self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data and then returns the commit message for that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

## Step 4: Register and use the example data source

เดิมตาม docs จะเป็นโค้ดตามด้านล่าง แต่ในส่วนของ query นั้น เนื่องด้วย cluster แบบฟรีที่เราใช้ไม่ support micro-batch trigger สำหรับการ streaming ซึ่ง databricks แนะนำให้ใช้ `availableNow` หรือ `once` แทน และยังรวมไปถึงเรื่องการใช้ python กับ sink ที่ตัวฟรีอย่างเราไม่ support เช่นกัน

In [0]:
spark.dataSource.register(FakeStreamDataSource)
# query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

ดังนั้น การแก้ไขปัญหา(เฉพาะหน้า) จากสิ่งที่เกิดขึ้น จึงต้องแก้ปัญหาด้วยการสร้าง folder ใหม่ในทุกครั้งที่เราจะ(ลอง)รันสิ่งนี้แทน โดยการใช้ uuid เข้ามาช่วย

!! trigger = once ไม่ใช่ batch !!
> trigger = once เป็น 1 ใน streaming query เช่นกัน

และในการทำงานแบบ streaming นั้น **ต้องมี** checkpoint เสมอ เพื่อใช้ในการ recovery (แต่ตัวฟรีของเราไม่รองรับ T-T)

นี่เป็นสาเหตุที่เราต้องแก้ไขเฉพาะหน้าด้วยวิธีข้างต้น เบื้องต้นตัวอย่างนี้จึงเกิดขึ้นเพื่อให้เข้าใจว่าถ้าจะ custom data sources แบบ streaming ควรจะเป็นประมาณไหน นั่นเองค่ะ

In [0]:
import uuid

checkpoint = f"/Volumes/workspace/default/tutorial/_checkpoint/{uuid.uuid4()}"
# สร้าง path ของ checkpoint เพื่อไม่ให้ซ้ำ จะใช้ uuid มาช่วยสร้าง folder

query = (
    spark.readStream
        .format("fakestream") # เรียกใช้ data source ที่เราสร้างมา
        .load()
        .writeStream # เริ่มเขียน
        .format("memory") # เขียนลง memory แทน
        .queryName("test_stream") # ตั้งชื่อตารางปลายทาง
        .option("checkpointLocation", checkpoint) # ต้องมี checkpoint เสมอ โดยเราระบุ path ของ checkpoint ตาม path ข้างบน
        .trigger(once=True) # รันรอบเดียวหยุด
        .start()
)

# รอประมวลผลให้เสร็จ
query.awaitTermination()

และเพื่อที่เราจะดูว่าหน้าตาข้อมูลตัวอย่างเป็นเช่นไร คำตอบคือ ด้านล่างนี้ เราใช้ SQL มาช่วยในการ query ออกมาดู

In [0]:
spark.sql("SELECT * FROM test_stream").show(truncate=False)