In [30]:
import pandas as pd
import numpy as np
import pyarrow as pa
import re
import findspark
findspark.init('/home/dave/spark-2.4.1-bin-hadoop2.7/')
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import *
import sys
import os
from IPython.display import display, HTML
from pyspark.sql.functions import pandas_udf, PandasUDFType
#from pyspark.sql.functions import unix_timestamp
#from pyspark.sql.functions import from_unixtime

#datafile = "/home/dave/Documents/CS691/proj/data/snotel/SNOTEL_518_HEAVENLY_VALLEY"
spark = SparkSession.builder.appName("proj_exp").getOrCreate()

def process_risk(datafile):
    df = spark.read.text(datafile)
    df= df.filter(~ df.value.rlike('#'))
    df= df.filter(~ df.value.rlike('Date'))

    #split DF into Cols: Date,Station Name,Station Id,Snow Water Equivalent (in),Change In Snow Water Equivalent (in),
    #Snow Depth (in),Change In Snow Depth (in),Air Temperature Observed (degF),Barometric Pressure (inch_Hg),Dew Point 
    #Temperature (degF),Wind Direction Observed (degree),Wind Speed Observed (mph)

    split_col = pyspark.sql.functions.split(df['value'], ',')

    df = df.withColumn('Date', split_col.getItem(0))
    df = df.withColumn('Station Name', split_col.getItem(1))
    df = df.withColumn('sid', split_col.getItem(2))
    df = df.withColumn('swe', split_col.getItem(3))
    df = df.withColumn('dswe', split_col.getItem(4))
    df = df.withColumn('sd', split_col.getItem(5))
    df = df.withColumn('dsd', split_col.getItem(6))
    df = df.withColumn('at', split_col.getItem(7))

    #create delta air temp
    w = Window().partitionBy().orderBy(F.col("Date"))
    df = df.select("*", F.lag("at").over(w).alias("prev_at"))
    df = df.withColumn("dat", F.when(F.isnull(df.at - df.prev_at), 0)
                                  .otherwise(df.at - df.prev_at))
    df = df.select('Date', 'Station Name', 'sid', 'swe', 'dswe', 'sd', 'dsd', 'at', 'dat')
    cr = df.select('Date', 'sid')



    #split date and time
    split_col = pyspark.sql.functions.split(df['Date'], ' ')
    df = df.withColumn('date_only', split_col.getItem(0))
    df = df.withColumn('time_only', split_col.getItem(1))
    df = df.select('Date', 'date_only', 'time_only', 'Station Name', 'sid', 'swe', 'dswe', 'sd', 'dsd', 'at', 'dat')
    #sid- station id
    #swe- snow water equivalent
    #dswe- delta swe
    #sd- snow depth
    #dsd- delta sd
    #at- air temp
    #dat- delta air temp

    #make compass rose
    #not currently used- maintaining for future use
    #compass rose
    # array index
    #---0----------1---------2--------3----------4--------5---------6--------7--------
    #---west---northwest---north---northeast---east---southeast---south---southwest---

    def ones(x):
        return [1,1,1,1,1,1,1,1]

    ones_udf_array = F.udf(lambda z: ones(z), ArrayType(IntegerType()))

    #cr = cr.select('*', ones_udf_array('sid').alias('below_treeline'))
    #cr = cr.select('*', ones_udf_array('sid').alias('near_treeline'))
    #cr = cr.select('*', ones_udf_array('sid').alias('above_treeline'))

    def sd(x):
        return [1,1,1,1,1,1,1,1]

    sd_udf_array = F.udf(lambda z: sd(z), ArrayType(IntegerType()))
    #cr = cr.select('*', ones_udf_array('sid').alias('below_treeline'))

    #join input and output tables for processing (remove dup cols)
    df = df.select('Date', 'date_only', 'time_only', 'Station Name', 'swe', 'dswe', 'sd', 'dsd', 'at', 'dat')
    sc = df.join(cr, ["Date"])

    #unix date format timestamp
    sc = sc.withColumn('timestamp',F.col('Date').cast("timestamp"))
    sc = sc.select('timestamp','date_only','time_only','swe', 'dswe', 'dsd').orderBy("timestamp")

    #add row id for later join
    sc = sc.withColumn("row_num", F.row_number().over(Window.orderBy("timestamp")))

    #pandas udf processing

    x_columns = ['time_only', 'dsd', 'row_num']
    schema = sc.select("date_only", *x_columns).schema
    sc = sc.select("date_only", *x_columns)

    def snowfall(x):
        try:
            if int(x) >= 4 :
                return "1"
            elif int(x) >= 2:
                return "0.5"
            else:
                return "0"
        except: 
            return "0"

    #udf to assess av risk from delta snow depth
    # Input/output are both a pandas.DataFrame

    @pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
    def dsd_calc_udf(pdf):
        #sort_by_dsd = pdf.sort_values(by=['dsd'],ascending = False)
        pdf.dsd = pdf.dsd.apply(snowfall)
        return pdf
        #return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)

    #result of avalanche risk calc, need to cumsum across time with 2 day window
    av_dsd = sc.groupby("date_only").apply(dsd_calc_udf)

    ##create delta snow water equiv and process risk vector

    sc = df.join(cr, ["Date"])
    x_columns = ['Date','time_only','swe', 'dswe', 'dsd']
    schema = sc.select("date_only", *x_columns).schema
    sc = sc.select("date_only", *x_columns)

    sc = sc.withColumn("dswe", F.abs(sc["dswe"].cast(DoubleType())))

    #cast date to timesampe and order
    sc = sc.withColumn('timestamp',F.col('Date').cast("timestamp"))
    sc = sc.select('timestamp','date_only','time_only','swe', 'dswe', 'dsd').orderBy("timestamp")

    #add row id for ranging
    sc = sc.withColumn("row_num", F.row_number().over(Window.orderBy("timestamp")))

    #cum sum dswe over window
    window = Window.orderBy("row_num").rangeBetween(-24, 0)
    sc = sc.withColumn('cum_sum_dswe', F.sum('dswe').over(window))

    #cum sum dswe over window
    window = Window.orderBy("row_num").rangeBetween(-48, 0)
    av_dsd = av_dsd.withColumn('cum_sum_dsd', F.sum('dsd').over(window))

    av_dsd = av_dsd.select( 'row_num','cum_sum_dsd','dsd')

    sc = sc.select('timestamp','date_only','row_num', 'cum_sum_dswe')
    sc = sc.join(av_dsd, ["row_num"])

    #calculate wet loose temperature risk

    df = df.withColumn('at',F.col('at').cast(IntegerType()))
    df = df.withColumn('timestamp',F.col('Date').cast("timestamp"))
    df = df.withColumn('wl_av',F.when(df.at > 40, 1).otherwise(0))

    #join 
    df = df.select('timestamp','wl_av')
    sc = df.join(sc, ['timestamp'])
    sc = sc.select('row_num', 'timestamp','date_only', 'dsd', F.col('cum_sum_dswe').alias('swe_av'), \
                   F.col('cum_sum_dsd').alias('dsd_av'),'wl_av')

    #sc = sc.filter(sc.date_only== "2017-02-20")

    sc = sc.withColumn('total_av', sc.swe_av + sc.dsd_av + sc.wl_av)
    sc = sc.withColumn('total_av_corrected',F.when((sc.total_av <=2), 1).otherwise(sc.total_av))
    sc = sc.withColumn('total_av_corrected',F.when(((sc.total_av > 2) & (sc.total_av <=3)), 2).otherwise(sc.total_av_corrected))
    sc = sc.withColumn('total_av_corrected',F.when(((sc.total_av > 3) & (sc.total_av <=4)), 3).otherwise(sc.total_av_corrected))
    sc = sc.withColumn('total_av_corrected',F.when(((sc.total_av > 4) & (sc.total_av <=5)), 4).otherwise(sc.total_av_corrected))
    sc = sc.withColumn('total_av_corrected',F.when((sc.total_av > 5), 5).otherwise(sc.total_av_corrected))
    sc = sc.select('date_only', 'total_av_corrected')

    sc = sc.groupBy('date_only').max('total_av_corrected')
    return sc.toPandas()



datapath = "/home/dave/Documents/CS691/proj/data/snotel/"
s1049="SNOTEL_1049_FORESTDALE_CREEK"    
s539="SNOTEL_539_INDEPENDENCE_CAMP"
s1050="SNOTEL_1050_HORSE_MEADOW"        
s540="SNOTEL_540_INDEPENDENCE_CREEK"
s1051="SNOTEL_1051_BURNSIDE_LAKE"       
s541="SNOTEL_541_INDEPENDENCE_LAKE"
s1067="SNOTEL_1067_CARSON_PASS"         
s615="SNOTEL_615_MARLETTE_LAKE"
s340="SNOTEL_340_BIG_MEADOW"           
s652="SNOTEL_652_MT_ROSE"
s356="SNOTEL_356_BLUE_LAKES"          
s428="SNOTEL_428_CSS_LAB"              
s724="SNOTEL_724_RUBICON"
s809="SNOTEL_809_TAHOE_CITY"
s463="SNOTEL_463_ECHO_PEAK"            
s834="SNOTEL_834_TRUCKEE"
s508="SNOTEL_508_HAGANS_MEADOW"        
s848="SNOTEL_848_WARD_CREEK"
s518="SNOTEL_518_HEAVENLY_VALLEY"

print(datapath+s518)
#s_518 = process_risk(datapath+s518)

print(datapath+s340)
s_518 = process_risk(datapath+s340)
s_518


truckee = [s541,s540,s428,s834]
west_shore = [s809,s848,s724]
south_shore = [s518,s508,s1050,s463]
mt_rose = [s340,s652,s615]
hope_valley = [s1051,s1049,s1067,s356]

    
#sc.toPandas().to_csv()

spark.stop()

/home/dave/Documents/CS691/proj/data/snotel/SNOTEL_518_HEAVENLY_VALLEY
/home/dave/Documents/CS691/proj/data/snotel/SNOTEL_340_BIG_MEADOW


In [None]:
r