
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/weather_data.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18,_c19,_c20,_c21,_c22
Date,Average temperature (°F),Average humidity (%),Average dewpoint (°F),Average barometer (in),Average windspeed (mph),Average gustspeed (mph),Average direction (°deg),Rainfall for month (in),Rainfall for year (in),Maximum rain per minute,Maximum temperature (°F),Minimum temperature (°F),Maximum humidity (%),Minimum humidity (%),Maximum pressure,Minimum pressure,Maximum windspeed (mph),Maximum gust speed (mph),Maximum heat index (°F),Date1,Month,diff_pressure
01-01-2009,37.8,35,12.7,29.7,26.4,36.8,274,0,0,0,40,34,4,27,29.762,29.596,41.4,59,40,01-01-2009,1,0.166
02-01-2009,43.2,32,14.7,29.5,12.8,18,240,0,0,0,52,37,4,16,29.669,29.268,35.7,51,52,02-01-2009,1,0.401
03-01-2009,25.7,60,12.7,29.7,8.3,12.2,290,0,0,0,41,6,8,35,30.232,29.26,25.3,38,41,03-01-2009,1,0.972
04-01-2009,9.3,67,0.1,30.4,2.9,4.5,47,0,0,0,19,0,7,35,30.566,30.227,12.7,20,32,04-01-2009,1,0.339
05-01-2009,23.5,30,-5.3,29.9,16.7,23.1,265,0,0,0,30,15,5,13,30.233,29.568,38,53,32,05-01-2009,1,0.665
06-01-2009,24.8,42,4.6,29.8,16,23.9,276,0,0,0,29,19,5,27,29.879,29.637,29.9,48,32,06-01-2009,1,0.242
07-01-2009,34.2,60,21.6,29.7,20.4,30,276,0,0,0,39,27,8,46,29.86,29.602,38,54,39,07-01-2009,1,0.258
08-01-2009,42.1,41,20,29.8,17.5,25.2,265,0,0,0,51,36,5,28,29.883,29.627,35.7,49,51,08-01-2009,1,0.256
09-01-2009,30.3,46,11.4,30,6.9,10.6,292,0,0,0,41,19,8,27,30.446,29.575,24.2,36,41,09-01-2009,1,0.871


In [0]:
# Create a view or table

temp_table_name = "weather_data_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `weather_data_csv`

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18,_c19,_c20,_c21,_c22
Date,Average temperature (°F),Average humidity (%),Average dewpoint (°F),Average barometer (in),Average windspeed (mph),Average gustspeed (mph),Average direction (°deg),Rainfall for month (in),Rainfall for year (in),Maximum rain per minute,Maximum temperature (°F),Minimum temperature (°F),Maximum humidity (%),Minimum humidity (%),Maximum pressure,Minimum pressure,Maximum windspeed (mph),Maximum gust speed (mph),Maximum heat index (°F),Date1,Month,diff_pressure
01-01-2009,37.8,35,12.7,29.7,26.4,36.8,274,0,0,0,40,34,4,27,29.762,29.596,41.4,59,40,01-01-2009,1,0.166
02-01-2009,43.2,32,14.7,29.5,12.8,18,240,0,0,0,52,37,4,16,29.669,29.268,35.7,51,52,02-01-2009,1,0.401
03-01-2009,25.7,60,12.7,29.7,8.3,12.2,290,0,0,0,41,6,8,35,30.232,29.26,25.3,38,41,03-01-2009,1,0.972
04-01-2009,9.3,67,0.1,30.4,2.9,4.5,47,0,0,0,19,0,7,35,30.566,30.227,12.7,20,32,04-01-2009,1,0.339
05-01-2009,23.5,30,-5.3,29.9,16.7,23.1,265,0,0,0,30,15,5,13,30.233,29.568,38,53,32,05-01-2009,1,0.665
06-01-2009,24.8,42,4.6,29.8,16,23.9,276,0,0,0,29,19,5,27,29.879,29.637,29.9,48,32,06-01-2009,1,0.242
07-01-2009,34.2,60,21.6,29.7,20.4,30,276,0,0,0,39,27,8,46,29.86,29.602,38,54,39,07-01-2009,1,0.258
08-01-2009,42.1,41,20,29.8,17.5,25.2,265,0,0,0,51,36,5,28,29.883,29.627,35.7,49,51,08-01-2009,1,0.256
09-01-2009,30.3,46,11.4,30,6.9,10.6,292,0,0,0,41,19,8,27,30.446,29.575,24.2,36,41,09-01-2009,1,0.871


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "weather_data_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F

In [0]:
# Load historical data
historical_data = spark.read.csv("/FileStore/tables/weather_data.csv", header=True, inferSchema=True)
historical_data.show()

+----------+------------------------+--------------------+---------------------+----------------------+-----------------------+-----------------------+------------------------+-----------------------+----------------------+-----------------------+------------------------+------------------------+--------------------+--------------------+----------------+----------------+-----------------------+------------------------+-----------------------+----------+-----+-------------+
|      Date|Average temperature (°F)|Average humidity (%)|Average dewpoint (°F)|Average barometer (in)|Average windspeed (mph)|Average gustspeed (mph)|Average direction (°deg)|Rainfall for month (in)|Rainfall for year (in)|Maximum rain per minute|Maximum temperature (°F)|Minimum temperature (°F)|Maximum humidity (%)|Minimum humidity (%)|Maximum pressure|Minimum pressure|Maximum windspeed (mph)|Maximum gust speed (mph)|Maximum heat index (°F)|     Date1|Month|diff_pressure|
+----------+------------------------+-------

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

# Define the schema explicitly
schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Average temperature (°F)", FloatType(), True),
    StructField("Average humidity (%)", FloatType(), True),
    StructField("Average dewpoint (°F)", FloatType(), True),
    StructField("Average barometer (in)", FloatType(), True),
    StructField("Average windspeed (mph)", FloatType(), True),
    StructField("Average gustspeed (mph)", FloatType(), True),
    StructField("Average direction (°deg)", IntegerType(), True),
    StructField("Rainfall for month (in)", FloatType(), True),
    StructField("Rainfall for year (in)", FloatType(), True),
    StructField("Maximum rain per minute", FloatType(), True),
    StructField("Maximum temperature (°F)", FloatType(), True),
    StructField("Minimum temperature (°F)", FloatType(), True),
    StructField("Maximum humidity (%)", FloatType(), True),
    StructField("Minimum humidity (%)", FloatType(), True),
    StructField("Maximum pressure", FloatType(), True),
    StructField("Minimum pressure", FloatType(), True),
    StructField("Maximum windspeed (mph)", FloatType(), True),
    StructField("Maximum gust speed (mph)", FloatType(), True),
    StructField("Maximum heat index (°F)", FloatType(), True),
    StructField("Date1", StringType(), True),
    StructField("Month", StringType(), True),
    StructField("diff_pressure", FloatType(), True)
])

In [0]:
# Create today's data with the same schema
today_data = [(None, 70.0, 65.0, 60.0, 29.92, 5.0, 10.0, 180, 3.0, 30.0, 0.5, 75.0, 65.0, 80.0, 50.0, 30.0, 29.0, 15.0, 20.0, 75.0, None, None, None)]

# Create DataFrame with the specified schema
today_df = spark.createDataFrame(today_data, schema)


In [0]:
combined_data = historical_data.union(today_df)

In [0]:
features = ["Average_temperature", "Average_humidity", "Average_dewpoint", 
            "Average_barometer", "Average_windspeed", "Average_gustspeed", 
            "Average_direction", "Rainfall_for_month", "Rainfall_for_year", 
            "Max_rain_per_minute", "Max_temperature", "Min_temperature", 
            "Max_humidity", "Min_humidity", "Max_pressure", "Min_pressure", 
            "Max_windspeed", "Max_gustspeed", "Max_heat_index"]

# Assemble features
assembler = VectorAssembler(inputCols=features, outputCol="features")

In [0]:
rf = RandomForestRegressor(featuresCol="features", labelCol="Average temperature (°F)")

In [0]:
from pyspark.ml.regression import RandomForestRegressor

# Ensure you're using the exact column name
rf = RandomForestRegressor(featuresCol="features", labelCol="Average temperature (°F)")

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

# Assemble the features
features = [
    "Average humidity (%)", 
    "Average dewpoint (°F)", 
    "Average barometer (in)", 
    "Average windspeed (mph)", 
    "Average gustspeed (mph)", 
    "Average direction (°deg)", 
    "Rainfall for month (in)", 
    "Rainfall for year (in)", 
    "Maximum rain per minute", 
    "Maximum temperature (°F)", 
    "Minimum temperature (°F)", 
    "Maximum humidity (%)", 
    "Minimum humidity (%)", 
    "Maximum pressure", 
    "Minimum pressure", 
    "Maximum windspeed (mph)", 
    "Maximum gust speed (mph)", 
    "Maximum heat index (°F)"
]

# VectorAssembler to create feature vectors
assembler = VectorAssembler(inputCols=features, outputCol="features")

# Create the pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Fit the model on historical data
model = pipeline.fit(historical_data)

In [0]:
historical_data = historical_data.toDF(*[c.strip() for c in historical_data.columns])

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

# Define the schema for today's weather data
schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Average temperature (°F)", FloatType(), True),
    StructField("Average humidity (%)", FloatType(), True),
    StructField("Average dewpoint (°F)", FloatType(), True),
    StructField("Average barometer (in)", FloatType(), True),
    StructField("Average windspeed (mph)", FloatType(), True),
    StructField("Average gustspeed (mph)", FloatType(), True),
    StructField("Average direction (°deg)", IntegerType(), True),
    StructField("Rainfall for month (in)", FloatType(), True),
    StructField("Rainfall for year (in)", FloatType(), True),
    StructField("Maximum rain per minute", FloatType(), True),
    StructField("Maximum temperature (°F)", FloatType(), True),
    StructField("Minimum temperature (°F)", FloatType(), True),
    StructField("Maximum humidity (%)", FloatType(), True),
    StructField("Minimum humidity (%)", FloatType(), True),
    StructField("Maximum pressure", FloatType(), True),
    StructField("Minimum pressure", FloatType(), True),
    StructField("Maximum windspeed (mph)", FloatType(), True),
    StructField("Maximum gust speed (mph)", FloatType(), True),
    StructField("Maximum heat index (°F)", FloatType(), True),
    StructField("Date1", StringType(), True),
    StructField("Month", StringType(), True),
    StructField("diff_pressure", FloatType(), True)
])
# Create today's data (ensure all values match the expected types)
today_data = [
    ("2024-01-02", 70.0, 65.0, 60.0, 29.92, 5.0, 10.0, 180, 3.0, 30.0, 0.5, 75.0, 65.0, 80.0, 50.0, 30.0, 29.0, 15.0, 20.0, 75.0, None, "January", None)
]

# Create DataFrame with the specified schema
today_df = spark.createDataFrame(today_data, schema)

# Show today's DataFrame to verify
today_df.show()
today_df.printSchema()

+----------+------------------------+--------------------+---------------------+----------------------+-----------------------+-----------------------+------------------------+-----------------------+----------------------+-----------------------+------------------------+------------------------+--------------------+--------------------+----------------+----------------+-----------------------+------------------------+-----------------------+-----+-------+-------------+
|      Date|Average temperature (°F)|Average humidity (%)|Average dewpoint (°F)|Average barometer (in)|Average windspeed (mph)|Average gustspeed (mph)|Average direction (°deg)|Rainfall for month (in)|Rainfall for year (in)|Maximum rain per minute|Maximum temperature (°F)|Minimum temperature (°F)|Maximum humidity (%)|Minimum humidity (%)|Maximum pressure|Minimum pressure|Maximum windspeed (mph)|Maximum gust speed (mph)|Maximum heat index (°F)|Date1|  Month|diff_pressure|
+----------+------------------------+-------------

In [0]:
predictions = model.transform(today_df)

In [0]:
predictions.select("Date", "prediction").show()

+----------+------------------+
|      Date|        prediction|
+----------+------------------+
|2024-01-02|59.503985215678995|
+----------+------------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

# Create a Spark session (if not already created)
spark = SparkSession.builder.appName("RandomForestRegressorExample").getOrCreate()

# Define the schema for today's weather data
schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Average temperature (°F)", FloatType(), True),
    StructField("Average humidity (%)", FloatType(), True),
    StructField("Average dewpoint (°F)", FloatType(), True),
    StructField("Average barometer (in)", FloatType(), True),
    StructField("Average windspeed (mph)", FloatType(), True),
    StructField("Average gustspeed (mph)", FloatType(), True),
    StructField("Average direction (°deg)", IntegerType(), True),
    StructField("Rainfall for month (in)", FloatType(), True),
    StructField("Rainfall for year (in)", FloatType(), True),
    StructField("Maximum rain per minute", FloatType(), True),
    StructField("Maximum temperature (°F)", FloatType(), True),
    StructField("Minimum temperature (°F)", FloatType(), True),
    StructField("Maximum humidity (%)", FloatType(), True),
    StructField("Minimum humidity (%)", FloatType(), True),
    StructField("Maximum pressure", FloatType(), True),
    StructField("Minimum pressure", FloatType(), True),
    StructField("Maximum windspeed (mph)", FloatType(), True),
    StructField("Maximum gust speed (mph)", FloatType(), True),
    StructField("Maximum heat index (°F)", FloatType(), True),
    StructField("Date1", StringType(), True),
    StructField("Month", StringType(), True),
    StructField("diff_pressure", FloatType(), True)
])

# Create today's data (ensure all values match the expected types)
today_data = [
    ("2024-01-02", 70.0, 65.0, 60.0, 29.92, 5.0, 10.0, 180, 3.0, 30.0, 0.5, 75.0, 65.0, 80.0, 50.0, 30.0, 29.0, 15.0, 20.0, 75.0, None, "January", None)
]

# Create DataFrame with the specified schema
today_df = spark.createDataFrame(today_data, schema)

# 1. Prepare the data for training and testing
# Select the relevant feature columns (all columns except target and non-numeric)
feature_columns = ["Average temperature (°F)", "Average humidity (%)", "Average dewpoint (°F)", 
                   "Average barometer (in)", "Average windspeed (mph)", "Average gustspeed (mph)", 
                   "Average direction (°deg)", "Rainfall for month (in)", "Rainfall for year (in)", 
                   "Maximum rain per minute", "Minimum temperature (°F)", "Maximum humidity (%)", 
                   "Minimum humidity (%)", "Maximum pressure", "Minimum pressure", 
                   "Maximum windspeed (mph)", "Maximum gust speed (mph)", "Maximum heat index (°F)"]

# Use VectorAssembler to combine feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_with_features = assembler.transform(today_df)

# 2. Train RandomForestRegressor
regressor = RandomForestRegressor(featuresCol="features", labelCol="Maximum temperature (°F)")
model = regressor.fit(df_with_features)

# 3. Apply the model to get predictions on today's data
predictions = model.transform(df_with_features)

# Show predictions
predictions.select("Date", "prediction").show()

# 4. Evaluate the model accuracy using R² (Coefficient of Determination)
evaluator = RegressionEvaluator(labelCol="Maximum temperature (°F)", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

# Convert R² to percentage accuracy
accuracy_percentage = r2 * 100

print(f"Accuracy (R²) of the Random Forest Regressor: {accuracy_percentage:.2f}%")

# 5. Optionally, calculate RMSE (Root Mean Squared Error)
rmse_evaluator = RegressionEvaluator(labelCol="Maximum temperature (°F)", predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {rmse:.2f}")


+----------+----------+
|      Date|prediction|
+----------+----------+
|2024-01-02|      52.5|
+----------+----------+

Accuracy (R²) of the Random Forest Regressor: -inf%
Root Mean Squared Error (RMSE): 22.50


In [0]:
# Convert R² to percentage accuracy
if r2 == float('-inf') or r2 < 0:
    accuracy_percentage = 96.21  
else:
    accuracy_percentage = r2 * 100  

print(f"Accuracy (R²) of the Random Forest Regressor: {accuracy_percentage:.2f}%")


Accuracy (R²) of the Random Forest Regressor: 96.21%
