In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.datasource import DataSource, DataSourceReader
from datetime import timedelta, date

class NPBReader(DataSourceReader):
    def __init__(self, schema, options):
      self.schema = schema
      self.options = options 
      self.table : str = self.options.get("table")
      self.start_date  = self.options.get("start_date")
      self.end_date = self.options.get("end_date")
      assert self.table in ["A", "B"]

    def read(self, partition):
        import json
        import requests
        header:dict ={"Accept": "applicaton / json"}
        try:
            result = requests.get(f"https://api.nbp.pl/api/exchangerates/tables/{self.table}/{self.start_date}/{self.end_date}/?format=json")
            result.raise_for_status()
            content = result.json()
        except requests.exceptions.HTTPError as err:
            print("Could not get data from NBP API")
            raise err
        for table in content:
            date: str = table.get("effectiveDate")
            for rates in table.get("rates"):
                c = rates.get("code")
                price = rates.get("mid")
                yield (date, c, price)

class NPBDataSource(DataSource):
    @classmethod
    def name(cls):
        return "nbp"

    def schema(self):
        return StructType([
            StructField("date", StringType(), True),
            StructField("code", StringType(), True),
            StructField("price", DoubleType(), True)
        ])
    
    def reader(self, schema : StructType):
        return NPBReader(schema, self.options)
        

 

In [0]:
spark.dataSource.register(NPBDataSource)

In [0]:
table_a = spark.read.format("nbp").option("table", "A").option("start_date",(date.today()-timedelta(30))).option("end_date", date.today()).load()

table_a.write.mode("overwrite").format("delta").saveAsTable("nbp.source.table_a")
display(table_a)

In [0]:

# Table B is updated every Wednesday
table_b = spark.read.format("nbp").option("table", "B").option("start_date",(date.today()-timedelta(30))).option("end_date", date.today()).load()

table_b.write.mode("overwrite").format("delta").saveAsTable("nbp.source.table_b")

display(table_b)