In [2]:
from dateutil.tz import gettz
import datetime
from typing import Tuple
from pyspark.sql import SparkSession, DataFrame, functions, Window
from pyspark.sql.types import StructType

In [3]:
class StockETL:
    """Extract, transform and load xetra data into DeltaLake."""

    def __init__(self, extraction_mode: str, month_to_extract: str = None):
        """Instantiate StockETL.

        Args:
            extraction_mode: data extracting mode, "full", "incremental", or "month". "full" mode
              extracts all the CSV files from the bucket, "incremental" extracts CSVs from yesterday,
              "month" extracts CSV from a given month.
            month_to_extract: the target month from which the CSV files should be extracted.
              Required when extraction_mode is "month". The value should be a given month, in format
              "YYYY-MM", for example 2022 January would be "2022-01".
        """
        if extraction_mode.lower() not in ["full", "incremental", "month"]:
            raise ValueError('Unrecognized extraction mode. '
                             'The extraction mode should be "full", "month" or "incremental".')
        else:
            self.extraction_mode = extraction_mode.lower()

        if extraction_mode == "month" and month_to_extract is None:
            raise ValueError('Parameter "month_to_load" is required '
                             'when using extracting mode "month".')
        else:
            self.month_to_extract = month_to_extract

        self.spark = self.configure_spark_session()

    @staticmethod
    def configure_spark_session() -> SparkSession:
        """Create a SparkSession.

        The session is configured to access public S3 bucket anonymously, read more
        https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Anonymous_Login_with_AnonymousAWSCredentialsProvider,
        and use DeltaLake engine. The log level is set to ERROR to reduce noise.

        Returns:
            A configured SparkSession
        """
        spark = (SparkSession.builder.appName("xetra")
                 .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                 .config("spark.sql.catalog.spark_catalog",
                         "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                 .config("spark.hadoop.fs.s3a.path.style.access", True)
                 .config("spark.hadoop.fs.s3a.aws.credentials.provider",
                         "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
                 .getOrCreate())
        spark.sparkContext.setLogLevel("ERROR")
        return spark

    def extract_xetra_data(self) -> DataFrame:
        """Extract xetra CSVs from S3 bucket deutsche-boerse-xetra-pds into a Spark DataFrame.

        xetra data schema can be found here
         https://github.com/Deutsche-Boerse/dbg-pds/blob/master/API_README.md#xetra.

        Returns:
            A Spark DataFrame of xetra stock data.
        """
        schema = (StructType()
                  .add("isin", "string")
                  .add("mnemonic", "string")
                  .add("securityDesc", "string")
                  .add("securityType", "string")
                  .add("currency", "string")
                  .add("securityID", "long")
                  .add("date", "date")
                  .add("time", "string")
                  .add("startPrice", "float")
                  .add("maxPrice", "float")
                  .add("minPrice", "float")
                  .add("endPrice", "float")
                  .add("tradedVolume", "float")
                  .add("numberOfTrades", "integer"))

        s3_base_uri = "s3a://deutsche-boerse-xetra-pds"
        if self.extraction_mode == "full":
            s3_uri = f"{s3_base_uri}/**/"
        elif self.extraction_mode == "incremental":
            yesterday = (datetime.datetime.now(gettz('Berlin'))
                         - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
            s3_uri = f"{s3_base_uri}/{yesterday}/"
        else:
            s3_uri = f"{s3_base_uri}/{self.month_to_extract}*/"

        return self.spark.read.csv(s3_uri, schema=schema, header=True)

    @staticmethod
    def _create_datetime_column(df: DataFrame) -> DataFrame:
        """Return a new dataframe with column "datetime" added.

        Column "datetime" is of data type "timestamp". The column is required to sort xetra trading
        records.

        Args:
            df: xetra Spark DataFrame.

        Returns:
            A Spark DataFrame
        """
        return (df.withColumn("datetime",
                              functions.to_timestamp(functions.concat_ws(" ", df.date, df.time))))

    def get_opening_and_closing_prices(self, df: DataFrame) -> Tuple[DataFrame, DataFrame]:
        """Return two DataFrames with daily opening and closing prices for all trading securities.

        Args:
            df: A xetra Spark DataFrame

        Returns:
            A tuple of two DataFrames, first hold opening price info, the second holds the closing
              price info.
        """
        df_i = self._create_datetime_column(df)

        asc_window = Window.partitionBy(["isin", "date"]).orderBy(df_i.datetime.asc())
        start_df = (df_i.withColumn("asc_rank", functions.rank().over(asc_window))
                    .where("asc_rank = 1")
                    .drop("asc_rank"))

        desc_window = Window.partitionBy(["isin", "date"]).orderBy(df_i.datetime.desc())
        end_df = (df_i.withColumn("desc_rank", functions.rank().over(desc_window))
                  .where("desc_rank = 1")
                  .drop("desc_rank"))

        return start_df, end_df

    def get_intra_day_performance(self, opening: DataFrame, closing: DataFrame):
        """ Get daily performance for each security.

        Determine security performance by joining opening price with closing price.

        Note that the current day data are dropped, because the complete CSVs will only be available
        after the current day.

        Args:
            opening: A DataFrame with security daily opening price info
            closing: A DataFrame with security daily closing price info

        Returns:
            A DataFrame of security daily performance. Each security has one row per trading day.
        """
        df = (opening.alias("start")
                        .join(closing.alias("end"), on=["isin", "date"])
                        .select("isin", "date", "start.startPrice", "end.endPrice"))
        if self.extraction_mode in ["full", "month"]:
            today = datetime.datetime.now(gettz('Berlin')).strftime("%Y-%m-%d")
            df = df.where(f"date != '{today}'")
        return (df
                .withColumn("performance", (df.endPrice - df.startPrice) / df.startPrice))

    @staticmethod
    def _create_partition_column(df: DataFrame) -> DataFrame:
        """Return a new dataframe with column "partition" added.

        Column "partition" is of data type "string". The column is used to write the xetra dataframe to
        a Delta table out in partition.

        Args:
            df: xetra Spark DataFrame.

        Returns:
            A Spark DataFrame
        """
        return df.withColumn("partition", functions.date_format("date", "yyyy-MM"))
        # .select("isin", "datetime", "date", "startPrice", "endPrice", "partition")

    def load_delta_table_stock_performance(self, df: DataFrame) -> None:
        """Load dataframe into Delta table.

        Args:
            df: A spark dataframe to write out
        """
        df_i = self._create_partition_column(df)

        if self.extraction_mode == "incremental":
            output_mode = "append"
        else:
            output_mode = "overwrite"

        (df_i.coalesce(1) # to reduce number of parquet files
         .write
         .format("delta")
         .partitionBy("partition")
         .mode(output_mode)
         .saveAsTable("stock_performance"))

# Initial Full ETL
Historical xetra data are extracted from S3 bucket, transformed and loaded as Delta table
`stock_performance`.

Initial full ETL only needs to be run once.

The full ETL processes data until `today - 1 day`, today's data will be processed tomorrow by
the incremental ETL

In [3]:
etl = StockETL(extraction_mode="month", month_to_extract="2022-02")
raw_df = etl.extract_xetra_data()
# raw_df.show(5)

22/02/15 00:48:51 WARN Utils: Your hostname, LMAC-XIATONGZHENG.local resolves to a loopback address: 127.0.0.1; using 192.168.178.34 instead (on interface en0)
22/02/15 00:48:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/02/15 00:48:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
opening_price_df, closing_price_df = etl.get_opening_and_closing_prices(df=raw_df)
performance_df = etl.get_intra_day_performance(opening=opening_price_df, closing=closing_price_df)
performance_df.where("isin = 'DE0005493365'").sort("date").show()



+------------+----------+----------+--------+--------------------+
|        isin|      date|startPrice|endPrice|         performance|
+------------+----------+----------+--------+--------------------+
|DE0005493365|2022-02-01|     394.0|   408.4|  0.0365482078590974|
|DE0005493365|2022-02-02|     411.6|   398.8|-0.03109819757225107|
|DE0005493365|2022-02-03|     396.0|   386.0|-0.02525252525252...|
|DE0005493365|2022-02-04|     386.0|   382.8|-0.00829018706484...|
|DE0005493365|2022-02-07|     385.0|   374.2|-0.02805191634537...|
|DE0005493365|2022-02-08|     385.0|   370.8|-0.03688314858969156|
|DE0005493365|2022-02-09|     373.2|   383.2|0.026795283153561472|
|DE0005493365|2022-02-10|     381.8|   376.0|-0.01519116809431067|
|DE0005493365|2022-02-11|     371.2|   372.8|0.004310278915198789|
|DE0005493365|2022-02-14|     358.0|   365.2| 0.02011176594142807|
+------------+----------+----------+--------+--------------------+



                                                                                

In [None]:
etl.load_delta_table_stock_performance(performance_df)

# Daily Incremental ETL
After running the full ETL once, the incremental ETL should be scheduled every day to process data
from yesterday.

In [1]:
incremental_etl = StockETL(extraction_mode="incremental")
yesterday_raw_df = incremental_etl.extract_xetra_data()
start_df, end_df = incremental_etl.get_opening_and_closing_prices(yesterday_raw_df)
yesterday_performance_df = incremental_etl.get_intra_day_performance(start_df, end_df)
incremental_etl.load_delta_table_stock_performance(yesterday_performance_df)

NameError: name 'StockETL' is not defined

In [6]:
spark = StockETL.configure_spark_session()
df = spark.read.format("delta").load("spark-warehouse/stock_performance")

In [7]:
df.show()

+------------+----------+----------+--------+--------------------+---------+
|        isin|      date|startPrice|endPrice|         performance|partition|
+------------+----------+----------+--------+--------------------+---------+
|DE000A0BVU28|2022-02-14|      23.6|    23.4|-0.00847460846214604|  2022-02|
|DE000A0HGQS8|2022-02-14|      2.24|     2.2|-0.01785712575121809|  2022-02|
|DK0060336014|2022-02-14|     51.66|   51.66|                 0.0|  2022-02|
|IE00B0M62S72|2022-02-14|    20.995|   21.05| 0.00261959493349354|  2022-02|
|IE00B4K48X80|2022-02-14|     65.27|   65.28|1.532424811535710...|  2022-02|
|IE00BGHQ0G80|2022-02-14|    29.625|   29.66|0.001181429448510...|  2022-02|
|IE00BJP26D89|2022-02-14|    4.9972|  5.0044|0.001440759672240772|  2022-02|
|LU1598690169|2022-02-14|    120.18|  120.38|0.001664145013599...|  2022-02|
|LU1834988864|2022-02-14|     59.59|   59.24|-0.00587344308147...|  2022-02|
|LU1861136247|2022-02-14|     88.83|   89.01|0.002026345845608...|  2022-02|