In [1]:
from pyspark.sql.functions import *
from  pyspark.sql.functions import abs
import time
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [2]:
## Reading the data

In [3]:
df = spark.read.csv("hdfs://orion11:37001/3hr_sample/*", sep=r'\t', header=True, inferSchema="true")

In [4]:
df.limit(5).toPandas()

Unnamed: 0,1_time,2_lat,3_lon,albedo_surface,precipitable_water_entire_atmosphere_single_layer,pressure_maximum_wind,pressure_surface,pressure_tropopause,relative_humidity_zerodegc_isotherm,snow_depth_surface,temperature_surface,temperature_tropopause,total_cloud_cover_entire_atmosphere_single_layer,total_precipitation_surface_3_hour_accumulation,vegetation_surface,visibility_surface,wilting_point_surface,wind_speed_gust_surface
0,1455440400000,28.862712,-80.155704,6.0,,17126.896,102390.0,23103.373,35.0,0.0,295.9439,219.42467,,0.0,0.0,24223.668,0.0,
1,1455440400000,57.697196,-79.334581,65.0,,11126.896,101925.0,30303.373,76.0,0.049999997,243.69392,213.04967,,0.0,0.0,24023.668,0.0,
2,1455440400000,36.639956,-120.499569,16.0,,19326.896,100150.0,19703.373,26.0,0.0,282.5689,210.17467,,0.0,20.5,24223.668,0.1025,
3,1455440400000,33.129387,-82.595627,15.0,,14526.896,101385.0,20703.373,52.0,0.0,272.4439,211.42467,,0.0,41.0,24223.668,0.0275,
4,1455440400000,47.168122,-119.719137,16.25,,25126.896,97359.0,21103.373,70.0,0.0,272.8189,214.67467,,0.0,6.0,23.668581,0.022499999,


In [4]:
from math import log10

#  Note: the alphabet in geohash differs from the common base32 alphabet described in IETF's RFC 4648
# (http://tools.ietf.org/html/rfc4648)

__base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
__decodemap = dict()
for i in range(len(__base32)):
    __decodemap[__base32[i]] = i


def decode_exactly(geohash):
    """
    Decode the geohash to its exact values, including the error
    margins of the result.  Returns four float values: latitude,
    longitude, the plus/minus error for latitude (as a positive
    number) and the plus/minus error for longitude (as a positive
    number).
    """
    lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
    lat_err, lon_err = 90.0, 180.0
    is_even = True
    for c in geohash:
        cd = __decodemap[c]
        for mask in [16, 8, 4, 2, 1]:
            if is_even: # adds longitude info
                lon_err /= 2
                if cd & mask:
                    lon_interval = ((lon_interval[0]+lon_interval[1])/2, lon_interval[1])
                else:
                    lon_interval = (lon_interval[0], (lon_interval[0]+lon_interval[1])/2)
            else:      # adds latitude info
                lat_err /= 2
                if cd & mask:
                    lat_interval = ((lat_interval[0]+lat_interval[1])/2, lat_interval[1])
                else:
                    lat_interval = (lat_interval[0], (lat_interval[0]+lat_interval[1])/2)
            is_even = not is_even
    lat = (lat_interval[0] + lat_interval[1]) / 2
    lon = (lon_interval[0] + lon_interval[1]) / 2
    return lat, lon, lat_err, lon_err


def decode(geohash):
    """
    Decode geohash, returning two float with latitude and longitude
    containing only relevant digits and with trailing zeroes removed.
    """
    lat, lon, lat_err, lon_err = decode_exactly(geohash)
    # Format to the number of decimals that are known
    lats = "%.*f" % (max(1, int(round(-log10(lat_err)))) - 1, lat)
    lons = "%.*f" % (max(1, int(round(-log10(lon_err)))) - 1, lon)
    if '.' in lats: lats = lats.rstrip('0')
    if '.' in lons: lons = lons.rstrip('0')
    return float(lats), float(lons)


def encode(latitude, longitude, precision=12):
    """
    Encode a position given in float arguments latitude, longitude to
    a geohash which will have the character count precision.
    """
    lat_interval = (-90.0, 90.0)
    lon_interval = (-180.0, 180.0)
    geohash = []
    bits = [16, 8, 4, 2, 1]
    bit = 0
    ch = 0
    even = True
    while len(geohash) < precision:
        if even:
            mid = (lon_interval[0] + lon_interval[1]) / 2
            if longitude > mid:
                ch |= bits[bit]
                lon_interval = (mid, lon_interval[1])
            else:
                lon_interval = (lon_interval[0], mid)
        else:
            mid = (lat_interval[0] + lat_interval[1]) / 2
            if latitude > mid:
                ch |= bits[bit]
                lat_interval = (mid, lat_interval[1])
            else:
                lat_interval = (lat_interval[0], mid)
        even = not even
        if bit < 4:
            bit += 1
        else:
            geohash += __base32[ch]
            bit = 0
            ch = 0
    return ''.join(geohash)

In [5]:
def convertToGeoHash(lat, lon):
    return encode(lat, lon,4)

udfConvertToGeoHash = F.udf(convertToGeoHash, StringType())

geohashDf = df.withColumn("5_hash", udfConvertToGeoHash("2_lat", "3_lon"))

geohashDf.limit(5).toPandas()

Unnamed: 0,1_time,2_lat,3_lon,albedo_surface,precipitable_water_entire_atmosphere_single_layer,pressure_maximum_wind,pressure_surface,pressure_tropopause,relative_humidity_zerodegc_isotherm,snow_depth_surface,temperature_surface,temperature_tropopause,total_cloud_cover_entire_atmosphere_single_layer,total_precipitation_surface_3_hour_accumulation,vegetation_surface,visibility_surface,wilting_point_surface,wind_speed_gust_surface,5_hash
0,1455440400000,28.862712,-80.155704,6.0,,17126.896,102390.0,23103.373,35.0,0.0,295.9439,219.42467,,0.0,0.0,24223.668,0.0,,djph
1,1455440400000,57.697196,-79.334581,65.0,,11126.896,101925.0,30303.373,76.0,0.049999997,243.69392,213.04967,,0.0,0.0,24023.668,0.0,,f4r8
2,1455440400000,36.639956,-120.499569,16.0,,19326.896,100150.0,19703.373,26.0,0.0,282.5689,210.17467,,0.0,20.5,24223.668,0.1025,,9qd2
3,1455440400000,33.129387,-82.595627,15.0,,14526.896,101385.0,20703.373,52.0,0.0,272.4439,211.42467,,0.0,41.0,24223.668,0.0275,,djvk
4,1455440400000,47.168122,-119.719137,16.25,,25126.896,97359.0,21103.373,70.0,0.0,272.8189,214.67467,,0.0,6.0,23.668581,0.022499999,,c26u


In [6]:
def convertToMonth(value):
    return time.strftime('%m', time.localtime(int(value)/1000))

udfConvertToMonth = F.udf(convertToMonth, StringType())

monthDf = geohashDf.withColumn("1_time", udfConvertToMonth("1_time"))
monthDf.limit(5).toPandas()

Unnamed: 0,1_time,2_lat,3_lon,albedo_surface,precipitable_water_entire_atmosphere_single_layer,pressure_maximum_wind,pressure_surface,pressure_tropopause,relative_humidity_zerodegc_isotherm,snow_depth_surface,temperature_surface,temperature_tropopause,total_cloud_cover_entire_atmosphere_single_layer,total_precipitation_surface_3_hour_accumulation,vegetation_surface,visibility_surface,wilting_point_surface,wind_speed_gust_surface,5_hash
0,2,28.862712,-80.155704,6.0,,17126.896,102390.0,23103.373,35.0,0.0,295.9439,219.42467,,0.0,0.0,24223.668,0.0,,djph
1,2,57.697196,-79.334581,65.0,,11126.896,101925.0,30303.373,76.0,0.049999997,243.69392,213.04967,,0.0,0.0,24023.668,0.0,,f4r8
2,2,36.639956,-120.499569,16.0,,19326.896,100150.0,19703.373,26.0,0.0,282.5689,210.17467,,0.0,20.5,24223.668,0.1025,,9qd2
3,2,33.129387,-82.595627,15.0,,14526.896,101385.0,20703.373,52.0,0.0,272.4439,211.42467,,0.0,41.0,24223.668,0.0275,,djvk
4,2,47.168122,-119.719137,16.25,,25126.896,97359.0,21103.373,70.0,0.0,272.8189,214.67467,,0.0,6.0,23.668581,0.022499999,,c26u


In [7]:
groupedDf = monthDf.groupBy("5_hash","1_time").agg(F.avg("pressure_maximum_wind").alias("pressure_maximum_wind"),F.avg("temperature_surface").alias("temperature_surface"))

In [8]:
groupedDf.limit(5).toPandas()

Unnamed: 0,5_hash,1_time,pressure_maximum_wind,temperature_surface
0,9h04,4,18703.129957,295.459039
1,9zce,4,23359.047237,280.81205
2,cfze,4,30639.888644,259.917391
3,fcg4,4,30567.781457,274.922854
4,9shc,4,18683.579753,302.166044


In [9]:
# min solar temp reqd is 35 C
# solar panels can get as hot as 65 °C (149 °F) at which point solar cell efficiency will be hindered.
# Ref : https://www.quora.com/What-is-the-minimum-temperature-required-to-produce-electricity-using-solar-panels
solarTemp = groupedDf.filter(groupedDf.temperature_surface > 308).filter(groupedDf.temperature_surface < 338)
solarTemp.limit(5).toPandas()

Unnamed: 0,5_hash,1_time,pressure_maximum_wind,temperature_surface
0,9tbm,6,22156.888804,308.744271
1,9mys,8,20457.995921,308.093449
2,9t1j,7,20534.163047,308.091683
3,9tbk,7,21782.259891,308.277503
4,9myc,8,20994.637579,308.871502


In [10]:
solarTempDesc = solarTemp.orderBy(col("temperature_surface").desc())
solarTempDesc.limit(5).toPandas()                  

Unnamed: 0,5_hash,1_time,pressure_maximum_wind,temperature_surface
0,9tbq,7,21111.943557,311.928496
1,9tbm,7,21408.059874,311.491012
2,9tbq,8,20832.068599,311.478068
3,9tbr,7,22095.834519,311.228877
4,9tbn,7,21420.241543,311.176214


In [11]:
windPressure = groupedDf.filter(groupedDf.pressure_maximum_wind > 20000)
windPressure.limit(3).toPandas()

Unnamed: 0,5_hash,1_time,pressure_maximum_wind,temperature_surface
0,9xbg,2,22196.405828,269.161803
1,9wq1,2,20559.007883,279.838123
2,9enw,2,20581.099954,300.487112


In [12]:
windPressureDesc = windPressure.orderBy(col("pressure_maximum_wind").desc())
windPressureDesc.limit(5).toPandas()  

Unnamed: 0,5_hash,1_time,pressure_maximum_wind,temperature_surface
0,f6b6,5,33926.435958,270.89306
1,cdyh,4,33784.9386,256.736114
2,f4fu,4,33710.226613,258.950405
3,b9e9,4,33650.899185,277.925527
4,f4gh,4,33563.199071,259.891043


In [13]:
windSolar = solarTempDesc.filter(solarTemp.pressure_maximum_wind > 20000)
windSolar.limit(5).toPandas()

Unnamed: 0,5_hash,1_time,pressure_maximum_wind,temperature_surface
0,9tbq,7,21111.943557,311.928496
1,9tbm,7,21408.059874,311.491012
2,9tbq,8,20832.068599,311.478068
3,9tbr,7,22095.834519,311.228877
4,9tbn,7,21420.241543,311.176214


In [14]:
windSolarDesc = windSolar.orderBy(col("pressure_maximum_wind").desc(), col("temperature_surface").desc())
windSolarDesc.limit(5).toPandas()  

Unnamed: 0,5_hash,1_time,pressure_maximum_wind,temperature_surface
0,9se5,6,24738.138428,308.946569
1,9sdu,6,24571.477879,308.757447
2,9se3,6,24040.346235,308.524853
3,9se1,6,23963.258743,308.526291
4,9se8,6,23714.081177,308.405512


In [20]:
## Top 3 locations for solar farms, based on high temp

In [15]:
#Top 3 locations for solar farms, based on high temp
solarTempDescTop3 = solarTempDesc.limit(3)
solarTempDescTop3.toPandas()


Unnamed: 0,5_hash,1_time,pressure_maximum_wind,temperature_surface
0,9tbq,7,21111.943557,311.928496
1,9tbm,7,21408.059874,311.491012
2,9tbq,8,20832.068599,311.478068


In [21]:
## Top 3 locations for wind farms, based on high wind pressure

In [16]:
#Top 3 locations for wind farms, based on high wind pressure
windPressureDescTop3 = windPressureDesc.limit(3)
windPressureDescTop3.toPandas()

Unnamed: 0,5_hash,1_time,pressure_maximum_wind,temperature_surface
0,f6b6,5,33926.435958,270.89306
1,cdyh,4,33784.9386,256.736114
2,f4fu,4,33710.226613,258.950405


In [23]:
## Top 3 locations for solar and wind farms, based on high wind pressure and high temp

In [17]:
#Top 3 locations for solar and wind farms, based on high wind pressure and high temp
windSolarDescTop3 = windSolarDesc.limit(3)
windSolarDescTop3.toPandas()

Unnamed: 0,5_hash,1_time,pressure_maximum_wind,temperature_surface
0,9se5,6,24738.138428,308.946569
1,9sdu,6,24571.477879,308.757447
2,9se3,6,24040.346235,308.524853
