In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=59bd5b3f7e69b81d7b7fcb89d8fe6f0af1819bce115ab314d9d7dc9c1732dce2
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indi

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [None]:

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:

historical_data  = spark.read.csv("datatraining.csv", header=True, inferSchema=True)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
sdf = spark.createDataFrame(df)

In [None]:
# List of numeric columns with data type 'double'
numeric_features = [t[0] for t in historical_data.dtypes if t[1] == 'double']

# Select the numeric columns and compute summary statistics
summary_df = historical_data.select(numeric_features).describe()

# Convert the summary statistics to a Pandas DataFrame for better display
summary_df_pd = summary_df.toPandas().transpose()

# Display the summary statistics
print(summary_df_pd)

             0         1                   2     3      4
summary  count      mean              stddev   min    max
weight   70000  74.20569  14.395756678511347  10.0  200.0


In [None]:
train, test = sdf.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 48937
Test Dataset Count: 21063


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer

# Create a feature vector assembler
feature_cols = ['date', 'temperature', 'humidity', 'light', 'CO2', 'humidity_ratio','occupancy']
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

# Create a label indexer
labelIndexer = StringIndexer(inputCol='occupancy', outputCol='labelIndex')

# Create a RandomForestClassifier
rf = RandomForestClassifier(featuresCol='features', labelCol='labelIndex')

# Create a pipeline
pipeline = Pipeline(stages=[assembler, labelIndexer, rf])

# Fit the pipeline to the training data
model = pipeline.fit(train)

# Make predictions on the test data
predictions = model.transform(test)

In [None]:
predictions.select("labelIndex", "prediction").show(10)

+----------+----------+
|labelIndex|prediction|
+----------+----------+
|       1.0|       1.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       1.0|       0.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       0.0|       0.0|
|       1.0|       1.0|
|       0.0|       0.0|
+----------+----------+
only showing top 10 rows



In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))

Accuracy: 81.35436789%


In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

preds_and_labels = predictions.select(['prediction','labelIndex']).withColumn('labelIndex', F.col('labelIndex').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','labelIndex'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())



[[7020. 3403.]
 [2342. 8298.]]
