# Setting up our Schema

Spark can automatically create a schema for CSV files, but ours don't have headings. Let's set this up here:

In [7]:
from pyspark.sql.types import StructType, StructField, FloatType, LongType, StringType

feats = []
f = open('features.txt')
for line_num, line in enumerate(f):
    if line_num == 0:
        # Timestamp
        feats.append(StructField(line.strip(), LongType(), True))
    elif line_num == 1:
        # Geohash
        feats.append(StructField(line.strip(), StringType(), True))
    else:
        # Other features
        feats.append(StructField(line.strip(), FloatType(), True))
    
schema = StructType(feats)

print(schema)


StructType(List(StructField(Timestamp,LongType,true),StructField(Geohash,StringType,true),StructField(geopotential_height_lltw,FloatType,true),StructField(water_equiv_of_accum_snow_depth_surface,FloatType,true),StructField(drag_coefficient_surface,FloatType,true),StructField(sensible_heat_net_flux_surface,FloatType,true),StructField(categorical_ice_pellets_yes1_no0_surface,FloatType,true),StructField(visibility_surface,FloatType,true),StructField(number_of_soil_layers_in_root_zone_surface,FloatType,true),StructField(categorical_freezing_rain_yes1_no0_surface,FloatType,true),StructField(pressure_reduced_to_msl_msl,FloatType,true),StructField(upward_short_wave_rad_flux_surface,FloatType,true),StructField(relative_humidity_zerodegc_isotherm,FloatType,true),StructField(categorical_snow_yes1_no0_surface,FloatType,true),StructField(u-component_of_wind_tropopause,FloatType,true),StructField(surface_wind_gust_surface,FloatType,true),StructField(total_cloud_cover_entire_atmosphere,FloatType,tru

# Creating a Dataframe

Let's load our CSV into a 'dataframe' - Spark's abstraction for working with tabular data (built on top of RDDs)

In [15]:
nam_t = spark.read.format('csv').option('sep', '\t').schema(schema).load('hdfs://orion11:15000/nam_tiny.tdv')
nam_s = spark.read.format('csv').option('sep', '\t').schema(schema).load('hdfs://orion11:15000/nam_s/*')

In [16]:
nam_t.cache()
nam_s.cache()

DataFrame[Timestamp: bigint, Geohash: string, geopotential_height_lltw: float, water_equiv_of_accum_snow_depth_surface: float, drag_coefficient_surface: float, sensible_heat_net_flux_surface: float, categorical_ice_pellets_yes1_no0_surface: float, visibility_surface: float, number_of_soil_layers_in_root_zone_surface: float, categorical_freezing_rain_yes1_no0_surface: float, pressure_reduced_to_msl_msl: float, upward_short_wave_rad_flux_surface: float, relative_humidity_zerodegc_isotherm: float, categorical_snow_yes1_no0_surface: float, u-component_of_wind_tropopause: float, surface_wind_gust_surface: float, total_cloud_cover_entire_atmosphere: float, upward_long_wave_rad_flux_surface: float, land_cover_land1_sea0_surface: float, vegitation_type_as_in_sib_surface: float, v-component_of_wind_pblri: float, albedo_surface: float, lightning_surface: float, ice_cover_ice1_no_ice0_surface: float, convective_inhibition_surface: float, pressure_surface: float, transpiration_stress-onset_soil_mo

# Playtime

# SQL

In [3]:
from pyspark.sql import functions as F

In [13]:
# Creating an SQL 'table'
nam_t.createOrReplaceTempView("nam_t")
nam_s.createOrReplaceTempView("nam_s")

In [18]:
spark.catalog.cacheTable("nam_t")
spark.catalog.cacheTable("nam_s")

#### Unknown Feature
I didn't know what albedo was, so I looked at its summary statistics. Still unsure, I looked up the definition: 'the proportion of the incident light or radiation that is reflected by a surface, typically that of a planet or moon.'

In [19]:
nam_t.describe('albedo_surface').show()

+-------+----------------+
|summary|  albedo_surface|
+-------+----------------+
|  count|             100|
|   mean|           18.07|
| stddev|17.4802948221907|
|    min|             6.0|
|    max|            76.0|
+-------+----------------+



#### Hot Hot Hot
The hottest tempurature in the dataset is 330.67431640625 at location d5f0jqerq27bat (21.13070154, -86.9520505; Benito Juárez, Quintana Roo, Mexico) at time 2015-08-23T18:00Z. This record is not surprising, as it is near the equator in the summer. Looking at the other highest tempuratures which are near this highest one, it does not appear to be an anomaly. 

In [23]:
temp_desc = nam_s.orderBy("temperature_surface", ascending=False)
temp_desc.cache()

DataFrame[Timestamp: bigint, Geohash: string, geopotential_height_lltw: float, water_equiv_of_accum_snow_depth_surface: float, drag_coefficient_surface: float, sensible_heat_net_flux_surface: float, categorical_ice_pellets_yes1_no0_surface: float, visibility_surface: float, number_of_soil_layers_in_root_zone_surface: float, categorical_freezing_rain_yes1_no0_surface: float, pressure_reduced_to_msl_msl: float, upward_short_wave_rad_flux_surface: float, relative_humidity_zerodegc_isotherm: float, categorical_snow_yes1_no0_surface: float, u-component_of_wind_tropopause: float, surface_wind_gust_surface: float, total_cloud_cover_entire_atmosphere: float, upward_long_wave_rad_flux_surface: float, land_cover_land1_sea0_surface: float, vegitation_type_as_in_sib_surface: float, v-component_of_wind_pblri: float, albedo_surface: float, lightning_surface: float, ice_cover_ice1_no_ice0_surface: float, convective_inhibition_surface: float, pressure_surface: float, transpiration_stress-onset_soil_mo

In [26]:
hottest_record = temp_desc.first()

In [30]:
hottest_record['temperature_surface']

330.67431640625

In [27]:
from datetime import datetime, timezone
timestamp = hottest_record['Timestamp'] / 1000
datetime.fromtimestamp(timestamp, timezone.utc)

datetime.datetime(2015, 8, 23, 18, 0, tzinfo=datetime.timezone.utc)

In [28]:
hottest_record['Geohash']

'd5f0jqerq27b'

In [33]:
temp_desc.select("temperature_surface", "Timestamp", "Geohash").head(10)

[Row(temperature_surface=330.67431640625, Timestamp=1440352800000, Geohash='d5f0jqerq27b'),
 Row(temperature_surface=330.640625, Timestamp=1440266400000, Geohash='d5f0vd8eb80p'),
 Row(temperature_surface=330.6044921875, Timestamp=1430157600000, Geohash='9g77js659k20'),
 Row(temperature_surface=330.53662109375, Timestamp=1439056800000, Geohash='d5f0jqerq27b'),
 Row(temperature_surface=330.48193359375, Timestamp=1440612000000, Geohash='d59d5yttuc5b'),
 Row(temperature_surface=330.35693359375, Timestamp=1440612000000, Geohash='d59eqv7e03pb'),
 Row(temperature_surface=330.23193359375, Timestamp=1440612000000, Geohash='d59dntd726gz'),
 Row(temperature_surface=330.220703125, Timestamp=1440698400000, Geohash='d59eqv7e03pb'),
 Row(temperature_surface=330.179931640625, Timestamp=1438279200000, Geohash='d5f04xyhucez'),
 Row(temperature_surface=330.14990234375, Timestamp=1439488800000, Geohash='d5dpds10m55b')]

#### So Snowy
One location where it is snowing all year, i.e. where it is snowing for every record in its timeseries is {location}.


In [8]:
# gdf = df.groupBy('Geohash')
# gdf.agg(sum('categorical_snow_yes1_no0_surface'), count('*')).orderBy('sum(categorical_snow_yes1_no0_surface)', ascending = False).collect()

so_snowy = spark.sql("SELECT geohash, sum, cnt, (sum / cnt) AS div \
            FROM ( \
                SELECT geohash, SUM(categorical_snow_yes1_no0_surface) AS sum, COUNT(*) as cnt \
                FROM nam_tiny \
                GROUP BY(geohash) \
            ) ORDER BY div DESC")
so_snowy.head(10)

[Row(geohash='c1nuq5290jup', sum=1.0, cnt=1, div=1.0),
 Row(geohash='f2w29r4werxb', sum=1.0, cnt=1, div=1.0),
 Row(geohash='f2d5v1jeyp7z', sum=1.0, cnt=1, div=1.0),
 Row(geohash='c6s64488ws80', sum=1.0, cnt=1, div=1.0),
 Row(geohash='f2fh6jpdgv5b', sum=1.0, cnt=1, div=1.0),
 Row(geohash='fccz22w4fytb', sum=1.0, cnt=1, div=1.0),
 Row(geohash='drmg1tprm22p', sum=0.0, cnt=1, div=0.0),
 Row(geohash='8gzfnvh85g0p', sum=0.0, cnt=1, div=0.0),
 Row(geohash='8up5c0e570pz', sum=0.0, cnt=1, div=0.0),
 Row(geohash='dse39p1jkszz', sum=0.0, cnt=1, div=0.0)]

# Analysis

### Strangely Snowy 
Find a location that contains snow while its surroundings do not. Why does this occur? Is it a high mountain peak in a desert?

### Lightning rod
Where are you most likely to be struck by lightning? Use a precision of at least 4 Geohash characters and provide the top 3 locations.

In [38]:
lightning_rod = spark.sql("\
                           SELECT LEFT(geohash, 4) AS geo4, SUM(lightning_surface) / COUNT(*) AS prob_lightning \
                           FROM nam_tiny \
                           GROUP BY geo4 \
                           ORDER BY prob_lightning DESC")

In [42]:
lightning_rod.head(3)

[Row(geo4='dmbz', prob_lightning=1.0),
 Row(geo4='c4xg', prob_lightning=0.0),
 Row(geo4='d5x3', prob_lightning=0.0)]

### Drying out
Choose a region in North America (defined by one or more Geohashes) and determine when its driest month is. This should include a histogram with data from each month.

In [95]:
# from_unixtime(timestamp DIV 1000) as timestring, \

drying_out = spark.sql("\
    SELECT \
        trunc(from_unixtime(timestamp DIV 1000), 'MM') as truncated \
    FROM nam_tiny \
    GROUP BY truncated")

In [96]:
drying_out.show()

+----------+
| truncated|
+----------+
|2015-03-01|
+----------+



### Travel Startup
After graduating from USF, you found a startup that aims to provide personalized travel itineraries using big data analysis. Given your own personal preferences, build a plan for a year of travel across 5 locations. Or, in other words: pick 5 regions. What is the best time of year to visit them based on the dataset?

- One avenue here could be determining the comfort index for a region. You could incorporate several features: not too hot, not too cold, dry, humid, windy, etc. There are several different ways of calculating this available online, and you could also analyze how well your own metrics do.

### Escaping the fog
After becoming rich from your startup, you are looking for the perfect location to build your Bay Area mansion with unobstructed views. Find the locations that are the least foggy and show them on a map.


In [None]:
spark.sql("\
    SELECT LEFT(geohash) as geo4, SUM(visibility_surface) /  \
    FROM nam_tiny \
    GROUP BY truncated")

### SolarWind, Inc.
You get bored enjoying the amazing views from your mansion, so you start a new company; here, you want to help power companies plan out the locations of solar and wind farms across North America. Locate the top 3 places for solar and wind farms, as well as a combination of both (solar + wind farm). You will report a total of 9 Geohashes as well as their relevant attributes (for example, cloud cover and wind speeds).

# Sampling

We can even create a sample dataset with Spark! Let's create a 10% sample (without replacement)

In [None]:
samp = df.sample(False, .1)

# Write it out to a file
samp.write.format('csv').save('hdfs://orion12:50000/sampled_output.csv')

In [27]:
bayarea_tiny = spark.sql("\
                           SELECT * \
                           FROM nam_tiny \
                           WHERE LEFT(geohash, 3) IN ('9qb', '9qc', '9q9', '9q8')")


nine_tiny = spark.sql("\
                           SELECT * \
                           FROM nam_tiny \
                           WHERE LEFT(geohash, 1) IN ('9', '8')")

In [29]:
bayarea_small = spark.sql("\
                           SELECT * \
                           FROM nam_small \
                           WHERE LEFT(geohash, 3) IN ('9qb', '9qc', '9q9', '9q8')")

In [8]:
bayarea_full = spark.sql("\
SELECT * \
FROM nam_full \
WHERE LEFT(geohash, 4) IN ( \
 '9q8x', '9q8z', '9q8y', '9q8v', '9q8u', '9q8g', \
 '9q9p', '9q9n', '9q9j', '9q9h', '9q95', '9q9r', '9q9q', '9q9m', '9q9k', '9q97', '9q9e', \
 '9qb3', '9q95', '9qb6', '9qb7', '9qb8', '9qb9', '9qbb', '9qbc', '9qbd', '9qbe', '9qbf', '9qbg', '9qbh', '9qbk', '9qbs', \
 '9qc0', '9qc1', '9qc2', '9qc3', '9qc4', '9qc5', '9qc6', '9qch' \
)")

In [None]:
bayarea_full.write.format('csv').save('hdfs://orion11:15000/nam_bayarea')

In [None]:
bayarea_full.createOrReplaceTempView("nam_bayarea")