# Для подсчёта статистики по новым витринам

In [1]:
# parameters coming from airflow
capture_type = 'full' #витрина требует сначала полной прогрузки в режиме "full" а затем можно переключить на "increment"
log_fl = None
ui_enable = 'false'

# project parameters
project_name = 'data_completeness'
#root_path = '/project'
root_path = '/user/mivanovskij'
project_path = root_path + '/' + project_name

In [2]:
# import modules
import pyspark
from pyspark.sql import SQLContext, SparkSession, HiveContext
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from datetime import datetime as dttm
import math
import sys
import os
import pandas as pd
from datetime import datetime, timedelta

# config processing
try:
    sc.stop()
except:
    print('Spark context is not running')

# context config
jar_paths = ['/etc/jdbc/*']
conf = (pyspark.SparkConf()\
            .set('spark.dynamicAllocation.maxExecutors','200')
            .set('spark.driver.extraClassPath', ':'.join(jar_paths))
            .set('spark.executor.extraClassPath', ':'.join(jar_paths))
            .set('spark.jars', ','.join(jar_paths))
            .set('spark.ui.enabled', ui_enable) #Выключить при установке на регламент
            .set('spark.executor.memory','8g')
            .set('spark.driver.memory', '8g')
            .set('spark.scheduler.mode','FAIR')
            .set('spark.driver.maxResultSize', '8g')
            .set('spark.rpc.message.maxSize','256')
            .set('spark.sql.execution.arrow.enabled', 'true')
           )
    
# Config initialization
sc = pyspark.SparkContext(appName=project_name, conf=conf)
sqlContext = SQLContext(sc)
sql = sqlContext.sql

Spark context is not running


In [3]:
# for logging
def log_msg(path, msg, msg_type = 'INFO', process = 'PapermillOperator'):
    msg_dttm = '[' + str(dttm.now())[:-3].replace('.', ',') + ']'
    if path:
        with open(path, 'a') as file:
            print(msg_dttm
                  , '{' + process + '}'
                  , msg_type, '-'
                  , msg
                  , file = file)
    else:
        print(msg_dttm
                  , '{' + process + '}'
                  , msg_type, '-'
                  , msg)

In [4]:
#Функция для вытаскивания индексов из икспасов
def xpath_index(s):
    s = "" if s is None else s
    ix = []
    for pt in s.split("/"):
        a, b = pt.find("["), pt.find("]")
        n = pt[a + 1 : b] if a >= 0 and b >= 0 else 0
        try:
            n1 = int(n)
        except:
            n1 = 0
        ix.append(f"{n1:03d}")
    for i in range(len(ix)):
        if ix[-1] != "000":
            break
        else:
            ix = ix[:-1]
    return str.join("_", ix)


# register for pyspark synthax
udf_xpath_index = F.udf(lambda x: xpath_index(x))

In [5]:
#Hadoop configuration
hadoop = sqlContext._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sqlContext._jsc.hadoopConfiguration())
fs = hadoop.fs.FileSystem
hadoop_conf = hadoop.conf.Configuration()

#Check directory size
def get_hdfs_directory_size(input_path):
    path = hadoop.fs.Path(input_path)
    dir_size = 0
    if hdfs.exists(path) != True:
        return dir_size
    for ffile in fs.get(hadoop_conf).listStatus(path):
        dir_size=dir_size+ffile.getLen()
    return dir_size

#Calculate repartition size
def get_repartition_factor(dir_size, compression_rate):
    try:
        block_size = int(sqlContext._jsc.hadoopConfiguration().get("dfs.blocksize"))
        return math.ceil(dir_size/compression_rate/block_size)
    except Exception as e:
        print('ERROR: ' + str(e))

In [6]:
#Consul utils
import consul
consul_host='srv-bigdata-etl01.mosgorzdrav.local'
consul_port='8500'
consul=consul.Consul(host=consul_host,port=consul_port)
service='vault'

if not len(consul.agent.members())>0:
     raise ValueError('Consul is not found on {consul_host}:{consul_port}'.format(consul_host=consul_host,consul_port=consul_port))
        
def list_consul_services(consul=consul):
    return list(consul.catalog.services()[1].keys())

def get_service_params(service=service,consul=consul):
    id, service_info = consul.catalog.service(service)
    if len(service_info)==0:
        raise ValueError('service {service} is not found'.format(service=service))
    index, nodes = consul.health.service(service, passing=True)
    if len(nodes)==0:
        raise ValueError('No active nodes for service {service}'.format(service=service))
    address = nodes[0]['Service']['Address']
    port = nodes[0]['Service']['Port']
    for x in ['address','port']:
        if eval(x) is None or len(str(eval(x)))==0:
            raise ValueError('Parameter {x} for service {service} not found in Consul'.format(x=x,service=service))
    return address, port

def get_db_params(service=service,consul=consul):
    id, service_info = consul.catalog.service(service)
    if len(service_info)==0:
        raise ValueError('service {service} is not found'.format(service=service))
    index, nodes = consul.health.service(service, passing=True)
    if len(nodes)==0:
        raise ValueError('No active nodes for service {service}'.format(service=service))
    address = nodes[0]['Service']['Address']
    port = nodes[0]['Service']['Port']
    database= nodes[0]['Service']['Meta']['service_database_name']
    conn_string=nodes[0]['Service']['Meta']['service_rfc1738_with_password_connection_string_template']
    for x in ['address','port','database','conn_string']:
        if eval(x) is None or len(str(eval(x)))==0:
            raise ValueError('Parameter {x} for service {service} not found in Consul'.format(x=x,service=service))
    return address, port, database, conn_string

In [7]:
#vault utils
import hvac
from hvac.exceptions import *
vault_service='vault'
vault_address, vault_port = get_service_params(vault_service)
vault_preauth = hvac.Client(url="http://{host}:{port}".format(host=vault_address, port=vault_port),token='')

if 'SECRET_TOKEN' in os.environ.keys():
    app_vault_token = os.environ['SECRET_TOKEN']
else:
    secret_id = os.environ['SECRET_ID']
    role_id = os.environ['ROLE_ID']
    app_vault_token = vault_preauth.auth_approle(secret_id=secret_id, role_id=role_id)
    app_vault_token = app_vault_token['auth']['client_token']    
    
vault = hvac.Client(url="http://{host}:{port}".format(host=vault_address, port=vault_port),token=app_vault_token)

def getSecrets(path,mount_point='datalab',vault=vault):
    secrets = vault.secrets.kv.read_secret_version(path=path, mount_point=mount_point)['data']['data']
    return secrets['username'], secrets['password']

def listVaultServices(path='source/',vault=vault,mount_point='datalab'):
    return [x.rstrip('/') for x in vault.secrets.kv.v2.list_secrets(path=path,mount_point=mount_point)['data']['keys']]

In [8]:
#parameters
log_msg(log_fl, '<<< Loading params >>>')
log_msg(log_fl, 'capture_type: ' + capture_type)
log_msg(log_fl, 'project_path: ' + project_path)
log_msg(log_fl, 'log_fl: ' + str(log_fl))
log_msg(log_fl, 'ui_enable: ' + ui_enable)

[2023-07-19 17:11:20,059] {PapermillOperator} INFO - <<< Loading params >>>
[2023-07-19 17:11:20,059] {PapermillOperator} INFO - capture_type: full
[2023-07-19 17:11:20,059] {PapermillOperator} INFO - project_path: /user/mivanovskij/data_completeness
[2023-07-19 17:11:20,059] {PapermillOperator} INFO - log_fl: None
[2023-07-19 17:11:20,059] {PapermillOperator} INFO - ui_enable: false


In [9]:
#connect to GP
service = 'gp_etl'
address, port, database, conn_string = get_db_params(service)
path = 'source/{service}/prod/reader'.format(service=service)
username, password = getSecrets(path)

In [None]:
proc_df = sqlContext.read.format('parquet').load(project_path + '/mart_final')

In [None]:
#Функции для подсчёта статистики по витрине

from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import col

def cnt_ids(x):
    total = x.count()
    distinct = x.select(countDistinct("ID")).head()[0]
    return print("Total: {}\nDistinct: {}".format(total, distinct))

def cnt_cols(y):
    for column in y.columns:
        print(column + ": " + str(y.filter(col(column).isNotNull()).count()))

In [None]:
#Функции для подсчёта соотношения: кол-во ид по документам
def cnt_table(x):
    cnt = x.groupby("document_class_id").agg(countDistinct("ID"))
    return cnt.show()

def cnt_meta(y):
    cnt = y.groupby("document_class_id").agg(countDistinct("ID"))
    return cnt.show()

In [None]:
cnt_table(proc_df)

In [None]:
cnt_meta(proc_df)

In [None]:
cnt_ids(proc_df)

In [None]:
cnt_cols(proc_df)

In [None]:
#min и max даты в паркете
import pyspark.sql.functions as F

proc_df.agg(F.min("DOCUMENT_CREATED_DATE"), F.max("DOCUMENT_CREATED_DATE")).show()

In [4]:
# project parameters
ui_enable = 'false'
project_name = 'statistics'
#root_path = '/project'
root_path = '/user/mivanovskij'
project_path = root_path + '/' + project_name

In [5]:
# import modules
import pyspark
from pyspark.sql import SQLContext, SparkSession, HiveContext
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from datetime import datetime as dttm
import math
import sys
import os
import pandas as pd
from datetime import datetime, timedelta

# config processing
try:
    sc.stop()
except:
    print('Spark context is not running')

# context config
jar_paths = ['/etc/jdbc/*']
conf = (pyspark.SparkConf()\
            .set('spark.dynamicAllocation.maxExecutors','200')
            .set('spark.driver.extraClassPath', ':'.join(jar_paths))
            .set('spark.executor.extraClassPath', ':'.join(jar_paths))
            .set('spark.jars', ','.join(jar_paths))
            .set('spark.ui.enabled', ui_enable) #Выключить при установке на регламент
            .set('spark.executor.memory','8g')
            .set('spark.driver.memory', '8g')
            .set('spark.scheduler.mode','FAIR')
            .set('spark.driver.maxResultSize', '8g')
            .set('spark.rpc.message.maxSize','256')
            .set('spark.sql.execution.arrow.enabled', 'true')
           )
    
# Config initialization
sc = pyspark.SparkContext(appName=project_name, conf=conf)
sqlContext = SQLContext(sc)
sql = sqlContext.sql

Spark context is not running


## Скрипт для юниона датафреймов с разным кол-вом столбцов

In [7]:
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

df1 = sqlContext.createDataFrame([[0, 1, 2, 3], [None, 4, 5, 6]], ["col0", "col1", "col2", "col3"])
df2 = sqlContext.createDataFrame([[4, 5, 6], [5, 6, 7]], ["col1", "col2", "col3"])

not_in_df1 = set(df2.columns) - set(df1.columns)
for col in not_in_df1:
    df1 = df1.withColumn(col, lit(None).cast(StringType()))

not_in_df2 = set(df1.columns) - set(df2.columns)
for col in not_in_df2:
    df2 = df2.withColumn(col, lit(None).cast(StringType()))

df1.unionByName(df2).distinct().show()

+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|null|   5|   6|   7|
|null|   4|   5|   6|
|   0|   1|   2|   3|
+----+----+----+----+



In [11]:
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

def UnionFunc(a,b):
    not_in_a = set(b.columns) - set(a.columns)
    for col in not_in_a:
        a = a.withColumn(col, lit(None).cast(StringType()))

    not_in_b = set(a.columns) - set(b.columns)
    for col in not_in_b:
        b = b.withColumn(col, lit(None).cast(StringType()))
    
    return a.unionByName(b).distinct()

In [12]:
df1 = sqlContext.createDataFrame([[0, 1, 2, 3], [None, 4, 5, 6]], ["col0", "col1", "col2", "col3"])
df2 = sqlContext.createDataFrame([[4, 5, 6], [5, 6, 7]], ["col1", "col2", "col3"])

In [13]:
union_df = UnionFunc(df1,df2)

In [15]:
union_df.show()

+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|null|   5|   6|   7|
|null|   4|   5|   6|
|   0|   1|   2|   3|
+----+----+----+----+



In [19]:
df12 = sqlContext.createDataFrame([[1, 2, 3], [4, 5, 6]], ["col1", "col2", "col3"])
df22 = sqlContext.createDataFrame([[4, 5, 6], [5, 6, 7]], ["col1", "col2", "col3"])

In [20]:
union_df_1 = UnionFunc(df12,df22)

In [21]:
union_df_1.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   5|   6|   7|
|   4|   5|   6|
+----+----+----+



## Скрипт для проверки полноты данных в витрине

In [1]:
# project parameters
ui_enable = 'false'
project_name = 'check_completeness'
log_fl = None
#root_path = '/project'
root_path = '/user/mivanovskij'
project_path = root_path + '/' + project_name

In [None]:

# read table-config
params_greenplum1 = {
    'user': username,
    'password':password,
    'url':f'jdbc:postgresql://{address}:{port}/{database}',
    'driver':'org.postgresql.Driver',
    'dbtable' : '(select distinct xpath_clear, max_nn_cnt, attr, document_class_id, part_num from common_analytics2.dict_to_dispensary_observation_refuse) as foo'
}

log_msg(log_fl, '<<< Getting dictionary from greenplum >>>')

xpath_dict = sqlContext.read\
                       .format('jdbc')\
                       .options(**params_greenplum1)\
                       .load()
log_msg(log_fl, '<<< Getting dictionary from greenplum ends >>>')

In [4]:
#Подтянем tagvalue
log_msg(log_fl, '<<< Tagvalue loading starts >>>')
tv = sqlContext.read.option("mergeSchema", "true").parquet('/ods/simi_docs/xml/docs_tagvalue')
log_msg(log_fl, '<<< Tagvalue loading done >>>')

[2023-06-01 12:44:59,976] {PapermillOperator} INFO - <<< Tagvalue loading starts >>>
[2023-06-01 12:45:34,661] {PapermillOperator} INFO - <<< Tagvalue loading done >>>


In [18]:
#Подтянем витрину
df_main = sqlContext.read.parquet('/user/MIvanovskij/do_refuse/mart_final') #поменять
df_ccts = df_main.select("document_class_id").where(f"document_class_id is not null").distinct()
df_ccts = df_ccts.rdd.map(lambda x: x.document_class_id).collect()
df_ccts = tuple(set(df_ccts))

In [19]:
df_ccts

('85066', '41062', '96511')

In [12]:
df_ccts = df_ccts.rdd.map(lambda x: x.document_class_id).collect()

In [20]:
#Уберём лишние cct
tv_cut = tv.where(f"document_class_id in {df_ccts}")

In [21]:
df_main.columns

['DOCUMENT_CREATED_DATE',
 'ID',
 'DOCUMENT_CLASS_ID',
 'root_tag',
 'nn1',
 'patient_id',
 'event_id',
 'start_time',
 'uid',
 'mo_type',
 'mo_parent_id',
 'mo_id',
 'mo_name',
 'mu_id',
 'mu_name',
 'mo_municipality',
 'mo_street',
 'mo_house',
 'mo_corp',
 'mo_building',
 'medic_mo',
 'func_value',
 'medic_position',
 'medic_division',
 'composer_name',
 'diagnos_code',
 'diagnos_mkb',
 'refuse_date',
 'refuse_reason',
 'dispensary_group']

In [22]:
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import col

def cnt_cols(y):
    for column in y.columns:
        print(column + ": " + str(y.filter(col(column).isNotNull()).count()))

In [25]:
cnt_cols(df_main)

DOCUMENT_CREATED_DATE: 5860761
ID: 5860761
DOCUMENT_CLASS_ID: 5860761
root_tag: 5860761
nn1: 5860761
patient_id: 5860761
event_id: 5860761
start_time: 5860761
uid: 5860749
mo_type: 5860761
mo_parent_id: 2404761
mo_id: 2404761
mo_name: 2404761
mu_id: 3456000
mu_name: 3456000
mo_municipality: 0
mo_street: 0
mo_house: 0
mo_corp: 0
mo_building: 0
medic_mo: 5860761
func_value: 5860761
medic_position: 5860761
medic_division: 5860761
composer_name: 5860761
diagnos_code: 5860761
diagnos_mkb: 5860761
refuse_date: 5860759
refuse_reason: 5860759
dispensary_group: 5851064


pd Analysis

In [None]:
df = pd.read_csv("C:/Users/User/Downloads/Блок3_Данные.csv")

In [13]:
import os

cwd = os.getcwd()  # Get the current working directory (cwd)
files = os.listdir(cwd)  # Get all the files in that directory
print("Files in %r: %s" % (cwd, files))

Files in '/home/MIvanovskij': ['patronazh_mart', 'despensary', 'ii', 'airflow_process.py', '.profile', 'tumor_monitoring', '.config', 'antropo_mart', 'dispensary_new', 'row_count_onco.ipynb', 'maltur', 'malignant_tumors', 'drugs_mart', '.virtual_documents', 'DN_refuse.ipynb', 'ii.ipynb', '.jupyter', 'diag_new.ipynb', 'lin_reg.ipynb', 'onco_consilium', 'drugs_mart_1', 'write_vault_token (1).ipynb', 'Untitled.ipynb', 'bigdata1381', 'Procedure_end.ipynb', 'oncology.ipynb', 'e-prescriptions', 'tap', '.fonts', '.emacs', 'health_group', 'spark-warehouse', '.cache', 'e-prescriptions_1', 'write_vault_token(2).ipynb', 'tv_eris.ipynb', 'tumor_mon', 'stat.ipynb', 'bad_habits.ipynb', 'main.ipynb', '.viminfo', 'dispanser_adult', 'lab_research', 'refer_hospitalization', 'dispensary_observation_refuse', 'allergic_anamnesis (2).ipynb', 'codes_classifier', 'seaborn-data', 'onco_consilium_1', 'Dn_new.ipynb', 'vaccination', 'tumor_mon_1', 'tap_mart', 'patronazh_qa.ipynb', 'disp_observ.ipynb', 'Untitled2.