## *DISCLAIMER*
<p style="font-size:16px; color:#117d30;">
 By accessing this code, you acknowledge the code is made available for presentation and demonstration purposes only and that the code: (1) is not subject to SOC 1 and SOC 2 compliance audits; (2) is not designed or intended to be a substitute for the professional advice, diagnosis, treatment, or judgment of a certified financial services professional; (3) is not designed, intended or made available as a medical device; and (4) is not designed or intended to be a substitute for professional medical advice, diagnosis, treatment or judgement. Do not use this code to replace, substitute, or provide professional financial advice or judgment, or to replace, substitute or provide medical advice, diagnosis, treatment or judgement. You are solely responsible for ensuring the regulatory, legal, and/or contractual compliance of any use of the code, including obtaining any authorizations or consents, and any solution you choose to build that incorporates this code in whole or in part.
</p>

## Important – Do not use in production, for demonstration purposes only – please review the legal notices before continuing
 License agreement: https://github.com/microsoft/Azure-Analytics-and-AI-Engagement/blob/main/HealthCare/License.md 


## Legal Notices
This presentation, demonstration, and demonstration model are for informational purposes only. Microsoft makes no warranties, express or implied, in this presentation demonstration, and demonstration model. Nothing in this presentation, demonstration, or demonstration model modifies any of the terms and conditions of Microsoft’s written and signed agreements. This is not an offer and applicable terms and the information provided is subject to revision and may be changed at any time by Microsoft.

This presentation, demonstration, and/or demonstration model do not give you or your organization any license to any patents, trademarks, copyrights, or other intellectual property covering the subject matter in this presentation, demonstration, and demonstration model.

The information contained in this presentation, demonstration and demonstration model represent the current view of Microsoft on the issues discussed as of the date of presentation and/or demonstration, and the duration of your access to the demonstration model. Because Microsoft must respond to changing market conditions, it should not be interpreted to be a commitment on the part of Microsoft, and Microsoft cannot guarantee the accuracy of any information presented after the date of presentation and/or demonstration and for the duration of your access to the demonstration model.

No Microsoft technology, nor any of its component technologies, including the demonstration model, is intended or made available: (1) as a medical device; (2) for the diagnosis of disease or other conditions, or in the cure, mitigation, treatment or prevention of a disease or other conditions; or (3) as a substitute for the professional clinical advice, opinion, or judgment of a treating healthcare professional. Partners or customers are responsible for ensuring the regulatory compliance of any solution they build using Microsoft technologies.

© 2020 Microsoft Corporation. All rights reserved


# Data Preparation for Predicting Risk Reduction

## Overview
*Dataset:* FHIR data, stored in an SQL Database, for 5 million patients across 51 states of the US. Appropriate features will be extracted to calculate the risk for getting Coronary Heart Disease, and in turn, how that risk changes with changes to lifestyle


### Data Ingestion using SparkSQL


In [1]:
%%spark
val patient_df = spark.read.sqlanalytics("HealthCareDW.dbo.SynPatient") 
patient_df.createOrReplaceTempView( "patient_table" )

// val encounters_df = spark.read.sqlanalytics("HealthCareDW.dbo.SynEncounter") 
// encounters_df.createOrReplaceTempView( "encounters_table" )

val conditions_df = spark.read.sqlanalytics("HealthCareDW.dbo.SynCondition") 
conditions_df.createOrReplaceTempView( "conditions_table" )

val observations_df = spark.read.sqlanalytics("HealthCareDW.dbo.SynObservation") 
observations_df.createOrReplaceTempView( "observations_table" )

patient_df: org.apache.spark.sql.DataFrame = [Id: string, BIRTHDATE: string ... 23 more fields]
conditions_df: org.apache.spark.sql.DataFrame = [START: string, STOP: string ... 4 more fields]
observations_df: org.apache.spark.sql.DataFrame = [DATE: string, PATIENT: string ... 6 more fields]

In [2]:
%%pyspark

patient_df = spark.sql("SELECT * FROM patient_table")
# encounters_df = spark.sql("SELECT DISTINCT Id, START FROM encounters_table")
conditions_df = spark.sql("SELECT * FROM conditions_table")
observations_df = spark.sql("SELECT * FROM observations_table")

In [3]:
patient_df.show()

+--------------------+----------+---------+-----------+-------+--------+------+--------------+-------------+------+------+-------+-----+-----------+------+--------------------+--------------------+----------------+------------+-------------------+----+------------------+------------------+-------------------+-------------------+
|                  Id| BIRTHDATE|DEATHDATE|        SSN|DRIVERS|PASSPORT|PREFIX|         FIRST|         LAST|SUFFIX|MAIDEN|MARITAL| RACE|  ETHNICITY|GENDER|          BIRTHPLACE|             ADDRESS|            CITY|       STATE|             COUNTY| ZIP|               LAT|               LON|HEALTHCARE_EXPENSES|HEALTHCARE_COVERAGE|
+--------------------+----------+---------+-----------+-------+--------+------+--------------+-------------+------+------+-------+-----+-----------+------+--------------------+--------------------+----------------+------------+-------------------+----+------------------+------------------+-------------------+-------------------+
|7b57d7

In [4]:
conditions_df.show()

+----------+----+--------------------+--------------------+---------+--------------------+
|     START|STOP|             PATIENT|           ENCOUNTER|     CODE|         DESCRIPTION|
+----------+----+--------------------+--------------------+---------+--------------------+
|1995-08-08|null|ff1fa893-f4b9-f96...|3e07db6c-f97a-aea...|162864005|Body mass index 3...|
|2010-01-01|null|9a0fd19c-35d5-444...|37039f78-5dbc-4f7...|162864005|Body mass index 3...|
|2015-02-21|null|3d3f9760-a9c5-da0...|dd381a5f-88b5-8e7...|162864005|Body mass index 3...|
|2004-04-01|null|38595fb9-c92f-70e...|377c3146-6afe-293...|162864005|Body mass index 3...|
|1972-03-21|null|bad31fa5-0a79-4bd...|8f236826-34cf-ab9...|162864005|Body mass index 3...|
|2008-10-05|null|0d587adf-ae8a-ed4...|d1afdb62-f657-4a1...|162864005|Body mass index 3...|
|1989-09-12|null|018dd4cc-6171-a2d...|ce1ff9db-a195-842...|162864005|Body mass index 3...|
|1971-01-06|null|410a1f86-bc38-f20...|ed0d48d1-64a6-817...|162864005|Body mass index 3...|

In [5]:
# encounters_df.show()

In [6]:
observations_df.show()

+--------------------+--------------------+---------+----+-----------+-----+-----+-------+
|                DATE|             PATIENT|ENCOUNTER|CODE|DESCRIPTION|VALUE|UNITS|   TYPE|
+--------------------+--------------------+---------+----+-----------+-----+-----+-------+
|2011-10-24T07:10:05Z|64b7fad6-38db-453...|     null|QALY|       QALY|  0.0|    a|numeric|
|2012-10-24T07:10:05Z|64b7fad6-38db-453...|     null|QALY|       QALY|  0.0|    a|numeric|
|2018-10-03T00:27:27Z|e47be2d4-af1f-583...|     null|QALY|       QALY|  0.0|    a|numeric|
|2019-10-03T00:27:27Z|e47be2d4-af1f-583...|     null|QALY|       QALY|  0.0|    a|numeric|
|2019-10-18T07:12:46Z|0f120bfd-db99-e0a...|     null|QALY|       QALY|  0.0|    a|numeric|
|2020-10-18T07:12:46Z|0f120bfd-db99-e0a...|     null|QALY|       QALY|  0.0|    a|numeric|
|2016-09-28T04:39:59Z|f8179c7c-148a-a68...|     null|QALY|       QALY|  0.0|    a|numeric|
|2017-09-28T04:39:59Z|f8179c7c-148a-a68...|     null|QALY|       QALY|  0.0|    a|numeric|

### Filter and Select relevant data


In [8]:
# Get patients that have Coronary Heart Disease
conditions_df = conditions_df.filter(conditions_df.DESCRIPTION=='Coronary Heart Disease')
conditions_df.show()

+----------+----+--------------------+--------------------+--------+--------------------+
|     START|STOP|             PATIENT|           ENCOUNTER|    CODE|         DESCRIPTION|
+----------+----+--------------------+--------------------+--------+--------------------+
|2004-09-21|null|bad31fa5-0a79-4bd...|78fe9c8e-b8e9-8c8...|53741008|Coronary Heart Di...|
|1989-02-20|null|b8ca6e01-ce1c-ae7...|6da1fb03-60c4-ce1...|53741008|Coronary Heart Di...|
|1930-10-06|null|587a9527-f3ac-b8a...|e62f0243-a4ef-ee2...|53741008|Coronary Heart Di...|
|1983-12-15|null|d38bfd76-7da2-fe3...|5afc9951-81ed-170...|53741008|Coronary Heart Di...|
|1984-07-11|null|f71159df-1eb6-2be...|bced9044-2f02-3e1...|53741008|Coronary Heart Di...|
|2003-07-09|null|1742424f-4603-fe2...|ad02ff53-1fd5-222...|53741008|Coronary Heart Di...|
|1969-02-01|null|23df27ac-4c50-db0...|65ce1c75-50ed-987...|53741008|Coronary Heart Di...|
|2018-04-29|null|ca0b3b41-1fac-a21...|f2d738f2-da65-429...|53741008|Coronary Heart Di...|
|1999-03-1

In [9]:
# Get only the relevant columns for patients
patient_df = patient_df.select('Id', 'BIRTHDATE', 'DEATHDATE', 'GENDER', 'CITY', 'STATE')
patient_df.show()

+--------------------+----------+---------+------+---------------+------------+
|                  Id| BIRTHDATE|DEATHDATE|GENDER|           CITY|       STATE|
+--------------------+----------+---------+------+---------------+------------+
|8e184653-8e76-7c1...|2015-11-07|     null|     F| West Hempfield|Pennsylvania|
|a2a17a93-0b2e-590...|2005-10-31|     null|     F|         Peters|Pennsylvania|
|25f2a04b-ff55-4e2...|2016-11-27|     null|     F|  Cowanshannock|Pennsylvania|
|8adaad71-bd30-259...|2019-07-12|     null|     F|  West Lampeter|Pennsylvania|
|8528640d-8435-151...|2009-04-29|     null|     F|     Washington|Pennsylvania|
|d7857f37-d43a-47a...|2018-08-18|     null|     M|           Erie|Pennsylvania|
|c701c2a0-d713-384...|2013-06-03|     null|     M|   Lower Paxton|Pennsylvania|
|45ec9c7c-733c-1ac...|2013-12-02|     null|     F|  West Rockhill|Pennsylvania|
|8e2efa1c-ed6e-e40...|2005-06-23|     null|     F| West Hempfield|Pennsylvania|
|1b1951ee-0e96-b8c...|2017-07-13|     nu

In [11]:
observations_df = observations_df.select("DATE", "PATIENT", "ENCOUNTER", "DESCRIPTION", "VALUE")
observations_df.show()

+--------------------+--------------------+---------+-----------+-----+
|                DATE|             PATIENT|ENCOUNTER|DESCRIPTION|VALUE|
+--------------------+--------------------+---------+-----------+-----+
|2010-12-07T08:53:59Z|76dc93a1-d6c1-4ce...|     null|       DALY|  0.0|
|2011-12-07T08:53:59Z|76dc93a1-d6c1-4ce...|     null|       DALY|  0.0|
|2012-12-07T08:53:59Z|76dc93a1-d6c1-4ce...|     null|       DALY|  0.0|
|2013-12-07T08:53:59Z|76dc93a1-d6c1-4ce...|     null|       DALY|  0.0|
|2014-12-07T08:53:59Z|76dc93a1-d6c1-4ce...|     null|       DALY|  0.0|
|2015-12-07T08:53:59Z|76dc93a1-d6c1-4ce...|     null|       DALY|  0.0|
|2016-12-07T08:53:59Z|76dc93a1-d6c1-4ce...|     null|       DALY|  0.0|
|2017-12-07T08:53:59Z|76dc93a1-d6c1-4ce...|     null|       DALY|  0.0|
|2018-12-07T08:53:59Z|76dc93a1-d6c1-4ce...|     null|       DALY|  0.0|
|2019-12-07T08:53:59Z|76dc93a1-d6c1-4ce...|     null|       DALY|  0.0|
|2010-02-20T17:00:05Z|d6142743-66a7-f39...|     null|       DALY

In [12]:
# Only get the Body Weight, Body Height and Respiratory Rate from observations
observations_df = observations_df.filter((observations_df.DESCRIPTION=='Body Weight') | (observations_df.DESCRIPTION=='Body Height') | (observations_df.DESCRIPTION=='Respiratory rate'))
observations_df.show()

+--------------------+--------------------+--------------------+----------------+-----+
|                DATE|             PATIENT|           ENCOUNTER|     DESCRIPTION|VALUE|
+--------------------+--------------------+--------------------+----------------+-----+
|2011-11-08T16:25:02Z|f7173fbd-5a57-f00...|1b574366-860a-164...|     Body Height|159.2|
|2011-11-08T16:25:02Z|f7173fbd-5a57-f00...|1b574366-860a-164...|     Body Weight| 55.9|
|2011-11-08T16:25:02Z|f7173fbd-5a57-f00...|1b574366-860a-164...|Respiratory rate| 16.0|
|2012-11-13T16:25:02Z|f7173fbd-5a57-f00...|7f4a6e3d-f3d4-125...|     Body Height|159.3|
|2012-11-13T16:25:02Z|f7173fbd-5a57-f00...|7f4a6e3d-f3d4-125...|     Body Weight| 53.7|
|2020-03-08T04:20:15Z|2226d257-25b3-bb6...|d5a8f286-913b-05c...|Respiratory rate| 16.9|
|2012-11-13T16:25:02Z|f7173fbd-5a57-f00...|7f4a6e3d-f3d4-125...|Respiratory rate| 15.0|
|2020-03-08T04:20:15Z|2226d257-25b3-bb6...|d5a8f286-913b-05c...|     Body Weight| 85.3|
|2015-11-17T16:25:02Z|f7173fbd-5

### Convert Data Types

In [18]:
from pyspark.sql.functions import col, unix_timestamp, to_date
from pyspark.sql.types import DateType

In [19]:
observations_df.printSchema()

root
 |-- DATE: date (nullable = true)
 |-- PATIENT: string (nullable = true)
 |-- ENCOUNTER: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- VALUE: string (nullable = true)

In [20]:
observations_df = observations_df.withColumn('DATE', col("DATE").cast(DateType()))

In [21]:
observations_df.printSchema()

root
 |-- DATE: date (nullable = true)
 |-- PATIENT: string (nullable = true)
 |-- ENCOUNTER: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- VALUE: string (nullable = true)

In [22]:
conditions_df.printSchema()

root
 |-- START: string (nullable = true)
 |-- STOP: string (nullable = true)
 |-- PATIENT: string (nullable = true)
 |-- ENCOUNTER: string (nullable = true)
 |-- CODE: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)

In [23]:
conditions_df = conditions_df.withColumn('START', col("START").cast(DateType()))

In [24]:
conditions_df.printSchema()

root
 |-- START: date (nullable = true)
 |-- STOP: string (nullable = true)
 |-- PATIENT: string (nullable = true)
 |-- ENCOUNTER: string (nullable = true)
 |-- CODE: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)

### Transform Data to get relevant features


In [26]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, first

In [27]:
# Pivot the observations table to get the description in the different columns

In [28]:
observations_df = observations_df.select("PATIENT", "DATE", "DESCRIPTION", "VALUE")
observations_df.printSchema()

root
 |-- PATIENT: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- VALUE: string (nullable = true)

In [25]:
observations_df.filter(observations_df.DESCRIPTION=="Respiratory rate").show()

Py4JJavaError: An error occurred while calling o359.showString.
: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:88)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.createCETASResources(SQLAnalyticsReader.scala:257)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.extractDataAndGetLocation(SQLAnalyticsReader.scala:206)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader.planBatchInputPartitions(SQLAnalyticsReader.scala:110)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions$lzycompute(DataSourceV2ScanExec.scala:83)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:59)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:467)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:409)
	at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:356)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply$mcI$sp(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at scala.util.Try$.apply(Try.scala:192)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdate(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:83)
	... 57 more


In [26]:
observations_pivoted = observations_df.groupby(*("PATIENT", "DATE")).pivot("DESCRIPTION", ["Body Weight", "Body Height", "Respiratory rate"]).agg(first("VALUE"))

In [27]:
observations_pivoted = observations_pivoted.na.drop()
observations_pivoted.show()

Py4JJavaError: An error occurred while calling o436.showString.
: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:88)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.createCETASResources(SQLAnalyticsReader.scala:257)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.extractDataAndGetLocation(SQLAnalyticsReader.scala:206)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader.planBatchInputPartitions(SQLAnalyticsReader.scala:110)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions$lzycompute(DataSourceV2ScanExec.scala:83)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:59)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:467)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:409)
	at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:356)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply$mcI$sp(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at scala.util.Try$.apply(Try.scala:192)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdate(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:83)
	... 81 more


In [28]:
observations_pivoted = observations_pivoted.orderBy('PATIENT', 'DATE')

In [29]:
observations_pivoted.show(20, False)

Py4JJavaError: An error occurred while calling o525.showString.
: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:88)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.createCETASResources(SQLAnalyticsReader.scala:257)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.extractDataAndGetLocation(SQLAnalyticsReader.scala:206)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader.planBatchInputPartitions(SQLAnalyticsReader.scala:110)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions$lzycompute(DataSourceV2ScanExec.scala:83)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:59)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:467)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:409)
	at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:356)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply$mcI$sp(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at scala.util.Try$.apply(Try.scala:192)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdate(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:83)
	... 75 more


In [30]:
# Preprocess conditions to only have patient and start date in it

In [31]:
conditions_df = conditions_df.select("PATIENT", "START")
conditions_df = conditions_df.withColumnRenamed("START", "TARGET_DATE")
conditions_df.printSchema()

root
 |-- PATIENT: string (nullable = true)
 |-- TARGET_DATE: date (nullable = true)

In [32]:
# Split Patients into 2 - those that have condition and those that do not

In [33]:
patients_with_disease = patient_df.join(conditions_df, patient_df.Id == conditions_df.PATIENT, how='inner')
patients_with_disease = patients_with_disease.select("Id", "BIRTHDATE", "GENDER", "CITY", "STATE")
patients_with_disease.printSchema()

root
 |-- Id: string (nullable = true)
 |-- BIRTHDATE: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)

In [34]:
patients_without_disease = patient_df.join(conditions_df, patient_df.Id == conditions_df.PATIENT, how='left_anti')
patients_without_disease.printSchema()

root
 |-- Id: string (nullable = true)
 |-- BIRTHDATE: string (nullable = true)
 |-- DEATHDATE: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)

In [35]:
patients_without_disease.count()

Py4JJavaError: An error occurred while calling o629.count.
: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:88)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.createCETASResources(SQLAnalyticsReader.scala:257)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.extractDataAndGetLocation(SQLAnalyticsReader.scala:206)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader.planBatchInputPartitions(SQLAnalyticsReader.scala:110)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions$lzycompute(DataSourceV2ScanExec.scala:83)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:59)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:467)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:409)
	at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:356)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply$mcI$sp(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at scala.util.Try$.apply(Try.scala:192)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdate(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:83)
	... 66 more


In [36]:
# Split observations into 2 for patients that have condition and those that do not

In [37]:
observations_with_disease = observations_pivoted.join(patients_with_disease, observations_pivoted.PATIENT == patients_with_disease.Id, how="inner")
observations_with_disease = observations_with_disease.select("PATIENT", "DATE", "Body Weight", "Body Height", "Respiratory rate")
observations_with_disease.printSchema()

root
 |-- PATIENT: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- Body Weight: string (nullable = true)
 |-- Body Height: string (nullable = true)
 |-- Respiratory rate: string (nullable = true)

In [38]:
observations_without_disease = observations_pivoted.join(patients_without_disease, observations_pivoted.PATIENT == patients_without_disease.Id, how="inner")
observations_without_disease = observations_without_disease.select("PATIENT", "DATE", "Body Weight", "Body Height", "Respiratory rate")
observations_without_disease.printSchema()

root
 |-- PATIENT: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- Body Weight: string (nullable = true)
 |-- Body Height: string (nullable = true)
 |-- Respiratory rate: string (nullable = true)

In [39]:
# For observations with disease add a column using join from conditions_df to get the start date of the disease

In [40]:
observations_with_disease_target = observations_with_disease.join(conditions_df, ['PATIENT'], how="inner")
observations_with_disease_target.select("PATIENT", "DATE", "Body Weight", "Body Height", "Respiratory rate", "TARGET_DATE")
observations_with_disease_target.show()

Py4JJavaError: An error occurred while calling o731.showString.
: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:88)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.createCETASResources(SQLAnalyticsReader.scala:257)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.extractDataAndGetLocation(SQLAnalyticsReader.scala:206)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader.planBatchInputPartitions(SQLAnalyticsReader.scala:110)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions$lzycompute(DataSourceV2ScanExec.scala:83)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:59)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:467)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:409)
	at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:356)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply$mcI$sp(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at scala.util.Try$.apply(Try.scala:192)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdate(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:83)
	... 105 more


In [42]:
# For observations without disease add a column using a groupby df on observations that picks the latest observation date

In [43]:
window = Window.partitionBy(observations_without_disease['PATIENT']).orderBy(observations_without_disease['DATE'].desc())

In [44]:
observations_without_disease_latest = observations_without_disease.select('*', rank().over(window).alias('rank')) \
  .filter(col('rank') == 1)

In [45]:
observations_without_disease_latest = observations_without_disease_latest.withColumnRenamed("DATE", "TARGET_DATE")
observations_without_disease_latest = observations_without_disease_latest.select("PATIENT", "TARGET_DATE")
observations_without_disease_latest.show(20, False)

Py4JJavaError: An error occurred while calling o873.showString.
: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:88)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.createCETASResources(SQLAnalyticsReader.scala:257)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.extractDataAndGetLocation(SQLAnalyticsReader.scala:206)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader.planBatchInputPartitions(SQLAnalyticsReader.scala:110)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions$lzycompute(DataSourceV2ScanExec.scala:83)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:59)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:467)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:409)
	at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:356)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply$mcI$sp(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at scala.util.Try$.apply(Try.scala:192)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdate(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:83)
	... 117 more


In [46]:
observations_without_disease_target = observations_without_disease.join(observations_without_disease_latest, ['PATIENT'], how="inner")
observations_without_disease_target.show()

Py4JJavaError: An error occurred while calling o996.showString.
: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:88)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.createCETASResources(SQLAnalyticsReader.scala:257)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.extractDataAndGetLocation(SQLAnalyticsReader.scala:206)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader.planBatchInputPartitions(SQLAnalyticsReader.scala:110)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions$lzycompute(DataSourceV2ScanExec.scala:83)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:59)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:467)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:409)
	at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:356)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply$mcI$sp(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at scala.util.Try$.apply(Try.scala:192)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdate(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:83)
	... 105 more


In [47]:
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf, struct

In [48]:
# For both observations add a True/False column that tell if observation is within relevant date
def validate_observation(observation_date, target_date):
    THRESHOLD = 1095
    time_difference = target_date - observation_date
    time_difference = time_difference.days
    if time_difference < 0 or time_difference >= THRESHOLD:
        return False
    else:
        return True

validate_observation_udf = udf(lambda x: validate_observation(x[0], x[1]), BooleanType())

In [49]:
# Filter out the right observations

In [50]:
observations_with_disease_filtered = observations_with_disease_target.withColumn("VALID_OBSERVATION", validate_observation_udf(struct('DATE', 'TARGET_DATE')))
observations_with_disease_filtered.show()

Py4JJavaError: An error occurred while calling o1119.showString.
: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:88)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.createCETASResources(SQLAnalyticsReader.scala:257)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.extractDataAndGetLocation(SQLAnalyticsReader.scala:206)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader.planBatchInputPartitions(SQLAnalyticsReader.scala:110)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions$lzycompute(DataSourceV2ScanExec.scala:83)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:59)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:467)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:409)
	at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:356)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply$mcI$sp(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at scala.util.Try$.apply(Try.scala:192)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdate(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:83)
	... 117 more


In [51]:
valid_observations_with_disease = observations_with_disease_filtered.filter(observations_with_disease_filtered.VALID_OBSERVATION == True)
valid_observations_with_disease.show()

Py4JJavaError: An error occurred while calling o1243.showString.
: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:88)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.createCETASResources(SQLAnalyticsReader.scala:257)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.extractDataAndGetLocation(SQLAnalyticsReader.scala:206)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader.planBatchInputPartitions(SQLAnalyticsReader.scala:110)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions$lzycompute(DataSourceV2ScanExec.scala:83)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:59)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:467)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:409)
	at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:356)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply$mcI$sp(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at scala.util.Try$.apply(Try.scala:192)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdate(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:83)
	... 135 more


In [52]:
valid_observations_with_disease.count()

Py4JJavaError: An error occurred while calling o1243.count.
: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:88)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.createCETASResources(SQLAnalyticsReader.scala:257)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader$PlanInputPartitionsUtilities$.extractDataAndGetLocation(SQLAnalyticsReader.scala:206)
	at com.microsoft.spark.sqlanalytics.read.SQLAnalyticsReader.planBatchInputPartitions(SQLAnalyticsReader.scala:110)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions$lzycompute(DataSourceV2ScanExec.scala:83)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.batchPartitions(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:59)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:96)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:96)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Type with name 'SQLAnalyticsConnectorDataSourcef01d9128bee642b6bda4d97d8bede080' already exists.
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:467)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:409)
	at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
	at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:356)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply$mcI$sp(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper$$anonfun$1.apply(SQLAnalyticsJDBCWrapper.scala:97)
	at scala.util.Try$.apply(Try.scala:192)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdate(SQLAnalyticsJDBCWrapper.scala:97)
	at com.microsoft.spark.sqlanalytics.utils.SQLAnalyticsJDBCWrapper.executeUpdateStatement(SQLAnalyticsJDBCWrapper.scala:83)
	... 132 more


In [None]:
observations_without_disease_filtered = observations_without_disease_target.withColumn("VALID_OBSERVATION", validate_observation_udf(struct('DATE', 'TARGET_DATE')))
observations_without_disease_filtered.show()

## OLD METHOD - CANNOT WORK


In [None]:
conditions_df.select('PATIENT').show(10, False)

In [None]:
random_condition_row = conditions_df.filter(conditions_df.PATIENT=="3beb5796-622e-f500-d30b-686639081553").collect()[0]

In [None]:
random_condition_row.asDict()['START']

In [None]:
observations_df.select('PATIENT').show(10, False)

In [None]:
random_observations = observations_df.filter(observations_df.PATIENT=="f7173fbd-5a57-f00a-6a0d-0cd48b2e15fe").collect()
random_observations

In [None]:
random_observation_dates = sorted([row.asDict()['DATE'] for row in random_observations])
random_observation_dates    

In [None]:
latest_date = random_observation_dates[-1]
latest_date

In [None]:
random_date = random_observation_dates[0]
random_time_difference = random_date - latest_date
random_time_difference.days

In [None]:
def within_time_span(patient_id, observation_date, conditions_df, observations_df):
    # The time threshold we are selecting for the observation
    THRESHOLD = 365
    patient_row = conditions_df.filter(conditions_df.PATIENT==patient_id).collect()
    if len(patient_row):
        start_date = patient_row[0].asDict()['START']
        time_difference = observation_date - start_date
        time_difference = time_difference.days
        # If the observation is after the condition was diagnosed
        # Or if the observation is more than a year older
        if time_difference > THRESHOLD or time_difference < 0:
            return False
        else:
            return True
    else:
        # This patient does not have the relevant disease
        # We need to get the latest observation from the observations and filter from a year before that
        patient_observations = observations_df.filter(observations_df.PATIENT==patient_id).collect()
        patient_observation_dates = sorted([row.asDict()['DATE'] for row in random_observations])
        latest_date = patient_observation_dates[-1]
        time_difference = latest_date - observation_date
        time_difference = time_difference.days
        if time_difference > THRESHOLD:
            return False
        else:
            return True

In [None]:
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf, struct


In [None]:
def random_func1(patient_id, date):
    return date.isoformat()

random_udf1 = udf(lambda x: random_func1(x[0], x[1]), StringType())

In [None]:
def random_func2(column_struct, conditions_df):
    patient = column_struct[0]
    date = column_struct[1]
    return date.isoformat()

partial_random_func2 = partial(random_func2, conditions_df)
random_udf2 = udf(partial_random_func2, StringType())

In [None]:
observations_df_func = observations_df.withColumn("VALID_OBSERVATION", random_udf2(struct('PATIENT', 'DATE')))

In [None]:
observations_df_func.show(10, False)

In [None]:
%%pyspark

def filter_patients(patient_df):
    '''
        This filters the relevant columns from the patients data
    '''

    relevant_columns = [
        'Id',
        'BIRTHDATE',
        'DEATHDATE',
        'GENDER',
        'CITY',
        'STATE'
    ]

    patients_df = patient_df[relevant_columns]

    return patients_df

In [None]:
%%pyspark

def get_disease_patients(patients_df, conditions_df, DISEASE):
    '''
        Add an additional column to specify which patients have the target disease
    '''
    diseased_ids = conditions_df[conditions_df['DESCRIPTION'] == DISEASE]['PATIENT']
    patients_df['HAVE_DISEASE'] = patients['Id'].isin(diseased_ids)

    return patients_df