Install spark

In [4]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812371 sha256=cebc9651d344b3a5fa6ffa7ed3b123aa909f0fba6ef2f6c878289a03a9c090d9
  Stored in directory: /Users/walkermartin/Library/Caches/pip/wheels/11/67/ea/33c283e520b775aa7a7a0d404447e287be841a711d074d4d91
Successfully built pyspark
Installing collected 

In [5]:
!pip show pyspark

Name: pyspark
Version: 3.5.2
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages
Requires: py4j
Required-by: 


In [159]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Predictive Consumption Model") \
    .getOrCreate()

24/09/06 00:28:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [160]:
print(spark.version)
print(spark.sparkContext.appName)
print(spark.sparkContext.master)

3.5.2
Predictive Consumption Models
local[*]


In [161]:
file_path = "Converted_Energy_Consumption.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show(df.count(), truncate=False)

+---+----+------------+-----------+------------+
|Id |Year|Fuel_Type_Id|Consumption|Units       |
+---+----+------------+-----------+------------+
|1  |1950|1           |5974.61    |Trillion BTU|
|2  |1955|1           |9006.98    |Trillion BTU|
|3  |1960|1           |12397.81   |Trillion BTU|
|4  |1965|1           |15830.08   |Trillion BTU|
|5  |1970|1           |21900.0    |Trillion BTU|
|6  |1975|1           |20241.37   |Trillion BTU|
|7  |1980|1           |20592.57   |Trillion BTU|
|8  |1985|1           |17903.12   |Trillion BTU|
|9  |1990|1           |19864.26   |Trillion BTU|
|10 |1995|1           |23006.45   |Trillion BTU|
|11 |2000|1           |24172.99   |Trillion BTU|
|12 |2005|1           |22806.5    |Trillion BTU|
|13 |2010|1           |24954.13   |Trillion BTU|
|14 |2011|1           |25358.17   |Trillion BTU|
|15 |2012|1           |26457.37   |Trillion BTU|
|16 |2013|1           |27096.58   |Trillion BTU|
|17 |2014|1           |27550.35   |Trillion BTU|
|18 |2015|1         

In [162]:
df.printSchema()
df.describe().show()

root
 |-- Id: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Fuel_Type_Id: integer (nullable = true)
 |-- Consumption: double (nullable = true)
 |-- Units: string (nullable = true)

+-------+------------------+------------------+------------------+------------------+------------+
|summary|                Id|              Year|      Fuel_Type_Id|       Consumption|       Units|
+-------+------------------+------------------+------------------+------------------+------------+
|  count|               218|               218|               218|               218|         218|
|   mean|             109.5|2003.1376146788991| 5.068807339449541|  9448.71871559633|        NULL|
| stddev|63.075351762792415|19.639595052423868|2.7153496858835537|13111.752175908192|        NULL|
|    min|                 1|              1950|                 1|              10.0|Trillion BTU|
|    max|               218|              2023|                 9|          43157.08|Trillion BTU|
+----

In [171]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator



# Indexing for categorical variable
indexer = StringIndexer(inputCol="Fuel_Type_Id", outputCol="Fuel_Type_Index")

# One-hot encoding
encoder = OneHotEncoder(inputCols=["Fuel_Type_Index"], outputCols=["Fuel_Type_OHE"])

# Vector assembler
assembler = VectorAssembler(
    inputCols=["Fuel_Type_OHE", "Year"], 
    outputCol="features"
)

# Random Forrest 
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="Consumption",    
    numTrees=10
)

# Create Pipeline
pipeline = Pipeline(stages=[indexer, encoder, assembler, rf])

# Preparing data
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

# Fit pipeline  on training data
new_model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(
    labelCol="Consumption", 
    predictionCol="prediction",
    metricName="rmse"  # Root Mean Squared Error
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")



In [139]:
# Prediction data

new_data9 = spark.createDataFrame([
    Row(Year=2025, Fuel_Type_Id=9), 
    Row(Year=2026, Fuel_Type_Id=9),
    Row(Year=2027, Fuel_Type_Id=9),
    Row(Year=2028, Fuel_Type_Id=9),
    Row(Year=2029, Fuel_Type_Id=9),
    Row(Year=2030, Fuel_Type_Id=9),
    Row(Year=2031, Fuel_Type_Id=9),
    Row(Year=2032, Fuel_Type_Id=9),
    Row(Year=2033, Fuel_Type_Id=9),
    Row(Year=2034, Fuel_Type_Id=9),
    Row(Year=2035, Fuel_Type_Id=9),
    Row(Year=2036, Fuel_Type_Id=9),
    Row(Year=2037, Fuel_Type_Id=9),
    Row(Year=2038, Fuel_Type_Id=9),
    Row(Year=2039, Fuel_Type_Id=9),
    Row(Year=2040, Fuel_Type_Id=9),
    Row(Year=2041, Fuel_Type_Id=9),
    Row(Year=2042, Fuel_Type_Id=9),
    Row(Year=2043, Fuel_Type_Id=9),
    Row(Year=2044, Fuel_Type_Id=9),
    Row(Year=2045, Fuel_Type_Id=9),
    Row(Year=2046, Fuel_Type_Id=9),
    Row(Year=2047, Fuel_Type_Id=9),
    Row(Year=2048, Fuel_Type_Id=9),
    Row(Year=2049, Fuel_Type_Id=9),
    Row(Year=2050, Fuel_Type_Id=9),
], ['Year', 'Fuel_Type_Id'])





    Year  Fuel_Type_Id  Predicted_Consumption
0   2025             9               871.1415
1   2026             9               871.1415
2   2027             9               871.1415
3   2028             9               871.1415
4   2029             9               871.1415
5   2030             9               871.1415
6   2031             9               871.1415
7   2032             9               871.1415
8   2033             9               871.1415
9   2034             9               871.1415
10  2035             9               871.1415
11  2036             9               871.1415
12  2037             9               871.1415
13  2038             9               871.1415
14  2039             9               871.1415
15  2040             9               871.1415
16  2041             9               871.1415
17  2042             9               871.1415
18  2043             9               871.1415
19  2044             9               871.1415
20  2045             9            