In [1]:
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import HiveContext,SparkSession
import pandas as pd
import datetime


appname = "Test Application"
master = "spark://master:7077"
conf = SparkConf().setAppName(appname).setMaster(master).set('spark.driver.maxResultSize', '8g') # maxResultSize work提交给dravel节点的最大数据

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
hive_cont = HiveContext(spark)
spark.sql("set spark.sql.execution.arrow.enabled=true")

DataFrame[key: string, value: string]

## SQL获取HIVE数据库

### 需从头遍历一次数据，以获取 overpass_abute 积水点属性表
### 广播变量不支持遍历修改，累加器无法生成对应的表格

#### 从所有数据中 去重生成 S_NO 列表 (为方便，将数据写入 Hive rcxljjs.t_jishui_sno 表中)

In [2]:
# 从所有数据中 去重生成 S_NO 列表 (为方便，将数据写入 Hive rcxljjs.t_jishui_sno 表中)
start_time = '2013-01-01 00:00:00'
end_time = '2022-01-01 00:00:00'

overpass_sql = open(file='./sql/overpass.sql')
list_text = overpass_sql.readlines()
overpass_sql.close()
sql_text = " ".join(list_text)
sql_text = sql_text.format(start_time=start_time, end_time=end_time)
# Hive 拿数据
hive_data = spark.sql(sql_text)

# 唯一的S_NO列表，表示所有的下立交积水点ID
list_no = hive_data.select('S_NO').dropDuplicates(['S_NO'])
list_no.write.format("hive").mode("overwrite").saveAsTable('rcxljjs.t_jishui_sno')

# 唯一的S_NO属性表
overpass_abute = hive_data.select('S_NO','T_SYSTIME', 'S_HASMONITOR', 'S_STATENAME', 'S_ADDR','S_BUILDDATE',
                                  'S_PROUNIT', 'S_MANAGE_UNIT','S_MAINTAIN_UNIT', 'S_STATIONID', 'S_STATIONNAME')
overpass_abute = overpass_abute.orderBy(['S_NO','T_SYSTIME'],ascending=[0,0]).dropDuplicates(['S_NO'])

# overpass_abute.write.format("hive").mode("overwrite").saveAsTable('rcxljjs.t_overpass_abute')

In [None]:
overpass_abute = overpass_abute.cache().toPandas()

#### 按时间、S_NO遍历积水监测数据，生成 hydrops_data 数据表

In [None]:
# 第一步：积水记录表   hydrops_data
# hydrops_data 记录所有积水的开始结束时间，深度与等级
# 从hive 多张表中获取所需数据 -- 已按时间进行排序的数据
years = {'2013':'2014',
         '2014':'2015',
         '2015':'2016',
         '2016':'2017',
         '2017':'2018',
         '2018':'2019',
         '2019':'2020',
         '2020':'2021',
         '2021':'2022'}

# 可用的S_NO名单
list_no = spark.sql('select * from rcxljjs.t_jishui_sno').cache().toPandas()

# 初始化DataFrame，hydrops_data 作为最终的积水记录，可存入Hive
hydrops_data = pd.DataFrame(columns=['S_NO', 'START_TIME', 'END_TIME', 'DEEP', 'JSRANK'])
hydrops_data_index = 0

log= 0  # 标志位，1代表正在积水
start_time = '2010-12-22 00:00:00'
end_time = '2010-12-22 00:00:00'
water_deep = 0.0
jsrank = 0


# 定于分布式 将Spark df 转换为 Pandas df
def _map_to_pandas(rdds):
    return [pd.DataFrame(list(rdds))]
    
def toPandas(df, n_partitions=None):
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
    return df_pand

for start_year,end_year in years.items():
    # 按时间范围截取数据，to_pandas不宜返回太大数据
    start_time = start_year + '-01-01 00:00:00'
    end_time = end_year + '-01-01 00:00:00'
    
    overpass_sql = open(file='./sql/overpass.sql')
    list_text = overpass_sql.readlines()
    overpass_sql.close()
    sql_text = " ".join(list_text)
    sql_text = sql_text.format(start_time=start_time, end_time=end_time)
    # Hive 拿数据
    hive_data = spark.sql(sql_text)
    
    hive_data.cache()
    
#     # 采用自定义优化的方式转换 Pandadf_panddf
#     %timeit toPandas(overpass_data)

    # 采用 Arrow 的方式优化转换
    # spark.sql("set spark.sql.execution.arrow.enabled=true")
    overpass_data = hive_data.toPandas()
    
    # 根据 S_NO 筛选特定ID的积水点
    for s_no in list_no.itertuples(index=True):
        s_no = s_no.S_NO
        JSD_value = overpass_data[overpass_data['S_NO'] == s_no][['S_NO', 'T_SYSTIME', 'N_VALUE']]
        JSD_value['T_SYSTIME'] = pd.to_datetime(JSD_value['T_SYSTIME'])
        JSD_value.sort_values('T_SYSTIME', inplace=True)
        
        # 对每一行数据进行遍历，判断其积水状态并记录
        for value in JSD_value.itertuples(index=True):
            if value.N_VALUE >= 10 and log == 1:  # 正在积水并还在积水
                water_deep = max(water_deep, value.N_VALUE)
            if value.N_VALUE < 10 and log == 1:  # 正在积水但在此时退出积水
                if start_time == '2010-12-22 00:00:00': start_time = value.T_SYSTIME
                end_time = value.T_SYSTIME
                if 10 <= water_deep < 15:  # 判断积水深度等级
                    jsrank = 1
                elif 15 <= water_deep < 25:
                    jsrank = 2
                elif 25 <= water_deep < 50:
                    jsrank = 3
                elif water_deep >= 50:
                    jsrank = 4
                hydrops_data.loc[hydrops_data_index] = [value.S_NO, start_time, end_time, water_deep, jsrank]
                log, water_deep = 0, 0.0
                hydrops_data_index += 1
            if value.N_VALUE >= 10 and log == 0:  # 不积水，此时开始积水
                start_time = value.T_SYSTIME
                log = 1
                water_deep = max(water_deep, value.N_VALUE)
            if value.N_VALUE < 10 and log == 0: continue  # 不积水
    
# 将 hydrops_data 保存至Hive数据库 rcxljjs.t_hydrops_data


hydrops_data_value = hydrops_data.values.tolist()
# Pandas df 中的时间格式为Timestamp，需转化为spark中的 datatime
for i in hydrops_data_value:
    i[1] = i[1].to_pydatetime()
    i[2] = i[2].to_pydatetime()
hydrops_data_columns = list(hydrops_data.columns)

hydrops_spark = spark.createDataFrame(hydrops_data_value, hydrops_data_columns)
hydrops_spark.write.format("hive").mode("overwrite").saveAsTable('rcxljjs.t_hydrops_data')

#### 创建属性表 overpass_abute，记录最大深度，平均深度与积水次数

In [21]:
# 从所有数据中 去重生成 S_NO 列表 (为方便，将数据写入 Hive rcxljjs.t_jishui_sno 表中)

overpass_abute = spark.sql('select * from rcxljjs.t_overpass_abute').cache().toPandas()


hydrops_deep = hydrops_data[['S_NO', 'DEEP']]
# 统计 积水平均深度
deep_mean = hydrops_deep.groupby('S_NO').mean()
# 统计 积水最大深度
deep_max = hydrops_deep.groupby('S_NO').max()

# 索引 积水点的S_NO
deep_SNO = hydrops_deep['S_NO']
deep_SNO_list = list(deep_SNO)
overpass_abute[['MEAN_DEEP', 'MAX_DEEP', 'FREQU']] = ''
for index in deep_SNO:
    # 记录平均深度
    overpass_abute.loc[index, 'MEAN_DEEP'] = deep_mean.loc[index, 'DEEP']
    overpass_abute.loc[index, 'MAX_DEEP'] = deep_max.loc[index, 'DEEP']
    overpass_abute.loc[index, 'FREQU'] = deep_SNO_list.count(index)