In [1]:
spark.stop()

25/05/18 03:52:06 INFO  SparkContext:60 SparkContext is stopping with exitCode 0.
25/05/18 03:52:06 INFO  SparkUI:60 Stopped Spark web UI at http://7db080dfe9d2:4040
25/05/18 03:52:06 INFO  StandaloneSchedulerBackend:60 Shutting down all executors
25/05/18 03:52:06 INFO  StandaloneSchedulerBackend$StandaloneDriverEndpoint:60 Asking each executor to shut down
25/05/18 03:52:06 INFO  MapOutputTrackerMasterEndpoint:60 MapOutputTrackerMasterEndpoint stopped!
25/05/18 03:52:06 INFO  MemoryStore:60 MemoryStore cleared
25/05/18 03:52:06 INFO  BlockManager:60 BlockManager stopped
25/05/18 03:52:06 INFO  BlockManagerMaster:60 BlockManagerMaster stopped
25/05/18 03:52:06 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:60 OutputCommitCoordinator stopped!
25/05/18 03:52:06 INFO  SparkContext:60 Successfully stopped SparkContext


In [1]:
from pyspark.sql.functions import explode, split, trim, lower, expr
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark import SparkContext
import logging
from os.path import abspath
from pathlib import Path
import shutil
from pathlib import Path
from typing import Optional, Union, List, Tuple, Any, Literal

In [2]:
logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%y-%m-%d %H:%M:%S",
    level=logging.DEBUG
)

In [3]:
class InvoiceStreamBronze:
    def __init__(
        self,
        spark: SparkSession
    ):
        self.spark = spark

    def read_invoices(
        self,
        format: str,
        path: Union[str, Path],
        schema: Union[str, Any],
        clean_source: Literal["delete", "archive"],
        archive_dir: Optional[str],
    ) -> DataFrame:
        if isinstance(path, str):
            path = Path(path).as_posix()
        if clean_source == "archive":
            return (
                self.spark.readStream
                .format(format)
                .schema(schema)
                .option("cleanSource", "archive")
                .option("sourceArchiveDir", archive_dir)
                .load(path)
            )
        return (
            self.spark.readStream
            .format(format)
            .schema(schema)
            .option("cleanSource", clean_source)
            .load(path)
        )

    def write_invoices(
        self,
        df: DataFrame,
        format: str,
        checkpoint_location: str,
        output_mode: str,
        table: str,
        query_name: str
    ):
        return (
            df.writeStream
            .queryName(query_name)
            .format(format)
            .option("checkpointLocation", checkpoint_location)
            .outputMode(output_mode)
            .toTable(table)
        )

In [4]:
class InvoiceStreamSilver:
    def __init__(
        self,
        spark: SparkSession
    ):
        self.spark = spark

    def read_invoices(self, table_name: str) -> DataFrame:
        return (
            self.spark.readStream
            .table(table_name)
        )

    def explode_invoices(self, df: DataFrame) -> DataFrame:
        return (
            df.selectExpr(
                "InvoiceNumber",
                "CreatedTime",
                "StoreID",
                "PosID",
                "CustomerType",
                "PaymentMethod",
                "DeliveryType",
                "DeliveryAddress.City",
                "DeliveryAddress.PinCode",
                "DeliveryAddress.State",
                "explode(InvoiceLineItems) AS LineItem"
            )
        )

    def flatten_invoices(self, df: DataFrame) -> DataFrame:
        return (
            df.withColumn("ItemCode", expr("LineItem.ItemCode"))
            .withColumn("ItemDescription", expr("LineItem.ItemDescription"))
            .withColumn("ItemPrice", expr("LineItem.ItemPrice"))
            .withColumn("ItemQty", expr("LineItem.ItemQty"))
            .withColumn("TotalValue", expr("LineItem.TotalValue"))
            .drop("LineItem")
        )

    def write_invoices(
        self,
        df: DataFrame,
        format: str,
        checkpoint_location: str,
        output_mode: str,
        table: str,
        query_name: str,
    ):
        return (
            df.writeStream
            .queryName(query_name)
            .format(format)
            .option("checkpointLocation", checkpoint_location)
            .outputMode(output_mode)
            .toTable(table)
        )

In [5]:
if __name__ == "__main__":
    table_name = "invoice_line_items"
    schema = """
        InvoiceNumber string,
        CreatedTime bigint,
        StoreID string,
        PosID string,
        CashierID string,
        CustomerType string,
        CustomerCardNo string,
        TotalAmount double,
        NumberOfItems bigint,
        PaymentMethod string,
        TaxableAmount double,
        CGST double,
        SGST double,
        CESS double,
        DeliveryType string,
        DeliveryAddress struct<
            AddressLine string,
            City string,
            ContactNumber string,
            PinCode string,
            State string
        >,
        InvoiceLineItems array<
            struct<
                ItemCode string,
                ItemDescription string,
                ItemPrice double,
                ItemQty bigint,
                TotalValue double
            >
        >
    """
    spark = (
        SparkSession.builder
        .appName("InvoicesStreamMedallion")
        .enableHiveSupport()
        .getOrCreate()
    )
    invoices_bronze = InvoiceStreamBronze(spark)
    invoices_df = invoices_bronze.read_invoices(
        format="json",
        path="/opt/spark/datasets/invoices/*.json",
        schema=schema,
        clean_source="archive",
        archive_dir="/opt/spark/datasets/archive/invoices"
    )
    squery_bronze = invoices_bronze.write_invoices(
        df=invoices_df,
        format="delta",
        checkpoint_location="/opt/spark/datasets/checkpoint/invoices/bronze",
        output_mode="append",
        table="invoices_bronze",
        query_name="ingestion_bronze"
    )

    invoices_silver = InvoiceStreamSilver(spark)
    exploded_df = invoices_silver.explode_invoices(invoices_df)
    flatten_df = invoices_silver.flatten_invoices(exploded_df)
    squery_silver = invoices_silver.write_invoices(
        df=flatten_df,
        format="delta",
        checkpoint_location="/opt/spark/datasets/checkpoint/invoices/silver",
        output_mode="append",
        table="invoices_silver",
        query_name="ingestion_silver",
    )

25-05-18 03:59:36 - DEBUG - Command to send: r
u
SparkSession$
rj
e

25-05-18 03:59:36 - DEBUG - Answer received: !ycorg.apache.spark.sql.SparkSession$
25-05-18 03:59:36 - DEBUG - Command to send: r
m
org.apache.spark.sql.SparkSession$
MODULE$
e

25-05-18 03:59:36 - DEBUG - Answer received: !yro52
25-05-18 03:59:36 - DEBUG - Command to send: i
java.util.HashMap
e

25-05-18 03:59:36 - DEBUG - Answer received: !yao53
25-05-18 03:59:36 - DEBUG - Command to send: c
o53
put
sspark.app.name
sInvoicesStreamMedallion
e

25-05-18 03:59:36 - DEBUG - Answer received: !yn
25-05-18 03:59:36 - DEBUG - Command to send: c
o53
put
sspark.sql.catalogImplementation
shive
e

25-05-18 03:59:36 - DEBUG - Answer received: !yn
25-05-18 03:59:36 - DEBUG - Command to send: c
o52
applyModifiableSettings
ro42
ro53
e

25/05/18 03:59:36 INFO  SharedState:60 Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
25/05/18 03:59:36 INFO  SharedState:60 Warehouse path is 'file:/opt/spark

In [None]:
spark.read.table("invoices_bronze").show()

25-05-18 03:56:13 - DEBUG - Command to send: c
o93
read
e

25-05-18 03:56:13 - DEBUG - Answer received: !yro141
25-05-18 03:56:13 - DEBUG - Command to send: c
o141
table
sinvoices_bronze
e

25/05/18 03:56:13 INFO  HiveMetaStore:781 0: get_database: default
25/05/18 03:56:13 INFO  audit:309 ugi=root	ip=unknown-ip-addr	cmd=get_database: default	
25/05/18 03:56:13 INFO  HiveMetaStore:781 0: get_table : db=default tbl=invoices_bronze
25/05/18 03:56:13 INFO  audit:309 ugi=root	ip=unknown-ip-addr	cmd=get_table : db=default tbl=invoices_bronze	
25/05/18 03:56:13 INFO  HiveMetaStore:781 0: get_table : db=default tbl=invoices_bronze
25/05/18 03:56:13 INFO  audit:309 ugi=root	ip=unknown-ip-addr	cmd=get_table : db=default tbl=invoices_bronze	
25-05-18 03:56:13 - DEBUG - Answer received: !yro142
25-05-18 03:56:13 - DEBUG - Command to send: c
o142
showString
i20
i20
bFalse
e

25/05/18 03:56:13 INFO  PrepareDeltaScan:95 DELTA: Filtering files for query
25/05/18 03:56:21 WARN  TaskSchedulerImpl:72 In