# Parsing XML files in PySpark

Sample file used to showcase the usage of the Databricks library.

**Pre-requisites**
1. The Jupyter AWS Glue Container as shown on [AWS](https://aws.amazon.com/blogs/big-data/developing-aws-glue-etl-jobs-locally-using-a-container/)
2. The [Databricks library](https://github.com/databricks/spark-xml) - Note that in the container, version **0.5** must be used
3. To add the `jar` files, run the following from a terminal in the Container's Jupyter environment:
    * `wget https://repo1.maven.org/maven2/com/databricks/spark-xml_2.11/0.5.0/spark-xml_2.11-0.5.0.jar`
    * `cp spark-xml_2.11-0.5.0.jar $SPARK_HOME/jars/`
4. Sample data was taken from [this website](https://www.service-architecture.com/articles/object-oriented-databases/xml-file-for-complex-data.html)

### Import libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.dataframe import DataFrame
import os

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Constant and Paremeters

In [2]:
FILE_PATH = '/home/jupyter/jupyter_default_dir/complex_file.xml'
FORMAT = "com.databricks.spark.xml"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Support Functions

In [3]:
def select_struct(self,struct_col_name):
    """
    Support Function that given a struct column, explodes it in place.
    Given that the data type is a struct, we can't run explode in it.
    To circumvent this, we need to select the struct to have the columns in place
    Args:
        output_col_name(str): Desired name for the output column
        struct_col_name(str): The struct column to be exploded
    Returns:
        dataframe(Dataframe): The dataframe with the added columns
    """
    # Select the columns that are not to be exploded
    cols = [x for x in self.columns if x != struct_col_name]
    
    # Check if array is greater than 1
    if len(cols)<1:
        return self.select(f"{struct_col_name}.*")
    else:
        return self.select(cols+[f"{struct_col_name}.*"])

setattr(DataFrame,'select_struct',select_struct)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Spark Session

In [4]:
spark = SparkSession.builder.appName("XML - Parser").getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Read in the Data

In [5]:
df = spark.read.format(FORMAT).option("rowTag","catalog").load(FILE_PATH)
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- product: struct (nullable = true)
 |    |-- _description: string (nullable = true)
 |    |-- _product_image: string (nullable = true)
 |    |-- catalog_item: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _gender: string (nullable = true)
 |    |    |    |-- item_number: string (nullable = true)
 |    |    |    |-- price: double (nullable = true)
 |    |    |    |-- size: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- _description: string (nullable = true)
 |    |    |    |    |    |-- color_swatch: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |    |    |    |    |-- _image: string (nullable = true)

Top level data is of type `struct`

In [6]:
df_flat = df.select_struct("product")
df_flat = df_flat.withColumnRenamed("_description","_product_description")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df_flat_cat = df_flat.withColumn("catalog_item_exploded",f.explode("catalog_item")).drop("catalog_item")
df_flat_cat.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _product_description: string (nullable = true)
 |-- _product_image: string (nullable = true)
 |-- catalog_item_exploded: struct (nullable = true)
 |    |-- _gender: string (nullable = true)
 |    |-- item_number: string (nullable = true)
 |    |-- price: double (nullable = true)
 |    |-- size: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _description: string (nullable = true)
 |    |    |    |-- color_swatch: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |    |    |-- _image: string (nullable = true)

In [8]:
df_flat_cat_items = df_flat_cat.select_struct("catalog_item_exploded")
df_flat_cat_items.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _product_description: string (nullable = true)
 |-- _product_image: string (nullable = true)
 |-- _gender: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- price: double (nullable = true)
 |-- size: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _description: string (nullable = true)
 |    |    |-- color_swatch: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |    |-- _image: string (nullable = true)

In [9]:
df_flat_size = df_flat_cat_items.withColumn("size_exploded",f.explode("size")).drop("size")
df_flat_size.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _product_description: string (nullable = true)
 |-- _product_image: string (nullable = true)
 |-- _gender: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- price: double (nullable = true)
 |-- size_exploded: struct (nullable = true)
 |    |-- _description: string (nullable = true)
 |    |-- color_swatch: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |-- _image: string (nullable = true)

In [10]:
df_flat_size_items = df_flat_size.select_struct("size_exploded")
df_flat_size_items = df_flat_size_items.withColumnRenamed("_description","_item_description")
df_flat_size_items.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _product_description: string (nullable = true)
 |-- _product_image: string (nullable = true)
 |-- _gender: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- price: double (nullable = true)
 |-- _item_description: string (nullable = true)
 |-- color_swatch: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _image: string (nullable = true)

In [11]:
df_flat_color = df_flat_size_items.withColumn("color_swatch_exploded",f.explode("color_swatch")).drop("color_swatch")
df_flat_color.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _product_description: string (nullable = true)
 |-- _product_image: string (nullable = true)
 |-- _gender: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- price: double (nullable = true)
 |-- _item_description: string (nullable = true)
 |-- color_swatch_exploded: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _image: string (nullable = true)

In [12]:
df_flat_color_items = df_flat_color.select_struct("color_swatch_exploded")
df_flat_color_items.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _product_description: string (nullable = true)
 |-- _product_image: string (nullable = true)
 |-- _gender: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- price: double (nullable = true)
 |-- _item_description: string (nullable = true)
 |-- _VALUE: string (nullable = true)
 |-- _image: string (nullable = true)

In [13]:
df_flat_color_items.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+-------+-----------+-----+-----------------+--------+---------------------+
|_product_description|_product_image|_gender|item_number|price|_item_description|_VALUE  |_image               |
+--------------------+--------------+-------+-----------+-----+-----------------+--------+---------------------+
|Cardigan Sweater    |cardigan.jpg  |Men's  |QWZ5671    |39.95|Medium           |Red     |red_cardigan.jpg     |
|Cardigan Sweater    |cardigan.jpg  |Men's  |QWZ5671    |39.95|Medium           |Burgundy|burgundy_cardigan.jpg|
|Cardigan Sweater    |cardigan.jpg  |Men's  |QWZ5671    |39.95|Large            |Red     |red_cardigan.jpg     |
|Cardigan Sweater    |cardigan.jpg  |Men's  |QWZ5671    |39.95|Large            |Burgundy|burgundy_cardigan.jpg|
|Cardigan Sweater    |cardigan.jpg  |Women's|RRX9856    |42.5 |Small            |Red     |red_cardigan.jpg     |
|Cardigan Sweater    |cardigan.jpg  |Women's|RRX9856    |42.5 |Small            |Navy    |navy_c