In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
spark = SparkSession.builder \
  .appName('Jupyter BigQuery Storage')\
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') \
  .getOrCreate()

In [3]:
table = " publicdata.samples.natality"
df = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .load()

In [4]:
### preprocessing 
df_natality_select = df \
.select("weight_pounds", "mother_age", "father_age", "gestation_weeks", "weight_gain_pounds", "apgar_5min") \
.where("""
weight_pounds IS NOT NULL 
AND mother_age IS NOT NULL
AND father_age IS NOT NULL
AND gestation_weeks IS NOT NULL
AND weight_gain_pounds IS NOT NULL
AND apgar_5min IS NOT NULL
""")

In [7]:
%%time
df_natality_select.count()

CPU times: user 2.38 ms, sys: 0 ns, total: 2.38 ms
Wall time: 1.38 s


76555742

In [6]:
assembler = VectorAssembler(
    inputCols=["mother_age", "father_age", "gestation_weeks", "weight_gain_pounds", "apgar_5min"],
    outputCol="features")

df_assembler_output = assembler.transform(df_natality_select)

df_training_data = df_assembler_output \
        .select("features", "weight_pounds") \
        .withColumnRenamed("weight_pounds","label")

(df_training, df_test) = df_training_data.randomSplit([0.7, 0.3])

randomForest = RandomForestRegressor(labelCol="label", featuresCol="features", numTrees=20)

In [None]:
%%time
model = randomForest.fit(df_training)

CPU times: user 90.6 ms, sys: 15 ms, total: 106 ms
Wall time: 7min 32s


In [9]:
predictions = model.transform(df_test)

In [10]:
predictions.show(5,False)

+--------------------------+------------------+------------------+
|features                  |label             |prediction        |
+--------------------------+------------------+------------------+
|[11.0,28.0,39.0,99.0,9.0] |7.68751907594     |7.540676262429969 |
|[11.0,31.0,38.0,30.0,9.0] |6.56316153974     |7.2561316307974195|
|[11.0,99.0,36.0,99.0,99.0]|4.2108292041999995|6.402358562675441 |
|[11.0,99.0,41.0,40.0,9.0] |7.25100379718     |7.454228404931898 |
|[12.0,16.0,37.0,99.0,9.0] |6.56316153974     |6.8728955066668815|
+--------------------------+------------------+------------------+
only showing top 5 rows



In [11]:
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 1.07372


In [12]:
spark.stop()