In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import json
from pyspark.sql import functions as F
from pyspark.sql import types as T

spark = ( SparkSession.builder
         .config("spark.driver.memory", '8g')
         .getOrCreate()
)         
spark

In [2]:
# read data
df = spark.read.json("./data/SV_3TPNSvgX6GtUpuJ-Unzip/DowDirectRelationshipSurvey.json")
df.printSchema()

root
 |-- responses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- displayedFields: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- displayedValues: struct (nullable = true)
 |    |    |    |-- QID10_1: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- QID10_2: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- QID10_3: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- QID10_4: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- QID10_5: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- QID12: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- QID128: array (nullable = true)
 |

In [3]:
# Use 'transform' to keep only the required fields in the 'responses' array
df = df.withColumn(
    "responses",
    F.expr("""
        transform(
            responses,
            r -> struct(
                r.responseId as responseId,
                r.values as values,
                r.labels as labels
            )
        )
    """)
)

# Show the updated schema to confirm the fields
df.printSchema()


root
 |-- responses: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- responseId: string (nullable = true)
 |    |    |-- values: struct (nullable = true)
 |    |    |    |-- AccountName: string (nullable = true)
 |    |    |    |-- AccountName_5e02b1554bfe4b0dac61f801SenPol: long (nullable = true)
 |    |    |    |-- AccountName_5e02b1554bfe4b0dac61f801SenScore: long (nullable = true)
 |    |    |    |-- BPCode: string (nullable = true)
 |    |    |    |-- COVIDComments: string (nullable = true)
 |    |    |    |-- CRMLink: string (nullable = true)
 |    |    |    |-- CRMOwner: string (nullable = true)
 |    |    |    |-- CRMOwnerEmail: string (nullable = true)
 |    |    |    |-- ContactArea: string (nullable = true)
 |    |    |    |-- ContactAreaCode: string (nullable = true)
 |    |    |    |-- ContactCountry: string (nullable = true)
 |    |    |    |-- ContactCountryCode: string (nullable = true)
 |    |    |    |-- ContactEmail: string (nu

In [4]:
# move responseId, values and labels  to top level schema

# Explode the 'responses' array to break it into individual rows
df_flattened = df.withColumn("response", F.explode("responses"))

# Select the 'responseId' at the top level, but retain 'values' and 'labels' as nested structures
df_flattened = df_flattened.select(
    "response.responseId",  # Move 'responseId' to the top level
    "response.values",       # Keep 'values' as a nested struct
    "response.labels"        # Keep 'labels' as a nested struct
)

# Show the updated schema
df_flattened.printSchema()

# Show the data to verify the transformation
df_flattened.show()


root
 |-- responseId: string (nullable = true)
 |-- values: struct (nullable = true)
 |    |-- AccountName: string (nullable = true)
 |    |-- AccountName_5e02b1554bfe4b0dac61f801SenPol: long (nullable = true)
 |    |-- AccountName_5e02b1554bfe4b0dac61f801SenScore: long (nullable = true)
 |    |-- BPCode: string (nullable = true)
 |    |-- COVIDComments: string (nullable = true)
 |    |-- CRMLink: string (nullable = true)
 |    |-- CRMOwner: string (nullable = true)
 |    |-- CRMOwnerEmail: string (nullable = true)
 |    |-- ContactArea: string (nullable = true)
 |    |-- ContactAreaCode: string (nullable = true)
 |    |-- ContactCountry: string (nullable = true)
 |    |-- ContactCountryCode: string (nullable = true)
 |    |-- ContactEmail: string (nullable = true)
 |    |-- ContactName: string (nullable = true)
 |    |-- CustomerType: string (nullable = true)
 |    |-- Digital: string (nullable = true)
 |    |-- Distinction: string (nullable = true)
 |    |-- GlobalCode: string (nulla

In [9]:
# convert values keys to columns
df_values = df_flattened.select(
    "responseId",
    *[F.col(f"values.{c}").alias(c) for c in df_flattened.schema["values"].dataType.fieldNames()]
)
df_values.printSchema()

root
 |-- responseId: string (nullable = true)
 |-- AccountName: string (nullable = true)
 |-- AccountName_5e02b1554bfe4b0dac61f801SenPol: long (nullable = true)
 |-- AccountName_5e02b1554bfe4b0dac61f801SenScore: long (nullable = true)
 |-- BPCode: string (nullable = true)
 |-- COVIDComments: string (nullable = true)
 |-- CRMLink: string (nullable = true)
 |-- CRMOwner: string (nullable = true)
 |-- CRMOwnerEmail: string (nullable = true)
 |-- ContactArea: string (nullable = true)
 |-- ContactAreaCode: string (nullable = true)
 |-- ContactCountry: string (nullable = true)
 |-- ContactCountryCode: string (nullable = true)
 |-- ContactEmail: string (nullable = true)
 |-- ContactName: string (nullable = true)
 |-- CustomerType: string (nullable = true)
 |-- Digital: string (nullable = true)
 |-- Distinction: string (nullable = true)
 |-- GlobalCode: string (nullable = true)
 |-- GlobalName: string (nullable = true)
 |-- Guide: string (nullable = true)
 |-- LST: string (nullable = true)
 |

In [10]:
# select all columns except question columns
cols_to_select = [col for col in df_values.columns if not col.startswith("QID")]
df_respondent = df_values.select(*cols_to_select)
df_respondent.printSchema()

root
 |-- responseId: string (nullable = true)
 |-- AccountName: string (nullable = true)
 |-- AccountName_5e02b1554bfe4b0dac61f801SenPol: long (nullable = true)
 |-- AccountName_5e02b1554bfe4b0dac61f801SenScore: long (nullable = true)
 |-- BPCode: string (nullable = true)
 |-- COVIDComments: string (nullable = true)
 |-- CRMLink: string (nullable = true)
 |-- CRMOwner: string (nullable = true)
 |-- CRMOwnerEmail: string (nullable = true)
 |-- ContactArea: string (nullable = true)
 |-- ContactAreaCode: string (nullable = true)
 |-- ContactCountry: string (nullable = true)
 |-- ContactCountryCode: string (nullable = true)
 |-- ContactEmail: string (nullable = true)
 |-- ContactName: string (nullable = true)
 |-- CustomerType: string (nullable = true)
 |-- Digital: string (nullable = true)
 |-- Distinction: string (nullable = true)
 |-- GlobalCode: string (nullable = true)
 |-- GlobalName: string (nullable = true)
 |-- Guide: string (nullable = true)
 |-- LST: string (nullable = true)
 |

# transforming the labels column

In [12]:
# get label object and fields
df_labels = df_flattened.select(F.col("responseId"), F.col("labels"))
df_labels.printSchema()

root
 |-- responseId: string (nullable = true)
 |-- labels: struct (nullable = true)
 |    |-- QID101: string (nullable = true)
 |    |-- QID103_1: string (nullable = true)
 |    |-- QID103_2: string (nullable = true)
 |    |-- QID103_3: string (nullable = true)
 |    |-- QID103_4: string (nullable = true)
 |    |-- QID103_5: string (nullable = true)
 |    |-- QID103_6: string (nullable = true)
 |    |-- QID103_7: string (nullable = true)
 |    |-- QID103_8: string (nullable = true)
 |    |-- QID103_9: string (nullable = true)
 |    |-- QID106: string (nullable = true)
 |    |-- QID107: string (nullable = true)
 |    |-- QID108: string (nullable = true)
 |    |-- QID10_1: string (nullable = true)
 |    |-- QID10_2: string (nullable = true)
 |    |-- QID10_3: string (nullable = true)
 |    |-- QID10_4: string (nullable = true)
 |    |-- QID10_5: string (nullable = true)
 |    |-- QID10_6: string (nullable = true)
 |    |-- QID119: string (nullable = true)
 |    |-- QID12: string (nullab

In [13]:
# flatten labels column
df_labels = df_labels.select(
    "responseId",
    *[F.col(f"labels.{c}").alias(c) for c in df_labels.schema["labels"].dataType.fieldNames()]
)
df_labels.printSchema()

root
 |-- responseId: string (nullable = true)
 |-- QID101: string (nullable = true)
 |-- QID103_1: string (nullable = true)
 |-- QID103_2: string (nullable = true)
 |-- QID103_3: string (nullable = true)
 |-- QID103_4: string (nullable = true)
 |-- QID103_5: string (nullable = true)
 |-- QID103_6: string (nullable = true)
 |-- QID103_7: string (nullable = true)
 |-- QID103_8: string (nullable = true)
 |-- QID103_9: string (nullable = true)
 |-- QID106: string (nullable = true)
 |-- QID107: string (nullable = true)
 |-- QID108: string (nullable = true)
 |-- QID10_1: string (nullable = true)
 |-- QID10_2: string (nullable = true)
 |-- QID10_3: string (nullable = true)
 |-- QID10_4: string (nullable = true)
 |-- QID10_5: string (nullable = true)
 |-- QID10_6: string (nullable = true)
 |-- QID119: string (nullable = true)
 |-- QID12: string (nullable = true)
 |-- QID120_1: string (nullable = true)
 |-- QID120_2: string (nullable = true)
 |-- QID120_3: string (nullable = true)
 |-- QID120_

In [15]:
# select all columns except question columns
label_cols_to_select = [col for col in df_labels.columns if not col.startswith("QID")]
df_labels = df_labels.select(*label_cols_to_select)
df_labels.printSchema()

root
 |-- responseId: string (nullable = true)
 |-- finished: string (nullable = true)
 |-- status: string (nullable = true)



In [18]:
# add suffix to column name
df_labels = df_labels.select(
    [F.col(c).alias(c + "_label") for c in df_labels.columns]
).withColumnRenamed("responseId_label", "responseId") # remove suffix from responseId column
df_labels.printSchema()

root
 |-- responseId: string (nullable = true)
 |-- finished_label: string (nullable = true)
 |-- status_label: string (nullable = true)



# Build final table to load into dim_respondent

In [19]:
# join label and values to built the final dim_respondent table
df_dim_respondent = df_respondent.join(df_labels, "responseId", "left")
df_dim_respondent = df_dim_respondent.select(sorted(df_dim_respondent.columns)) # sort the columns
df_dim_respondent.printSchema()

root
 |-- AccountName: string (nullable = true)
 |-- AccountName_5e02b1554bfe4b0dac61f801SenPol: long (nullable = true)
 |-- AccountName_5e02b1554bfe4b0dac61f801SenScore: long (nullable = true)
 |-- BPCode: string (nullable = true)
 |-- COVIDComments: string (nullable = true)
 |-- CRMLink: string (nullable = true)
 |-- CRMOwner: string (nullable = true)
 |-- CRMOwnerEmail: string (nullable = true)
 |-- ContactArea: string (nullable = true)
 |-- ContactAreaCode: string (nullable = true)
 |-- ContactCountry: string (nullable = true)
 |-- ContactCountryCode: string (nullable = true)
 |-- ContactEmail: string (nullable = true)
 |-- ContactName: string (nullable = true)
 |-- CustomerType: string (nullable = true)
 |-- Digital: string (nullable = true)
 |-- Distinction: string (nullable = true)
 |-- GlobalCode: string (nullable = true)
 |-- GlobalName: string (nullable = true)
 |-- Guide: string (nullable = true)
 |-- LST: string (nullable = true)
 |-- OwnerArea: string (nullable = true)
 |-