In [1]:
%%pyspark
#load CIExport Customer Profiles

customers = spark.read.load('abfss://filesystestmtcvj@datalaketestmtcvj.dfs.core.windows.net/sourcedata/CustomerProfileExport/Customer/2022/08/03/1346/*.csv', format='csv'
## If header exists uncomment line below
, header=True,
inferSchema = True
)
display(customers.limit(10))

StatementMeta(, , , SessionStarting, )

SynapseWidget(Synapse.DataFrame, 6485c72a-9613-4599-9d4d-08e678e4aa33)

In [2]:
customers.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- CustomerId: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- PostCode: integer (nullable = true)
 |-- StreetAddress: string (nullable = true)
 |-- DateOfBirth: timestamp (nullable = true)
 |-- Created_date: timestamp (nullable = true)
 |-- Source: string (nullable = true)
 |-- pid: integer (nullable = true)
 |-- surveytype: string (nullable = true)
 |-- surverydate: timestamp (nullable = true)
 |-- question: string (nullable = true)
 |-- answer: double (nullable = true)
 |-- SurveyEmail: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- AzureDataLake_residents_source1_cid: integer (nullable = true)
 |-- AzureDataLake_residents_source1_cid_Alternate: integer (nullable = true)


need CID to identify the customer in the other files. So lets merge the two ID columns.

In [3]:
#we're working with dates and it's easier using sql function libraries in spark

from pyspark.sql.functions import to_date,col,when, datediff

StatementMeta(, , , Waiting, )

In [4]:
customerCIDs = customers.withColumn("CID",when(customers.AzureDataLake_residents_source1_cid.isNull(),customers.AzureDataLake_residents_source2_cid).otherwise(customers.AzureDataLake_residents_source1_cid))
display(customerCIDs.limit(10)) 

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 62ccad10-f5fc-4d3b-ac78-b7833db70be9)

In [5]:
#selecting a few columns
mycustomers = customerCIDs.select("CustomerId","cid","Name","Gender","Email","City","State","DateOfBirth")
display(mycustomers.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 3e611858-6d26-4e17-b539-4f0c449f5e0f)

Let's combine the survey data into this

In [6]:
#load survey data
surveys = spark.read.load('abfss://filesystestmtcvj@datalaketestmtcvj.dfs.core.windows.net/sourcedata/surveys.csv', format='csv'
, header=True
, inferSchema=True
)
display(surveys.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 323030c0-76a3-4e6c-9fe4-ca384560109e)

lets use the email field to combine the two datasets

In [7]:
surveys.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- Email: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- pid: integer (nullable = true)
 |-- surveytype: string (nullable = true)
 |-- surverydate: timestamp (nullable = true)
 |-- question: string (nullable = true)
 |-- answer: integer (nullable = true)
 |-- sid: integer (nullable = true)



In [8]:
#renaming one of the EMail columns to prevent confusion later
surveys = surveys.withColumnRenamed("Email","SurveyEmail")

StatementMeta(, , , Waiting, )

In [9]:
surveys.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- SurveyEmail: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- pid: integer (nullable = true)
 |-- surveytype: string (nullable = true)
 |-- surverydate: timestamp (nullable = true)
 |-- question: string (nullable = true)
 |-- answer: integer (nullable = true)
 |-- sid: integer (nullable = true)



In [10]:
customerAndSurvey = mycustomers.join(surveys,mycustomers.Email == surveys.SurveyEmail,"inner")
display(customerAndSurvey.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 4c5c2559-4fdd-44ec-af44-10f0f588d8ff)

Now that we have the Customer IDs and surveys for each property, lets see what the average satisfaction results were from the survey. 

In [11]:
#group by propery and customer id and whether the survey was for moving in or for renewing a lease.
surveyavg = customerAndSurvey.groupBy("cid","pid","surveytype").agg({'answer':'avg'}).fillna(0)
display(surveyavg.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 21584d63-bff6-4d65-9c86-5ea461f0ef23)

In [12]:
#lets pivot this to get more columns to understand underlying drivers
surveyAvgPivot = customerAndSurvey.groupBy("cid","pid","surveytype").pivot("question").agg({'answer':'avg'})

StatementMeta(, , , Waiting, )

Select the chart view and setup the chart so that
***Key***: pid, cid
***Values***: (Select all the Columns, except surveytype)
***Series Group***: surveytype
***Aggregation***: avg
***Stacked***: (checked)


In [13]:
display(surveyAvgPivot.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 51ed11b3-f4c4-47f4-acdd-3b32edaf6c05)

lets get it in a format that we can join later to other data frames


In [14]:
customerAndSurvey.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- CustomerId: string (nullable = true)
 |-- cid: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- DateOfBirth: timestamp (nullable = true)
 |-- SurveyEmail: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- pid: integer (nullable = true)
 |-- surveytype: string (nullable = true)
 |-- surverydate: timestamp (nullable = true)
 |-- question: string (nullable = true)
 |-- answer: integer (nullable = true)
 |-- sid: integer (nullable = true)



In [15]:
#selecting the columns we want and pivoting it
customerAndSurveyAvgPivot = customerAndSurvey.groupBy( "CustomerId","Name", "Gender","Email","City","State","DateOfBirth","cid","pid","surveytype").pivot("question").agg({'answer':'avg'})
display(customerAndSurveyAvgPivot.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, cb4ec0eb-7d9b-46f5-a954-dea743c0e1b9)

Lets tackle the Workorders file in a similar way. 

In [16]:
#load the work orders data
workorders = spark.read.load('abfss://filesystestmtcvj@datalaketestmtcvj.dfs.core.windows.net/sourcedata/workorders.csv', format='csv'
, header=True
, inferSchema=True
)
display(workorders.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 4b668d26-f30d-4b88-b320-d34ccbab3100)

In [17]:
workorders.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- cid: integer (nullable = true)
 |-- Source: string (nullable = true)
 |-- pid: integer (nullable = true)
 |-- uid: integer (nullable = true)
 |-- workorder_type: string (nullable = true)
 |-- ServiceRequestDate: timestamp (nullable = true)
 |-- ServiceCompleteDate: timestamp (nullable = true)
 |-- wid: integer (nullable = true)



In [18]:
#lets add a column that also takes into consideration effort.
workorders = workorders.withColumn("Effort",datediff(workorders.ServiceCompleteDate,workorders.ServiceRequestDate))
display(workorders.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, d9bb1a5c-54ed-4913-a25d-de347ae7498f)

In [19]:
#aggregate the number of work orders
workOrdersTotal = workorders.groupBy("cid","pid","uid").agg({'workorder_type':'count','Effort':'sum'})
display(workOrdersTotal.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, d221a248-ab7c-4c16-9f8a-c2399f13fb60)

In [20]:
#pivot the workorders by type and aggregate the effort for each property and unit
workOrdersPivot = workorders.groupBy("cid","pid","uid").pivot("workorder_type").agg({'Effort':'sum'})
display(workOrdersPivot.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 4795a7a1-2a87-442b-887f-c331df15c11c)

In [21]:
#rename to avoid join confusions
workOrdersPivot = workOrdersPivot.withColumnRenamed("cid","woCid") \
.withColumnRenamed("pid","woPid") \
.withColumnRenamed("uid","woUid")

StatementMeta(, , , Waiting, )

In [22]:
#print the schemas for both dataframes
workOrdersPivot.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- woCid: integer (nullable = true)
 |-- woPid: integer (nullable = true)
 |-- woUid: integer (nullable = true)
 |-- Air Conditioning: long (nullable = true)
 |-- Air Ducts or Vents: long (nullable = true)
 |-- Cabinet Doors: long (nullable = true)
 |-- Cabinets: long (nullable = true)
 |-- Carpet: long (nullable = true)
 |-- Ceiling: long (nullable = true)
 |-- Ceiling Fan: long (nullable = true)
 |-- Clean Unit: long (nullable = true)
 |-- Cleaning: long (nullable = true)
 |-- Dishwasher: long (nullable = true)
 |-- Door Repair or Replace: long (nullable = true)
 |-- Doorbell: long (nullable = true)
 |-- Drain: long (nullable = true)
 |-- Dryer: long (nullable = true)
 |-- Faucet: long (nullable = true)
 |-- Filter Replacement: long (nullable = true)
 |-- Garage Remote: long (nullable = true)
 |-- Garbage Disposal: long (nullable = true)
 |-- Heating: long (nullable = true)
 |-- Light Bulbs: long (nullable = true)
 |-- Light Fixture: long (nullable = true)
 |-- Lock or Exterio

In [23]:
customerAndSurveyAvgPivot.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- CustomerId: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- DateOfBirth: timestamp (nullable = true)
 |-- cid: integer (nullable = true)
 |-- pid: integer (nullable = true)
 |-- surveytype: string (nullable = true)
 |-- Condition of Property: double (nullable = true)
 |-- Ease of process: double (nullable = true)
 |-- Maintenance Overall Satisfaction: double (nullable = true)
 |-- Management Communication: double (nullable = true)
 |-- Management Courtesy: double (nullable = true)
 |-- Management Problem Resolution: double (nullable = true)
 |-- Management Professionalism: double (nullable = true)
 |-- Overall Satisfaction: double (nullable = true)
 |-- Staff Communication: double (nullable = true)
 |-- Staff Courtesy: double (nullable = true)
 |-- Staff Professionalism: double (nullable = true)



In [24]:
#combine this workorder info to the customer plus survey table. We'll use both cid and pid together for this
customerAndSurveyAndWOs = customerAndSurveyAvgPivot.join(workOrdersPivot,( \
(customerAndSurveyAvgPivot.cid == workOrdersPivot.woCid) & \
 (customerAndSurveyAvgPivot.pid == workOrdersPivot.woPid)
 )
 )
display(customerAndSurveyAndWOs.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 43588d6a-ae38-4b76-87a0-b6ede1ffd587)

In [25]:
customerAndSurveyAndWOs.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- CustomerId: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- DateOfBirth: timestamp (nullable = true)
 |-- cid: integer (nullable = true)
 |-- pid: integer (nullable = true)
 |-- surveytype: string (nullable = true)
 |-- Condition of Property: double (nullable = true)
 |-- Ease of process: double (nullable = true)
 |-- Maintenance Overall Satisfaction: double (nullable = true)
 |-- Management Communication: double (nullable = true)
 |-- Management Courtesy: double (nullable = true)
 |-- Management Problem Resolution: double (nullable = true)
 |-- Management Professionalism: double (nullable = true)
 |-- Overall Satisfaction: double (nullable = true)
 |-- Staff Communication: double (nullable = true)
 |-- Staff Courtesy: double (nullable = true)
 |-- Staff Professionalism: double (nullable = true)
 |-- woCid: integ

Let's now tackle the last dataset, the Lease information. Lets use the available dates to create some additional features:
- People who moved out
- People who are still living there. these have a null moveout date
- People who renewed the Lease
- Intial Lease Term
- Average Length of Renewals,

In [26]:
#load lease data
leases = spark.read.load('abfss://filesystestmtcvj@datalaketestmtcvj.dfs.core.windows.net/sourcedata/leases.csv', format='csv'
, header=True
, inferSchema=True
)
display(surveys.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 6e7a8733-f26f-4858-8b05-8bbee6c34841)

In [27]:
leases.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- cid: integer (nullable = true)
 |-- Source: string (nullable = true)
 |-- pid: integer (nullable = true)
 |-- utid: integer (nullable = true)
 |-- uid: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- ApplicationDate: timestamp (nullable = true)
 |-- StartDate: timestamp (nullable = true)
 |-- EndDate: timestamp (nullable = true)
 |-- LeaseTerm: integer (nullable = true)
 |-- SignedDate: timestamp (nullable = true)
 |-- MoveInDate: timestamp (nullable = true)
 |-- MoveOutDate: timestamp (nullable = true)
 |-- lid: integer (nullable = true)



In [28]:
#renaming cid,pid,utid,uid columns to prevent confusion later
leases = leases.withColumnRenamed("cid","leaseCid")
leases = leases.withColumnRenamed("pid","leasePid")
leases = leases.withColumnRenamed("utid","leaseUtid")
leases = leases.withColumnRenamed("uid","leaseUid")
leases.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- leaseCid: integer (nullable = true)
 |-- Source: string (nullable = true)
 |-- leasePid: integer (nullable = true)
 |-- leaseUtid: integer (nullable = true)
 |-- leaseUid: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- ApplicationDate: timestamp (nullable = true)
 |-- StartDate: timestamp (nullable = true)
 |-- EndDate: timestamp (nullable = true)
 |-- LeaseTerm: integer (nullable = true)
 |-- SignedDate: timestamp (nullable = true)
 |-- MoveInDate: timestamp (nullable = true)
 |-- MoveOutDate: timestamp (nullable = true)
 |-- lid: integer (nullable = true)



Since the dates have been infered as strings, lets correctly cast these to date types, which will allow us to use date based operations


In [29]:
leases=leases.withColumn("ApplicationDate",to_date(col("ApplicationDate"),"yyyy-MM-dd")) \
.withColumn("StartDate", to_date(col("StartDate"),"yyyy-MM-dd")) \
.withColumn("EndDate",to_date(col("EndDate"),"yyyy-MM-dd")) \
.withColumn("SignedDate",to_date(col("SignedDate"),"yyyy-MM-dd")) \
.withColumn("MoveInDate",to_date(col("MoveInDate"),"yyyy-MM-dd")) \
.withColumn("MoveOutDate",to_date(col("MoveOutDate"),"yyyy-MM-dd"))


StatementMeta(, , , Waiting, )

In [30]:
#check the schema
leases.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- leaseCid: integer (nullable = true)
 |-- Source: string (nullable = true)
 |-- leasePid: integer (nullable = true)
 |-- leaseUtid: integer (nullable = true)
 |-- leaseUid: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- ApplicationDate: date (nullable = true)
 |-- StartDate: date (nullable = true)
 |-- EndDate: date (nullable = true)
 |-- LeaseTerm: integer (nullable = true)
 |-- SignedDate: date (nullable = true)
 |-- MoveInDate: date (nullable = true)
 |-- MoveOutDate: date (nullable = true)
 |-- lid: integer (nullable = true)



In [31]:
#add a column to indicate movedout or not
leases=leases.withColumn("IsMovedOut",when(leases.MoveOutDate.isNull(),"N").otherwise("Y"))
display(leases.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 8aa873e2-e98c-442c-bd4a-caa81a78061c)

We'll pivot this so that we can work with both renewal and initial lease agreements

In [32]:
allleases = leases.groupby("leaseCid","leasePid","leaseUid").pivot("Type").agg({'LeaseTerm':'sum','ApplicationDate':'count'})
display(allleases.limit(10))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 9e19b95a-abd5-4681-8822-ef7f90280bc1)

In [33]:
allleases.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- leaseCid: integer (nullable = true)
 |-- leasePid: integer (nullable = true)
 |-- leaseUid: integer (nullable = true)
 |-- Application_count(ApplicationDate): long (nullable = true)
 |-- Application_sum(CAST(LeaseTerm AS BIGINT)): long (nullable = true)
 |-- Renewal_count(ApplicationDate): long (nullable = true)
 |-- Renewal_sum(CAST(LeaseTerm AS BIGINT)): long (nullable = true)



In [34]:
#rename annoying columns
allleases = allleases.withColumnRenamed("Application_count(ApplicationDate)","InitialApplicationCount") \
.withColumnRenamed("Application_sum(CAST(LeaseTerm AS BIGINT))","InitialLeaseTerm") \
.withColumnRenamed("Renewal_count(ApplicationDate)","RenewalApplicationCount") \
.withColumnRenamed("Renewal_sum(CAST(LeaseTerm AS BIGINT))","TotalRenewalTerm")

allleases.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- leaseCid: integer (nullable = true)
 |-- leasePid: integer (nullable = true)
 |-- leaseUid: integer (nullable = true)
 |-- InitialApplicationCount: long (nullable = true)
 |-- InitialLeaseTerm: long (nullable = true)
 |-- RenewalApplicationCount: long (nullable = true)
 |-- TotalRenewalTerm: long (nullable = true)



Calculate the average length of renewals



In [35]:
allleases = allleases.withColumn("averageRenewalLength",(allleases.TotalRenewalTerm/allleases.RenewalApplicationCount))
allleases.printSchema()
display(allleases.limit(2))

StatementMeta(, , , Waiting, )

root
 |-- leaseCid: integer (nullable = true)
 |-- leasePid: integer (nullable = true)
 |-- leaseUid: integer (nullable = true)
 |-- InitialApplicationCount: long (nullable = true)
 |-- InitialLeaseTerm: long (nullable = true)
 |-- RenewalApplicationCount: long (nullable = true)
 |-- TotalRenewalTerm: long (nullable = true)
 |-- averageRenewalLength: double (nullable = true)



SynapseWidget(Synapse.DataFrame, 49500725-c223-49a5-8754-418cab934895)

add a column to say if a renewal occured

In [36]:
allleases = allleases.withColumn("Renewed", when(allleases.RenewalApplicationCount > 1,"Y").otherwise("N"))
display(allleases.limit(2))

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, a381d2bf-3ad2-4c8b-b30c-6d32c100736e)

Lets combine this into the main table we want to use

In [37]:
customerAndSurveyAndWOs.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- CustomerId: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- DateOfBirth: timestamp (nullable = true)
 |-- cid: integer (nullable = true)
 |-- pid: integer (nullable = true)
 |-- surveytype: string (nullable = true)
 |-- Condition of Property: double (nullable = true)
 |-- Ease of process: double (nullable = true)
 |-- Maintenance Overall Satisfaction: double (nullable = true)
 |-- Management Communication: double (nullable = true)
 |-- Management Courtesy: double (nullable = true)
 |-- Management Problem Resolution: double (nullable = true)
 |-- Management Professionalism: double (nullable = true)
 |-- Overall Satisfaction: double (nullable = true)
 |-- Staff Communication: double (nullable = true)
 |-- Staff Courtesy: double (nullable = true)
 |-- Staff Professionalism: double (nullable = true)
 |-- woCid: integ

In [38]:
customerAndSurveyAndWOsAndLeases = customerAndSurveyAndWOs.join(allleases, ( \
(customerAndSurveyAndWOs.cid == allleases.leaseCid) & \
(customerAndSurveyAndWOs.pid == allleases.leasePid) & \
(customerAndSurveyAndWOs.woUid == allleases.leaseUid) 
))
display(customerAndSurveyAndWOsAndLeases.limit(10))
customerAndSurveyAndWOsAndLeases.printSchema()

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 0a4ae07e-aed1-488f-905d-31d39e55d616)

root
 |-- CustomerId: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- DateOfBirth: timestamp (nullable = true)
 |-- cid: integer (nullable = true)
 |-- pid: integer (nullable = true)
 |-- surveytype: string (nullable = true)
 |-- Condition of Property: double (nullable = true)
 |-- Ease of process: double (nullable = true)
 |-- Maintenance Overall Satisfaction: double (nullable = true)
 |-- Management Communication: double (nullable = true)
 |-- Management Courtesy: double (nullable = true)
 |-- Management Problem Resolution: double (nullable = true)
 |-- Management Professionalism: double (nullable = true)
 |-- Overall Satisfaction: double (nullable = true)
 |-- Staff Communication: double (nullable = true)
 |-- Staff Courtesy: double (nullable = true)
 |-- Staff Professionalism: double (nullable = true)
 |-- woCid: integ

We need to remove spaces in the column names, otherwise we wont be able to save it as a table. to do this we use a regular expression to look for spaces and remove them

In [39]:
#import the Regular Expressions library
import re

StatementMeta(, , , Waiting, )

In [40]:
for each in customerAndSurveyAndWOsAndLeases.schema.names:
    customerAndSurveyAndWOsAndLeases = customerAndSurveyAndWOsAndLeases.withColumnRenamed(each,  re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '')))

StatementMeta(, , , Waiting, )

In [41]:
customerAndSurveyAndWOsAndLeases.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- CustomerId: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- DateOfBirth: timestamp (nullable = true)
 |-- cid: integer (nullable = true)
 |-- pid: integer (nullable = true)
 |-- surveytype: string (nullable = true)
 |-- ConditionofProperty: double (nullable = true)
 |-- Easeofprocess: double (nullable = true)
 |-- MaintenanceOverallSatisfaction: double (nullable = true)
 |-- ManagementCommunication: double (nullable = true)
 |-- ManagementCourtesy: double (nullable = true)
 |-- ManagementProblemResolution: double (nullable = true)
 |-- ManagementProfessionalism: double (nullable = true)
 |-- OverallSatisfaction: double (nullable = true)
 |-- StaffCommunication: double (nullable = true)
 |-- StaffCourtesy: double (nullable = true)
 |-- StaffProfessionalism: double (nullable = true)
 |-- woCid: integer (nullable = 

Now save this as a table for reference

In [42]:
customerAndSurveyAndWOsAndLeases.write.mode("overwrite").saveAsTable("default.combinedcustomerdata")

StatementMeta(, , , Waiting, )

lets now create tables to use with Automated Machine Learning. We want to split this data into two sets. One for training, one for testing.

In [43]:
combinedCustomerTraining, combinedCustomerTest = customerAndSurveyAndWOsAndLeases.randomSplit([0.8,0.2], 223)


StatementMeta(, , , Waiting, )

In [44]:
#save these as tables
combinedCustomerTraining.write.mode("overwrite").saveAsTable("default.CombinedCustomerTrainingData")

StatementMeta(, , , Waiting, )

In [46]:
#also create a temporary table to work with scala
#combinedCustomerTest_df = combinedCustomerTest.to_spark_dataframe()

print('Register the DataFrame as a SQL temporary view: source')
combinedCustomerTest.createOrReplaceTempView('testingdata')


StatementMeta(sparktestmtcvj2, 11, 48, Finished, Available)

Register the DataFrame as a SQL temporary view: source


In [None]:
combinedCustomerTest.write.mode("overwrite").saveAsTable("default.CombinedCustomerTestingData")

StatementMeta(, , , Waiting, )

Lets write this to a SQL Dedicated Pool table so that we can score against it later


In [50]:
#read the data from the tables we just created in spark into scala variables. 
mytable = spark.sql("SELECT * FROM testingdata")
display(mytable.limit(10))

StatementMeta(sparktestmtcvj2, 11, 52, Finished, Available)

SynapseWidget(Synapse.DataFrame, 31ce7cbb-d346-4563-87d9-ab19a02b5652)

In [54]:
mytable.write.saveAsTable("mytable")

StatementMeta(sparktestmtcvj2, 11, 56, Finished, Available)

In [57]:
combinedCustomerTest.write.mode("overwrite").option("header", "true").csv("abfss://filesystestmtcvj@datalaketestmtcvj.dfs.core.windows.net/sourcedata/mytestdata.csv")

StatementMeta(sparktestmtcvj2, 11, 59, Finished, Available)

In [None]:
%%scala
// Write the dataframe into your sql pool
import org.apache.spark.sql.SqlAnalyticsConnector._
import com.microsoft.spark.sqlanalytics.utils.Constants

val sql_pool_name = "Your sql pool name" //fill in your sql pool name

holiday_nodate.write
    .sqlanalytics("synapsetest".dbo.PublicHoliday", Constants.INTERNAL)

StatementMeta(, , , Waiting, )