In [None]:
#------------------------------------------------------------------------------#
# Importing packages                                                           #
#------------------------------------------------------------------------------#
from pyspark.sql import Window
from pyspark.sql.functions import arrays_zip, explode
from pyspark.sql import functions as F
from pyspark.sql.functions import col
# added to replace and cast column
from pyspark.sql.functions import *
# added to replace and cast column
import os
import json
import pyspark
import glob
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
from pyspark.sql.types import (
    StructType,
    StructField,
    TimestampType,
    StringType,
    BooleanType,
    IntegerType,
    DateType,
    LongType
)

from pyspark.sql.types import (
    TimestampType
)
from pyspark.sql.functions import (

    col, when, size, split, unix_timestamp,
    from_unixtime

)

import pyspark.sql.functions as func
#added by Nasa to convert to timestamp
from pyspark.sql.functions import to_timestamp


In [None]:
#drop SQL tables
spark.sql("DROP TABLE IF EXISTS cx_pilot.airside_users_table")
spark.sql("DROP TABLE IF EXISTS cx_pilot.airside_Customer_Addresses")
spark.sql("DROP TABLE IF EXISTS cx_pilot.airside_Customer_Phones")
spark.sql("DROP TABLE IF EXISTS cx_pilot.airside_Customer_Emails")
spark.sql("DROP TABLE IF EXISTS cx_pilot.airside_Customer_AccountDetails")
spark.sql("DROP TABLE IF EXISTS cx_pilot.airside_Customer_Citizenships")
spark.sql("DROP TABLE IF EXISTS cx_pilot.airside_users_main")
spark.sql("DROP TABLE IF EXISTS cx_pilot.airside_users_migration")

In [None]:
#------------------------------------------------------------------------------#
# Command to read CURATED files  + read content                                #
# Table with User information                                                  #
#------------------------------------------------------------------------------#

url='abfss://raw@datalakeprd.dfs.core.windows.net/Users/v1/*/*/*.blob'
df_users = spark.read.json(url)


In [None]:
users_main_link = ("abfss://curated@datalakeprd.dfs.core.windows.net/users/*.parquet")
users_main = spark.read.parquet(users_main_link)


In [None]:
users_mmigration_link = ("abfss://curated@datalakeprd.dfs.core.windows.net/user_migrations")
users_mmigration = spark.read.parquet(users_mmigration_link)
users_mmigration = users_mmigration.distinct()
users_mmigration.createOrReplaceTempView("users_migration")

In [None]:
users_main.createOrReplaceTempView("users_main")
users_main =spark.sql("""select distinct user_id,  
                                max(timestamp(from_unixtime(left(joindate,10),'yyyy-MM-dd HH:mm:ss'))) as joindate,
                                max(timestamp(from_unixtime(left(last_modified,10),'yyyy-MM-dd HH:mm:ss'))) as last_modified,
                                identity,
                                handle,
                                email
                                from users_main 
                                group by 
                                  user_id,
                                  identity,
                                  handle,
                                  email
                        """)

users_main.createOrReplaceTempView("users_main")

In [None]:
users_main = spark.sql("SELECT user_id,\
                                MIN(joindate) AS joindate,\
                                MAX(last_modified) AS last_modified,\
                                MAX(identity) AS identity,\
                                MAX(handle) AS handle,\
                                MAX(email) AS email\
                              FROM\
                              (SELECT\
                                COALESCE(to_user_id, user_id) AS user_id,joindate,last_modified,identity,handle,email\
                                FROM users_main as u\
                                LEFT JOIN users_migration as m\
                                  ON u.user_id = m.from_user_id) x\
                              GROUP BY user_id\
                              ")

In [None]:
#------------------------------------------------------------------------------#
# Table with Users information                                                 #
# with nested content                                                          #
#------------------------------------------------------------------------------#
users_info  = df_users.select(
  col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').cast('timestamp').alias('EventProcessedUtcTime'),
  col('Payload.B2CIdentities.id').getItem(0).alias('user_B2C_Id'),
  col('Payload.B2CIdentities.UserPrincipalName').getItem(0).alias('user_B2C_UserPrincipalName'),
  col('Payload.B2CIdentities.Type.Name').getItem(0).alias('user_B2C_Type_Name'),
  col('Payload.B2CIdentities.Type.value').getItem(0).alias('user_B2C_Type_Value'),
  func.to_date('Payload.Birthdate').alias('user_Birthdate'),
  col('Payload.Citizenship').alias('user_Citizenship'),
  col('Payload.CountryOfResidency').alias('user_CountryOfResidency'),
  col('Payload.Email').alias('user_Email'),
  col('Payload.GeminiId').alias('user_GeminiId'),
  col('Payload.GivenName').alias('user_GivenName'),
  col('Payload.Identification').alias('user_Identification'),
  col('Payload.IsVisitor').alias('user_IsVisitor'),
  col('Payload.LastName').alias('user_LastName'),
  col('Payload.MarketingCommunicationsConsent').alias('user_MarketingCommunicationsConsent'),
  col('Payload.MiddleName').alias('user_MiddleName'),
  col('Payload.Name').alias('user_Name'),
  col('Payload.PhoneNumber').alias('user_PhoneNumber'),
  col('Payload.SalesForceId').alias('user_SalesForceId'),
  col('Payload.Suffix').alias('user_Suffix'),
  col('Payload.Title').alias('user_Title'),
  col('Payload.UserType').alias('user_Type'),
  col('Payload.id').alias('user_id')
  )

users_info = users_info.distinct()


In [None]:
# cleaning wrong dates for birthdays
users_info= users_info.withColumn('user_Birthdate',regexp_replace('user_Birthdate','0001-01-01',''))
users_info = users_info.withColumn("user_Birthdate", to_date(col("user_Birthdate"),"yyy-MM-dd"))


In [None]:
#------------------------------------------------------------------------------#
# Table Users Addresses information                                            #
#                                                                              #
#------------------------------------------------------------------------------#

customer_Addresses = df_users.select(
  col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('Payload.id').alias('user_id'),
  explode('Payload.PersonalDetails.Addresses').alias('Addresses')      )


customer_AddressesDetails = customer_Addresses.select(
  col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('user_id').alias('user_id'),
  col('Addresses.City').alias('City'),
  col('Addresses.Country').alias('Country'),
  col('Addresses.PostalCode').alias('PostalCode'),
  col('Addresses.State').alias('State'),
  col('Addresses.Street').alias('Street'),
  col('Addresses.Unit').alias('Unit')
                        )

customer_AddressesDetails = customer_AddressesDetails.distinct() 


In [None]:
#------------------------------------------------------------------------------#
# Table customer accounts information                                          #
#                                                                              #
#------------------------------------------------------------------------------#

customer_accounts = df_users.select(
    col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('Payload.id').alias('user_id'),
  explode('Payload.CustomerAccounts').alias('CustomerAccounts')      )

customer_AccountDetails = customer_accounts.select(
    col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('user_id').alias('user_id'),
  col('CustomerAccounts.ContactInformation').alias('user_CA_ContactInformation'),
  col('CustomerAccounts.CustomerId').alias('user_CA_CustomerId'),
  col('CustomerAccounts.CustomerName').alias('user_CA_CustomerName'),
  col('CustomerAccounts.CustomerSalesforceId').alias('user_CA_CustomerSalesforceId'),
  col('CustomerAccounts.IsActive').alias('user_CA_IsActive'),
  col('CustomerAccounts.RoleName').alias('user_CA_RoleName'),
  col('CustomerAccounts.SalesforceRoleId').alias('user_CA_SalesforceRoleId')
                        )

customer_AccountDetails = customer_AccountDetails.distinct() 
customer_AccountDetails.count()

In [None]:
#------------------------------------------------------------------------------#
# Table customer Citizenships information                                      #
#                                                                              #
#------------------------------------------------------------------------------#

customer_Citizenships = df_users.select(
    col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('Payload.id').alias('user_id'),
  explode('Payload.PersonalDetails.Citizenships').alias('Citizenships')      )

customer_Citizenships = customer_Citizenships.distinct() 


In [None]:
#------------------------------------------------------------------------------#
# Table customer Emails information                                            #
#                                                                              #
#------------------------------------------------------------------------------#

customer_emails = df_users.select(
    col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('Payload.id').alias('user_id'),
  explode('Payload.PersonalDetails.Contact.Emails').alias('Emails')      )

customer_EmailsDetails = customer_emails.select(
    col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('user_id').alias('user_id'),
  col('Emails.Address').alias('user_CustomerEmail'),
  col('Emails.IsPrimary').alias('user_CustomerEmailIsPrimary')  )

customer_EmailsDetails = customer_EmailsDetails.distinct() 


In [None]:
#------------------------------------------------------------------------------#
# Table customer phones information                                            #
#                                                                              #
#------------------------------------------------------------------------------#

customer_phones = df_users.select(
  col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('Payload.id').alias('user_id'),
  explode('Payload.PersonalDetails.Contact.Phones').alias('Phones')      )

customer_PhonesDetails = customer_phones.select(
  col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('user_id').alias('user_id'),
  col('Phones.CountryCode').alias('user_PhoneCountryCode'),
  col('Phones.Number').alias('user_PhoneNumber')  )

customer_PhonesDetails = customer_PhonesDetails.distinct() 


In [None]:
#------------------------------------------------------------------------------#
# Table customer SalesForce Account information                                            #
#                                                                              #
#------------------------------------------------------------------------------#

customer_SF = df_users.select(
  col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('Payload.id').alias('user_id'),
  explode('Payload.SalesForceUserAccounts').alias('SalesForceUserAccounts')      )

customer_SFDetails = customer_SF.select(
  col('EventEnqueuedUtcTime').alias("EventEnqueuedUtcTime"),
  col('EventProcessedUtcTime').alias("EventProcessedUtcTime"),
  col('user_id').alias('user_id'),
  col('SalesForceUserAccounts.Email').alias('user_SalesForceEmail'),
  col('SalesForceUserAccounts.Id').alias('user_SalesForceID')  )

customer_SFDetails = customer_SFDetails.distinct() 
customer_SFDetails.count() 

In [None]:
#Get the latest itteration 
users_info.createOrReplaceTempView("users_info")
df_ui_curated =spark.sql("select  * from (\
                                          SELECT distinct *, \
                                          EventProcessedUtcTime = MAX(EventProcessedUtcTime) OVER (PARTITION BY user_id) as max_date\
                                          FROM users_info ) as t\
                          where t.max_date = True\
                          ")

customer_AddressesDetails.createOrReplaceTempView("customer_AddressesDetails")
customer_AddressesDetails_curated = spark.sql("select  * from (\
                                          SELECT distinct *, \
                                          EventProcessedUtcTime = MAX(EventProcessedUtcTime) OVER (PARTITION BY user_id) as max_date\
                                          FROM customer_AddressesDetails ) as t\
                          where t.max_date = True  ")

customer_AccountDetails.createOrReplaceTempView("customer_AccountDetails")
customer_AccountDetails_curated = spark.sql("select  * from (\
                                          SELECT distinct *, \
                                          EventProcessedUtcTime = MAX(EventProcessedUtcTime) OVER (PARTITION BY user_id) as max_date\
                                          FROM customer_AccountDetails ) as t\
                          where t.max_date = True  ")

customer_Citizenships.createOrReplaceTempView("customer_Citizenships")
customer_Citizenships_curated = spark.sql("select  * from (\
                                          SELECT distinct *, \
                                          EventProcessedUtcTime = MAX(EventProcessedUtcTime) OVER (PARTITION BY user_id) as max_date\
                                          FROM customer_Citizenships ) as t\
                          where t.max_date = True  ")

customer_EmailsDetails.createOrReplaceTempView("customer_EmailsDetails")
customer_EmailsDetails_curated = spark.sql("select  * from (\
                                          SELECT distinct *, \
                                          EventProcessedUtcTime = MAX(EventProcessedUtcTime) OVER (PARTITION BY user_id) as max_date\
                                          FROM customer_EmailsDetails ) as t\
                          where t.max_date = True  ")

customer_PhonesDetails.createOrReplaceTempView("customer_PhonesDetails")
customer_PhonesDetails_curated = spark.sql("select  * from (\
                                          SELECT distinct *, \
                                          EventProcessedUtcTime = MAX(EventProcessedUtcTime) OVER (PARTITION BY user_id) as max_date\
                                          FROM customer_PhonesDetails ) as t\
                          where t.max_date = True  ")

In [None]:
#------------------------------------------------------------------------------#
#  DROP and SAVE a tables                                                      #
#------------------------------------------------------------------------------#


customer_AddressesDetails_curated.write.format("delta").saveAsTable("cx_pilot.airside_Customer_Addresses")
customer_AccountDetails_curated.write.format("delta").saveAsTable("cx_pilot.airside_Customer_AccountDetails")
customer_Citizenships_curated.write.format("delta").saveAsTable("cx_pilot.airside_Customer_Citizenships")
customer_EmailsDetails_curated.write.format("delta").saveAsTable("cx_pilot.airside_Customer_Emails")
customer_PhonesDetails_curated.write.format("delta").saveAsTable("cx_pilot.airside_Customer_Phones")


In [None]:
#add the has filled coulmns to support it in the the Airside KPI main dashboard
df_ui_curated = spark.sql("""
select
  u.*,
  if (fl_user_id is null,False,True) as HasFlightLicensesDL,
  if (ac_user_id is null,False,True) as HasAccountDetailDL,
  if (ca_user_id is null,False,True) as HasCustomerAddressesDL,
  if (ed_user_id is null,False,True) as HasEducationDetailsDL,
  if (fe_user_id is null,False,True) as HasFlightExperiencesDetailsDL,
  if (we_user_id is null,False,True) as HasWorkExperiencesDetailsDL
  

from
  users_table as u 
  left join (select fl.userid, count(userid)  as fl_user_id from cx_pilot.airside_users_flightlicenses    as FL group by userid) as fl  on fl.userid = u.user_id
  left join (select ac.user_id,count(user_id) as ac_user_id from cx_pilot.airside_customer_accountdetails as AC group by user_id) as ac on u.user_id = ac.user_id
  left join (select ca.user_id,count(user_id) as ca_user_id from cx_pilot.airside_customer_addresses      as ca group by user_id) as ca on u.user_id = ca.user_id
  left join (select ed.UE_UserId,count(UE_UserId) as ed_user_id from cx_pilot.airside_users_educationdetails  as ed group by UE_UserId) as ed on u.user_id = ed.UE_UserId
  left join (select ed.FE_UserId,count(FE_UserId) as fe_user_id from cx_pilot.airside_users_flightsexperiencesdetails  as ed group by FE_UserId) as ed on u.user_id = ed.FE_UserId
  left join (select ed.WE_UserId,count(WE_UserId) as we_user_id from cx_pilot.airside_users_workexperiencesdetails  as ed group by WE_UserId) as ed on u.user_id = ed.WE_UserId
""")

In [None]:
# this code was moved to a lower cell because we added the has filled field in the above cell
#------------------------------------------------------------------------------#
#  DROP and SAVE a tables                                                      #
#------------------------------------------------------------------------------#

df_ui_curated.write.format("delta").saveAsTable("cx_pilot.airside_users_table")

In [None]:
#user main (user id and identity first login)


users_main.write.format("delta").saveAsTable("cx_pilot.airside_users_main")

In [None]:
users_mmigration.write.format("delta").saveAsTable("cx_pilot.airside_users_migration")

In [None]:
%sql
REFRESH TABLE cx_pilot.airside_users_table;
REFRESH TABLE cx_pilot.airside_Customer_Addresses;
REFRESH TABLE cx_pilot.airside_Customer_AccountDetails;
REFRESH TABLE cx_pilot.airside_Customer_Citizenships;
REFRESH TABLE cx_pilot.airside_Customer_Emails;
REFRESH TABLE cx_pilot.airside_Customer_Phones;
REFRESH TABLE cx_pilot.airside_users_main;
REFRESH TABLE cx_pilot.airside_users_migration;

In [None]:
%sql
optimize cx_pilot.airside_users_table;
optimize cx_pilot.airside_Customer_Addresses;
optimize cx_pilot.airside_Customer_AccountDetails;
optimize cx_pilot.airside_Customer_Citizenships;
optimize cx_pilot.airside_Customer_Emails;
optimize cx_pilot.airside_Customer_Phones;
optimize cx_pilot.airside_users_main;
optimize cx_pilot.airside_users_migration;
