In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=cbbeb685cdb5c78db9193df5165f22b511914e2644d3ad8684ff90446e12dbb5
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [7]:
# Create a Spark session
spark = SparkSession.builder.appName("ProjectWindmill").getOrCreate()
#loading dataset
data = spark.read.csv("/content/sample_data/wind_mill_dataset.csv", header=True, inferSchema=True)
# Display all attributes in the dataset
data.printSchema()

root
 |-- Air_temperature: double (nullable = true)
 |-- Pressure: double (nullable = true)
 |-- Wind_speed: double (nullable = true)
 |-- Wind_direction: integer (nullable = true)
 |-- Power_generated: double (nullable = true)



In [9]:
# describing target attribute
data.select('Power_generated').describe().show()

+-------+-----------------+
|summary|  Power_generated|
+-------+-----------------+
|  count|             2499|
|   mean|19881.97538495402|
| stddev|17998.58950119579|
|    min|              0.0|
|    max|          59918.3|
+-------+-----------------+



In [10]:
import pandas as pd
pd.DataFrame(data.take(5), columns=data.columns)

Unnamed: 0,Air_temperature,Pressure,Wind_speed,Wind_direction,Power_generated
0,10.926,0.979103,9.014,229,33688.1
1,9.919,0.979566,9.428,232,37261.9
2,8.567,0.979937,8.7,236,30502.9
3,7.877,0.980053,8.481,247,28419.2
4,7.259,0.979867,8.383,256,27370.3


In [11]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
#indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in [ "Status"]]
#encoder = [OneHotEncoder(inputCols=[column+"_index"], outputCols=[column+"_encoded"]) for column in [ "Status"]]
#indexers_and_encoders = indexers + encoder
#indexer_model = [indexer.fit(data) for indexer in indexers]
encoded_data = data
#for i in range(len(indexers)):
 #   encoded_data = indexer_model[i].transform(encoded_data)
print(encoded_data)

DataFrame[Air_temperature: double, Pressure: double, Wind_speed: double, Wind_direction: int, Power_generated: double]


In [12]:
# Assemble features
feature_columns = encoded_data.columns
print(feature_columns)
feature_columns.remove("Power_generated")
print(feature_columns)
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
print(assembler)
assembled_data = assembler.transform(encoded_data)
print(assembled_data)
assembled_data.show()

['Air_temperature', 'Pressure', 'Wind_speed', 'Wind_direction', 'Power_generated']
['Air_temperature', 'Pressure', 'Wind_speed', 'Wind_direction']
VectorAssembler_5cd4d384fb30
DataFrame[Air_temperature: double, Pressure: double, Wind_speed: double, Wind_direction: int, Power_generated: double, features: vector]
+---------------+--------+----------+--------------+---------------+--------------------+
|Air_temperature|Pressure|Wind_speed|Wind_direction|Power_generated|            features|
+---------------+--------+----------+--------------+---------------+--------------------+
|         10.926|0.979103|     9.014|           229|        33688.1|[10.926,0.979103,...|
|          9.919|0.979566|     9.428|           232|        37261.9|[9.919,0.979566,9...|
|          8.567|0.979937|       8.7|           236|        30502.9|[8.567,0.979937,8...|
|          7.877|0.980053|     8.481|           247|        28419.2|[7.877,0.980053,8...|
|          7.259|0.979867|     8.383|           256|     

In [13]:
assembled_data.show(5)
display(feature_columns)

+---------------+--------+----------+--------------+---------------+--------------------+
|Air_temperature|Pressure|Wind_speed|Wind_direction|Power_generated|            features|
+---------------+--------+----------+--------------+---------------+--------------------+
|         10.926|0.979103|     9.014|           229|        33688.1|[10.926,0.979103,...|
|          9.919|0.979566|     9.428|           232|        37261.9|[9.919,0.979566,9...|
|          8.567|0.979937|       8.7|           236|        30502.9|[8.567,0.979937,8...|
|          7.877|0.980053|     8.481|           247|        28419.2|[7.877,0.980053,8...|
|          7.259|0.979867|     8.383|           256|        27370.3|[7.259,0.979867,8...|
+---------------+--------+----------+--------------+---------------+--------------------+
only showing top 5 rows



['Air_temperature', 'Pressure', 'Wind_speed', 'Wind_direction']

In [30]:
train_data, test_data = assembled_data.randomSplit([0.7, 0.3], seed=125)
# Hypothetical data for the pie chart
train_power_predection= train_data.select("Power_generated").rdd.flatMap(lambda x: x).collect()
test_power_predection = test_data.select("Power_generated").rdd.flatMap(lambda x: x).collect()
labels = ['Training Data', 'Test Data']
sizes = [sum(train_power_predection), sum(test_power_predection )]
explode = (0.1, 0)  # Explode the first slice (Training Data)

In [31]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'Power_generated', maxBins=180)
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
dt_results=dt_predictions.select("Power_generated","prediction")
dt_results.show(5)

+---------------+------------------+
|Power_generated|        prediction|
+---------------+------------------+
|        2550.07|2520.2361016949158|
|        6455.35| 6512.504745762712|
|        6819.52| 6512.504745762712|
|        8873.32|           8201.24|
|        12727.9|11743.892929292928|
+---------------+------------------+
only showing top 5 rows



In [32]:
evaluator = RegressionEvaluator(labelCol="Power_generated", predictionCol="prediction", metricName="rmse")
rmse3 = evaluator.evaluate(dt_predictions)
print("Root-mean-square error = " + str(rmse3))

evaluator = RegressionEvaluator(labelCol="Power_generated", predictionCol="prediction", metricName="r2")
r23 = evaluator.evaluate(dt_predictions)
print("R-squared = " + str(r23))

Root-mean-square error = 1016.0111463131537
R-squared = 0.9969883325258467


In [34]:
input_at=input("Enter the Air Temperature: ")
input_p=input("Enter the Preassure: ")
input_ws=input("Enter the windspeed: ")
input_wd=input("Enter the wind direction: ")
Air_temperature=float(input_at)
Pressure=float(input_p)
Wind_speed=float(input_ws)
Wind_direction=float(input_wd)
spark.createDataFrame(new_data,[input_at,input_p,input_ws,input_wd])
new_data_df=assembler.transform(new_data)
new_prediction=dt_model.transform(new_data_df)

prediction=new_prediction.select("prediction").collect()[0]["prediction"]
print("Power Generation is: ",prediction)

Enter the Air Temperature: 10.926
Enter the Preassure: 0.9726
Enter the windspeed: 9.014
Enter the wind direction: 229


AttributeError: 'list' object has no attribute '_jdf'

In [35]:
input_at = input("Enter the Air Temperature: ")
input_p = input("Enter the Preassure: ")
input_ws = input("Enter the windspeed: ")
input_wd = input("Enter the wind direction: ")

Air_temperature = float(input_at)
Pressure = float(input_p)
Wind_speed = float(input_ws)
Wind_direction = float(input_wd)

new_data = [[Air_temperature, Pressure, Wind_speed, Wind_direction]]

new_data_df = spark.createDataFrame(new_data, ["Air_temperature", "Pressure", "Wind_speed", "Wind_direction"])

assembler = VectorAssembler(inputCols=["Air_temperature", "Pressure", "Wind_speed", "Wind_direction"], outputCol="features")
new_data_df = assembler.transform(new_data_df)

new_prediction = dt_model.transform(new_data_df)

prediction = new_prediction.select("prediction").collect()[0]["prediction"]

print("Power Generation is:", prediction)


Enter the Air Temperature: 10.926
Enter the Preassure: 0.979103
Enter the windspeed: 9.014
Enter the wind direction: 229
Power Generation is: 33720.25714285715
