In [2]:
import pyspark
import dxpy
import dxdata

In [3]:
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

In [4]:
dispensed_database_name = dxpy.find_one_data_object(classname="database", name="app*", folder="/", name_mode="glob", describe=True)["describe"]["name"]
dispensed_dataset_id = dxpy.find_one_data_object(typename="Dataset", name="app*.dataset", folder="/", name_mode="glob")["id"]

In [19]:
scripts = dxdata.load_dataset(id=dispensed_dataset_id)["gp_scripts"]
# scripts = dataset["gp_scripts"]
field_names = [f.name for f in scripts.fields]
df = scripts.retrieve_fields(names=field_names, engine=dxdata.connect())
df.show(truncate=False)

+-------+-------------+----------+-------+---------------+----------------+------------------------------------------------------------------------------+-------------------------------------------------------+
|eid    |data_provider|issue_date|read_2 |bnf_code       |dmd_code        |drug_name                                                                     |quantity                                               |
+-------+-------------+----------+-------+---------------+----------------+------------------------------------------------------------------------------+-------------------------------------------------------+
|1825437|3            |2009-02-05|null   |02.08.02.00.00 |null            |Warfarin 1mg tablets                                                          |28 tablet(s) - 1 mg                                    |
|4161024|3            |2015-03-23|null   |03.01.01.03.00 |null            |Bricanyl 500micrograms/dose Turbohaler (AstraZeneca UK Ltd)                   |10

In [20]:
df = df.dropDuplicates(["read_2", "bnf_code", "dmd_code", "drug_name", "quantity"]).drop("eid", "data_provider", "issue_date")

In [21]:
df = df.select('*', df.bnf_code.isNotNull().alias("has_bnf"), df.read_2.isNotNull().alias("has_read_2"), df.dmd_code.isNotNull().alias("has_dmd"))

In [22]:
df.count()

727778

In [23]:
df.show(truncate=False)

+-------+---------------+------------------+--------------------------------------------------------------------------------------------+----------------------+-------+----------+-------+
|read_2 |bnf_code       |dmd_code          |drug_name                                                                                   |quantity              |has_bnf|has_read_2|has_dmd|
+-------+---------------+------------------+--------------------------------------------------------------------------------------------+----------------------+-------+----------+-------+
|null   |0402010ABAAACAC|null              |Quetiapine Fumarate  Tablets  100 mg                                                        |28.000                |true   |false     |false  |
|null   |01.03.05.00.00 |null              |Omeprazole 10mg gastro-resistant capsules                                                   |56 capsule            |true   |false     |false  |
|null   |13.04.02.00.00 |null              |Clobetasone 0.05

In [25]:
df.write.partitionBy("has_bnf","has_read_2", "has_dmd").parquet('/tmp/all_codes.parquet')

In [26]:
!hadoop fs -get /tmp/all_codes.parquet

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/cluster/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/cluster/dnax/jars/dnanexus-api-0.1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
23/01/19 12:21:31 WARN metrics.MetricsReporter: Unable to initialize metrics scraping configurations from hive-site.xml. Message:InputStream cannot be null
23/01/19 12:21:31 WARN service.DNAxApiSvc: Using default configurations. Unable to find dnanexus.conf.location=null
23/01/19 12:21:31 INFO service.DNAxApiSvc: apiserver connection-pool config. MaxPoolSize=10, MaxPoolPerRoute=10,MaxWaitTimeout=60000
23/01/19 12:21:31 INFO service.DNAxApiSvc: initializing http connection manager pools
23/01/19 12:21:31

In [27]:
!du -sh all_codes.parquet

13M	all_codes.parquet


In [None]:
!dx upload -r --path /Prescriptions/ all_codes.parquet 

In [1]:
# Remove EID
# Requires polars
import polars as pl

In [21]:
df = pl.scan_parquet("/mnt/project/Prescriptions/all_codes.parquet/*/*/*/*.snappy.parquet")

In [38]:
df.filter(pl.col('read_2').is_null() & pl.col('bnf_code').is_null() & pl.col('dmd_code').is_null()).select(pl.col('drug_name').unique()).collect()

drug_name
str
"""ACTIVE GLUCOSE..."
"""GLUCOTREND PLU..."
"""Triptorelin P..."
"""Biatain Non-Ad..."
"""Accucheck Soft..."
"""Hydrocolloid D..."
"""Freestyle Lanc..."
"""Yentreve Gast..."
"""Rivaroxaban T..."
"""NOVOPEN 3 PEN ..."


In [31]:
df.columns

['read_2', 'bnf_code', 'dmd_code', 'drug_name', 'quantity']