### Configurações Pyspark

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

import pandas as pd

def write_to_postgresl(df, tb_name=None, write_mode='None'):
    df.count()
    if tb_name is None:
        raise Exception('Informe o nome da tabela')
    if write_mode is None:
        raise Exception('Informe o mode de escrita: append ou overwrite')
        
    from datetime import datetime
    start_time = datetime.now()
    
    try:
        df.write.jdbc(url, table=tb_name, mode=write_mode, properties=properties)
    except Exception as e:
        print(f'Erro: {e}')
    end_time = datetime.now()
    
    total_time = str(end_time - start_time)
    
    return f'Total time: {total_time} and - Total rows: {df.count()} - Total columns: {len(df.columns)}'



    # Criando a sessão do Spark
spark  = SparkSession.builder \
    .appName("Data Analysis") \
    .config('spark.jars', '/data/IDAF/DATABASECONNECTOR_JAR_FOLDER/postgresql-42.2.18.jar')\
    .config("spark.executor.memory", "8g")\
    .config("spark.driver.memory", "8g")\
    .config("spark.executor.cores", "4")\
    .config("spark.executor.instances","8")\
    .config("spark.sql.shuffle.partitions","96")\
    .config("spark.default.parallelism","96")\
    .getOrCreate()


url = "jdbc:postgresql://localhost:5432/postgres"

properties = {
    "user" : "postgres",
    "password" : "cidacs",
    "driver" : "org.postgresql.Driver"
}

### Lendo dados enriquecidos

In [None]:
df_input = (spark
            .read
            .parquet('/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/basefinal_gest_limp_enriched', header=True)
           )

### Location

In [None]:
observation_concept_list = ['escmae_sinasc_concept', 'estcivmae_sinasc_concept', 'qtdfilvivo_sinasc_aux', 
                            'qtdfilmort_sinasc_aux', 'consprenat_sinasc_concept', 'qtdgestant_sinasc_concept',
                            'marc_pbf_eq_v4_concept'
                           ]

df_observation = (df_input
                  .withColumn('marc_pbf_eq_v4_concept', F.when(F.col('marc_pbf_eq_v4').isNotNull(), 44804295).otherwise(None).cast('int'))
                  .withColumn('qtdgestant_sinasc_concept', F.when(F.col('qtdgestant_sinasc').isNotNull(), 4132561).otherwise(None).cast('int'))
                  .withColumn('escmae_sinasc_concept', F.when(F.col('escmae_sinasc').isNotNull(), 4144287).otherwise(None).cast('int'))
                  .withColumn('estcivmae_sinasc_concept', F.when(F.col('estcivmae_sinasc').isNotNull(), 4053609).otherwise(None).cast('int'))
                  .withColumn('qtdfilvivo_sinasc_aux', F.when(F.col('qtdfilvivo_sinasc').isNotNull(), 45770046).otherwise(None).cast('int'))
                  .withColumn('qtdfilmort_sinasc_aux', F.when(F.col('qtdfilmort_sinasc').isNotNull(), 4090055).otherwise(None).cast('int'))
                  .withColumn('consprenat_sinasc_concept', F.when(F.col('consprenat_sinasc').isNotNull(), 4313474).otherwise(None).cast('int'))
                  .withColumn('observation_concept_id_list', F.split(F.concat_ws(',', *observation_concept_list), ','))
                  .withColumn('observation_concept_id', F.explode('observation_concept_id_list'))
                  .withColumn('observation_date', F.when(F.col('mesprenat_sinasc')<=9, F.add_months(F.col('dtnasc_sinasc'), -(9 - F.col('mesprenat_sinasc').cast('int'))))
                                     .when((F.col('mesprenat_sinasc')==99)|(F.col('mesprenat_sinasc').isNull()), F.lit('2099-12-31'))
                                     .when(F.col('mesprenat_sinasc')>9, F.trunc(F.col('dtnasc_sinasc'), 'month')))
                  .withColumn('observation_datetime', F.lit(None).cast('timestamp'))
                  .withColumn('qtdfilvivo_sinasc_calc', F.col('qtdfilvivo_sinasc').cast('int') + F.col('qtdfilmort_sinasc').cast('int'))
                  .withColumn('qtdfilmort_sinasc_calc', F.col('qtdfilmort_sinasc').cast('int'))
                  .withColumn('consprenat_sinasc_calc', F.col('consprenat_sinasc').cast('int'))
                  .withColumn('qtdgestant_sinasc_calc', F.col('consprenat_sinasc').cast('int'))
                  .withColumn('value_as_number_list', F.split(F.concat_ws(',', 'qtdgestant_sinasc_calc', 'qtdfilvivo_sinasc_calc', 'qtdfilmort_sinasc_calc', 'consprenat_sinasc_calc'), ','))
                  .withColumn('value_as_number', F.explode(F.col('value_as_number_list')))
                  .withColumn('value_as_number', 
                             F.when(F.col('observation_concept_id')==45770046, F.col('qtdfilvivo_sinasc_calc'))
                              .when(F.col('observation_concept_id')==4090055, F.col('qtdfilmort_sinasc_calc'))
                              .when(F.col('observation_concept_id')==4313474, F.col('consprenat_sinasc_calc'))
                              .when(F.col('observation_concept_id')==4132561, F.col('qtdgestant_sinasc_calc'))
                              .otherwise(None))
                  .withColumn('escmae_sinasc_calc', 
                              F.when((F.col('escmae_sinasc_concept')==4144287)&(F.col('escmae_sinasc').isin(0,88,99)), 0)
                              .when((F.col('escmae_sinasc_concept')==4144287)&(F.col('escmae_sinasc')==1), 45877495)
                              .when((F.col('escmae_sinasc_concept')==4144287)&(F.col('escmae_sinasc')==2), 37172863)
                              .when((F.col('escmae_sinasc_concept')==4144287)&(F.col('escmae_sinasc').isin(3,4)), 37172863)
                              .when((F.col('escmae_sinasc_concept')==4144287)&(F.col('escmae_sinasc')==5), 45884461)
                              .otherwise(0)
                             )
                  .withColumn('estcivmae_sinasc_calc', F.when(F.col('estcivmae_sinasc').isin(0,99), 0)
                             .when((F.col('escmae_sinasc_concept')==4053609)&(F.col('estcivmae_sinasc')==1), 45879879)
                              .when((F.col('escmae_sinasc_concept')==4053609)&(F.col('estcivmae_sinasc')==2), 45876756)
                              .when((F.col('escmae_sinasc_concept')==4053609)&(F.col('estcivmae_sinasc')==3), 45883711)
                              .when((F.col('escmae_sinasc_concept')==4053609)&(F.col('estcivmae_sinasc')==4), 1620675)
                              .when((F.col('escmae_sinasc_concept')==4053609)&(F.col('estcivmae_sinasc')==5), 1620470)
                              .when((F.col('escmae_sinasc_concept')==4053609)&(F.col('estcivmae_sinasc')==88), 4052929)
                              .otherwise(0))
                  .withColumn('value_as_concept_id_list', F.split(F.concat_ws(',', 'escmae_sinasc_calc', 'estcivmae_sinasc_calc'), ','))
                  .withColumn('value_as_concept_id', F.explode('value_as_concept_id_list'))
                  .withColumn('observation_id', F.row_number().over(Window.orderBy('person_id', 'dtnasc_sinasc')))
                  .withColumn('observation_type_concept_id', F.lit(32879))
                  .withColumn('value_as_string', F.when((F.col('observation_concept_id')==44804295)&(F.col('marc_pbf_eq_v4')==0), F.lit(None))
                                                  .when((F.col('observation_concept_id')==44804295)&(F.col('marc_pbf_eq_v4')==1), F.lit(4188539))
                                                  .when((F.col('observation_concept_id')==44804295)&(F.col('marc_pbf_eq_v4')==2), F.lit(4188540))
                                                  .when((F.col('observation_concept_id')==44804295)&(F.col('marc_pbf_eq_v4')==88), F.lit(4118638))
                                                  .when((F.col('observation_concept_id')==44804295)&(F.col('marc_pbf_eq_v4')==99), F.lit(4127662))
                                                  .otherwise(None)
                             )
                  .withColumn('qualifier_concept_id', F.lit(None))
                  .withColumn('unit_concept_id', F.lit(None))
                  .withColumn('provider_id', F.lit(None))
                  .withColumn('visit_occurrence_id', F.col('visit_occurrence_id'))
                  .withColumn('visit_detail_id', F.lit(None))
                  .withColumn('observation_source_value', F.lit(None))
                  .withColumn('observation_source_concept_id', F.lit(None))
                  .withColumn('unit_source_value', F.lit(None))
                  .withColumn('qualifier_source_value', F.lit(None))
                  .withColumn('value_source_value', F.lit(None))
                  .withColumn('observation_event_id', F.lit(None))
                  .withColumn('obs_event_field_concept_id', F.lit(None))
                  .filter(F.col('observation_concept_id').isNotNull())
                             ).select(
                                 F.col('observation_id').cast('integer'),
                                 F.col('person_id').cast('integer'),
                                 F.col('observation_concept_id').cast('integer'),
                                 F.col('observation_date').cast('date'),
                                 F.col('observation_datetime').cast('timestamp'),
                                 F.col('observation_type_concept_id').cast('integer'),
                                 F.col('value_as_number').cast('float'),
                                 F.col('value_as_string').cast('string'),
                                 F.col('value_as_concept_id').cast('integer'),
                                 F.col('qualifier_concept_id').cast('integer'),
                                 F.col('unit_concept_id').cast('integer'),
                                 F.col('provider_id').cast('integer'),
                                 F.col('visit_occurrence_id').cast('integer'),
                                 F.col('visit_detail_id').cast('integer'),
                                 F.col('observation_source_value').cast('string'),
                                 F.col('observation_source_concept_id').cast('integer'),
                                 F.col('unit_source_value').cast('string'),
                                 F.col('qualifier_source_value').cast('string'),
                                 F.col('value_source_value').cast('string'),
                                 F.col('observation_event_id').cast('integer'),
                                 F.col('obs_event_field_concept_id').cast('integer')
                            ).distinct()
           
# df_observation.count()

### Salvando no Postgresql

In [None]:
# Ler parquet e salvar em 6 arquivos
# Escrever no banco um de cada vez

In [None]:
write_to_postgresl(df_observation, tb_name='omop.observation', write_mode='append')

In [None]:
# Total de rows no df_observation: 759872794
df_observation.write.parquet('/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation')

## Deprecated

## SQL de insertion

In [None]:
# CREATE TABLE public.drug_exposure (
# 			drug_exposure_id integer NOT NULL,
# 			person_id integer NOT NULL,
# 			drug_concept_id integer NOT NULL,
# 			drug_exposure_start_date date NOT NULL,
# 			drug_exposure_start_datetime TIMESTAMP NULL,
# 			drug_exposure_end_date date NOT NULL,
# 			drug_exposure_end_datetime TIMESTAMP NULL,
# 			verbatim_end_date date NULL,
# 			drug_type_concept_id integer NOT NULL,
# 			stop_reason varchar(20) NULL,
# 			refills integer NULL,
# 			quantity NUMERIC NULL,
# 			days_supply integer NULL,
# 			sig TEXT NULL,
# 			route_concept_id integer NULL,
# 			lot_number varchar(50) NULL,
# 			provider_id integer NULL,
# 			visit_occurrence_id integer NULL,
# 			visit_detail_id integer NULL,
# 			drug_source_value varchar(50) NULL,
# 			drug_source_concept_id integer NULL,
# 			route_source_value varchar(50) NULL,
# 			dose_unit_source_value varchar(50) NULL );



# CREATE TABLE public.drug_exposure_pyspark (
# 			drug_exposure_id varchar,
# 			person_id varchar,
# 			drug_concept_id varchar,
# 			drug_exposure_start_date varchar,
# 			drug_exposure_start_datetime varchar,
# 			drug_exposure_end_date varchar,
# 			drug_exposure_end_datetime varchar,
# 			verbatim_end_date varchar,
# 			drug_type_concept_id varchar,
# 			stop_reason varchar,
# 			refills varchar,
# 			quantity varchar,
# 			days_supply varchar,
# 			sig varchar,
# 			route_concept_id varchar,
# 			lot_number varchar,
# 			provider_id varchar,
# 			visit_occurrence_id varchar,
# 			visit_detail_id varchar,
# 			drug_source_value varchar,
# 			drug_source_concept_id varchar,
# 			route_source_value varchar,
# 			dose_unit_source_value varchar)


# insert into public.drug_exposure (
# drug_exposure_id,
# person_id,
# drug_concept_id,
# drug_exposure_start_date,
# drug_exposure_start_datetime,
# drug_exposure_end_date,
# drug_exposure_end_datetime,
# verbatim_end_date,
# drug_type_concept_id,
# stop_reason,
# refills,
# quantity,
# days_supply,
# sig,
# route_concept_id,
# lot_number,
# provider_id,
# visit_occurrence_id,
# visit_detail_id,
# drug_source_value,
# drug_source_concept_id,
# route_source_value,
# dose_unit_source_value
# )
# SELECT 
# cast(case when drug_exposure_id='' then null else drug_exposure_id end as integer),
# cast(case when person_id='' then null else person_id end as integer),
# cast(case when drug_concept_id='' then null else drug_concept_id end as integer),
# cast(case when drug_exposure_start_date='' then null else drug_exposure_start_date end as date),
# cast(case when drug_exposure_start_datetime='' then null else drug_exposure_start_datetime end as timestamp),
# cast(case when drug_exposure_end_date='' then null else drug_exposure_end_date end as date),
# cast(case when drug_exposure_end_datetime='' then null else drug_exposure_end_datetime end as timestamp),
# cast(case when verbatim_end_date='' then null else verbatim_end_date end as timestamp),
# cast(case when drug_type_concept_id='' then null else drug_type_concept_id end as integer),
# stop_reason,
# cast(case when refills='' then null else refills end as integer),
# cast(case when quantity='' then null else quantity end as decimal),
# cast(case when days_supply='' then null else days_supply end as integer),
# sig,
# cast(case when route_concept_id='' then null else route_concept_id end as integer),
# cast(case when lot_number='' then null else lot_number end as integer),
# cast(case when provider_id='' then null else provider_id end as integer),
# cast(case when visit_occurrence_id='' then null else visit_occurrence_id end as integer),
# cast(case when visit_detail_id='' then null else visit_detail_id end as integer),
# drug_source_value,
# cast(case when drug_source_concept_id='' then null else drug_source_concept_id end as integer),
# route_source_value,
# dose_unit_source_value
# 	FROM public.drug_exposure_pyspark;


In [4]:
df = spark.read.parquet('/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation')

                                                                                

In [7]:
df.coalesce(8).write.parquet('/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation_batch')

                                                                                

In [9]:
import os 

In [12]:
path = '/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation_batch'

files = os.listdir(path)

In [18]:
files_to_upload = [f'{path}/{f}' for f in files if f.endswith('.parquet')]
files_to_upload

['/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation_batch/part-00006-26dd4e87-eb42-4761-83b5-0f24c2481101-c000.snappy.parquet',
 '/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation_batch/part-00005-26dd4e87-eb42-4761-83b5-0f24c2481101-c000.snappy.parquet',
 '/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation_batch/part-00003-26dd4e87-eb42-4761-83b5-0f24c2481101-c000.snappy.parquet',
 '/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation_batch/part-00002-26dd4e87-eb42-4761-83b5-0f24c2481101-c000.snappy.parquet',
 '/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation_batch/part-00007-26dd4e87-eb42-4761-83b5-0f24c2481101-c000.snappy.parquet',
 '/data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation_batch/part-00000-26dd4e87-eb42-4761-83b5-0f24c2481101-c000.snappy.parquet',
 '/data/ID

In [None]:
for f in files_to_upload:
    print(f'Writing {f}')
    write_to_postgresl(spark.read.parquet(f), tb_name='omop.observation', write_mode='append')
    print(f'Written {f}')

Writing /data/IDAF/PROJETOS/PARCERIA_CIDACS_PHDC/omop_scripts_base_16mi_karine/csv/observation_batch/part-00006-26dd4e87-eb42-4761-83b5-0f24c2481101-c000.snappy.parquet


25/06/05 09:26:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.