In [76]:
import requests
import math
from lakehouse.utils.settings import spark
from lakehouse import bronze
from pyspark.sql import DataFrame, Row
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
from delta import DeltaTable
from delta.tables import DeltaMergeBuilder

# 1. Set Up and Data

In [77]:
class SWAPIDataSource(DataSource):
    """
    Implementing the SWAPI API as to https://swapi.dev/documentation

    Name: `SWAPI`

    Schema (depends on the called resource): 

        "people": 'birth_year string, created string, edited string, eye_color string, films array<string>, 
        gender string, hair_color string, height string, homeworld string, mass string, name string, skin_color string, 
        species array<string>, starships array<string>, url string, vehicles array<string>',

        "films": 'characters array<string>, created string, director string, edited string, episode_id bigint, 
        opening_crawl string, planets array<string>, producer string, release_date string, species array<string>, 
        starships array<string>, title string, url string, vehicles array<string>',

        "starships": 'MGLT string, cargo_capacity string, consumables string, cost_in_credits string, created string, 
        crew string, edited string, films array<string>, hyperdrive_rating string, length string, manufacturer string, 
        max_atmosphering_speed string, model string, name string, passengers string, pilots array<string>, starship_class string, url string',

        "vehicles": 'cargo_capacity string, consumables string, cost_in_credits string, created string, crew string, 
        edited string, films array<string>, length string, manufacturer string, max_atmosphering_speed string, model string, 
        name string, passengers string, pilots array<string>, url string, vehicle_class string',

        "species": 'average_height string, average_lifespan string, classification string, created string, designation string, 
        edited string, eye_colors string, films array<string>, hair_colors string, homeworld string, language string, name string, 
        people array<string>, skin_colors string, url string',

        "planets": 'climate string, created string, diameter string, edited string, films array<string>, gravity string, name string, 
        orbital_period string, population string, residents array<string>, rotation_period string, surface_water string, terrain string, url string'

    Examples
    --------
    Register the data source.

    >>> from pyspark_datasources import SWAPIDataSource
    >>> spark.dataSource.register(SWAPIDataSource)

    Load data from the availble resources "people", "films", "starships", "vehicles", "species", "planets"

    >>> spark.read.format("SWAPI").load("planets").show()
 
    +---------+--------------------+--------+--------------------+--------------------+----------+--------+---+
    |  climate|             created|diameter|              edited|               films|   gravity|    name|...|
    +---------+--------------------+--------+--------------------+--------------------+----------+--------+---+
    |     arid|2014-12-09T13:50:...|   10465|2014-12-20T20:58:...|[https://swapi.de...|1 standard|Tatooine|...|
    |...      |...                 |...     |...                 |                 ...|...       |...     |...|
    +---------+--------------------+--------+--------------------+--------------------+----------+--------+---+

    """

    @classmethod
    def name(self):
        return "SWAPI"
    
    def schema(self):
        if self.options["path"] not in ["people", "films", "starships", "vehicles", "species", "planets"]:
            raise Exception(f"Assure that only values in ['people', 'films', 'starships', 'vehicles', 'species', 'planets'] are provided")
        
        schemas = {
            "people": 'birth_year string, created string, edited string, eye_color string, films array<string>, gender string, hair_color string, height string, homeworld string, mass string, name string, skin_color string, species array<string>, starships array<string>, url string, vehicles array<string>',

            "films": 'characters array<string>, created string, director string, edited string, episode_id bigint, opening_crawl string, planets array<string>, producer string, release_date string, species array<string>, starships array<string>, title string, url string, vehicles array<string>',

            "starships": 'MGLT string, cargo_capacity string, consumables string, cost_in_credits string, created string, crew string, edited string, films array<string>, hyperdrive_rating string, length string, manufacturer string, max_atmosphering_speed string, model string, name string, passengers string, pilots array<string>, starship_class string, url string',

            "vehicles": 'cargo_capacity string, consumables string, cost_in_credits string, created string, crew string, edited string, films array<string>, length string, manufacturer string, max_atmosphering_speed string, model string, name string, passengers string, pilots array<string>, url string, vehicle_class string',

            "species": 'average_height string, average_lifespan string, classification string, created string, designation string, edited string, eye_colors string, films array<string>, hair_colors string, homeworld string, language string, name string, people array<string>, skin_colors string, url string',

            "planets": 'climate string, created string, diameter string, edited string, films array<string>, gravity string, name string, orbital_period string, population string, residents array<string>, rotation_period string, surface_water string, terrain string, url string'
        }

        return schemas[self.options["path"]]

    def reader(self, schema):
        return SWAPIDataSourceReader(self.options)


class SWAPIDataSourceReader(DataSourceReader):
    def __init__(self, options):
        self.resource = options["path"]

    def partitions(self):
        query = f"https://swapi.dev/api/{self.resource}/"
        page_size = 10
        total_elements = int(requests.get(query).json()["count"])
        no_pages = math.ceil(total_elements / page_size)
        return [InputPartition(i) for i in range(1, no_pages + 1)]

    def read(self, partition):
        query = f"https://swapi.dev/api/{self.resource}/?page={str(partition.value)}"
        req = requests.get(query)
        data = req.json()["results"]
        for d in data:
            yield Row(**d)

In [78]:
spark.dataSource.register(SWAPIDataSource)
spark.read.format("SWAPI").load("people").show()

+----------+--------------------+--------------------+---------+--------------------+-------------+-------------+------+--------------------+-------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+
|birth_year|             created|              edited|eye_color|               films|       gender|   hair_color|height|           homeworld|   mass|                name|      skin_color|             species|           starships|                 url|            vehicles|
+----------+--------------------+--------------------+---------+--------------------+-------------+-------------+------+--------------------+-------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+
|     19BBY|2014-12-09T13:50:...|2014-12-20T21:17:...|     blue|[https://swapi.de...|         male|        blond|   172|https://swapi.dev...|     77|      Luke Skywalker|            fa

In [79]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS hive_metastore.bronze")

DataFrame[]

In [80]:
options = {
    "catalog": "hive_metastore",
    "target_schema": "bronze" 
}

# 2 Overwrite with execute_one and execute_all

In [81]:
class StarWarsBronze(bronze.BronzeOverwrite):
    def load(self, table):
        spark.dataSource.register(SWAPIDataSource)
        return spark.read.format("SWAPI").load(table)
    
bronze_instance = StarWarsBronze(spark, **options)
bronze_instance.execute_one("people")

In [82]:
class StarWarsBronze(bronze.BronzeOverwrite):
    def load(self, table):
        spark.dataSource.register(SWAPIDataSource)
        return spark.read.format("SWAPI").load(table)
    
bronze_instance = StarWarsBronze(spark, **options)
bronze_instance.execute_all(["people", "planets"])

In [67]:
sdf = spark.sql("SELECT * FROM hive_metastore.bronze.people")
print(f"No. Rows: {sdf.count()}")
sdf.show()

No. Rows: 82
+----------+--------------------+--------------------+---------+--------------------+-------------+-------------+------+--------------------+-------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|birth_year|             created|              edited|eye_color|               films|       gender|   hair_color|height|           homeworld|   mass|                name|      skin_color|             species|           starships|                 url|            vehicles|         LH_BronzeTS|
+----------+--------------------+--------------------+---------+--------------------+-------------+-------------+------+--------------------+-------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     19BBY|2014-12-09T13:50:...|2014-12-20T21:17:...|     blue|[https://swapi.de...|         male|        b

In [47]:
sdf = spark.sql("SELECT * FROM hive_metastore.bronze.planets")
print(f"No. Rows: {sdf.count()}")
sdf.show()

No. Rows: 60
+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------+--------------+-------------+--------------------+---------------+-------------+--------------------+--------------------+--------------------+
|             climate|             created|diameter|              edited|               films|             gravity|          name|orbital_period|   population|           residents|rotation_period|surface_water|             terrain|                 url|         LH_BronzeTS|
+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------+--------------+-------------+--------------------+---------------+-------------+--------------------+--------------------+--------------------+
|                arid|2014-12-09T13:50:...|   10465|2014-12-20T20:58:...|[https://swapi.de...|          1 standard|      Tatooine|           304|       200000|[https

# 3 Replace Where

In [68]:
class StarWarsBronze(bronze.BronzeReplaceWhere):
    def load(self, table):
        spark.dataSource.register(SWAPIDataSource)
        sdf = spark.read.format("SWAPI").load(table)
        return sdf.where("edited == '2014-12-20T21:17:50.403000Z'")
    
    def get_replace_condition(self, sdf: DataFrame, table: str) -> str:
        return "edited == '2014-12-20T21:17:50.403000Z'"
    
bronze_instance = StarWarsBronze(spark, **options)
bronze_instance.execute_one("people")

In [69]:
sdf = spark.sql("SELECT * FROM hive_metastore.bronze.people")
print(f"No. Rows: {sdf.count()}")
sdf.show()

No. Rows: 82
+----------+--------------------+--------------------+---------+--------------------+-------------+-------------+------+--------------------+-------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|birth_year|             created|              edited|eye_color|               films|       gender|   hair_color|height|           homeworld|   mass|                name|      skin_color|             species|           starships|                 url|            vehicles|         LH_BronzeTS|
+----------+--------------------+--------------------+---------+--------------------+-------------+-------------+------+--------------------+-------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     19BBY|2014-12-09T13:50:...|2014-12-20T21:17:...|     blue|[https://swapi.de...|         male|        b

# 4 Append

In [70]:
class StarWarsBronze(bronze.BronzeAppend):
    def load(self, table):
        spark.dataSource.register(SWAPIDataSource)
        return spark.read.format("SWAPI").load(table)
    
bronze_instance = StarWarsBronze(spark, **options)
bronze_instance.execute_one("people")

In [71]:
sdf = spark.sql("SELECT * FROM hive_metastore.bronze.people")
print(f"No. Rows: {sdf.count()}")
sdf.show()

No. Rows: 164
+----------+--------------------+--------------------+---------+--------------------+-------------+-------------+------+--------------------+-------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|birth_year|             created|              edited|eye_color|               films|       gender|   hair_color|height|           homeworld|   mass|                name|      skin_color|             species|           starships|                 url|            vehicles|         LH_BronzeTS|
+----------+--------------------+--------------------+---------+--------------------+-------------+-------------+------+--------------------+-------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     19BBY|2014-12-09T13:50:...|2014-12-20T21:17:...|     blue|[https://swapi.de...|         male|        

# 5 Merge

In [73]:
class StarWarsBronze(bronze.BronzeMerge):
    def load(self, table):
        return spark.read.format("SWAPI").load(table)
    
    def get_delta_merge_builder(self, sdf: DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder:
        merge_condition = "target.url = source.url" 
        builder = delta_table.alias("target").merge(sdf.alias("source"), merge_condition)
        builder = builder.whenMatchedUpdateAll()
        builder = builder.whenNotMatchedInsertAll()
        return builder
    
bronze_instance = StarWarsBronze(spark, **options)
bronze_instance.execute_one("people")

In [74]:
sdf = spark.sql("SELECT * FROM hive_metastore.bronze.people")
print(f"No. Rows: {sdf.count()}")
sdf.show()

No. Rows: 164
+----------+--------------------+--------------------+---------+--------------------+------+-------------+------+--------------------+----+------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|birth_year|             created|              edited|eye_color|               films|gender|   hair_color|height|           homeworld|mass|              name| skin_color|             species|           starships|                 url|            vehicles|         LH_BronzeTS|
+----------+--------------------+--------------------+---------+--------------------+------+-------------+------+--------------------+----+------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     19BBY|2014-12-09T13:50:...|2014-12-20T21:17:...|     blue|[https://swapi.de...|  male|        blond|   172|https://swapi.dev...|  77|    Luke Skywalker|

# 6 Clean Up

In [75]:
spark.sql(f"DROP SCHEMA IF EXISTS pt_dh_dev.bronze CASCADE")

DataFrame[]