# Spark SQL StructType & StructField

In [76]:
#Import Required Python & Spark Libraries

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType

conf = SparkConf().setAppName("Simple Masking App").setMaster("local")
sc = SparkContext(conf=conf)

spark = SparkSession \
    .builder \
    .appName("DataMasking on Nested Strcuture") \
    .getOrCreate()

In [212]:
#Read file data in Spark RDD
empRDD = sc.textFile("EmployData.csv")
#Remove Header and get only data
empNoHeaderRDD = empRDD.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)
#Split columns with seprator comma(',')
empRDD1 = empNoHeaderRDD.map(lambda l: l.split(","))

In [213]:
# Each line is converted to a tuple.
emp = empRDD1.map(lambda p: (p[0], p[1],p[2].strip()))

In [214]:
for i in emp.take(10):print(i)

('James', 'james@gmail.com', 'Sr. Developer')
('Smith', 'Smith@gmail.com', 'Project Lead')


In [215]:
# The schema is encoded in a string.
schemaString = "employee_name email job_profile"

In [216]:
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

In [89]:
#Apply new custom schema on RDD and create a dataframe
newDF = spark.createDataFrame(emp,newSchema)

In [90]:
newDF.printSchema()

root
 |-- employee_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- job_profile: string (nullable = true)



In [93]:
#Display Structs in dataframe schema
newDF.schema

StructType(List(StructField(employee_name,StringType,true),StructField(email,StringType,true),StructField(job_profile,StringType,true)))

In [236]:
#Displace data from newDF dataframe
newDF.show()

+-------------+---------------+-------------+
|employee_name|          email|  job_profile|
+-------------+---------------+-------------+
|        James|james@gmail.com|Sr. Developer|
|        Smith|Smith@gmail.com| Project Lead|
+-------------+---------------+-------------+



#### Nested Schema

In [218]:
#Introduce new columns with struct type
from pyspark.sql.functions import struct, col
nestedDF_old = newDF.withColumn("Info", struct(
                 (col("email") != "null").alias("email_exists"),
                 (col("job_profile") != "null").alias("job_profile_exists")
             ))

In [118]:
#Nested columns with boolean structs
nestedDF.show()

+-------------+---------------+-------------+------------+
|employee_name|          email|  job_profile|        Info|
+-------------+---------------+-------------+------------+
|        James|james@gmail.com|Sr. Developer|[true, true]|
|        Smith|Smith@gmail.com| Project Lead|[true, true]|
+-------------+---------------+-------------+------------+



In [226]:
#Nested columns with string structs
nestedDF = newDF.withColumn("Info", struct(
                 col("email").alias("email_nested"),
                 col("job_profile").alias("job_profile_nested")
             ))

In [227]:
nestedDF.printSchema()

root
 |-- employee_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- job_profile: string (nullable = true)
 |-- Info: struct (nullable = false)
 |    |-- email_nested: string (nullable = true)
 |    |-- job_profile_nested: string (nullable = true)



In [229]:
nestedDF.select(
                "employee_name","email","job_profile","info.email_nested","info.job_profile_nested"
).show()

+-------------+---------------+-------------+---------------+------------------+
|employee_name|          email|  job_profile|   email_nested|job_profile_nested|
+-------------+---------------+-------------+---------------+------------------+
|        James|james@gmail.com|Sr. Developer|james@gmail.com|     Sr. Developer|
|        Smith|Smith@gmail.com| Project Lead|Smith@gmail.com|      Project Lead|
+-------------+---------------+-------------+---------------+------------------+



In [230]:
nestedDF.select(
                "employee_name","email","job_profile","info.email_nested","info.job_profile_nested"
).createOrReplaceTempView("emp_nested")

In [231]:
results = spark.sql("SELECT employee_name,email,job_profile,email_nested,job_profile_nested FROM emp_nested")

In [232]:
results.show()

+-------------+---------------+-------------+---------------+------------------+
|employee_name|          email|  job_profile|   email_nested|job_profile_nested|
+-------------+---------------+-------------+---------------+------------------+
|        James|james@gmail.com|Sr. Developer|james@gmail.com|     Sr. Developer|
|        Smith|Smith@gmail.com| Project Lead|Smith@gmail.com|      Project Lead|
+-------------+---------------+-------------+---------------+------------------+



In [233]:
#Define Python Function to Mask column 
def maskColumn(colName):
        return ('*****Masked Data*****')
    
spark.udf.register("maskColumn", maskColumn, StringType())


<function __main__.maskColumn(colName)>

In [234]:
results = spark.sql("SELECT employee_name,maskColumn(email_nested),maskColumn(job_profile_nested) FROM emp_nested")

In [235]:
results.show()

+-------------+------------------------+------------------------------+
|employee_name|maskColumn(email_nested)|maskColumn(job_profile_nested)|
+-------------+------------------------+------------------------------+
|        James|    *****Masked Data*...|          *****Masked Data*...|
|        Smith|    *****Masked Data*...|          *****Masked Data*...|
+-------------+------------------------+------------------------------+

