In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, size
from pyspark.sql.functions import first, lit, explode
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DoubleType, StructType, StructField, BooleanType, LongType, IntegerType, DateType, TimestampType
from pyspark.sql.functions import explode
import json
import xmlschema
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql.functions import explode
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
import uuid

In [2]:
from xsd_functions import generate_schema_from_xpaths

In [11]:
jarpath = r"spark-xml_2.11-0.13.0.jar"
xml_path = r"data/TM_PLTFI000000_PLTFI000000_202401_0.xml"

xpaths_mappings = r"tm_xpaths_maping.json"
with open(xpaths_mappings, 'r') as json_file:
    mapping = json.load(json_file)

# showing 5 exaple entries in maping dictionary
list(mapping.items())[:5]

[('Naglowek.IdentyfikatorZarzadzajacego', 'xs:string'),
 ('Naglowek.NazwaZarzadzajacego', 'xs:string'),
 ('Naglowek.IdentyfikatorDostawcyDanych', 'xs:string'),
 ('Naglowek.WalutaSprawozdania', 'xs:string'),
 ('Naglowek.OkresSprawozdawczy.DataOd', 'xs:date')]

### Based on generated xpaths from JSON file the function is able to create pyspark schema that can be used to read in full xml structure based on XSD file

In [4]:
schema = generate_schema_from_xpaths(mapping)
schema

StructType(List(StructField(Naglowek,StructType(List(StructField(IdentyfikatorZarzadzajacego,StringType,true),StructField(NazwaZarzadzajacego,StringType,true),StructField(IdentyfikatorDostawcyDanych,StringType,true),StructField(WalutaSprawozdania,StringType,true),StructField(OkresSprawozdawczy,StructType(List(StructField(DataOd,DateType,true),StructField(DataDo,DateType,true),StructField(Rok,StringType,true),StructField(Miesiac,StringType,true))),true),StructField(CzyKorekta,BooleanType,true),StructField(_CzyFunduszWLikwidacji,BooleanType,true))),true),StructField(Dane,StructType(List(StructField(Aktywa,StructType(List(StructField(SrodkiPieniezne,DoubleType,true),StructField(PapieryWartoscioweSkarbPanstwaNBP,DoubleType,true),StructField(Naleznosci,StructType(List(StructField(FunduszeInwestycyjne,StructType(List(StructField(Nieprzeterminowane,DoubleType,true),StructField(PrzeterminowaneDo30Dni,DoubleType,true),StructField(PrzeterminowaneOd31Do90Dni,DoubleType,true),StructField(Przetermi

In [5]:
spark = SparkSession.builder \
    .appName("SparkApp") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.yarn.driver.memoryOverhead", "4g") \
    .config("spark.jars",jarpath) \
    .enableHiveSupport().getOrCreate()

### First simple reading of XML file into semi structured pyspark dataframe

In [12]:
df = spark.read.format("com.databricks.spark.xml").option("rowTag", "Sprawozdanie").schema(schema).load(xml_path)
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+--------------------+
|            Naglowek|                Dane|
+--------------------+--------------------+
|[PLTFI000000, Naz...|[[1.6456286432E8,...|
+--------------------+--------------------+



                                                                                

In [13]:
# schema of the dataframe
df.printSchema()

root
 |-- Naglowek: struct (nullable = true)
 |    |-- IdentyfikatorZarzadzajacego: string (nullable = true)
 |    |-- NazwaZarzadzajacego: string (nullable = true)
 |    |-- IdentyfikatorDostawcyDanych: string (nullable = true)
 |    |-- WalutaSprawozdania: string (nullable = true)
 |    |-- OkresSprawozdawczy: struct (nullable = true)
 |    |    |-- DataOd: date (nullable = true)
 |    |    |-- DataDo: date (nullable = true)
 |    |    |-- Rok: string (nullable = true)
 |    |    |-- Miesiac: string (nullable = true)
 |    |-- CzyKorekta: boolean (nullable = true)
 |    |-- _CzyFunduszWLikwidacji: boolean (nullable = true)
 |-- Dane: struct (nullable = true)
 |    |-- Aktywa: struct (nullable = true)
 |    |    |-- SrodkiPieniezne: double (nullable = true)
 |    |    |-- PapieryWartoscioweSkarbPanstwaNBP: double (nullable = true)
 |    |    |-- Naleznosci: struct (nullable = true)
 |    |    |    |-- FunduszeInwestycyjne: struct (nullable = true)
 |    |    |    |    |-- Nieprzetermi

In [17]:
# keys of mapping dict are xpaths that enable selection of particular data points in xml file
# Pandas dataframe is shown here for convinience
df.select(list(mapping)).toPandas()

Unnamed: 0,IdentyfikatorZarzadzajacego,NazwaZarzadzajacego,IdentyfikatorDostawcyDanych,WalutaSprawozdania,DataOd,DataDo,Rok,Miesiac,CzyKorekta,_CzyFunduszWLikwidacji,...,AktywaZarzadzanychFunduszy,AktywaZarzadzanychFunduszyZagranicznych,AktywaZarzadzanychUnijnychAFI,AktywaZarzadzanychZbiorczychPortfeliPapierowWartosciowych,InwestycjeWlasneJUCI,_AktywaZarzadzanychPortfeliInstrumentyFinansoweOgolem,AktywaFunduszyWLikwidacji,DataPortfeliZarzadzanychAFI,_PortfeleZarzadzanychAFIOgolem,_ZarzadzaneAktywa
0,PLTFI000000,Nazwa zarządzającego,PLTFI000000,PLN,2024-01-01,2024-01-31,2024,1,False,,...,4068231000.0,0.0,0.0,0.0,0.0,0.0,0.0,2023-06-30,100000.0,4069231027.26


### We can freely shuffle the xpaths list and create multiple tables that are easier to use
- UUID will act as a key between the tables
- we can split the data between main tags Naglowek and Dane into two separate tables

In [23]:
new_uuid = str(uuid.uuid4())
naglowek_xpaths = [key for key in mapping if "Naglowek" in key]
dane_xpaths = [key for key in mapping if "Dane" in key]


In [26]:
df_naglowek = df.select(naglowek_xpaths).toPandas()
df_naglowek["key"] = new_uuid
df_naglowek

Unnamed: 0,IdentyfikatorZarzadzajacego,NazwaZarzadzajacego,IdentyfikatorDostawcyDanych,WalutaSprawozdania,DataOd,DataDo,Rok,Miesiac,CzyKorekta,_CzyFunduszWLikwidacji,key
0,PLTFI000000,Nazwa zarządzającego,PLTFI000000,PLN,2024-01-01,2024-01-31,2024,1,False,,7cae0036-620a-4313-908d-a7ab959f5af2


In [29]:
df_dane = df.select(dane_xpaths).toPandas()
df_dane["key"] = new_uuid
df_dane

Unnamed: 0,SrodkiPieniezne,PapieryWartoscioweSkarbPanstwaNBP,Nieprzeterminowane,PrzeterminowaneDo30Dni,PrzeterminowaneOd31Do90Dni,PrzeterminowaneOd91Do180Dni,PrzeterminowaneOd181Do360Dni,PrzeterminowanePowyzej360Dni,_FunduszeInwestycyjneOgolem,Nieprzeterminowane.1,...,AktywaZarzadzanychFunduszyZagranicznych,AktywaZarzadzanychUnijnychAFI,AktywaZarzadzanychZbiorczychPortfeliPapierowWartosciowych,InwestycjeWlasneJUCI,_AktywaZarzadzanychPortfeliInstrumentyFinansoweOgolem,AktywaFunduszyWLikwidacji,DataPortfeliZarzadzanychAFI,_PortfeleZarzadzanychAFIOgolem,_ZarzadzaneAktywa,key
0,164562900.0,167417620.2,30297248.11,0.0,0.0,0.0,0.0,0.0,30297248.1,1890892.08,...,0.0,0.0,0.0,0.0,0.0,0.0,2023-06-30,100000.0,4069231027.26,7cae0036-620a-4313-908d-a7ab959f5af2
