In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
import xml.etree.ElementTree as ET
import hashlib
import os

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
def xml_to_df_table_pers_auto_policy_mod_rq(xml_string):
    root = ET.fromstring(xml_string)
    
    requ_id = root.find(".//RequId").text
    transaction_request_dt = root.find(".//TransactionRequestDt").text
    transaction_effective_dt = root.find(".//TransactionEffectiveDt").text
    
    pk = hashlib.sha256(ET.tostring(root)).hexdigest()
    
    schema = StructType([
        StructField("PK_PersAutoPolicyModRq", StringType(), True),
        StructField("RequId", StringType(), True),
        StructField("TransactionRequestDt", StringType(), True),
        StructField("TransactionEffectiveDt", StringType(), True)
    ])
    
    data = [(pk, requ_id, transaction_request_dt, transaction_effective_dt)]
    return spark.createDataFrame(data, schema), pk

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
def xml_to_df_table_pers_auto_policy_mod_rq_producer(xml_string, fk_pers_auto_policy_mod_rq):
    root = ET.fromstring(xml_string)
    
    producer_id = root.find(".//Producer").get('id')
    
    pk = hashlib.sha256(ET.tostring(root.find(".//Producer"))).hexdigest()
    
    schema = StructType([
        StructField("FK_PersAutoPolicyModRq", StringType(), True),
        StructField("PK_PersAutoPolicyModRq_Producer", StringType(), True),
        StructField("Producer_id", StringType(), True)
    ])
    
    data = [(fk_pers_auto_policy_mod_rq, pk, producer_id)]
    return spark.createDataFrame(data, schema), pk

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
def xml_to_df_table_pers_auto_policy_mod_rq_producer_itemidinfo(xml_string, fk_pers_auto_policy_mod_rq, fk_pers_auto_policy_mod_rq_producer):
    root = ET.fromstring(xml_string)
    
    # Extract ItemIdInfo fields
    itemidinfo_id = root.find(".//ItemIdInfo").get('id')
    agency_id = root.find(".//AgencyId").text
    insurer_id = root.find(".//InsurerId").text
    
    # Create a unique primary key by hashing the ItemIdInfo structure
    pk = hashlib.sha256(ET.tostring(root.find(".//ItemIdInfo"))).hexdigest()
    
    # Schema for ItemIdInfo table
    schema = StructType([
        StructField("FK_PersAutoPolicyModRq", StringType(), True),
        StructField("FK_PersAutoPolicyModRq_Producer", StringType(), True),
        StructField("PK_PersAutoPolicyModRq_Producer_ItemIdInfo", StringType(), True),
        StructField("ItemIdInfo_id", StringType(), True),
        StructField("AgencyId", StringType(), True),
        StructField("InsurerId", StringType(), True)
    ])
    
    data = [(fk_pers_auto_policy_mod_rq, fk_pers_auto_policy_mod_rq_producer, pk, itemidinfo_id, agency_id, insurer_id)]
    return spark.createDataFrame(data, schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
xml_file_path = "jupyter_workspace/DataEngineer_Test/Source File/sample_data.xml"

if os.path.exists(xml_file_path):
    with open(xml_file_path, 'r') as file:
        xml_data = file.read()

    df_pers_auto_policy_mod_rq, pk_pers_auto_policy_mod_rq = xml_to_df_table_pers_auto_policy_mod_rq(xml_data)
    df_pers_auto_policy_mod_rq.show()

    df_pers_auto_policy_mod_rq_producer, pk_pers_auto_policy_mod_rq_producer = xml_to_df_table_pers_auto_policy_mod_rq_producer(xml_data, pk_pers_auto_policy_mod_rq)
    df_pers_auto_policy_mod_rq_producer.show()

    df_pers_auto_policy_mod_rq_producer_itemidinfo = xml_to_df_table_pers_auto_policy_mod_rq_producer_itemidinfo(xml_data, pk_pers_auto_policy_mod_rq, pk_pers_auto_policy_mod_rq_producer)
    df_pers_auto_policy_mod_rq_producer_itemidinfo.show()
    
else:
    print(f"File not found: {xml_file_path}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------+--------------------+--------------------+----------------------+
|PK_PersAutoPolicyModRq|              RequId|TransactionRequestDt|TransactionEffectiveDt|
+----------------------+--------------------+--------------------+----------------------+
|  29cdd4318090c742e...|AAAAAA-BBBBBB-CCC...|          2024-07-16|            2024-07-18|
+----------------------+--------------------+--------------------+----------------------+

+----------------------+-------------------------------+-----------+
|FK_PersAutoPolicyModRq|PK_PersAutoPolicyModRq_Producer|Producer_id|
+----------------------+-------------------------------+-----------+
|  29cdd4318090c742e...|           11265e3b017ee6991...|         N1|
+----------------------+-------------------------------+-----------+

+----------------------+-------------------------------+------------------------------------------+-------------+--------+---------+
|FK_PersAutoPolicyModRq|FK_PersAutoPolicyModRq_Producer|PK_PersAutoPol

In [17]:
output_dir = "jupyter_workspace/DataEngineer_Test/Target File/"

os.makedirs(output_dir, exist_ok=True)


df_pers_auto_policy_mod_rq.write.mode('overwrite').parquet(f"{output_dir}/TABLE_PersAutoPolicyModRq", compression='snappy')
df_pers_auto_policy_mod_rq_producer.write.mode('overwrite').parquet(f"{output_dir}/TABLE_PersAutoPolicyModRq_Producer", compression='snappy')
df_pers_auto_policy_mod_rq_producer_itemidinfo.write.mode('overwrite').parquet(f"{output_dir}/TABLE_PersAutoPolicyModRq_Producer_ItemIdInfo", compression='snappy')

print("DataFrames have been saved as Snappy-compressed Parquet files.")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrames have been saved as Snappy-compressed Parquet files.