# Loading Data

In [None]:
df = spark.read.load(
    'hdfs://orion12:9001/nam/2019/11/namanl_218_20191127*.grb2.tdv.gz',
    format='csv',
    sep='\t',
    inferSchema=True,
    header=True)

I loaded a smaller dataset than usual because training models can take a *LOT* of time. In practice, you want as much data as you can get your hands on, but we don't want to sit and watch Spark run for half the class.

Let's take a look at our schema...

In [None]:
df.columns

Looks good... (well, we probably don't need that last column -- _c18. It's blank because our .tdv files' header end with a tab character. I removed it in the sampled dataset).

However, what would you need to do if you **didn't** have a workable schema autogenerated for you?

## Custom Schema Example (don't run on our NAM dataset!)

In [None]:

# NOTE: this is just an example. It's not needed for our dataset!

feats = []
f = open('features.txt')
for line_num, line in enumerate(f):
    if line_num == 0:
        # The first field is a long integer
        feats.append(StructField(line.strip(), LongType(), True))
    elif line_num == 1:
        # The second field is a string
        feats.append(StructField(line.strip(), StringType(), True))
    else:
        # All the other features are floats
        feats.append(StructField(line.strip(), FloatType(), True))

# I hard-coded some of the data types above. You could list them all out or use a loop if you have
# several columns all with the same type.

schema = StructType(feats)
print(schema)

df = spark.read.format('csv').option('sep', '\t').schema(schema).load('hdfs://path/to/your/custom/dataset')


## And now... back to Machine Learning!

In [None]:
from pyspark.ml.feature import VectorAssembler

def prepare_data(dframe, predictors, target):
    assembler = VectorAssembler(inputCols=predictors, outputCol="features")
    output = assembler.transform(dframe)
    return output.select("features", target).withColumnRenamed(target, "label")

# Choose our dependent and independent variables:
prepped = prepare_data(df,
    ["precipitable_water_entire_atmosphere_single_layer", 
     "pressure_surface",
         "relative_humidity_zerodegc_isotherm", 
         "snow_depth_surface", 
         "albedo_surface"],
    "temperature_surface")

prepped.show()
(trainingData, testData) = prepped.randomSplit([0.9, 0.1])

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(numTrees=100, maxDepth=5, maxBins=32)
model = rf.fit(trainingData)
predictions = model.transform(testData)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

I got an RMSE of 3.82 K, which is not particularly great... We also need to look at the lag plot for this. Pandas does a decent job of doing this automatically, but we can also customize the plot with matplotlib directly.

In [None]:
import matplotlib.pyplot as plt

%matplotlib notebook

p_df = predictions.select("label", "prediction").toPandas()

plt.suptitle('Random Forest Regressor', fontsize=16)

minval = p_df[['label', 'prediction']].min().min()
maxval = p_df[['label', 'prediction']].max().max()
plt.axis([minval, maxval, minval, maxval])

plt.plot(p_df['label'], p_df['prediction'], '.', color='#2ba5f1')
plt.plot(range(int(minval), int(maxval)), range(int(minval), int(maxval)), lw=3, dashes=(10, 3), color='#000000', alpha=0.25, label='Ideal Predictor')
plt.show()