In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
from pyspark.sql.window import Window
import time
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
       .master("local[*]") \
       .appName("test") \
       .config("spark.driver.memory", "80g")\
       .config("spark.executor.memory", "50g")\
       .config("spark.dirver.maxResultSize", "50g")\
       .config('spark.local.dir', '/dados10t/datalake/raw/checkpoint_dir')\
       .getOrCreate()
sc = spark.sparkContext
sc.setCheckpointDir('/dados10t/datalake/raw/checkpoint_dir')

# Criando condition_occurrence

Todos os testes de covid com resultados positivos resultam em um registro na tabela condition_occurrence.

Esse script lê os dados da tabela de medida para verificar os testes positivos.
Os dados da tabela de condition_occurrence são lidos para saber qual o maior condition_occurrence_id.

Os novos registros extraídos da tabela de medidas precisam ter o id maior que o maior condition_occurrence_id.

#### measurement

In [3]:
measurement_df = spark.read.parquet("/dados10t/datalake/standard/omop/tabelas_omop_cmd_5.4_v1/tb_measurement_data.parquet")

In [14]:
measurement_df.printSchema()

root
 |-- visit_occurrence_id: integer (nullable = true)
 |-- measurement_date: date (nullable = true)
 |-- measurement_type_concept_id: integer (nullable = true)
 |-- measurement_concept_id: integer (nullable = true)
 |-- value_as_concept_id: integer (nullable = true)
 |-- person_id: integer (nullable = true)
 |-- measurement_id: integer (nullable = true)



#### condition

In [4]:
condition_df = spark.read.parquet("/dados10t/datalake/standard/omop/tabelas_omop_cmd_5.4_v1/tb_condition_occurence_data.parquet/")

In [5]:
last_id_condition = condition_df.select(F.max('condition_occurrence_id')).collect()[0][0]

In [13]:
condition_df.printSchema()

root
 |-- visit_occurrence_id: integer (nullable = true)
 |-- condition_start_date: date (nullable = true)
 |-- condition_concept_id: integer (nullable = true)
 |-- person_id: integer (nullable = true)
 |-- condition_type_concept_id: integer (nullable = true)
 |-- condition_occurrence_id: integer (nullable = true)



In [25]:
condition_df.select('condition_concept_id').show()

+--------------------+
|condition_concept_id|
+--------------------+
|              201820|
|              254761|
|              378253|
|              201820|
|              254761|
|              259153|
|              259153|
|              378253|
|             4215968|
|             4215968|
|              321588|
|             4276172|
|             4276172|
|              259153|
|             4215968|
|             4276172|
|             4215968|
|             4215968|
|              321588|
|             4276172|
+--------------------+
only showing top 20 rows



### filter positive tests

In [6]:
positive_measurement = measurement_df.filter(F.col('value_as_concept_id')== 45884084)

In [7]:
positive_measurement = positive_measurement.withColumnRenamed('measurement_date', 'condition_start_date')

In [8]:
positive_measurement = positive_measurement\
.drop('value_as_concept_id')\
.drop('measurement_id')\
.drop('measurement_type_concept_id')\
.drop('measurement_concept_id')

In [9]:
positive_measurement = positive_measurement.withColumn('condition_concept_id', F.lit(37311061))

In [10]:
# EHR
positive_measurement = positive_measurement.withColumn('condition_type_concept_id', F.lit(32817))

In [12]:
positive_measurement.printSchema()

root
 |-- visit_occurrence_id: integer (nullable = true)
 |-- condition_start_date: date (nullable = true)
 |-- person_id: integer (nullable = true)
 |-- condition_concept_id: integer (nullable = false)
 |-- condition_type_concept_id: integer (nullable = false)



In [11]:
new_measurement = positive_measurement.rdd.zipWithIndex().toDF().select(F.col("_1.*"),F.col("_2").alias('condition_occurrence_id'))

In [13]:
new_measurement.printSchema()

root
 |-- visit_occurrence_id: long (nullable = true)
 |-- condition_start_date: date (nullable = true)
 |-- person_id: long (nullable = true)
 |-- condition_concept_id: long (nullable = true)
 |-- condition_type_concept_id: long (nullable = true)
 |-- condition_occurrence_id: long (nullable = true)



In [31]:
new_measurement.select(F.min('condition_occurrence_id')).show()

+----------------------------+
|min(condition_occurrence_id)|
+----------------------------+
|                           0|
+----------------------------+



#### criando novo id para as condições decorrentes de teste de covid


In [13]:
new_measurement = new_measurement.withColumn('condition_occurrence_id', F.col('condition_occurrence_id')+last_id_condition+1)

##### making sure there's no overlap

In [14]:
condition_df.select('condition_occurrence_id')\
.join(new_measurement.select('condition_occurrence_id'), 'condition_occurrence_id', 'inner')\
.count()

0

##### escrita no arquivo

In [16]:
condition_df.write.parquet('/dados10t/datalake/standard/omop/tabelas_omop_cmd_5.4_v1/tb_condition_occurence_from_measurement.parquet/')