-
Notifications
You must be signed in to change notification settings - Fork 0
/
death data linkage prod.sql
553 lines (453 loc) · 19.5 KB
/
death data linkage prod.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
-- Databricks notebook source
-- MAGIC %md
-- MAGIC ## Data Loading
-- COMMAND ----------
-- load patient data table
CREATE OR REPLACE TEMPORARY VIEW patient
USING parquet
OPTIONS (path [path]);
-- load state death data tables
CREATE OR REPLACE TEMPORARY VIEW death_2010
USING parquet
OPTIONS (path [path]);
CREATE OR REPLACE TEMPORARY VIEW death_2014
USING parquet
OPTIONS (path [path]);
CREATE OR REPLACE TEMPORARY VIEW death_2020
USING parquet
OPTIONS (path [path]);
-- COMMAND ----------
-- MAGIC %md
-- MAGIC ## Data Cleaning
-- COMMAND ----------
-- parse 2010 death file, clean and standardize columns
drop view if exists clean_2010;
create temporary view clean_2010 as
select distinct
filename,
'Y' as state_final,
to_date(death_date, 'MMddyyyy') as death_date,
case when sex=1 then 'M'
when sex=2 then 'F' end as sex,
to_date(date_of_birth, 'MMddyyyy') as dob,
split(decedents_firstname,' ')[0] as fname,
substring(decedents_middlename,1,1) as mname,
split(decedents_lastname,' ')[0] as lname,
case when social_security_number='000000000' then null
when social_security_number is null then null
else social_security_number end as ssn,
case when split(residence_address, ' ')[0] rlike '^[0-9]*$' then split(residence_address, ' ')[0]
else null end as house_number,
split(residence_address,' ')[1] as street,
residence_zip_code as zipcode,
manual_underlying_causeofdeathcode as underlying_cause_of_death,
first_mention_cause_of_death as cod1,
second_mention_cause_of_death as cod2,
third_mention_cause_of_death as cod3,
fourth_mention_cause_of_death as cod4,
fifth_mention_cause_of_death as cod5,
sixth_mention_cause_of_death as cod6,
seventh_mention_cause_of_death as cod7,
eighth_mention_cause_of_death as cod8,
ninth_mention_cause_of_death as cod9,
tenth_mention_cause_of_death as cod10,
eleventh_mention_cause_of_death as cod11,
twelfth_mention_cause_of_death as cod12,
thirteenth_mention_cause_of_death as cod13,
fourteenth_mention_cause_of_death as cod14,
fifteenth_mention_cause_of_death as cod15,
sixteenth_mention_cause_of_death as cod16,
seventeenth_mention_cause_of_death as cod17,
eighteenth_mention_cause_of_death as cod18,
nineteenth_mention_cause_of_death as cod19,
twentieth_mention_cause_of_death as cod20
from death_2010
-- COMMAND ----------
-- parse 2014 death file, clean and standardize columns
drop view if exists clean_2014;
create temporary view clean_2014 as
select distinct
filename,
'Y' as state_final,
concat(DOD_YR, '-', DOD_MO, '-', DOD_DY) as death_date,
sex,
concat(DOB_YR, '-', DOB_MO, '-', DOB_DY) as dob,
split(GNAME,' ')[0] as fname,
substring(MNAME,1,1) as mname,
split(LNAME,' ')[0] as lname,
case when ssn='000000000' then null
when ssn is null then null
else ssn end as ssn,
addrnum as house_number,
split(addrname,' ')[0] as street,
zipcode,
acmecod as underlying_cause_of_death,
cod1,
cod2,
cod3,
cod4,
cod5,
cod6,
cod7,
cod8,
cod9,
cod10,
cod11,
cod12,
cod13,
cod14,
cod15,
cod16,
cod17,
cod18,
cod19,
cod20
from death_2014
-- COMMAND ----------
-- parse 2020 death file, clean and standardize columns
drop view if exists clean_2020;
create temporary view clean_2020 as
select distinct
filename,
case when substr(filename,1,5)='FINAL' then 'Y'
else 'N' end as state_final,
concat(DOD_YR, '-', DOD_MO, '-', DOD_DY) as death_date,
sex,
concat(DOB_YR, '-', DOB_MO, '-', DOB_DY) as dob,
split(GNAME,' ')[0] as fname,
substring(MNAME,1,1) as mname,
split(LNAME,' ')[0] as lname,
case when ssn='000000000' then null
when ssn is null then null
else ssn end as ssn,
stnum_d as house_number,
split(stname_r,' ')[0] as street,
substring(zip9_d,1,5) as zipcode,
acme_uc as underlying_cause_of_death,
split(rac,' ')[0] as cod1,
split(rac,' ')[1] as cod2,
split(rac,' ')[2] as cod3,
split(rac,' ')[3] as cod4,
split(rac,' ')[4] as cod5,
split(rac,' ')[5] as cod6,
split(rac,' ')[6] as cod7,
split(rac,' ')[7] as cod8,
split(rac,' ')[8] as cod9,
split(rac,' ')[9] as cod10,
split(rac,' ')[10] as cod11,
split(rac,' ')[11] as cod12,
split(rac,' ')[12] as cod13,
split(rac,' ')[13] as cod14,
split(rac,' ')[14] as cod15,
split(rac,' ')[15] as cod16,
split(rac,' ')[16] as cod17,
split(rac,' ')[17] as cod18,
split(rac,' ')[18] as cod19,
split(rac,' ')[19] as cod20
from death_2020
-- COMMAND ----------
-- create new view of all cleaned death files unioned together
drop view if exists all_death;
create temporary view all_death as
select * from clean_2010
union
select * from clean_2014
union
select * from clean_2020
-- COMMAND ----------
-- clean and standardize patient data table
drop view if exists patient_clean;
create temporary view patient_clean as
select distinct mrn,
id,
last_name,
first_name,
split(upper(last_name),' ')[0] as ehr_lname,
split(upper(first_name),' ')[0] as ehr_fname,
substring(middle_name,1,1) as ehr_mname,
case when sex=1 then 'F'
when sex=2 then 'M'
else '' end as ehr_sex,
birth_date,
to_date(birth_date) as ehr_dob,
add_line_1,
case when split(add_line_1, ' ')[0] rlike '^[0-9]*$' then split(add_line_1, ' ')[0]
else null end as ehr_house_number,
upper(split(add_line_1,' ')[1]) as ehr_street,
substring(zip,1,5) as ehr_zip,
ssn,
case when regexp_replace(ssn,'-','') in ('000000000','999999999','777777777','444444444','888888888','222222222','999990999','000009999','111111111','123456789','555555555','444556666','099999999','999999998','999990099','333333333','999999990','999999991','000000001','666666666','999991111') then null else regexp_replace(ssn,'-','') end as ehr_ssn,
death_date as ehr_death_date,
max_contact_dt,
enc_type
from patient
-- COMMAND ----------
-- MAGIC %md
-- MAGIC ## Data Linkage
-- COMMAND ----------
--perform first linkage approach [SSN is same, first name is a little different]
drop view if exists total_join_14;
create temporary view total_join_14 as
select z.*, (lname_score + fname_score + mname_score + dob_score + ssn_score + zip_score + house_score) as wms from (
select a.pat_mrn_id, a.pat_id, b.state_final, b.death_date, b.underlying_cause_of_death,
b.cod1, b.cod2, b.cod3, b.cod4, b.cod5, b.cod6, b.cod7, b.cod8, b.cod9, b.cod10,
b.cod11, b.cod12, b.cod13, b.cod14, b.cod15, b.cod16, b.cod17, b.cod18, b.cod19, b.cod20,
a.ehr_lname, b.lname, levenshtein(a.ehr_lname, b.lname) as lname_dist,
a.ehr_fname, b.fname, levenshtein(a.ehr_fname, b.fname) as fname_dist,
a.ehr_mname, b.mname as mname,
a.ehr_sex, b.sex,
a.ehr_dob, b.dob, levenshtein(a.ehr_dob, b.dob) as dob_dist,
a.ehr_ssn, b.ssn as ssn, levenshtein(a.ehr_ssn, b.ssn) as ssn_dist,
a.ehr_zip, b.zipcode, levenshtein(a.ehr_zip, b.zipcode) as zip_dist,
a.ehr_house_number, b.house_number, levenshtein(a.ehr_house_number, b.house_number) as house_dist,
a.ehr_street, b.street,
case when isnull(a.ehr_lname) then 0
when a.ehr_lname = b.lname then 10
when levenshtein(a.ehr_lname, b.lname) between 4 and 6 then 3
when levenshtein(a.ehr_lname, b.lname) between 1 and 3 then 7
else -5 end as lname_score,
case when isnull(a.ehr_fname) then 0
when a.ehr_fname = b.fname then 10
when levenshtein(a.ehr_fname, b.fname) between 4 and 6 then 3
when levenshtein(a.ehr_fname, b.fname) between 1 and 3 then 7
else -5 end as fname_score,
case when isnull(a.ehr_mname) or isnull(b.mname) then 0
when a.ehr_mname = b.mname then 5
else -5 end as mname_score,
case when isnull(a.ehr_dob) or isnull(b.dob) then 0
when a.ehr_dob = b.dob then 15
when levenshtein(a.ehr_dob, b.dob) between 1 and 2 then 9
when levenshtein(a.ehr_dob, b.dob) between 3 and 4 then 3
else -5 end as dob_score,
case when isnull(a.ehr_ssn) or isnull(b.ssn) then 0
when a.ehr_ssn = b.ssn then 25
when levenshtein(a.ehr_ssn, b.ssn) = 2 then 5
when levenshtein(a.ehr_ssn, b.ssn) = 1 then 10
else -5 end as ssn_score,
case when isnull(a.ehr_zip) or isnull(b.zipcode) then 0
when a.ehr_zip = b.zipcode then 5
else 0 end as zip_score,
case when isnull(a.ehr_house_number) or isnull(b.house_number) then 0
when a.ehr_house_number = b.house_number then 10
else 0 end as house_score
from patient_clean a INNER JOIN all_death b on a.ehr_ssn = b.ssn
and a.ehr_lname not like ('INITIAL%')
and levenshtein(a.ehr_fname, b.fname) >2
) z
-- COMMAND ----------
--perform second linkage approach (sex is same, last 4 of SSN are same, last name is different)
drop view if exists total_join_17;
create temporary view total_join_17 as
select z.*, (lname_score + fname_score + mname_score + dob_score + ssn_score + zip_score + house_score) as wms from (
select a.pat_mrn_id, a.pat_id, b.state_final, b.death_date, b.underlying_cause_of_death,
b.cod1, b.cod2, b.cod3, b.cod4, b.cod5, b.cod6, b.cod7, b.cod8, b.cod9, b.cod10,
b.cod11, b.cod12, b.cod13, b.cod14, b.cod15, b.cod16, b.cod17, b.cod18, b.cod19, b.cod20,
a.ehr_lname, b.lname, levenshtein(a.ehr_lname, b.lname) as lname_dist,
a.ehr_fname, b.fname, levenshtein(a.ehr_fname, b.fname) as fname_dist,
a.ehr_mname, b.mname as mname,
a.ehr_sex, b.sex,
a.ehr_dob, b.dob, levenshtein(a.ehr_dob, b.dob) as dob_dist,
a.ehr_ssn, b.ssn as ssn, levenshtein(a.ehr_ssn, b.ssn) as ssn_dist,
a.ehr_zip, b.zipcode, levenshtein(a.ehr_zip, b.zipcode) as zip_dist,
a.ehr_house_number, b.house_number, levenshtein(a.ehr_house_number, b.house_number) as house_dist,
a.ehr_street, b.street,
case when isnull(a.ehr_lname) then 0
when a.ehr_lname = b.lname then 10
when levenshtein(a.ehr_lname, b.lname) between 4 and 6 then 3
when levenshtein(a.ehr_lname, b.lname) between 1 and 3 then 7
else -5 end as lname_score,
case when isnull(a.ehr_fname) then 0
when a.ehr_fname = b.fname then 10
when levenshtein(a.ehr_fname, b.fname) between 4 and 6 then 3
when levenshtein(a.ehr_fname, b.fname) between 1 and 3 then 7
else -5 end as fname_score,
case when isnull(a.ehr_mname) or isnull(b.mname) then 0
when a.ehr_mname = b.mname then 5
else -5 end as mname_score,
case when isnull(a.ehr_dob) or isnull(b.dob) then 0
when a.ehr_dob = b.dob then 15
when levenshtein(a.ehr_dob, b.dob) between 1 and 2 then 9
when levenshtein(a.ehr_dob, b.dob) between 3 and 4 then 3
else -5 end as dob_score,
case when isnull(a.ehr_ssn) or isnull(b.ssn) then 0
when a.ehr_ssn = b.ssn then 25
when levenshtein(a.ehr_ssn, b.ssn) = 2 then 5
when levenshtein(a.ehr_ssn, b.ssn) = 1 then 10
else -5 end as ssn_score,
case when isnull(a.ehr_zip) or isnull(b.zipcode) then 0
when a.ehr_zip = b.zipcode then 5
else 0 end as zip_score,
case when isnull(a.ehr_house_number) or isnull(b.house_number) then 0
when a.ehr_house_number = b.house_number then 10
else 0 end as house_score
from patient_clean a INNER JOIN all_death b on a.ehr_sex = b.sex
and substr(a.ehr_ssn,6,9) = substr(b.ssn,6,9)
and substr(a.ehr_lname,1,4) != substr(b.lname,1,4)
) z
-- COMMAND ----------
--perform third linkage approach (sex is same, last 4 of SSN are same, last name is very similar)
drop view if exists total_join_18;
create temporary view total_join_18 as
select z.*, (lname_score + fname_score + mname_score + dob_score + ssn_score + zip_score + house_score) as wms from (
select a.pat_mrn_id, a.pat_id, b.state_final, b.death_date, b.underlying_cause_of_death,
b.cod1, b.cod2, b.cod3, b.cod4, b.cod5, b.cod6, b.cod7, b.cod8, b.cod9, b.cod10,
b.cod11, b.cod12, b.cod13, b.cod14, b.cod15, b.cod16, b.cod17, b.cod18, b.cod19, b.cod20,
a.ehr_lname, b.lname, levenshtein(a.ehr_lname, b.lname) as lname_dist,
a.ehr_fname, b.fname, levenshtein(a.ehr_fname, b.fname) as fname_dist,
a.ehr_mname, b.mname as mname,
a.ehr_sex, b.sex,
a.ehr_dob, b.dob, levenshtein(a.ehr_dob, b.dob) as dob_dist,
a.ehr_ssn, b.ssn as ssn, levenshtein(a.ehr_ssn, b.ssn) as ssn_dist,
a.ehr_zip, b.zipcode, levenshtein(a.ehr_zip, b.zipcode) as zip_dist,
a.ehr_house_number, b.house_number, levenshtein(a.ehr_house_number, b.house_number) as house_dist,
a.ehr_street, b.street,
case when isnull(a.ehr_lname) then 0
when a.ehr_lname = b.lname then 10
when levenshtein(a.ehr_lname, b.lname) between 4 and 6 then 3
when levenshtein(a.ehr_lname, b.lname) between 1 and 3 then 7
else -5 end as lname_score,
case when isnull(a.ehr_fname) then 0
when a.ehr_fname = b.fname then 10
when levenshtein(a.ehr_fname, b.fname) between 4 and 6 then 3
when levenshtein(a.ehr_fname, b.fname) between 1 and 3 then 7
else -5 end as fname_score,
case when isnull(a.ehr_mname) or isnull(b.mname) then 0
when a.ehr_mname = b.mname then 5
else -5 end as mname_score,
case when isnull(a.ehr_dob) or isnull(b.dob) then 0
when a.ehr_dob = b.dob then 15
when levenshtein(a.ehr_dob, b.dob) between 1 and 2 then 9
when levenshtein(a.ehr_dob, b.dob) between 3 and 4 then 3
else -5 end as dob_score,
case when isnull(a.ehr_ssn) or isnull(b.ssn) then 0
when a.ehr_ssn = b.ssn then 25
when levenshtein(a.ehr_ssn, b.ssn) = 2 then 5
when levenshtein(a.ehr_ssn, b.ssn) = 1 then 10
else -5 end as ssn_score,
case when isnull(a.ehr_zip) or isnull(b.zipcode) then 0
when a.ehr_zip = b.zipcode then 5
else 0 end as zip_score,
case when isnull(a.ehr_house_number) or isnull(b.house_number) then 0
when a.ehr_house_number = b.house_number then 10
else 0 end as house_score
from patient_clean a INNER JOIN all_death b on a.ehr_sex = b.sex
and substr(a.ehr_ssn,6,9) = substr(b.ssn,6,9)
and substr(a.ehr_lname,1,4) = substr(b.lname,1,4)
) z
-- COMMAND ----------
--union all linkage results together
--and drop matches with weighted match scores < 30
--we consider these negative matches, so they don't need to proceed to ML classifier
drop view if exists total_linkage_results;
create temporary view total_linkage_results as
select * from total_join_14 where wms >= 30
union
select * from total_join_17 where wms >= 30
union
select * from total_join_18 where wms >= 30;
-- COMMAND ----------
-- MAGIC %md
-- MAGIC ## ML Classifier
-- COMMAND ----------
-- MAGIC %md
-- MAGIC #### Load Code
-- COMMAND ----------
-- MAGIC %python
-- MAGIC
-- MAGIC # import code
-- MAGIC import pandas as pd
-- MAGIC import numpy as np
-- MAGIC import joblib
-- MAGIC
-- MAGIC # PySpark
-- MAGIC import pyspark.sql.functions as F
-- MAGIC from pyspark.sql.types import IntegerType
-- MAGIC from pyspark.sql import Window
-- MAGIC
-- MAGIC # modeling
-- MAGIC from sklearn.pipeline import make_pipeline
-- MAGIC from sklearn.ensemble import HistGradientBoostingClassifier
-- COMMAND ----------
-- MAGIC %md
-- MAGIC #### Deploy Model
-- COMMAND ----------
-- MAGIC %python
-- MAGIC
-- MAGIC # unpickle saved, trained model
-- MAGIC pipeline = joblib.load('[path]/death_linkage_classifier.joblib')
-- MAGIC
-- MAGIC # broadcast model to all nodes
-- MAGIC model = sc.broadcast(pipeline)
-- COMMAND ----------
-- MAGIC %python
-- MAGIC
-- MAGIC # convert linkage results to spark df
-- MAGIC dataset = spark.sql('SELECT * FROM total_linkage_results')
-- COMMAND ----------
-- MAGIC %python
-- MAGIC
-- MAGIC # define function to run model's predict method on spark df
-- MAGIC @F.pandas_udf(IntegerType())
-- MAGIC def predict(*cols: pd.Series) -> pd.Series:
-- MAGIC
-- MAGIC # columns are passed as a tuple of pandas series.
-- MAGIC # combine into a pandas df
-- MAGIC X = pd.concat(cols, axis=1)
-- MAGIC
-- MAGIC # make predictions
-- MAGIC predictions = model.value.predict(X)
-- MAGIC
-- MAGIC # return pandas series of predictions
-- MAGIC return pd.Series(predictions)
-- COMMAND ----------
-- MAGIC %python
-- MAGIC
-- MAGIC feature_cols = ['lname_dist', 'fname_dist', 'dob_dist', 'ssn_dist', 'zip_dist', 'house_dist',
-- MAGIC 'lname_score', 'fname_score', 'mname_score', 'dob_score', 'ssn_score', 'zip_score',
-- MAGIC 'house_score']
-- MAGIC
-- MAGIC # run the model on the dataset
-- MAGIC dataset = dataset.withColumn('match', predict(*feature_cols))
-- MAGIC
-- MAGIC # filter results to only positive matches
-- MAGIC matches = dataset.where(F.col('match') == 1)
-- MAGIC matches = matches.drop('match')
-- COMMAND ----------
-- MAGIC %python
-- MAGIC
-- MAGIC ## in rare cases, the above pipeline yields multiple positive matches for a single patient
-- MAGIC
-- MAGIC # when wms differs, only retain the match with highest wms
-- MAGIC # manual inspection revealed the difference in wms was often substantial
-- MAGIC # and the higher-wms match was clearly more likely the true match
-- MAGIC w = Window.partitionBy('mrn')
-- MAGIC matches = (
-- MAGIC matches
-- MAGIC .withColumn('max_wms', F.max('wms').over(w))
-- MAGIC .where(F.col('wms') == F.col('max_wms'))
-- MAGIC .drop('max_wms')
-- MAGIC )
-- MAGIC
-- MAGIC # of the remaining multiple match cases, these generally seem to result from 2-3 death records for
-- MAGIC # what appears to be the same individual (resulting in wms ties)
-- MAGIC # but usually only one of the records includes cod data
-- MAGIC # this code uses an indirect method to drop the match(es) without cod data when another match has cod data
-- MAGIC matches = matches.withColumn('cod_len', F.length(F.col('underlying_cause_of_death')))
-- MAGIC matches = (
-- MAGIC matches
-- MAGIC .withColumn('sum_cod_len', F.sum('cod_len').over(w))
-- MAGIC .where((F.col('sum_cod_len') > 5) | (F.col('cod_len') == F.col('sum_cod_len')))
-- MAGIC .drop('cod_len', 'sum_cod_len')
-- MAGIC )
-- MAGIC
-- MAGIC # any remaining multiple matches generally seem to result from slightly different data
-- MAGIC # in multiple death records for what appears to be the same individual
-- MAGIC # we cannot resolve which is the better match in these cases, so all such matches will remain in the output table
-- COMMAND ----------
-- MAGIC %python
-- MAGIC
-- MAGIC state_death_matches = matches
-- MAGIC
-- MAGIC spark.sql('drop table if exists state_death_matches')
-- MAGIC
-- MAGIC state_death_matches.write.saveAsTable("STATE_DEATH_MATCHES")