In [1]:
%%bash
pip install kafka-python
pip install deltalake
# python meta.py --filepath metastore.yml --run patient_pseudonymiser



# Patient Pseudonymiser

Load the main pipeline from the metastore and run it

In [2]:
import meta 

metastore = meta.Store.load('./metastore.yml')
meta_instance = metastore.getPipeline('patient_pseudonymiser')
pipeline = meta_instance.infraclass.bootstrap(meta_instance, metastore, {})
pipeline.run()

<transformation.TransformerPipeline at 0xffff2b7b0d40>

# Examine the data

Read the input source data fixture, query the deltalake table for pseudonymised patients, and read the audit log file data

In [6]:
import pandas as pd
from IPython.display import Markdown
from pyspark.sql import SparkSession
import deltalake
pd.set_option('max_colwidth', 400)

display(Markdown('# Data from pipeline_id "patient_pseudonymiser"'))
source_df = pd.read_json('/fixtures/sample_data.json', orient='records', lines=True)
display(Markdown(f'## Source Patient Data (30 random records from {source_df.shape[0]})'))
display(source_df.sample(frac=1).head(30))
pseudo_df = deltalake.DeltaTable('/results/patient_pseudo_deltatable').to_pandas()
display(Markdown(f'## Pseudonymised Patients (30 random records from {pseudo_df.shape[0]})'))
display(pseudo_df.sample(frac=1).head(30))
spark = SparkSession.builder.getOrCreate()
audit_df = spark.read.json('/results/audit_patient_pseudonymiser/*.json').toPandas()
display(Markdown(f'## Patient Pseudonymiser Audit (30 random records from {audit_df.shape[0]})'))
display(audit_df.sample(frac=1).head(30))

# Data from pipeline_id "patient_pseudonymiser"

## Source Patient Data (30 random records from 1000)

Unnamed: 0,patient_id,patient_name,city,postcode,disease
81,81,Georgiann Watchmaker-Azathioprine,Barnsley,S70 6RE,Postherpetic neuralgia
824,824,Amalie Downpour-Needlebush,Bristol,BS31 2EB,Blepharoconjunctivitis
567,567,Bynum Nonconformist-Directory,Leeds,LS5 3LZ,Wedge fracture of lumbar vertebra
970,970,Salomon Miconazole-Ibis,Brighton,BN42 4FY,Pernicious anemia
335,335,Evander Wingspread-Executant,Slough and Heathrow,TW7 7AJ,Closed fracture of distal end of radius
146,146,Roderic Spokeshave-Interrupt,Worksop and Retford,S81 8JJ,Sprain of ankle
749,749,Carlota Dropseed-Pinhole,Milton Keynes,MK15 9AQ,Medulloblastoma
28,28,Fremont Lepton-Nuclease,Norwich,NR1 1JW,Prolapsing internal haemorrhoids requiring manual reduction
595,595,Shep Lamplight-Hyalinization,Reading,RG12 7GJ,Cardiac arrest
100,100,Amado Loophole-Postscript,Chesterfield,DE45 1LJ,Lower respiratory tract infection


## Pseudonymised Patients (30 random records from 255)

Unnamed: 0,batch_id,inserted_at,patient_id,patient_name,city,postcode,disease,region
129,3,2025-03-13 11:19:00.916498+00:00,184,XXXXX,Bradford,BD6,Iron deficiency,Yorkshire and The Humber
253,1,2025-03-13 11:17:01.323054+00:00,57,XXXXX,Liverpool,L27,Eruption of skin,North West
172,2,2025-03-13 11:18:00.878752+00:00,108,XXXXX,Slough and Heathrow,TW7,Lumbar disc prolapse with radiculopathy,London
16,4,2025-03-13 11:20:01.082404+00:00,286,XXXXX,Swansea,SA10,Viral disease,Wales
177,2,2025-03-13 11:18:00.878752+00:00,158,XXXXX,Coventry,CV10,Adenocarcinoma of rectum,West Midlands
149,2,2025-03-13 11:18:00.878752+00:00,118,XXXXX,Hereford,HR1,Disorder of the genitourinary system,West Midlands
47,4,2025-03-13 11:20:01.082404+00:00,280,XXXXX,Eastbourne,BN27,Insect bite - wound,South East
239,1,2025-03-13 11:17:01.323054+00:00,58,XXXXX,London,WC2E,Verruca vulgaris,London
150,2,2025-03-13 11:18:00.878752+00:00,121,XXXXX,Grimsby,DN36,Fracture at wrist and/or hand level,Yorkshire and The Humber
168,2,2025-03-13 11:18:00.878752+00:00,95,XXXXX,Newbury,RG14,Basal cell carcinoma of skin,South East


## Patient Pseudonymiser Audit (30 random records from 558)

Unnamed: 0,new_value,old_value,patient_id,transform_tag
364,TS8,TS8 9XT,133,"postcode#3 = patient_pseudonymiser.split_pick(postcode#2, split_by=' ', pick_index=0)"
378,LS28,LS28 8AT,152,"postcode#3 = patient_pseudonymiser.split_pick(postcode#2, split_by=' ', pick_index=0)"
472,XXXXX,Nobie Sourdine-Band,78,"patient_name#2 = patient_pseudonymiser.redact(patient_name#1, replace_with='XXXXX')"
497,XXXXX,Noemie Japan-Cheep,70,"patient_name#2 = patient_pseudonymiser.redact(patient_name#1, replace_with='XXXXX')"
375,N9,N9 9QD,111,"postcode#3 = patient_pseudonymiser.split_pick(postcode#2, split_by=' ', pick_index=0)"
125,XXXXX,Dann Electrolyte-Spoonerism,166,"patient_name#2 = patient_pseudonymiser.redact(patient_name#1, replace_with='XXXXX')"
265,East of England,,136,"region#1 = patient_pseudonymiser.translate(city#0, fixture='city_to_region', key_column='city', value_column='region')"
524,W1D,W1D 3PU,96,"postcode#3 = patient_pseudonymiser.split_pick(postcode#2, split_by=' ', pick_index=0)"
404,KT21,KT21 2JG,145,"postcode#3 = patient_pseudonymiser.split_pick(postcode#2, split_by=' ', pick_index=0)"
286,London,,135,"region#1 = patient_pseudonymiser.translate(city#0, fixture='city_to_region', key_column='city', value_column='region')"


# A Pipeline with Memory Sinks and Async Generators

The metastore defines a second pipeline, but instead of sinks to deltalake and the filestore, it has memory sinks. These define a Python async generator which we can await.

In [1]:
import meta
import pandas as pd
pd.set_option('max_colwidth', 400)

metastore = meta.Store.load('./metastore.yml')
meta_instance = metastore.getPipeline('memory_patient_pseudonymiser')
pipeline = meta_instance.infraclass.bootstrap(meta_instance, metastore, {})
pipeline.run()
# pipeline.explain()

async def show(gen):
    async for df in gen:
        display(df)

await show(pipeline.query)
# await show(pipeline.audit_query)


Unnamed: 0,city,patient_id,postcode,disease,region,patient_name
0,London,26,CR8,Stress-related problem,London,XXXXX
1,Dudley,27,B71,Ulcer of lower extremity,West Midlands,XXXXX
2,Norwich,28,NR1,Prolapsing internal haemorrhoids requiring manual reduction,East of England,XXXXX
3,Medway,29,ME17,Prolapsed lumbar intervertebral disc,South East,XXXXX
4,Exeter,30,EX17,Nail-patella syndrome,South West,XXXXX
5,Dudley,31,DY2,Varicose eczema,West Midlands,XXXXX
6,Warrington and Wigan,32,WN5,Paronychia of finger,North West,XXXXX
7,Preston,33,PR1,Candidal vulvovaginitis,North West,XXXXX
8,London,34,W3,Uveitis,London,XXXXX
9,Cardiff,35,CF14,Non-diabetic hyperglycemia,Wales,XXXXX


Unnamed: 0,city,patient_id,postcode,disease,region,patient_name
0,London,26,CR8,Stress-related problem,London,XXXXX
1,Dudley,27,B71,Ulcer of lower extremity,West Midlands,XXXXX
2,Norwich,28,NR1,Prolapsing internal haemorrhoids requiring manual reduction,East of England,XXXXX
3,Medway,29,ME17,Prolapsed lumbar intervertebral disc,South East,XXXXX
4,Exeter,30,EX17,Nail-patella syndrome,South West,XXXXX
5,Dudley,31,DY2,Varicose eczema,West Midlands,XXXXX
6,Warrington and Wigan,32,WN5,Paronychia of finger,North West,XXXXX
7,Preston,33,PR1,Candidal vulvovaginitis,North West,XXXXX
8,London,34,W3,Uveitis,London,XXXXX
9,Cardiff,35,CF14,Non-diabetic hyperglycemia,Wales,XXXXX


Unnamed: 0,city,patient_id,postcode,disease,region,patient_name
0,London,26,CR8,Stress-related problem,London,XXXXX
1,Dudley,27,B71,Ulcer of lower extremity,West Midlands,XXXXX
2,Norwich,28,NR1,Prolapsing internal haemorrhoids requiring manual reduction,East of England,XXXXX
3,Medway,29,ME17,Prolapsed lumbar intervertebral disc,South East,XXXXX
4,Exeter,30,EX17,Nail-patella syndrome,South West,XXXXX
5,Dudley,31,DY2,Varicose eczema,West Midlands,XXXXX
6,Warrington and Wigan,32,WN5,Paronychia of finger,North West,XXXXX
7,Preston,33,PR1,Candidal vulvovaginitis,North West,XXXXX
8,London,34,W3,Uveitis,London,XXXXX
9,Cardiff,35,CF14,Non-diabetic hyperglycemia,Wales,XXXXX


Unnamed: 0,city,patient_id,postcode,disease,region,patient_name
0,London,26,CR8,Stress-related problem,London,XXXXX
1,Dudley,27,B71,Ulcer of lower extremity,West Midlands,XXXXX
2,Norwich,28,NR1,Prolapsing internal haemorrhoids requiring manual reduction,East of England,XXXXX
3,Medway,29,ME17,Prolapsed lumbar intervertebral disc,South East,XXXXX
4,Exeter,30,EX17,Nail-patella syndrome,South West,XXXXX
5,Dudley,31,DY2,Varicose eczema,West Midlands,XXXXX
6,Warrington and Wigan,32,WN5,Paronychia of finger,North West,XXXXX
7,Preston,33,PR1,Candidal vulvovaginitis,North West,XXXXX
8,London,34,W3,Uveitis,London,XXXXX
9,Cardiff,35,CF14,Non-diabetic hyperglycemia,Wales,XXXXX


Unnamed: 0,city,patient_id,postcode,disease,region,patient_name
0,London,26,CR8,Stress-related problem,London,XXXXX
1,Dudley,27,B71,Ulcer of lower extremity,West Midlands,XXXXX
2,Norwich,28,NR1,Prolapsing internal haemorrhoids requiring manual reduction,East of England,XXXXX
3,Medway,29,ME17,Prolapsed lumbar intervertebral disc,South East,XXXXX
4,Exeter,30,EX17,Nail-patella syndrome,South West,XXXXX
5,Dudley,31,DY2,Varicose eczema,West Midlands,XXXXX
6,Warrington and Wigan,32,WN5,Paronychia of finger,North West,XXXXX
7,Preston,33,PR1,Candidal vulvovaginitis,North West,XXXXX
8,London,34,W3,Uveitis,London,XXXXX
9,Cardiff,35,CF14,Non-diabetic hyperglycemia,Wales,XXXXX


: 

: 

: 