In [20]:
from pyspark.sql import *
from pyspark.sql.types import *
from time import time
from GetPath import *
import shutil
spark = SparkSession.builder.appName('B2_Create').config('spark.dynamicAllocation.enabled','true').config('spark.debug.maxToStringFields', '100').config('spark.sql.execution.arrow.enable','true').config('spark.executor.memory','48g').config('spark.driver.memory', '48g').config('spark.core.connection.ack.wait.timeout','36000s').config('spark.executor.heartbeatInterval','36000s').config('spark.network.timeout', '50000s').config('spark.rpc.lookupTimeout', '5000s').config('spark.shuffle.io.connectionTimeout', '50000s').getOrCreate()

In [21]:
# Config
input_path='D:/Output/chenbingying/B1/'
output_path='D:/Output/chenbingying/B2/'
A2_path='D:/Output/chenbingying/A2.csv'

In [22]:
#定义判断是否广州的udf函数
def Is_GuangZhou(x):
    if(x is not None):
        for i in ['广州','本市','越秀','海珠','荔湾','天河','白云','黄埔','南沙','番禺','花都','增城','从化','东山','芳村','萝岗']:
            if i in x:
                return '广州市'
    return None

In [23]:
#定义区域字段udf
def DISTRICT(x):
    if(x is not None):
        for i in [['芳村区','荔湾区'],['花县','花都区'],['东山区','越秀区'],['萝岗区','黄埔区']]:
            x=x.replace(i[0],i[1])   
        for i in ['荔湾','越秀','海珠','天河','白云','黄埔','番禺','花都','南沙','从化','增城']:
            if i in x:
                return i+'区'
    return None

In [24]:
#定义众数原则udf
def return_mode(list1):
    try:
        if list1 is not None: 
            if len(list1)==1:
                return list1[0]
            b = {}
            for i in list1:
                if i in b.keys():
                    b[i] += 1
                else:
                    b[i] = 1
            c = sorted(b.items(), key=lambda x:x[1], reverse=True)
            return c[0][0]
        return None
    except:
        return None

In [25]:
def Create_B2(input_path,output_path,A2_path):
    t_start=time()
    filelist=GetPath(input_path,'tb_cis_op_visiting_record')
    # 注册函数
    spark.udf.register("Is_GuangZhou",Is_GuangZhou,StringType())
    spark.udf.register("DISTRICT",DISTRICT,StringType())
    spark.udf.register("return_mode",return_mode,StringType()) 
    # A2表预处理
    print('开始处理：A2表')
    start=time()
    spark.read.option("inferSchema","true").option("header","true").csv(A2_path).createOrReplaceTempView("A2_Raw")
    # 取众数，确保患者信息表机构号加档案号唯一
    spark.sql("""
    select MEDICAL_INSTITUTION_CODE
    ,FILE_NUMBER 
    ,return_mode(collect_list(CARD_NUMBER)) as CARD_NUMBER
    ,return_mode(collect_list(CARD_TYPE)) as CARD_TYPE
    ,return_mode(collect_list(cast(BIRTH_DATE as string))) as BIRTH_DATE
    ,return_mode(collect_list(GENDER)) as GENDER
    ,return_mode(collect_list(CITY)) as CITY
    ,return_mode(collect_list(DISTRICT)) as DISTRICT
    from A2_Raw group by MEDICAL_INSTITUTION_CODE,FILE_NUMBER
    """).createOrReplaceTempView("A2_result")
    # 取众数，确保患者信息表身份证号唯一
    spark.sql("""
    select ID_NUMBER
    ,return_mode(collect_list(CARD_NUMBER)) as CARD_NUMBER
    ,return_mode(collect_list(CARD_TYPE)) as CARD_TYPE
    ,return_mode(collect_list(cast(BIRTH_DATE as string))) as BIRTH_DATE
    ,return_mode(collect_list(GENDER)) as GENDER
    ,return_mode(collect_list(CITY)) as CITY
    ,return_mode(collect_list(DISTRICT)) as DISTRICT
    from A2_Raw group by ID_NUMBER
    """).createOrReplaceTempView("A2_result_sfz")
    print('A2表预处理完毕\n')
    print('开始处理：B2表')
    # B2表匹配
    for file in filelist:
        start=time()
        print('开始处理',file.split('/')[-1].split('.csv')[0])
        spark.read.option("inferSchema","true").option("header","true").csv(file).createOrReplaceTempView("B2_Raw")
        print('读入数据耗时：%.2f秒'%(time()-start))
        # B2表年龄、性别、是否广州字段补全，创建临时表B2_1
        start=time()
########################################################################        
        spark.sql("""
        select
        MEDICAL_INSTITUTION_CODE
        ,FILE_NUMBER
        ,ID_NUMBER
        ,CARD_NUMBER
        ,CARD_TYPE
        ,AGE
        ,GENDER 
        ,BIRTH_DATE
        ,case when VISITING_DATE is null then DATE_OF_DIAGNOSIS else VISITING_DATE end as VISITING_DATE
        ,DISEASE_DIAGNOSIS_CODE
        ,VISITING_TYPE_CODE
        ,VISITING_TYPE_NAME
        ,ICD_CODE
        ,AREA
        ,id_birth_date
        ,id_gender
        from B2_Raw
        """).createOrReplaceTempView("B2_0")
    
        spark.sql("""
        select 
        MEDICAL_INSTITUTION_CODE
        ,FILE_NUMBER
        ,ID_NUMBER
        ,CARD_NUMBER
        ,CARD_TYPE
        ,case when id_birth_date is not null and VISITING_DATE is not null then year(VISITING_DATE)-year(id_birth_date) 
              when id_birth_date is null and AGE is not null then AGE
              when AGE is null and id_birth_date is null and BIRTH_DATE is not null and VISITING_DATE is not null then year(VISITING_DATE)-year(BIRTH_DATE)
             else AGE end as AGE
        ,case when id_gender is null and GENDER is not null then GENDER else id_gender end as GENDER
        ,VISITING_DATE
        ,DISEASE_DIAGNOSIS_CODE
        ,VISITING_TYPE_CODE
        ,VISITING_TYPE_NAME
        ,ICD_CODE
        ,Is_GuangZhou(AREA) as CITY
        ,DISTRICT(AREA) as DISTRICT 
        from B2_0
        """).createOrReplaceTempView("B2_1")
#########################################################################        
#         spark.sql("""
#         select MEDICAL_INSTITUTION_CODE
#         ,FILE_NUMBER
#         ,ID_NUMBER
#         ,CARD_NUMBER
#         ,CARD_TYPE
#        ,case when id_birth_date is not null and VISITING_DATE is not null then year(VISITING_DATE)-year(id_birth_date) 
#               when id_birth_date is null and AGE is not null then AGE
#               when AGE is null and id_birth_date is null and BIRTH_DATE is not null and VISITING_DATE is not null then year(VISITING_DATE)-year(BIRTH_DATE)
#              else AGE end as AGE
#         ,case when id_gender is null and GENDER is not null then GENDER else id_gender end as GENDER
#         ,case when VISITING_DATE is null then DATE_OF_DIAGNOSIS else VISITING_DATE end as VISITING_DATE
#         ,DISEASE_DIAGNOSIS_CODE
#         ,VISITING_TYPE_CODE
#         ,VISITING_TYPE_NAME
#         ,ICD_CODE
#         ,Is_GuangZhou(AREA) as CITY
#         ,DISTRICT(AREA) as DISTRICT 
#         from B2_Raw
#         """).createOrReplaceTempView("B2_1")
        # B2表和A2表根据档案号和机构号匹配，创建临时表 match_01 ,flag字段为1表示匹配到，为0表示未匹配
        spark.sql("""
        select b.MEDICAL_INSTITUTION_CODE
        ,b.FILE_NUMBER
        ,b.ID_NUMBER
        ,b.CARD_NUMBER
        ,b.CARD_TYPE
        ,case when b.AGE is null and a.BIRTH_DATE is not null and b.VISITING_DATE is not null then year(b.VISITING_DATE)-year(a.BIRTH_DATE) else b.AGE end as AGE
        ,case when b.GENDER is null then a.GENDER else b.GENDER end as GENDER
        ,b.VISITING_DATE
        ,b.DISEASE_DIAGNOSIS_CODE
        ,b.VISITING_TYPE_CODE
        ,b.VISITING_TYPE_NAME 
        ,b.ICD_CODE
        ,case when b.CITY is not null then b.CITY else a.CITY end as CITY
        ,case when b.DISTRICT is not null then b.DISTRICT else a.DISTRICT end as DISTRICT
        ,case when a.MEDICAL_INSTITUTION_CODE is not null and a.FILE_NUMBER is not null then 1 else 0 end as flag
        from B2_1 b left join A2_result a 
        on b.MEDICAL_INSTITUTION_CODE=a.MEDICAL_INSTITUTION_CODE and b.FILE_NUMBER=a.FILE_NUMBER
        """).createOrReplaceTempView("match_01")
        # 使用身份证匹配
        spark.sql("""
        select 
        b.MEDICAL_INSTITUTION_CODE
        ,b.FILE_NUMBER
        ,b.ID_NUMBER
        ,b.CARD_NUMBER
        ,b.CARD_TYPE
        ,case when b.AGE is null and a.BIRTH_DATE is not null and b.VISITING_DATE is not null then year(b.VISITING_DATE)-year(a.BIRTH_DATE) else b.AGE end as AGE
        ,case when b.GENDER is null then a.GENDER else b.GENDER end as GENDER 
        ,b.VISITING_DATE
        ,b.DISEASE_DIAGNOSIS_CODE
        ,b.VISITING_TYPE_CODE
        ,b.VISITING_TYPE_NAME 
        ,b.ICD_CODE
        ,case when b.CITY is not null then b.CITY else a.CITY end as CITY
        ,case when b.DISTRICT is not null then b.DISTRICT else a.DISTRICT end as DISTRICT
        from (select * from match_01 where flag=0) b left join A2_result_sfz a on 
        b.ID_NUMBER=a.ID_NUMBER
        """).createOrReplaceTempView("match_02")
        # 合并结果表
        spark.sql("""
        select 
        MEDICAL_INSTITUTION_CODE
        ,FILE_NUMBER
        ,ID_NUMBER
        ,CARD_NUMBER
        ,CARD_TYPE
        ,AGE
        ,GENDER
        ,VISITING_DATE
        ,DISEASE_DIAGNOSIS_CODE
        ,VISITING_TYPE_CODE
        ,VISITING_TYPE_NAME
        ,ICD_CODE
        ,CITY
        ,DISTRICT
        from match_01 where flag=1
        union all
        select * from match_02
        """).coalesce(1).write.csv(output_path+file.split('/')[-1].split('.csv')[0],mode='overwrite',header=True)
        shutil.move(GetPath(output_path+file.split('/')[-1].split('.csv')[0]+'/','csv'),output_path+file.split('/')[-1].split('.csv')[0]+'.csv')
        shutil.rmtree(output_path+file.split('/')[-1].split('.csv')[0])
        print('处理数据耗时：%.2f秒'%(time()-start))
        print('已输出至：'+output_path+file.split('/')[-1].split('.csv')[0]+'.csv')
        # 释放资源
        spark.catalog.dropTempView("visiting_B2")
        spark.catalog.dropTempView("visiting_B2_1")
        spark.catalog.dropTempView("match_01")
        spark.catalog.dropTempView("match_02")
    spark.catalog.dropTempView("A2_Raw")
    spark.catalog.dropTempView("A2_result")
    spark.catalog.dropTempView("A2_result_sfz")
    print('总耗时：%.2f秒'%(time()-t_start))

In [26]:
if __name__=="__main__":
    Create_B2(input_path,output_path,A2_path)

开始处理：A2表
A2表预处理完毕

开始处理：B2表
开始处理 tb_cis_op_visiting_record_201401_201806
读入数据耗时：94.11秒
处理数据耗时：1824.66秒
已输出至：D:/Output/zhuhao/B2/tb_cis_op_visiting_record_201401_201806.csv
开始处理 tb_cis_op_visiting_record_201807_201912
读入数据耗时：66.66秒
处理数据耗时：1490.82秒
已输出至：D:/Output/zhuhao/B2/tb_cis_op_visiting_record_201807_201912.csv
开始处理 tb_cis_op_visiting_record_202001_202012
读入数据耗时：88.35秒
处理数据耗时：1702.06秒
已输出至：D:/Output/zhuhao/B2/tb_cis_op_visiting_record_202001_202012.csv
开始处理 tb_cis_op_visiting_record_202101_202112
读入数据耗时：82.84秒
处理数据耗时：1590.23秒
已输出至：D:/Output/zhuhao/B2/tb_cis_op_visiting_record_202101_202112.csv
开始处理 tb_cis_op_visiting_record_202201_202205
读入数据耗时：44.64秒
处理数据耗时：1113.94秒
已输出至：D:/Output/zhuhao/B2/tb_cis_op_visiting_record_202201_202205.csv
总耗时：8251.29秒
