In [1]:
import findspark

findspark.init()
findspark.find()

'/usr/local/opt/apache-spark/libexec'

In [2]:
from pyspark.sql import SparkSession
import pyspark
from pyspark.sql.functions import translate, col, month
from pyspark.sql.types import DoubleType, TimestampType, StringType
from pyspark.context import SparkContext

spark = SparkSession.builder.appName("NV Energy Project").getOrCreate()

sc = spark.sparkContext.getOrCreate()
sc.setLogLevel('OFF')

23/03/30 11:41:31 WARN Utils: Your hostname, Courtneys-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.6 instead (on interface en0)
23/03/30 11:41:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/30 11:41:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
## ENERGY CONSUMPTION MODEL VS ACTUAL CONSUMPTION
#Import consumption data

nv_ac_df = spark.read.option("header", "true").option("inferSchema", "true").csv("CleanData/nv_annual_consumption60-21.csv")

nv_ac_df = nv_ac_df.withColumn("TotalConsumption", translate(col("TotalConsumption"), ",", "").cast("integer"))


In [6]:
#Convert data to millions scale for visualization

import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

convertUDF = udf(lambda z: convertMill(z), DoubleType())

def convertMill(reading):
    new = reading/1000000
    return new

nv_ac_df2 = nv_ac_df.select("YEAR", "TotalConsumption", convertUDF(col("TotalConsumption").alias("TotalMillions"))) 

nv_ac_df2 = nv_ac_df2.withColumnRenamed("<lambda>(TotalConsumption AS TotalMillions)", "Actual Consumption")

nv_ac_df3 = nv_ac_df2.filter(nv_ac_df2.YEAR != 1990)

nv_ac_df3 = nv_ac_df3.filter(nv_ac_df2.YEAR != 2010)


In [7]:
#Get predictions from Consumption LR 

from pyspark.sql.functions import col, udf

convertUDF = udf(lambda z: predict(z), DoubleType())

def predict(x):
    return  x * 675250.9602239017 - 1325137830.201505

nv_ac_df3 = nv_ac_df3.select("YEAR", "TotalConsumption", "Actual Consumption", 
                             convertUDF(col("YEAR").alias("ModelPred"))) 

nv_ac_df3 = nv_ac_df3.withColumnRenamed("<lambda>(YEAR AS ModelPred)", "ModelPred")

#Convert predictions to millions scale

convertUDF2 = udf(lambda z: convertMill(z), DoubleType())

def convertMill(reading):
    new = reading/1000000
    return new

nv_ac_df4 = nv_ac_df3.select("YEAR", "TotalConsumption", "Actual Consumption", "ModelPred",
                             convertUDF2(col("ModelPred").alias("PredMillions"))) 

nv_ac_df4 = nv_ac_df4.withColumnRenamed("<lambda>(ModelPred AS PredMillions)", "Model Predictions")

nv_ac_df4 = nv_ac_df4.orderBy("YEAR")

In [8]:
#Create consumption panda frame, return to dashboard

def createNVCFrame():
    
    nv_ac_df5 = nv_ac_df4.toPandas()
    
    return nv_ac_df5

In [9]:
## NEVADA ENERGY PRODUCTION BY PRIMARY SOURCE
#Import primary source data

primary_source_df = spark.read.option("header", "true").option("inferSchema", "true").csv("CleanData/GenerationBySource.csv")


In [10]:
#Convert data to millions scale for visualization

import pyspark
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType, IntegerType

convertUDF = udf(lambda z: convertMill(z), DoubleType())

def convertMill(reading):
    
    new = reading/1000000
    return new

#primary_source_df2 = primary_source_df.filter(primary_source_df.energySource != "Total")

primary_source_df2 = primary_source_df.select("YEAR", "EnergySource", convertUDF(col("GenerationMWh").alias("TotalMillions"))) 

#Order by year and rename custom column
primary_source_df2 = primary_source_df2.orderBy("YEAR")

primary_source_df2 = primary_source_df2.withColumnRenamed("<lambda>(GenerationMWh AS TotalMillions)", "GenerationMil")


In [11]:
#Create primary source pandas frame for dashboard

def createPSFrame():
    ps_df = primary_source_df2.toPandas()

    ps_df_grouped = ps_df.groupby("EnergySource")

    return ps_df_grouped

In [12]:
##SUPPLY AND DEMAND
#Dataframe for interactive demand

hist_demand_df = nv_ac_df.filter(nv_ac_df.YEAR >= 1990)

In [13]:
#Create demand prediction frame

pred_df = spark.range(2022,2050).withColumnRenamed("id","YEAR")

convertUDF = udf(lambda z: predict(z), DoubleType())

def predict(x):
    return  x * 675250.9602239017 - 1325137830.201505

pred_df2 = pred_df.select("YEAR", convertUDF(col("YEAR").alias("ModelPred"))) 

pred_df2 = pred_df2.withColumnRenamed("<lambda>(YEAR AS ModelPred", "Demand")

#Merge actual and predictions

demand_df = hist_demand_df.union(pred_df2)


In [14]:
#Convert demand to millions scale

convertUDF = udf(lambda z: convertMill(z), DoubleType())

def convertMill(reading):
    
    new = reading/1000000
    return new

demand_df2 = demand_df.select("YEAR", convertUDF(col("TotalConsumption").alias("Demand")))

demand_df2 = demand_df2.withColumnRenamed("<lambda>(TotalConsumption AS Demand)", "Demand")

In [17]:
from pyspark.sql.functions import avg, sum

#Dataframe for interactive supply

#Historical supply 1990-2021
supply_df = primary_source_df2.filter(primary_source_df2.EnergySource == "Total")

supply_df = supply_df.withColumnRenamed("GenerationMil", "Supply")

supply_df2 = supply_df.select("YEAR", "Supply")

#Solar generation predictions from weather data, 5 year period + 2006 actual

solar_preds_df = spark.read.option("header", "true").option("inferSchema", "true").csv("CleanData/Solar_Predictions2010-2014.csv")

#Average value from preds and actual per farm size

annual_100MW_df = solar_preds_df.select(avg("100MW"))

annual_100MW = annual_100MW_df.first()["avg(100MW)"]

annual_150MW_df = solar_preds_df.select(avg("150MW"))

annual_150MW = annual_150MW_df.first()["avg(150MW)"]

annual_200MW_df = solar_preds_df.select(avg("200MW"))

annual_200MW = annual_200MW_df.first()["avg(200MW)"]

# Energy loss -- 
# 9.5% of electricity lost from preinverter derate (DC losses) 
# 2.0% of energy lost from inverter efficiency (AC losses)

annual_100MW = annual_100MW * .115

annual_150MW = annual_150MW * .115

annual_200MW = annual_200MW * .115
 

In [62]:
def getSupplyDemand(resource_list, reduction_list):
        
    #2021 primary source levels
    source21_df = primary_source_df2.filter(primary_source_df2.YEAR == 2021)
            
    source21_total = source21_df.filter(source21_df.EnergySource == "Total")

    generation21 = source21_total.first()["GenerationMil"]
    
    coal21 = 2.752473
    
    petroleum21 = 0.008014
    
    naturalGas21 = 26.129918
    
    pred_data = []
    
    previous_supply = 0
    
    for year in range(2022, 2050):
        if year == 2022:
            pred_data.append((year, generation21))
            previous_supply = generation21
        else:
            new_resources = 0
            
            #Add selected PV farms
            for resource_tup in resource_list:
                if resource_tup[1] == year:
                    if resource_tup[0] == '100 MW':
                        new_resources = new_resources + annual_100MW
                    elif resource_tup[0] == '150 MW':
                        new_resources = new_resources + annual_150MW
                    elif resource_tup[0] == '200 MW':
                        new_resources = new_resources + annual_200MW
             
            #Add previous year supply, convert new resources to millions scale 
            supply = previous_supply + (new_resources / 1000000)
            
            amount_to_remove = 0
            
            #Removed selected 2021 resources            
            for removal_tup in resource_list:
                if removal_tup[1] == year:
                    if removal_tup[0] == 'Coal':
                        amount_to_remove = amount_to_remove - coal21
                    elif removal_tup[0] == 'Petroleum':
                        amount_to_remove = amount_to_remove - petroleum21
                    elif removal_tup[0] == 'Natural Gas 10%':
                        amount_to_remove = amount_to_remove - (naturalGas21 * .1)
                    elif removal_tup[0] == 'Natural Gas 25%':
                        amount_to_remove = amount_to_remove - (naturalGas21 * .25)
                    elif removal_tup[0] == 'Natural Gas 50%':
                        amount_to_remove = amount_to_remove - (naturalGas21 * .5)
                    elif removal_tup[0] == 'Natural Gas 10%':
                        amount_to_remove = amount_to_remove - (naturalGas21 * .1)
                    elif removal_tup[0] == 'Natural Gas 75%':
                        amount_to_remove = amount_to_remove - (naturalGas21 * .75)
                    elif removal_tup[0] == 'Natural Gas 100%':
                        amount_to_remove = amount_to_remove - naturalGas21
            
            #Remove selection from supply
            supply = supply - amount_to_remove
            
            pred_data.append((year, supply))
            
            previous_supply = supply
    
    #Create dataframe for supply predictions
    columns = ['YEAR', 'Supply']
    
    supply_pred_df = spark.createDataFrame(pred_data, columns)
    
    #Join with historical
    supply_df_all = supply_df2.union(supply_pred_df)
       
    #Join with demand
    supply_demand_df = supply_df_all.join(demand_df2, supply_df_all.YEAR == demand_df2.YEAR).drop(demand_df2.YEAR)
    
    supply_demand_panda = supply_demand_df.toPandas()
    
    return supply_demand_panda  
        

In [63]:
#Import actual readings from 100MW PV panel farm

solar100MW_df = spark.read.option("header", "true").option("inferSchema", "true").csv("CleanData/solar_actual_100mw.csv")

#Add month for visualizations
solar100MW_df2 = solar100MW_df.select("Timestamp", month("Timestamp").alias("Month"), "Power100")


In [64]:
#Import actual readings from 150MW PV panel farm

solar150MW_df = spark.read.option("header", "true").option("inferSchema", "true").csv("CleanData/solar_actual_150mw.csv")

#Add month for visualizations
solar150MW_df2 = solar150MW_df.select("Timestamp", month("Timestamp").alias("Month"), "Power150")


In [65]:
#Import actual readings from 200MW PV panel farm

solar200MW_df = spark.read.option("header", "true").option("inferSchema", "true").csv("CleanData/solar_actual_200mw.csv")

#Add month for visualizations
solar200MW_df2 = solar200MW_df.select("Timestamp", month("Timestamp").alias("Month"), "Power200")


In [66]:
# Group solar readings by month

df_100 = solar100MW_df2.groupBy("Month").sum("Power100")

df_100 = df_100.withColumnRenamed("sum(Power100)", "Sum100")

df_150 = solar150MW_df2.groupBy("Month").sum("Power150")

df_150 = df_150.withColumnRenamed("sum(Power150)", "Sum150")

df_200 = solar200MW_df2.groupBy("Month").sum("Power200")

df_200 = df_200.withColumnRenamed("sum(Power200)", "Sum200")

# Merge solar reading sets for visualization, order by month

solar_readings_df = df_100.join(df_150, df_100.Month == df_150.Month).drop(df_150.Month)

solar_readings_df2 = solar_readings_df.join(df_200, solar_readings_df.Month == df_200.Month).drop(df_200.Month)

solar_readings_df3 = solar_readings_df2.orderBy("Month")

In [67]:
#Convert reading to monthly MWh

from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType

convertUDF = udf(lambda z: convertMWh(z), DoubleType())

def convertMWh(reading):
    MWh = reading/730
    return MWh

solar_readings_df4 = solar_readings_df3.select("Month", convertUDF(col("Sum100").alias("100MW")), 
                                               convertUDF(col("Sum150").alias("1500MW")), 
                                               convertUDF(col("Sum200").alias("200MW")))

#Rename custom columns
solar_readings_df4 = solar_readings_df4.withColumnRenamed("<lambda>(Sum100 AS 100MW)", "100 MW")

solar_readings_df4 = solar_readings_df4.withColumnRenamed("<lambda>(Sum150 AS 1500MW)", "150 MW")

solar_readings_df4 = solar_readings_df4.withColumnRenamed("<lambda>(Sum200 AS 200MW)", "200 MW")

In [68]:
#Method to return solar generation panda frame to dashboard

def createSGFrame():
    
    solar_readings_df5 = solar_readings_df4.toPandas()
    
    return solar_readings_df5

In [69]:
#Import data for solstice & equinox visualization

se_df = spark.read.option("header", "true").option("inferSchema", "true").csv("CleanData/SolsticeAndEquinoxCategorical.csv")


In [70]:
#Return data to dashboard for seasonal GHI visualization

def createGHIFrame():
    
    se_df2 = se_df.toPandas()
    
    se_df_grouped = se_df2.groupby("Event")
    
    return se_df_grouped

In [3]:
pip freeze


absl-py==1.4.0
anaconda-client==1.11.0
anaconda-navigator==2.1.4
anyio @ file:///opt/concourse/worker/volumes/live/485b0f52-1188-482a-6285-65a36c8fa8a6/volume/anyio_1644481714856/work/dist
appnope @ file:///opt/concourse/worker/volumes/live/6ca6f098-d773-4461-5c91-a24a17435bda/volume/appnope_1606859448531/work
argon2-cffi @ file:///opt/conda/conda-bld/argon2-cffi_1645000214183/work
argon2-cffi-bindings @ file:///opt/concourse/worker/volumes/live/42cf1b28-e71f-45ed-47b2-50f828088636/volume/argon2-cffi-bindings_1644569709119/work
asgiref==3.6.0
asttokens @ file:///opt/conda/conda-bld/asttokens_1646925590279/work
astunparse==1.6.3
attrs @ file:///private/var/folders/sy/f16zz6x50xz3113nwtb9bvq00000gp/T/abs_33k1uces4n/croot/attrs_1668696162258/work
Babel @ file:///private/var/folders/sy/f16zz6x50xz3113nwtb9bvq00000gp/T/abs_59c7q3smap/croot/babel_1671781946809/work
backcall @ file:///home/ktietz/src/ci/backcall_1611930011877/work
backports.functools-lru-cache @ file:///tmp/build

Note: you may need to restart the kernel to use updated packages.
