In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [4]:
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [5]:
# install findspark using pip
!pip install -q findspark

In [6]:
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [7]:
import findspark

findspark.init()

In [8]:
findspark.find()

'/content/spark-3.0.0-bin-hadoop3.2'

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from datetime import datetime

spark = SparkSession.builder\
        .master("local")\
        .appName("Spark-Datium-ETL-App")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [10]:
spark

In [11]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

--2021-08-16 05:07:49--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 34.203.159.69, 35.171.130.195, 52.202.35.83, ...
Connecting to bin.equinox.io (bin.equinox.io)|34.203.159.69|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2021-08-16 05:07:49 (27.6 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
{"tunnels":[],"uri":"/api/tunnels"}


In [None]:
# Install SQL Server Python ODBC driver library to connect to Database later on
!pip install pyodbc

In [90]:
# Import Libraries

from datetime import datetime as dt
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import coalesce, col, to_timestamp, udf, substring_index, substring, concat, lit, explode_outer
from dateutil import parser, tz
import uuid
import pyodbc

from copy import deepcopy
from collections import Counter

import hashlib
import sys
sys.path.append('/content/drive/MyDrive/datium_etl')
from json_flatten import JsonFlatten

In [91]:
"""Global Functions"""
# Function to get shape of pyspark dataframe
def shape_of_sdf(sdf):
    rows = sdf.count()
    columns = len(sdf.columns)
    size = (rows, columns)
    return """Spark DF has {0} rows and {1} columns""".format(rows,columns)

# Function to replace a string with null
def replace(column, value):
    return F.when(column != value, column).otherwise(F.lit(None))

# Function to encrypt the sensitive column data
def encrypt_value(col):
  sha_value = hashlib.sha256(col.encode()).hexdigest()
  return sha_value

# Function to check null counts in dataframe
def nulls_count_check(df):
    df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

"""Global User Defined Functions"""
encrypt_udf = udf(encrypt_value, StringType())
uuid_udf = udf(lambda : str(uuid.uuid4()),StringType())
format_timestamp_udf = udf(lambda x: format_timestamp(x))

"""It takes the String, parse it to a timestamp, convert to UTC, then convert to String again"""
# Create UTC timezone
utc_zone =  tz.gettz('UTC')
func = udf(lambda x: parser.parse(x).astimezone(utc_zone).isoformat() if x is not None else None,  StringType())


In [92]:
# Data Sourcing

# 1. Upload the dataset from CSV file into Spark Dataframe
csv_sdf = csv_sdf = spark.read.option("header", "true")\
          .option("multiLine", "true")\
          .option("inferSchema","true")\
          .csv("/content/drive/MyDrive/datium_etl/data/test.csv")
print(shape_of_sdf(csv_sdf))

# Analyse the schema of the CSV File
csv_sdf.printSchema()

Spark DF has 4000000 rows and 8 columns
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- color: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- last_login: integer (nullable = true)
 |-- is_claimed: string (nullable = true)
 |-- paid_amount: string (nullable = true)



In [40]:
# Read Sample 10 records from CSV Dataset
csv_sdf.show(10)

+----+------------------+--------------------+------+--------------------+----------+----------+------------+
|  id|              name|             address| color|          created_at|last_login|is_claimed| paid_amount|
+----+------------------+--------------------+------+--------------------+----------+----------+------------+
|6311|    Jennifer Green|7593 Juan Through...|  lime|Monday, June 30th...|1202190735|      True| 5004.671532|
|3350|      Karen Grimes|60975 Jessica Squ...|  lime|Monday, June 30th...| 195884769|      True|893.40459503|
|9031|       Calvin Cook|PSC 3989, Box 471...|silver|      1986-06-23TEST| 623477862|      True|       266.6|
|1131|    Peter Mcdowell|PSC 1868, Box 483...|  aqua|      1998-07-17TEST|1244885561|      True| 674.5441267|
|1889|  Mr. Ryan Sanchez|352 Simmons Circl...| white| 2006-05-09 13:29:58|1293151276|     truee|        null|
|1212|       Mark Obrien|51090 Susan Ferry...|  navy| 1974-11-24 20:47:03|1389262412|     truee|        null|
|4932|Chri

In [41]:
# csv_sdf.select(F.countDistinct("id").alias("unique_id_count")).show()
# csv_sdf.select("is_claimed").distinct().show(10)
# csv_sdf.groupBy('created_at').count().where("count > 25").show(50)

# **Data Cleaning, Transformation and PII data Masking **

> 1. (For CSV Dataset)



The developed ETL framework should have the following features:

● clear data transformation and cleaning functions

● extendable to allow ingestion of new raw data sources

● extendable to allow loading into new data stores

    ○ e.g. storing the output in an object store instead of a SQL database

● consist of modular and reusable components

● incorporates unit tests

● logging



In [42]:
# Get an idea for range of CSV dataset
csv_sdf.describe().show()

+-------+----------------+------------+--------------------+-------+-------------------+-------------------+----------+------------------+
|summary|              id|        name|             address|  color|         created_at|         last_login|is_claimed|       paid_amount|
+-------+----------------+------------+--------------------+-------+-------------------+-------------------+----------+------------------+
|  count|         4000000|     4000000|             3750024|4000000|            4000000|            4000000|   3499498|           4000000|
|   mean|     5002.297927|        null|                null|   null|               null|8.055214408093956E8|      null|2499.9453875717863|
| stddev|2887.03672341837|        null|                null|   null|               null|4.651463472266512E8|      null| 2204.931224065069|
|    min|               0|Aaron Abbott|000 Aaron Corner ...|   aqua|1970-01-01 00:02:30|                 63|     False|               0.0|
|    max|            9999| 

In [93]:
# Check the count of nulls in each column
nulls_count_check(csv_sdf)

+---+----+-------+-----+----------+----------+----------+-----------+
| id|name|address|color|created_at|last_login|is_claimed|paid_amount|
+---+----+-------+-----+----------+----------+----------+-----------+
|  0|   0| 249976|    0|         0|         0|    500502|          0|
+---+----+-------+-----+----------+----------+----------+-----------+



In [94]:
# As all columns are of String Type, apply trim operation on all columns
for c in csv_sdf.columns:
    csv_sdf = csv_sdf.withColumn(c,F.trim(F.col(c)))

**As per the requirement, data from CSV file to be loaded in SQL database table must meet the
following requirements:**

- each row must have a unique identifier
- Columns created_at and last_login must be of type timestamp
- Column is_claimed must be of type boolean
- Column paid_amount must be of type numeric with 2 decimal places
- Personally Identifiable Information (PII) or sensitive data should be
masked where applicable


In [95]:
# Identify non-numeric values in paid_amount column 
csv_sdf.select("paid_amount").filter(F.col("paid_amount").rlike("^[a-zA-Z]*$")).groupBy("paid_amount").count().show(10)

+-----------+------+
|paid_amount| count|
+-----------+------+
|       None|249830|
|      pyint|250006|
|       null|249582|
+-----------+------+



In [96]:
# Convert these Non-Numberic values to null in paid_amount columns and valid values to decimal(30,2)
csv_sdf = csv_sdf.withColumn("paid_amount",F.when(F.col("paid_amount").rlike("^[a-zA-Z]*$"), None).otherwise(F.col("paid_amount").cast(DecimalType(30,2))))

In [47]:
# Identify Unique values in is_claimed column 
csv_sdf.select("is_claimed").groupBy("is_claimed").count().show(10)

+----------+-------+
|is_claimed|  count|
+----------+-------+
|     False|1251112|
|      null| 500502|
|      True|1248492|
|    fal_se| 500936|
|     truee| 498958|
+----------+-------+



In [97]:
# Clean is_claimed column and make sure type of column is Boolean
csv_sdf = csv_sdf.withColumn("is_claimed",
                                 F.when(F.soundex(F.col("is_claimed")) == 'T600','True')\
                                  .when(F.soundex(F.col("is_claimed")) == 'F420','False')\
                                  .otherwise(F.col("is_claimed")).cast(BooleanType()))

# Check Unique values in is_claimed column after cleaning 
csv_sdf.select("is_claimed").groupBy("is_claimed").count().show(10)

+----------+-------+
|is_claimed|  count|
+----------+-------+
|      null| 500502|
|      true|1747450|
|     false|1752048|
+----------+-------+



In [98]:
# Convert last_login column values to Timestamp Type
csv_sdf = csv_sdf.withColumn("last_login", F.from_unixtime(F.col("last_login"),"yyyy-MM-dd HH:mm:ss").cast(TimestampType()))

In [99]:
# Convert Color columns values with Initcap inbuilt Pyspark Function
csv_sdf = csv_sdf.withColumn("color",F.initcap(F.col("color")).cast(StringType()))

In [51]:
print(csv_sdf.dtypes)
csv_sdf.show(10)

[('id', 'string'), ('name', 'string'), ('address', 'string'), ('color', 'string'), ('created_at', 'string'), ('last_login', 'timestamp'), ('is_claimed', 'boolean'), ('paid_amount', 'decimal(30,2)')]
+----+------------------+--------------------+------+--------------------+-------------------+----------+-----------+
|  id|              name|             address| color|          created_at|         last_login|is_claimed|paid_amount|
+----+------------------+--------------------+------+--------------------+-------------------+----------+-----------+
|6311|    Jennifer Green|7593 Juan Through...|  Lime|Monday, June 30th...|2008-02-05 05:52:15|      true|    5004.67|
|3350|      Karen Grimes|60975 Jessica Squ...|  Lime|Monday, June 30th...|1976-03-17 04:26:09|      true|     893.40|
|9031|       Calvin Cook|PSC 3989, Box 471...|Silver|      1986-06-23TEST|1989-10-04 04:17:42|      true|     266.60|
|1131|    Peter Mcdowell|PSC 1868, Box 483...|  Aqua|      1998-07-17TEST|2009-06-13 09:32:41

In [52]:
# Analyse columns data in created_at column
csv_sdf.groupBy('created_at').count().where("count > 25").show(20)

+--------------------+------+
|          created_at| count|
+--------------------+------+
|      1985-04-09TEST|    26|
|Monday, June 30th...|250292|
|          not a date|249856|
|      1990-05-17TEST|    27|
|      2018-12-19TEST|    26|
|      2013-10-01TEST|    27|
|         20001-01-01|249104|
|      1981-04-15TEST|    26|
|      2018-11-10TEST|    30|
|      2007-09-18TEST|    29|
|      1973-02-01TEST|    26|
|      1973-08-05TEST|    27|
|      1981-07-22TEST|    26|
|      1985-08-06TEST|    31|
|      2012-12-20TEST|    26|
|      1991-02-27TEST|    27|
|      2001-11-25TEST|    27|
|      1997-06-20TEST|    27|
|      1987-03-20TEST|    26|
|      1999-06-13TEST|    26|
+--------------------+------+
only showing top 20 rows



> Insights : 
    As the data show quite a number of different values in created_at column. Need to apply some regex, clean few incorrect data values and set values having only alphabets as None

In [100]:
# Clean created_at column data

# 1. Replace values with not of any date type pattern with null 
csv_sdf = csv_sdf.withColumn("created_at", replace(F.col("created_at"), "not a date"))

# 2. Clean date values with random string embedded in it
csv_sdf = csv_sdf.withColumn("created_at", F.when(F.col('created_at').rlike("(TEST$)"), F.regexp_replace(F.col('created_at'),r'(TEST$)','')).otherwise(F.col('created_at')))\
                 .withColumn("created_at", F.when(F.col('created_at').rlike("(20001-01-01$)"), F.regexp_replace(F.col('created_at'),r'(20001-01-01$)','2001-01-01')).otherwise(F.col('created_at')))

In [101]:
# Parse different date formats and convert all values to Timestamp type
csv_sdf = csv_sdf.withColumn("created_at",func(F.col("created_at")).cast(TimestampType()))

In [55]:
# Get all column names and it's types
for col in csv_sdf.dtypes:
    print(col[0]+" "+col[1])

# Drop Duplicates before encrypting and masking the data
print("CSV Dataset with duplicates",shape_of_sdf(csv_sdf))
csv_sdf = csv_sdf.drop_duplicates()

print("CSV Dataset without duplicates",shape_of_sdf(csv_sdf))

id string
name string
address string
color string
created_at timestamp
last_login timestamp
is_claimed boolean
paid_amount decimal(30,2)
CSV Dataset with duplicates Spark DF has 4000000 rows and 8 columns
CSV Dataset without duplicates Spark DF has 4000000 rows and 8 columns


**Insights :** 
    There are no duplicate records in CSV dataset

In [102]:
# Mask PII Columns(name and address) data

# Function to mask the columns
def mask_func(colVal):
    if (colVal is None) | (len(colVal) >= 16):
        charList=list(colVal)
        charList[4:12]='x'*8
        return "".join(charList)
    else:
        return colVal

# Create the UDF
mask_func_udf = udf(mask_func, StringType())

In [103]:
# Remove newline character from address column values
csv_sdf = csv_sdf.withColumn("address", F.regexp_replace(F.col("address"), "[\n\r]", " "))

# Mask the address sensitive information
csv_sdf = csv_sdf.withColumn("address_l", F.substring_index(csv_sdf.address, ',', 1))\
                 .withColumn("address_r", F.substring_index(csv_sdf.address, ',', -1))\
                 .withColumn("address", F.when(F.col('address').isNotNull(),F.concat(F.lit("XXXXXXXX"), F.col("address_r"))).otherwise(F.col('address')))

csv_sdf = csv_sdf.drop('address_l','address_r')


csv_sdf.show(10)

+----+------------------+--------------------+------+-------------------+-------------------+----------+-----------+
|  id|              name|             address| color|         created_at|         last_login|is_claimed|paid_amount|
+----+------------------+--------------------+------+-------------------+-------------------+----------+-----------+
|6311|    Jennifer Green|   XXXXXXXX TX 43780|  Lime|2013-06-30 00:00:00|2008-02-05 05:52:15|      true|    5004.67|
|3350|      Karen Grimes|   XXXXXXXX FL 71671|  Lime|2013-06-30 00:00:00|1976-03-17 04:26:09|      true|     893.40|
|9031|       Calvin Cook|XXXXXXXX Box 4719...|Silver|1986-06-23 00:00:00|1989-10-04 04:17:42|      true|     266.60|
|1131|    Peter Mcdowell|XXXXXXXX Box 4833...|  Aqua|1998-07-17 00:00:00|2009-06-13 09:32:41|      true|     674.54|
|1889|  Mr. Ryan Sanchez|   XXXXXXXX OK 83627| White|2006-05-09 13:29:58|2010-12-24 00:41:16|      true|       null|
|1212|       Mark Obrien|   XXXXXXXX KS 09676|  Navy|1974-11-24 

In [104]:
# Encrypt the name column values
csv_sdf = csv_sdf.withColumn("name",encrypt_udf(F.col("name")))

In [105]:
# Create Unique_id column for each record
csv_sdf = csv_sdf.withColumn("unique_id",uuid_udf())

In [106]:
# Handling NaN and NaT values before uploading in database
csv_sdf=csv_sdf.na.replace('NaN',None)
csv_sdf=csv_sdf.na.replace('NaT',None)

csv_sdf.where("id='2939'").show(10)

+----+--------------------+--------------------+------+-------------------+-------------------+----------+-----------+--------------------+
|  id|                name|             address| color|         created_at|         last_login|is_claimed|paid_amount|           unique_id|
+----+--------------------+--------------------+------+-------------------+-------------------+----------+-----------+--------------------+
|2939|843c6bf8075574723...|   XXXXXXXX SD 99372|  Teal|1975-06-30 09:52:21|1993-06-14 22:35:20|      true|       null|1c97aced-4601-44a...|
|2939|db5f5184a4074d2d6...|   XXXXXXXX SC 03397|  Teal|2013-09-30 00:00:00|1973-04-22 03:52:21|      true|     987.59|5f36cdaf-fe63-42f...|
|2939|0eef1893531d7d37e...|   XXXXXXXX VT 98873|Purple|2019-03-16 07:08:54|1980-05-17 15:15:56|     false|    5389.20|a0f926f2-97d6-4d6...|
|2939|c4b61447c12c342be...|   XXXXXXXX TX 18484| Black|2009-12-13 06:49:22|1984-05-05 09:45:48|      true|    6016.00|be2886e0-7c8b-438...|
|2939|066645aa2b189f

In [107]:
# Final Shape of dataframe to be loaded in Table
print(shape_of_sdf(csv_sdf))

Spark DF has 4000000 rows and 9 columns


In [65]:
# Setup Database Connection
driver = '{ODBC Driver 17 for SQL Server}'
db_host = '<Enter the host name>'
db_port = 1433
database = 'Distributors'
user = '<Enter username>'
pwd = '<Enter Password>'
 
conn_string = 'driver={0}; server={1}; database={2}; uid={3}; pwd={4}'.format(driver,
                                                                              db_host,
                                                                              database,
                                                                              user,
                                                                              pwd
                                                                        )
 
conn = pyodbc.connect(conn_string)

Error: ignored

In [None]:
# Use the session to create a table in SQL Database
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE dbo.test (
    id nvarchar(256) NOT NULL,
    name nvarchar(256) NULL,
    address nvarchar(256) NULL,
    color nvarchar(256) NULL,
    created_at datetime,
    last_login datetime,
    is_claimed bit NULL,
    paid_amount decimal(30,2),
    unique_id nvarchar(256) PRIMARY KEY
    );
''')
conn.commit()

In [None]:
# Write the data to SQL Server Database
# 1. CSV Data
# Insert data in [scf.test] table of SQL Server Database
csv_sdf.write.mode("append") \
        .format("jdbc") \
        .option("url", "jdbc:sqlserver://"+db_host+":1433;databaseName=Distributors") \
        .option("dbtable", 'dbo.test') \
        .option("user",user) \
        .option("password",pwd) \
        .save()

In [None]:
# Read the sample data from Loaded data
cursor.execute('SELECT top 10 * FROM dbo.test;')
for row in cursor:
    print(row)

In [108]:
# Drop the Dataframe to release the memory
del csv_sdf

In [109]:
# 2. Data Sourcing from JSON file type
json_sdf = spark.read.json("/content/drive/MyDrive/datium_etl/data/test.json")
print(shape_of_sdf(json_sdf))

# Analyse the schema of the JSON File
json_sdf.printSchema()

Spark DF has 100000 rows and 9 columns
root
 |-- car_brand: string (nullable = true)
 |-- car_license_plate: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- jobs_history: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- employer: string (nullable = true)
 |    |    |-- end: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- is_fulltime: boolean (nullable = true)
 |    |    |-- occupation: string (nullable = true)
 |    |    |-- start: string (nullable = true)
 |-- logged_at: long (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- user_details: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- dob: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- national_id: string (nullable = true)
 |    |-- password: string (nullable = true)
 |    |-- telephone_numbers: array (nullable = true)
 | 

In [110]:
# Cleaning, Tranformation for JSON File Data
"""
JSON records are read from the file path, and the global schema is computed. 
This schema is then passed while creating an object of the JsonFlatten class that
initializes all class variables. When the compute function is called from the object of JsonFlatten class, 
the class variables are updated.
"""
json_schema = json_sdf.schema
af = JsonFlatten(json_schema)

"""
Compute function performs the required computation and gets all the resources
needed for further process of selecting and exploding fields
"""
af.compute()

df1 = json_sdf
df1.show(3)

+---------+-----------------+--------------------+---------+--------------------+----------+--------------------+--------------------+--------------------+
|car_brand|car_license_plate|          created_at|is_active|        jobs_history| logged_at|          updated_at|        user_details|             user_id|
+---------+-----------------+--------------------+---------+--------------------+----------+--------------------+--------------------+--------------------+
|     null|             null|2003-09-19T13:43:...|     null|[[, 1997-10-02, 8...| 881885008|1986-01-02T09:49:...|[208 Ewing Pine
N...|e9703a66-6556-4b4...|
|     null|             null|2002-10-08T09:14:...|     null|[[, 2015-08-04, b...|  77028400|1984-08-26T14:42:...|[17031 Justin Pla...|aa246388-104c-44f...|
|     null|             null|1993-11-22T08:32:...|     null|[[, 2018-03-30, 8...|1282838750|2010-12-31T12:18:...|[8328 Ryan Overpa...|86af3e4d-6c57-424...|
+---------+-----------------+--------------------+---------+----

In [111]:
"""
To open/explode, all first-level columns are selected with the columns in rest which haven’t appeared already. 
A counter is kept on the target names which counts the duplicate target column names. Any target column name 
having a count greater than 1 is renamed as <path_to_target_field> with each level separated by a > . 
All paths to those fields are added to the visited set of paths.
"""

visited = set([f'.{column}' for column in df1.columns])
duplicate_target_counter = Counter(af.all_fields.values())
cols_to_select = df1.columns
for rest_col in af.rest:
    if rest_col not in visited:
        cols_to_select += [rest_col[1:]] if (duplicate_target_counter[af.all_fields[rest_col]]==1 and af.all_fields[rest_col] not in df1.columns) else [col(rest_col[1:]).alias(f"{rest_col[1:].replace('.', '>')}")]
        visited.add(rest_col)

df1 = df1.select(cols_to_select)
df1.show(3)

+---------+-----------------+--------------------+---------+--------------------+----------+--------------------+--------------------+--------------------+-----------+----------+------------------+--------------------+--------------------+----------+
|car_brand|car_license_plate|          created_at|is_active|        jobs_history| logged_at|          updated_at|        user_details|             user_id|national_id|  password|              name|             address|            username|       dob|
+---------+-----------------+--------------------+---------+--------------------+----------+--------------------+--------------------+--------------------+-----------+----------+------------------+--------------------+--------------------+----------+
|     null|             null|2003-09-19T13:43:...|     null|[[, 1997-10-02, 8...| 881885008|1986-01-02T09:49:...|[208 Ewing Pine
N...|e9703a66-6556-4b4...|199-43-8140|M!*(W7SpoU|    Joshua Webster|208 Ewing Pine
No...|bboyle@garza-shel...|1983-05-

In [112]:
if af.order:
    for key in af.order:
        column = key.split('.')[-1]
        if af.bottom_to_top[key]:
            #########
            #values for the column in bottom_to_top dict exists if it is an array type
            #########
            df1 = df1.select('*', F.explode_outer(F.col(column)).alias(f"{column}_exploded")).drop(column)
            data_type = df1.select(f"{column}_exploded").schema.fields[0].dataType
            if not (isinstance(data_type, StructType) or isinstance(data_type, ArrayType)):
                df1 = df1.withColumnRenamed(f"{column}_exploded", column if duplicate_target_counter[af.all_fields[key]]<=1 else key[1:].replace('.', '>'))
                visited.add(key)
            else:
                #grabbing all paths to columns after explode
                cols_in_array_col = set(map(lambda x: f'{key}.{x}', df1.select(f'{column}_exploded.*').columns))
                #retrieving unvisited columns
                cols_to_select_set = cols_in_array_col.difference(visited)
                all_cols_to_select_set = set(af.bottom_to_top[key])
                #check done for duplicate column name & path
                cols_to_select_list = list(map(lambda x: f"{column}_exploded{'.'.join(x.split(key)[1:])}" if (duplicate_target_counter[af.all_fields[x]]<=1 and x.split('.')[-1] not in df1.columns) else col(f"{column}_exploded{'.'.join(x.split(key)[1:])}").alias(f"{x[1:].replace('.', '>')}"), list(all_cols_to_select_set)))
                #updating visited set
                visited.update(cols_to_select_set)
                rem = list(map(lambda x: f"{column}_exploded{'.'.join(x.split(key)[1:])}", list(cols_to_select_set.difference(all_cols_to_select_set))))
                df1 = df1.select(df1.columns + cols_to_select_list + rem).drop(f"{column}_exploded")        
        else:
            #########
            #values for the column in bottom_to_top dict do not exist if it is a struct type / array type containing a string type
            #########
            #grabbing all paths to columns after opening
            cols_in_array_col = set(map(lambda x: f'{key}.{x}', df1.selectExpr(f'{column}.*').columns))
            #retrieving unvisited columns
            cols_to_select_set = cols_in_array_col.difference(visited)
            #check done for duplicate column name & path
            cols_to_select_list = list(map(lambda x: f"{column}.{x.split('.')[-1]}" if (duplicate_target_counter[x.split('.')[-1]]<=1 and x.split('.')[-1] not in df1.columns) else col(f"{column}.{x.split('.')[-1]}").alias(f"{x[1:].replace('.', '>')}"), list(cols_to_select_set)))
            #updating visited set
            visited.update(cols_to_select_set)
            df1 = df1.select(df1.columns + cols_to_select_list).drop(f"{column}")

final_sdf = df1.select([field[1:].replace('.', '>') if duplicate_target_counter[af.all_fields[field]]>1 else af.all_fields[field] for field in af.all_fields])

final_sdf.show(5)

+---------+-----------------+--------------------+---------+--------+----------+--------------------+-----------+--------------------+----------+----------+--------------------+--------------------+----------+------------------+-----------+----------+--------------------+--------------------+--------------------+
|car_brand|car_license_plate|          created_at|is_active|employer|       end|                  id|is_fulltime|          occupation|     start| logged_at|          updated_at|             address|       dob|              name|national_id|  password|   telephone_numbers|            username|             user_id|
+---------+-----------------+--------------------+---------+--------+----------+--------------------+-----------+--------------------+----------+----------+--------------------+--------------------+----------+------------------+-----------+----------+--------------------+--------------------+--------------------+
|     null|             null|2003-09-19T13:43:...|     

In [71]:
# Check the count of nulls in each column
final_sdf.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in final_sdf.columns]).show(5)

+---------+-----------------+----------+---------+--------+------+---+-----------+----------+-----+---------+----------+-------+------+----+-----------+--------+-----------------+--------+-------+
|car_brand|car_license_plate|created_at|is_active|employer|   end| id|is_fulltime|occupation|start|logged_at|updated_at|address|   dob|name|national_id|password|telephone_numbers|username|user_id|
+---------+-----------------+----------+---------+--------+------+---+-----------+----------+-----+---------+----------+-------+------+----+-----------+--------+-----------------+--------+-------+
|   399768|           399768|         0|   410933|  266577|166535|  0|     255704|         0|    0|        0|         0| 366414|366414|   0|          0|       0|            33344|       0|      0|
+---------+-----------------+----------+---------+--------+------+---+-----------+----------+-----+---------+----------+-------+------+----+-----------+--------+-----------------+--------+-------+



In [113]:
# Remove newline character from address column values
final_sdf = final_sdf.withColumn("address", F.regexp_replace(F.col("address"), "[\n\r]", " "))

# Mask the address sensitive information
final_sdf = final_sdf.withColumn("address_l", F.substring_index(final_sdf.address, ',', 1))\
                 .withColumn("address_r", F.substring_index(final_sdf.address, ',', -1))\
                 .withColumn("address", F.when(F.col('address').isNotNull(),F.concat(F.lit("XXXXXXXX"), F.col("address_r"))).otherwise(F.col('address')))


In [114]:
# Split dataframes as per SQL Table Structure after complete JSON file explode
user_sdf = final_sdf.select('user_id','username','password','national_id','name','dob','address','created_at','logged_at','updated_at','is_active','car_brand','car_license_plate')
print(shape_of_sdf(user_sdf))

job_history_sdf = final_sdf.select('user_id','employer','end','id','is_fulltime','occupation','start')
print(shape_of_sdf(user_sdf))

tel_numbers_sdf = final_sdf.select('user_id','telephone_numbers')
print(shape_of_sdf(tel_numbers_sdf))

Spark DF has 433112 rows and 13 columns
Spark DF has 433112 rows and 13 columns
Spark DF has 433112 rows and 2 columns


In [74]:
# Check some records where telephone number is not null
tel_numbers_sdf.where("telephone_numbers is not null").show(10)

+--------------------+--------------------+
|             user_id|   telephone_numbers|
+--------------------+--------------------+
|e9703a66-6556-4b4...|001-640-924-3637x268|
|e9703a66-6556-4b4...|001-127-583-8338x...|
|aa246388-104c-44f...|        840.808.9845|
|aa246388-104c-44f...|001-558-896-0120x701|
|86af3e4d-6c57-424...|001-672-539-5662x...|
|86af3e4d-6c57-424...|        310-693-9384|
|a610bdb4-8d67-47b...|+1-098-016-5290x7...|
|a610bdb4-8d67-47b...| +1-348-828-2586x800|
|2bde2981-4bd8-4da...|   (168)847-2979x521|
|2bde2981-4bd8-4da...| (975)922-1242x44183|
+--------------------+--------------------+
only showing top 10 rows



In [115]:
# Clean, transform and mask tel_numbers_sdf
tel_numbers_sdf = tel_numbers_sdf.withColumn("telephone_numbers",F.regexp_replace(F.col("telephone_numbers"), "\\d{2,}", " * "))

In [116]:
# Drop Duplicates
print("Dataset with duplicates",shape_of_sdf(tel_numbers_sdf))
tel_numbers_sdf = tel_numbers_sdf.drop_duplicates()
print("Dataset without duplicates",shape_of_sdf(tel_numbers_sdf))

# Take a look at sample dataset
tel_numbers_sdf.show(10)

Dataset with duplicates Spark DF has 433112 rows and 2 columns
Dataset without duplicates Spark DF has 233158 rows and 2 columns
+--------------------+-------------------+
|             user_id|  telephone_numbers|
+--------------------+-------------------+
|f32637b5-07fc-408...|         * . * . * |
|abfac530-855f-4fb...|     * - * - * x * |
|c83be50d-56d4-4b3...| +1- * - * - * x * |
|54a1d47b-21cd-431...|               null|
|afa0357b-c42e-4bc...|     * . * . * x * |
|ce95d60b-3f2f-4e5...|     * . * . * x * |
|29c8c8c9-b1e5-4df...| * - * - * - * x * |
|5c3b239d-2795-4ea...|               null|
|a39ad087-0c89-4f6...|     * - * - * x * |
|31f9b130-becc-439...|         * . * . * |
+--------------------+-------------------+
only showing top 10 rows



In [117]:
# Get all column names and it's types
for col in job_history_sdf.dtypes:
    print(col[0]+" "+col[1])

job_history_sdf.show(10)

user_id string
employer string
end string
id string
is_fulltime boolean
occupation string
start string
+--------------------+--------+----------+--------------------+-----------+--------------------+----------+
|             user_id|employer|       end|                  id|is_fulltime|          occupation|     start|
+--------------------+--------+----------+--------------------+-----------+--------------------+----------+
|e9703a66-6556-4b4...|    null|1997-10-02|8c48a084-27d7-4f1...|      false|        Set designer|1996-12-26|
|e9703a66-6556-4b4...|    null|1997-10-02|8c48a084-27d7-4f1...|      false|        Set designer|1996-12-26|
|aa246388-104c-44f...|    null|2015-08-04|b9d4fc47-0e53-449...|       true|      Chief of Staff|1991-09-12|
|aa246388-104c-44f...|    null|2015-08-04|b9d4fc47-0e53-449...|       true|      Chief of Staff|1991-09-12|
|86af3e4d-6c57-424...|    null|2018-03-30|818289aa-0a0d-45a...|       true|Clinical cytogene...|1970-10-28|
|86af3e4d-6c57-424...|    null|20

In [118]:
# Analyse columns data in start and end columns
job_history_sdf.groupBy('start').count().where("count > 25").show(10)
job_history_sdf.groupBy('end').count().where("count > 25").show(10)

+----------+-----+
|     start|count|
+----------+-----+
|1982-12-10|   34|
|1984-03-08|   33|
|1993-11-21|   48|
|1983-01-16|   40|
|2014-02-16|   38|
|2008-10-26|   32|
|2003-02-22|   29|
|2015-05-01|   35|
|1981-05-11|   29|
|1994-03-01|   28|
+----------+-----+
only showing top 10 rows

+----------+-----+
|       end|count|
+----------+-----+
|1994-08-31|   34|
|1993-11-09|   31|
|2016-08-17|   26|
|2000-11-13|   26|
|1999-11-18|   28|
|1994-03-01|   38|
|1973-04-27|   27|
|2006-05-24|   28|
|2008-04-07|   26|
|1991-10-07|   28|
+----------+-----+
only showing top 10 rows



In [79]:
# Check Unique values in occupation column after cleaning 
job_history_sdf.select("occupation").groupBy("occupation").count().show(10)

+--------------------+-----+
|          occupation|count|
+--------------------+-----+
| Retail merchandiser|  651|
| Librarian, academic|  764|
|Designer, ceramic...|  602|
|    Catering manager|  683|
|Engineer, aeronau...|  720|
|Diplomatic Servic...|  747|
|Primary school te...|  652|
| Early years teacher|  524|
|     Patent examiner|  637|
|Occupational hygi...|  767|
+--------------------+-----+
only showing top 10 rows



In [119]:
# Convert jobs_history_sdf columns to required datatypes
job_history_sdf = job_history_sdf.withColumn("start",F.col('start').cast(DateType()))\
                                 .withColumn("end",F.col('start').cast(DateType()))

# Get all column names and it's types
for col in job_history_sdf.dtypes:
    print(col[0]+" "+col[1])

# Drop Duplicates
print("Dataset with duplicates",shape_of_sdf(job_history_sdf))
job_history_sdf = job_history_sdf.drop_duplicates()

print("Dataset without duplicates",shape_of_sdf(job_history_sdf))

job_history_sdf.show(10)

user_id string
employer string
end date
id string
is_fulltime boolean
occupation string
start date
Dataset with duplicates Spark DF has 433112 rows and 7 columns
Dataset without duplicates Spark DF has 133307 rows and 7 columns
+--------------------+------------+----------+--------------------+-----------+--------------------+----------+
|             user_id|    employer|       end|                  id|is_fulltime|          occupation|     start|
+--------------------+------------+----------+--------------------+-----------+--------------------+----------+
|8945242f-af27-40c...|        null|2011-12-23|57c9c1c1-0ffd-412...|       true|    Engineer, mining|2011-12-23|
|a48ba4e2-2f5b-4da...|        null|2000-10-01|21b0bf54-4e80-4f6...|       true|Loss adjuster, ch...|2000-10-01|
|57e7429d-e67e-446...|        null|1993-06-03|9b6c4729-abfb-426...|       null|Designer, blown g...|1993-06-03|
|8469d401-8a77-419...|        null|2012-12-27|7fe9f1d2-bbfc-4bd...|       true|Financial risk an...|

In [120]:
user_sdf = final_sdf.select('user_id','username','password','national_id','name','dob','address','created_at','logged_at','updated_at','is_active','car_brand','car_license_plate')
print(shape_of_sdf(user_sdf))

# Analyse user_sdf dataframe columns
for col in user_sdf.dtypes:
    print(col[0]+" "+col[1])

user_sdf.show(10)

Spark DF has 433112 rows and 13 columns
user_id string
username string
password string
national_id string
name string
dob string
address string
created_at string
logged_at bigint
updated_at string
is_active boolean
car_brand string
car_license_plate string
+--------------------+--------------------+----------+-----------+------------------+----------+-----------------+--------------------+----------+--------------------+---------+---------+-----------------+
|             user_id|            username|  password|national_id|              name|       dob|          address|          created_at| logged_at|          updated_at|is_active|car_brand|car_license_plate|
+--------------------+--------------------+----------+-----------+------------------+----------+-----------------+--------------------+----------+--------------------+---------+---------+-----------------+
|e9703a66-6556-4b4...|bboyle@garza-shel...|M!*(W7SpoU|199-43-8140|    Joshua Webster|1983-05-15|XXXXXXXX MN 35292|2003-09-19T

In [121]:
# Check the count of unique user_id
user_sdf.select(F.countDistinct("user_id").alias("unique_id_count")).show()

# Analyse date columns
user_sdf.groupBy('dob').count().where("count > 10").show(5)

+---------------+
|unique_id_count|
+---------------+
|         100000|
+---------------+

+----------+-----+
|       dob|count|
+----------+-----+
|2012-03-09|   14|
|2006-11-18|   12|
|2014-08-29|   12|
|1989-07-16|   14|
|1999-02-14|   12|
+----------+-----+
only showing top 5 rows



In [122]:
# Mask Sensitive data in Users dataset
"""
For National_ID :
where ever two consecutive digits are found, mask the digits
"""

user_sdf = user_sdf.withColumn("national_id",F.regexp_replace(F.col("national_id"), "\\d{2,}", " * "))

In [123]:
from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import concat, expr, length, lit, split, when

"""
For username :
(?<=.{2}): Positive lookbehind for 2 characters
(?<=.{1}): Positive lookbehind for 1 character
\w+: Any word characters
(?=.{2}@): Positive lookahead for 2 characters followed by a literal @
(?=.{1}@): Positive lookahead for 1 character followed by a literal @
"""

patA = "regexp_replace(username, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)"
patB = "regexp_replace(username, concat('(?<=.{1})', pattern, '(?=.{1}@)'), replacement)"

user_sdf = user_sdf.withColumn("username_first_part", split("username", "@").getItem(0))\
                   .withColumn("pattern", when(length("username_first_part") > 5, regexp_extract("username", r"(?<=.{2})\w+(?=.{2}@)", 0)).otherwise(regexp_extract("username", r"(?<=.{1})\w+(?=.{1}@)", 0)))\
                   .withColumn("replacement", regexp_replace("pattern", r"\w", "*"))\
                   .withColumn("username",when(length("username_first_part") > 5, expr(patA)).when(length("username_first_part") > 3, expr(patB)).otherwise(regexp_replace('username', '\w(?=@)', '*'))).drop("pattern", "replacement","username_first_part")

In [124]:
# Look at username sample data after masking
user_sdf.select('username').show(5)

+--------------------+
|            username|
+--------------------+
|bb**le@garza-shel...|
|bb**le@garza-shel...|
|ha******os@baker-...|
|ha******os@baker-...|
|se******ey@willia...|
+--------------------+
only showing top 5 rows



In [125]:
# Convert required columns to Timestamp Type
"""Convert to String Type and take out only first 19 characters and then cast columns to Timestamp"""
user_sdf = user_sdf.withColumn("created_at",F.col('created_at').cast(StringType()))

user_sdf = user_sdf.withColumn("created_at", F.substring_index(user_sdf.created_at, ':', -1).cast(LongType()))\
                   .withColumn("updated_at", F.substring_index(user_sdf.updated_at, ':', -1).cast(LongType()))

# Convert last_login column values to Timestamp Type
user_sdf = user_sdf.withColumn("logged_at", F.from_unixtime(F.col("logged_at"),"yyyy-MM-dd HH:mm:ss").cast(TimestampType()))\
                   .withColumn("created_at", F.from_unixtime(F.col("created_at"),"yyyy-MM-dd HH:mm:ss").cast(TimestampType()))\
                   .withColumn("updated_at", F.from_unixtime(F.col("updated_at"),"yyyy-MM-dd HH:mm:ss").cast(TimestampType()))

In [126]:
# Drop Duplicates before masking columns
print("Dataset with duplicates",shape_of_sdf(user_sdf))
user_sdf = user_sdf.drop_duplicates()
print("Dataset without duplicates",shape_of_sdf(user_sdf))

Dataset with duplicates Spark DF has 433112 rows and 13 columns
Dataset without duplicates Spark DF has 100000 rows and 13 columns


In [127]:
user_sdf = user_sdf.withColumn("address_l", F.substring_index(user_sdf.address, ',', 1))\
                   .withColumn("address_r", F.substring_index(user_sdf.address, ',', -1))\
                   .withColumn("address", F.when(F.col('address').isNotNull(),F.concat(F.lit("XXXXXXXX"), F.col("address_r"))).otherwise(F.col('address')))

user_sdf = user_sdf.drop('address_l','address_r')

# Encrypt the name column values
user_sdf = user_sdf.withColumn("name",encrypt_udf(F.col("name")))

user_sdf.show(10)

+--------------------+--------------------+----------+-----------+--------------------+----------+--------------------+-------------------+-------------------+-------------------+---------+---------+-----------------+
|             user_id|            username|  password|national_id|                name|       dob|             address|         created_at|          logged_at|         updated_at|is_active|car_brand|car_license_plate|
+--------------------+--------------------+----------+-----------+--------------------+----------+--------------------+-------------------+-------------------+-------------------+---------+---------+-----------------+
|9024c53d-71a1-46b...|sa**09@campbell-f...|*E3Jbdu1I3| * - * - * |1897378f7b3e4bde6...|      null|                null|2017-05-17 12:43:08|2005-11-11 23:14:00|1988-11-27 23:18:02|     null|     null|             null|
|f03010d0-fffd-43e...|je********yl@hotm...|E5nIuRYd@5| * - * - * |b623279c1b5db0732...|      null|                null|2014-02-1

In [128]:
# Handling NaN and NaT values before uploading in database
user_sdf = user_sdf.na.replace('NaN',None)
user_sdf = user_sdf.na.replace('NaT',None)

for col in user_sdf.dtypes:
    print(col[0]+" "+col[1])

user_sdf.show(10)

user_id string
username string
password string
national_id string
name string
dob string
address string
created_at timestamp
logged_at timestamp
updated_at timestamp
is_active boolean
car_brand string
car_license_plate string
+--------------------+--------------------+----------+-----------+--------------------+----------+--------------------+-------------------+-------------------+-------------------+---------+---------+-----------------+
|             user_id|            username|  password|national_id|                name|       dob|             address|         created_at|          logged_at|         updated_at|is_active|car_brand|car_license_plate|
+--------------------+--------------------+----------+-----------+--------------------+----------+--------------------+-------------------+-------------------+-------------------+---------+---------+-----------------+
|9024c53d-71a1-46b...|sa**09@campbell-f...|*E3Jbdu1I3| * - * - * |1897378f7b3e4bde6...|      null|                null|2

**Write JSON File data in DB Tables**

In [129]:
# Check the shape of final dataframes to be loaded
print(shape_of_sdf(user_sdf))
print(shape_of_sdf(tel_numbers_sdf))
print(shape_of_sdf(job_history_sdf))

Spark DF has 100000 rows and 13 columns
Spark DF has 233158 rows and 2 columns
Spark DF has 133307 rows and 7 columns


In [None]:
# Create Tables in SQL Server Database
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE dbo.users (
user_id nvarchar(256) NOT NULL PRIMARY KEY,
username nvarchar(256) NOT NULL,
password nvarchar(256) NOT NULL,
national_id nvarchar(256) NULL,
name nvarchar(256) NOT NULL,
dob date,
address nvarchar(256) NULL,
created_at datetime,
logged_at datetime,
updated_at datetime,
is_active bit NULL,
car_brand nvarchar(256) NULL,
car_license_plate nvarchar(256) NULL
);
''')
conn.commit()

In [None]:
# Write the data to SQL Server Database
# 1. JSON Data - users
# Insert data in [Distributors.dbo.users] table of SQL Server Database
user_sdf.write.mode("append") \
        .format("jdbc") \
        .option("url", "jdbc:sqlserver://"+db_host+":1433;databaseName=Distributors") \
        .option("dbtable", 'dbo.users') \
        .option("user",user) \
        .option("password",pwd) \
        .save()

In [None]:
# Drop the dataframe to release the memory
del user_sdf

In [None]:
# Read the sample data from Loaded data
cursor.execute('SELECT top 10 * FROM dbo.users;')
for row in cursor:
    print(row)

In [None]:
# Create Tables in SQL Server Database
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE dbo.job_history (
user_id nvarchar(256) NOT NULL,
employer nvarchar(256) NULL,
[end] date,
id nvarchar(256) NULL,
is_fulltime bit NULL,
occupation nvarchar(256) NULL,
[start] date
);
''')
conn.commit()

In [None]:
# Write the data to SQL Server Database
# 2. JSON Data - job_history
# Insert data in [Distributors.dbo.job_history] table of SQL Server Database
job_history_sdf.write.mode("append") \
        .format("jdbc") \
        .option("url", "jdbc:sqlserver://"+db_host+":1433;databaseName=Distributors") \
        .option("dbtable", 'dbo.job_history') \
        .option("user",user) \
        .option("password",pwd) \
        .save()

In [None]:
# Drop the dataframe to release the memory
del job_history_sdf

In [None]:
# Read the sample data from Loaded data
cursor.execute('SELECT top 10 * FROM dbo.job_history;')
for row in cursor:
    print(row)

In [None]:
# Create Tables in SQL Server Database
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE dbo.telephone_numbers (
user_id nvarchar(256) NOT NULL,
telephone_numbers nvarchar(256) NULL
)
''')
conn.commit()

In [None]:
# Write the data to SQL Server Database
# 3. JSON Data - telephone_numbers
# Insert data in [Distributors.dbo.telephone_numbers] table of SQL Server Database
tel_numbers_sdf.write.mode("append") \
        .format("jdbc") \
        .option("url", "jdbc:sqlserver://"+db_host+":1433;databaseName=Distributors") \
        .option("dbtable", 'dbo.telephone_numbers') \
        .option("user",user) \
        .option("password",pwd) \
        .save()

In [None]:
# Drop the dataframe to release the memory
del tel_numbers_sdf

In [None]:
# Read the sample data from Loaded data
cursor.execute('SELECT top 10 * FROM dbo.telephone_numbers;')
for row in cursor:
    print(row)