# Setting up our Schema

Spark can automatically create a schema for CSV files, but ours don't have headings. Let's set this up here:

In [1]:
from pyspark.sql.types import StructType, StructField, FloatType, LongType, StringType

feats = []
f = open('features.txt')
for line_num, line in enumerate(f):
    if line_num == 0:
        # Timestamp
        feats.append(StructField(line.strip(), LongType(), True))
    elif line_num == 1:
        # Geohash
        feats.append(StructField(line.strip(), StringType(), True))
    else:
        # Other features
        feats.append(StructField(line.strip(), FloatType(), True))
    
schema = StructType(feats)

print(schema)


StructType(List(StructField(Timestamp,LongType,true),StructField(Geohash,StringType,true),StructField(geopotential_height_lltw,FloatType,true),StructField(water_equiv_of_accum_snow_depth_surface,FloatType,true),StructField(drag_coefficient_surface,FloatType,true),StructField(sensible_heat_net_flux_surface,FloatType,true),StructField(categorical_ice_pellets_yes1_no0_surface,FloatType,true),StructField(visibility_surface,FloatType,true),StructField(number_of_soil_layers_in_root_zone_surface,FloatType,true),StructField(categorical_freezing_rain_yes1_no0_surface,FloatType,true),StructField(pressure_reduced_to_msl_msl,FloatType,true),StructField(upward_short_wave_rad_flux_surface,FloatType,true),StructField(relative_humidity_zerodegc_isotherm,FloatType,true),StructField(categorical_snow_yes1_no0_surface,FloatType,true),StructField(u-component_of_wind_tropopause,FloatType,true),StructField(surface_wind_gust_surface,FloatType,true),StructField(total_cloud_cover_entire_atmosphere,FloatType,tru

# Creating a Dataframe

Let's load our CSV into a 'dataframe' - Spark's abstraction for working with tabular data (built on top of RDDs)

In [31]:
#df = spark.read.format('csv').option('sep', '\t').schema(schema).load('/Volumes/evo/Datasets/NAM_2015_S/*')
df = spark.read.format('csv').option('sep', '\t').schema(schema).load('hdfs://orion11:15000/nam_tiny.tdv')
# df = spark.read.format('csv').option('sep', '\t').schema(schema).load('hdfs://orion11:15000/nam_s/*')
df.take(1)

[Row(Timestamp=1426377600000, Geohash='953rtrfmww20', geopotential_height_lltw=3975.03125, water_equiv_of_accum_snow_depth_surface=0.0, drag_coefficient_surface=0.0, sensible_heat_net_flux_surface=10.42236328125, categorical_ice_pellets_yes1_no0_surface=0.0, visibility_surface=24221.587890625, number_of_soil_layers_in_root_zone_surface=0.0, categorical_freezing_rain_yes1_no0_surface=0.0, pressure_reduced_to_msl_msl=101366.0, upward_short_wave_rad_flux_surface=18.375, relative_humidity_zerodegc_isotherm=30.0, categorical_snow_yes1_no0_surface=0.0, u-component_of_wind_tropopause=5.4028778076171875, surface_wind_gust_surface=3.15878963470459, total_cloud_cover_entire_atmosphere=100.0, upward_long_wave_rad_flux_surface=424.9310302734375, land_cover_land1_sea0_surface=0.0, vegitation_type_as_in_sib_surface=0.0, v-component_of_wind_pblri=-1.61834716796875, albedo_surface=6.0, lightning_surface=0.0, ice_cover_ice1_no_ice0_surface=0.0, convective_inhibition_surface=-0.65234375, pressure_surfac

# Playtime

In [30]:
really_hot = df.filter(df.temperature_surface > 320).count()
print(really_hot)

hot_and_humid = df.filter(df.temperature_surface > 313).filter(df.relative_humidity_zerodegc_isotherm > .8).count()
print(hot_and_humid)

KeyboardInterrupt: 

In [5]:
df.filter(df.snow_cover_surface > .85).take(5)

[Row(Timestamp=1426377600000, Geohash='cf7ecr4h2ps0', geopotential_height_lltw=136.53125, water_equiv_of_accum_snow_depth_surface=77.0, drag_coefficient_surface=0.0, sensible_heat_net_flux_surface=-39.57763671875, categorical_ice_pellets_yes1_no0_surface=0.0, visibility_surface=24221.587890625, number_of_soil_layers_in_root_zone_surface=3.0, categorical_freezing_rain_yes1_no0_surface=0.0, pressure_reduced_to_msl_msl=99602.0, upward_short_wave_rad_flux_surface=6.625, relative_humidity_zerodegc_isotherm=34.0, categorical_snow_yes1_no0_surface=0.0, u-component_of_wind_tropopause=27.527877807617188, surface_wind_gust_surface=16.158788681030273, total_cloud_cover_entire_atmosphere=100.0, upward_long_wave_rad_flux_surface=314.0560302734375, land_cover_land1_sea0_surface=1.0, vegitation_type_as_in_sib_surface=18.0, v-component_of_wind_pblri=12.31915283203125, albedo_surface=38.75, lightning_surface=0.0, ice_cover_ice1_no_ice0_surface=0.0, convective_inhibition_surface=-0.65234375, pressure_su

# SQL

In [34]:
from pyspark.sql import functions as F

In [45]:
# Creating an SQL 'table'
df.createOrReplaceTempView("TEMP_DF")

#### Unknown Feature
I didn't know what albedo was, so I looked at its summary statistics. Still unsure, I looked up the definition: 'the proportion of the incident light or radiation that is reflected by a surface, typically that of a planet or moon.'

In [46]:
df.describe('albedo_surface').show()

+-------+----------------+
|summary|  albedo_surface|
+-------+----------------+
|  count|             100|
|   mean|           18.07|
| stddev|17.4802948221907|
|    min|             6.0|
|    max|            76.0|
+-------+----------------+



#### Hot Hot Hot
The hottest tempurature in the dataset is at {location} at {time}. Looking at the other highest tempuratures, it {does/not} appear to be an anomaly. 

In [40]:
tempminmax = spark.sql("SELECT MIN(temperature_surface) as mintemp, MAX(temperature_surface) as maxtemp FROM TEMP_DF").collect()

In [101]:
df.select(min(df.temperature_surface), max(df.temperature_surface)).first()[1]

306.4980163574219

In [48]:
hottest_record = df.orderBy("temperature_surface", ascending=False).first()
df.orderBy("temperature_surface", ascending=False).select("temperature_surface").head(10)

[Row(temperature_surface=306.4980163574219),
 Row(temperature_surface=301.3730163574219),
 Row(temperature_surface=299.9980163574219),
 Row(temperature_surface=299.9980163574219),
 Row(temperature_surface=299.7480163574219),
 Row(temperature_surface=299.6230163574219),
 Row(temperature_surface=299.4980163574219),
 Row(temperature_surface=299.1230163574219),
 Row(temperature_surface=298.9980163574219),
 Row(temperature_surface=298.8730163574219)]

In [99]:
df.describe('temperature_surface').show()

+-------+-------------------+
|summary|temperature_surface|
+-------+-------------------+
|  count|                100|
|   mean|  284.9017663574219|
| stddev| 13.002025568205239|
|    min|          247.49802|
|    max|          306.49802|
+-------+-------------------+



In [97]:
from datetime import datetime, timezone
timestamp = hottest_record['Timestamp'] / 1000
datetime.fromtimestamp(timestamp, timezone.utc)

datetime.datetime(2015, 3, 15, 0, 0, tzinfo=datetime.timezone.utc)

In [13]:
hottest_record['Geohash']

'9qd23ynghrrz'

In [194]:
rdd = df.select("temperature_surface").rdd

#rdd.map(lambda row : row[0]).collect()
#rdd.map(lambda row : row.temperature_surface).collect()
hist_rdd = df.rdd.map(lambda row : row.temperature_surface).histogram(5)
(start_vals, counts) = hist_rdd

zipped = zip(start_vals, counts)
type(zipped)
hist_df = spark.createDataFrame(zipped)
hist_df.createOrReplaceTempView("histogramTable")

# rdd = df.rdd.map(lambda row : row.temperature_surface)
# help(spark.createDataFrame)
# spark.createDataFrame(rdd, FloatType())
# spark.createDataFrame(hist_rdd)

DataFrame[_1: double, _2: bigint]

In [20]:
# Let's get all the snow cover values:
snow = spark.sql("SELECT snow_cover_surface FROM TEMP_DF").collect()
# .collect() gives us a list of rows. Let's grab the first 10:
for i in range(10):
    print(snow[i])


# What's the maximum value?
snowmax = spark.sql("SELECT MAX(snow_cover_surface) as maxval FROM TEMP_DF").collect()

print('Max val observed:', snowmax)


Row(snow_cover_surface=0.0)
Row(snow_cover_surface=0.0)
Row(snow_cover_surface=0.0)
Row(snow_cover_surface=100.0)
Row(snow_cover_surface=0.0)
Row(snow_cover_surface=0.0)
Row(snow_cover_surface=100.0)
Row(snow_cover_surface=0.0)
Row(snow_cover_surface=0.0)
Row(snow_cover_surface=0.0)
Max val observed: [Row(maxval=100.0)]


Help on GroupedData in module pyspark.sql.group object:

class GroupedData(builtins.object)
 |  A set of methods for aggregations on a :class:`DataFrame`,
 |  created by :func:`DataFrame.groupBy`.
 |  
 |  .. note:: Experimental
 |  
 |  .. versionadded:: 1.3
 |  
 |  Methods defined here:
 |  
 |  __init__(self, jgd, df)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  agg(self, *exprs)
 |      Compute aggregates and returns the result as a :class:`DataFrame`.
 |      
 |      The available aggregate functions are `avg`, `max`, `min`, `sum`, `count`.
 |      
 |      If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
 |      is the column to perform aggregation on, and the value is the aggregate function.
 |      
 |      Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
 |      
 |      :param exprs: a dict mapping from column name (string) to aggregate functions (string),
 |          o

#### So Snowy
One location where it is snowing all year, i.e. where it is snowing for every record in its timeseries is {location}.


In [63]:
# gdf = df.groupBy('Geohash')
# gdf.agg(sum('categorical_snow_yes1_no0_surface'), count('*')).orderBy('sum(categorical_snow_yes1_no0_surface)', ascending = False).collect()

spark.sql("SELECT geohash, sum, cnt, (sum / cnt) AS div \
            FROM ( \
                SELECT geohash, SUM(categorical_snow_yes1_no0_surface) AS sum, COUNT(*) as cnt \
                FROM TEMP_DF \
                GROUP BY(geohash) \
            ) ORDER BY div DESC").head(10)

[Row(geohash='c1nuq5290jup', sum=1.0, cnt=1, div=1.0),
 Row(geohash='f2w29r4werxb', sum=1.0, cnt=1, div=1.0),
 Row(geohash='f2d5v1jeyp7z', sum=1.0, cnt=1, div=1.0),
 Row(geohash='c6s64488ws80', sum=1.0, cnt=1, div=1.0),
 Row(geohash='f2fh6jpdgv5b', sum=1.0, cnt=1, div=1.0),
 Row(geohash='fccz22w4fytb', sum=1.0, cnt=1, div=1.0),
 Row(geohash='drmg1tprm22p', sum=0.0, cnt=1, div=0.0),
 Row(geohash='8gzfnvh85g0p', sum=0.0, cnt=1, div=0.0),
 Row(geohash='8up5c0e570pz', sum=0.0, cnt=1, div=0.0),
 Row(geohash='dkgtgw6v6y7z', sum=0.0, cnt=1, div=0.0)]

Help on method agg in module pyspark.sql.group:

agg(*exprs) method of pyspark.sql.group.GroupedData instance
    Compute aggregates and returns the result as a :class:`DataFrame`.
    
    The available aggregate functions are `avg`, `max`, `min`, `sum`, `count`.
    
    If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
    is the column to perform aggregation on, and the value is the aggregate function.
    
    Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
    
    :param exprs: a dict mapping from column name (string) to aggregate functions (string),
        or a list of :class:`Column`.
    
    >>> gdf = df.groupBy(df.name)
    >>> sorted(gdf.agg({"*": "count"}).collect())
    [Row(name='Alice', count(1)=1), Row(name='Bob', count(1)=1)]
    
    >>> from pyspark.sql import functions as F
    >>> sorted(gdf.agg(F.min(df.age)).collect())
    [Row(name='Alice', min(age)=2), Row(name='Bob', min(age)=5)]
    
  

In [21]:
from pyspark.sql.functions import avg

df.select(avg(df.wilting_point_surface)).show()

+--------------------------+
|avg(wilting_point_surface)|
+--------------------------+
|      0.029712499007582664|
+--------------------------+



# Sampling

We can even create a sample dataset with Spark! Let's create a 10% sample (without replacement)

In [None]:
samp = df.sample(False, .1)

# Write it out to a file
samp.write.format('csv').save('hdfs://orion12:50000/sampled_output.csv')