In [0]:
%scala
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
import com.databricks.spark.xml._

val toStrUDF = udf((bytes: Array[Byte]) => new String(bytes, "UTF-8")) 
// UDF to convert the binary to String
 
val df_schema = spark.read.format("binaryFile").load("/FileStore/DSR/Source/Stream").select(toStrUDF($"content").alias("text"))
  
val payloadSchema = schema_of_xml(df_schema.select("text").as[String]) 
// This is costlier operation when we have too many files because of file-listing schema inference, it is best to use the user-defined custom schema 

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.useNotifications", "false")
  .option("cloudFiles.format", "binaryFile")
  .option("cloudFiles.maxFilesPerTrigger", 2)
  .load("/FileStore/DSR/Source/Stream")
  .select(toStrUDF($"content").alias("text")).select(from_xml($"text", payloadSchema).alias("structuredBody"))

display(df)
df.createOrReplaceTempView("dfp")

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import xml.etree.ElementTree as ET

# Variables
Checkpoint_directory_addr = "/FileStore/DSR/CheckPointing/Proc_Addr_Details"
Checkpoint_directory_proc = "/FileStore/DSR/CheckPointing/Proc_Details"
Proc_Sample_File = "/FileStore/DSR/Source/Sample/Procedure_List_Samples.csv"
Addr_Sample_File = "/FileStore/DSR/Source/Sample/Address_List_Samples.csv"
Proc_Target = "dsr.Proc_Details_managed"
Addr_Target = "dsr.Proc_Addr_Details_managed"
src_df = sqlContext.table("dfp")

# XML File Handling
temp_df1 = src_df.select(
    explode(src_df.structuredBody.component.structuredBody.component.section).alias(
        "Source"
    )
)

# Filter out Procedures Details alone
temp_df2 = temp_df1.filter(temp_df1.Source.code._displayName == "HISTORY OF PROCEDURES")

# Below "proc_det" Dataframe used later in order to extract procedure details which is in String of XML format
proc_det = temp_df2.select(temp_df2.Source.text.alias("proc_det"))

# XML Handling Continues
temp_df2 = temp_df2.select(explode(temp_df1.Source.entry).alias("src"))
temp_df3 = temp_df2.filter(temp_df2.src.procedure.isNotNull()).select(
    temp_df2.src.procedure.performer.assignedEntity.Addr.alias("addr")
)
temp_df4 = temp_df3.select(explode(temp_df3.addr).alias("addr"))

temp_df5 = (
    temp_df4.withColumn("id", split(temp_df4.addr.id, ","))
    .withColumn("Hospital", temp_df4.addr.Hospital)
    .withColumn("Street", temp_df4.addr.streetAddressLine)
    .withColumn("City", temp_df4.addr.city)
    .withColumn("State", temp_df4.addr.state)
    .withColumn("Country", temp_df4.addr.country)
    .withColumn("PostalCode", temp_df4.addr.postalCode)
)

addr_columns = ["Hospital", "Street", "City", "State", "Country", "PostalCode"]
temp_df5 = temp_df5.select(explode(temp_df5.id).alias("PID"), *addr_columns)

final_add_df = temp_df5.select(col("PID").cast(LongType()).alias("PID"), *addr_columns)

# Processing Procedure Details Which is in String of XML
def process(df):
    lk = []
    for k in range(0, df.count()):
        xml = df.collect()[k]["proc_det"]
        doc = ET.fromstring(xml)
        for i in range(0, len(doc[1])):
            lk.append(doc[1][i].attrib["ID"])
            for j in range(0, 3):
                if j == 1:
                    lk.append(doc[1][i][j].text[0:11])
                else:
                    lk.append(doc[1][i][j].text)
    lk_o = [tuple(lk[i : i + 4]) for i in range(0, len(lk), 4)]
    return lk_o


def process_micro_batch(df, batch_id):
    columns = ["PID", "Name", "Date", "Status"]
    proc_temp = spark.createDataFrame(process(df), columns)

    new_streaming_df = proc_temp.select(
        proc_temp.PID.cast(LongType()),
        "Name",
        to_date(col("Date"), "dd MMM yyyy").alias("Proc_Date"),
        "Status",
    )
    new_streaming_df.createOrReplaceTempView("proc")
    spark.sql(
        f"""MERGE INTO {Proc_Target} T USING proc P on T.PID = P.PID
              and T.Name = P.Name
              and T.Proc_Date = P.Proc_Date
              WHEN NOT MATCHED THEN
              INSERT
              *"""
    )

def addr_process_micro_batch(df, batch_id):
    df.createOrReplaceTempView("addr")
    df._jdf.sparkSession().sql(f'''MERGE INTO {Addr_Target} T USING addr A on T.PID=A.PID
                    and T.Hospital=A.Hospital
                    WHEN NOT MATCHED THEN
                    INSERT
                    * ''')


In [0]:
query_proc = (
    proc_det.writeStream.foreachBatch(process_micro_batch)
    .option("checkpointLocation", Checkpoint_directory_proc)
    .trigger(processingTime="1 minute")
    .outputMode("append")
    .start()
)

In [0]:
# Writing date into Delta Table
query_addr = (
    final_add_df.writeStream.foreachBatch(addr_process_micro_batch)
    .option("checkpointLocation", Checkpoint_directory_addr)
    .trigger(processingTime="1 minute")
    .outputMode("append")
    .start()
)


In [0]:
%sql
select
  *
from
  dsr.Proc_Details_managed
order by
  PID;

In [0]:
%sql
select
  *
from
  dsr.Proc_Addr_Details_managed
order by
  PID;

In [0]:
# Stop Streaming

for stream in spark.streams.active:
    s = spark.streams.get(stream.id)
    print(s)
    s.stop()

In [0]:
# Sample Data Loading BLOCK


def load_proc_sampl(loc, load_table):
    length = 0
    if bool(dbutils.fs.ls(loc)) is True:
        sample = spark.read.format("csv").option("header", True).csv(loc)
        length = sample.count()
        sample.createOrReplaceTempView("sample")
        spark.sql(f"""insert into {load_table} select * from sample""")
    else:
        raise Exception()
    return length


# Load Proc_Sample Data
try:
    print(
        f"{load_proc_sampl(Proc_Sample_File, Proc_Target)} Sample Data Loaded Successfully in {Proc_Target} Table"
    )
except Exception as e:
    if "java.io.FileNotFoundException" in str(e):
        print(f"Sample Data File Not Found {Proc_Target} Table")
    else:
        print(str(e))

# Load Addr_Sample Data
try:
    print(
        f"{load_proc_sampl(Addr_Sample_File, Addr_Target)} Sample Data Loaded Successfully in {Addr_Target} Table"
    )
except Exception as e:
    if "java.io.FileNotFoundException" in str(e):
        print(f"Sample Data File Not Found {Addr_Target} Table")
    else:
        print(str(e))

410 Sample Data Loaded Successfully in dsr.Proc_Details_managed Table
1400 Sample Data Loaded Successfully in dsr.Proc_Addr_Details_managed Table
