<a href="https://colab.research.google.com/github/AndreyBuyanov/HPC.Lab2/blob/main/HPC_Lab2_V3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

In [3]:
import os
os.environ['APACHE_SPARK_DISTR_NAME'] = 'spark-3.0.2-bin-hadoop2.7'
os.environ['APACHE_SPARK_URL'] = 'https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz'

In [4]:
![ ! -d $APACHE_SPARK_DISTR_NAME ] \
   && wget -q $APACHE_SPARK_URL \
   && tar xf $APACHE_SPARK_DISTR_NAME.tgz \
   && rm -f $APACHE_SPARK_DISTR_NAME.tgz

In [5]:
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.0.2-bin-hadoop2.7'

In [6]:
!pip install findspark > /dev/null

In [7]:
import findspark
findspark.init()

In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName("HPC.Lab2").getOrCreate()

In [9]:
df = spark.read.csv(r"./drive/My Drive/TPU/HPC/Lab2/DS_2019_public.csv", header=True, inferSchema=True)
df.show()

+------------------+--------+-----------------+--------+-----------+--------+---------+-----------+-----------+-----------+------+-----------+--------+--------+-----+--------+--------+-----------+---------+--------+--------+--------+--------+-------+------+-----------+--------+-----------+-----+--------+--------+--------+--------+---------+------+-----+--------+--------+--------+--------+--------+--------+------+-----------+--------+--------+-------+-----------+--------+--------+------------+---------+---------+---------+-------+----------+--------+--------+-------+--------+--------+--------+---------+------+-------+-------+--------+--------+--------+---------+--------+--------+--------+---------+--------+-------+------------+---------+-----------+-----------+--------+--------+----+------+-------+--------+-----+----+-------+-------+-----------+--------+-------+--------+-----------+---------+--------+---------+-----------+--------+-----+--------+---------+-----+-----+--------+----------

In [10]:
priznak = [
            'DIVISION', # Census Division
            'KWHCOL',   # Electricity usage for air-conditioning, central and window/wall (room), in kilowatt-hours
            'NWEIGHT',  # Final sample weight
            'DOLLAREL', # Total Electricity cost, in whole dollars
            'AIRCOND', # Air conditioning equipment used
            'TOTSQFT_EN',  # Total square footage (includes heated/cooled garages, all basements, and finished/heated/cooled attics). Used for EIA data tables.
            ] 
priz_pred = 'Climate_Region_Pub'

In [11]:
from pyspark.ml.feature import VectorAssembler

def make_df(dataframe, inputCols, outputCol):
    assembler = VectorAssembler(inputCols=inputCols, outputCol=outputCol)
    assembled_df = assembler.transform(dataframe)
    return assembled_df.randomSplit([0.7, 0.3])


In [12]:
columns = priznak + [priz_pred]
df_neww = df.select(columns)
df_neww.show()


+--------+--------+-----------+--------+-------+----------+------------------+
|DIVISION|  KWHCOL|    NWEIGHT|DOLLAREL|AIRCOND|TOTSQFT_EN|Climate_Region_Pub|
+--------+--------+-----------+--------+-------+----------+------------------+
|      10| 181.998| 8599.17201|     475|      1|      2736|                 5|
|       1| 184.459|8969.915921|     588|      1|       528|                 1|
|       3|1063.022| 18003.6396|     952|      1|      1623|                 1|
|       1|     0.0|5999.605242|     705|      0|      1912|                 1|
|       4|  274.53|4232.486778|    1050|      1|      3485|                 1|
|       2|1224.714|7862.341967|    2444|      1|      2654|                 4|
|       5|6129.163|6297.038285|    2622|      1|      1952|                 3|
|       2| 401.146|12156.72151|    1240|      1|      1635|                 4|
|       4|1080.527|3242.224473|     655|      1|      2390|                 4|
|      10|     0.0|8812.434127|     373|      0|    

In [13]:
train_df, test_df = make_df(df_neww, priznak, 'priznak')

In [14]:
from pyspark.ml.classification import (LogisticRegression, RandomForestClassifier)
log_reg = LogisticRegression(featuresCol='priznak', labelCol=priz_pred)
ran_for = RandomForestClassifier(featuresCol='priznak', labelCol=priz_pred)

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def evaluate(dataFrame, priz_pred):
    evaluator = MulticlassClassificationEvaluator(labelCol=priz_pred)
    return evaluator.evaluate(dataFrame)

In [16]:
fitted_model_log_reg = log_reg.fit(train_df)
log_reg_df = fitted_model_log_reg.transform(test_df)
log_reg_evaluation = evaluate(log_reg_df, priz_pred)
log_reg_evaluation

0.7320127056028894

In [17]:
fitted_model_ran_for = ran_for.fit(train_df)
ran_for_df = fitted_model_ran_for.transform(test_df)
ran_for_evaluation = evaluate(ran_for_df, priz_pred)
ran_for_evaluation

0.7884606511841918

In [18]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType

udf = UserDefinedFunction(lambda x: 1 if x == 3 else 0, IntegerType())
df_var = log_reg_df.select(*[udf(column).alias(column) if column == priz_pred else column for column in log_reg_df.columns])
df_var.show()

+--------+------+-----------+--------+-------+----------+------------------+--------------------+--------------------+--------------------+----------+
|DIVISION|KWHCOL|    NWEIGHT|DOLLAREL|AIRCOND|TOTSQFT_EN|Climate_Region_Pub|             priznak|       rawPrediction|         probability|prediction|
+--------+------+-----------+--------+-------+----------+------------------+--------------------+--------------------+--------------------+----------+
|       1|   0.0|2886.607847|     387|      0|      2152|                 0|[1.0,0.0,2886.607...|[-6.3435326439015...|[3.03682577748754...|       1.0|
|       1|   0.0|3912.874323|     759|      0|      2355|                 0|[1.0,0.0,3912.874...|[-6.4124509307609...|[2.14133081231692...|       1.0|
|       1|   0.0|3912.874443|     814|      0|      2096|                 0|[1.0,0.0,3912.874...|[-6.4018916539035...|[2.68766993398764...|       1.0|
|       1|   0.0| 3991.10201|    2291|      0|      2351|                 0|[1.0,0.0,3991.102.