In [83]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('lab4').getOrCreate()
df = spark.read.csv('DS_2019_public.csv', inferSchema=True, header=True)

In [84]:
from typing import Literal
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType

@udf(IntegerType())
def climate_code_to_binary(code: Literal[1, 2, 3, 4, 5]) -> Literal[0, 1]:
    match code:
        case 3:
            return 0
        case 1 | 2 | 4 | 5:
            return 1

@udf(StringType())
def climate_code_to_climate_name(code: Literal[1, 2, 3, 4, 5]) -> str:
    match code:
        case 1:
            return 'Very cold / Cold'
        case 2:
            return 'Hot dry / Mixed dry'
        case 3:
            return 'Hot humid'
        case 4:
            return 'Mixed humid'
        case 5:
            return 'Marine'
    return ''

df = df.withColumn(
    'Climate',
    climate_code_to_climate_name(df['Climate_Region_Pub']),
).withColumn(
    'Class',
    climate_code_to_binary(df['Climate_Region_Pub'])
)

In [85]:
explain = {
    'Climate': 'Climate',
    'Class': 'Class',
    'TEMPHOME':	'Temperature when someone is home during the day (winter)',
    'TEMPGONE':	'Temperature when no on is home during the day (winter)',
    'KWHSPH': 'Electricity usage for space heating, in kilowatt-hours, 2009',
    'KWHCOL': 'Electricity usage for air-conditioning, central and window/wall (room), in kilowatt-hours, 2009',
    'CUFEETNGSPH': 'Natural Gas usage for space heating, in hundred cubic feet, 2009',
    'GALLONFOSPH': 'LPG/Propane usage for space heating, in gallons, 2009',
}

input_features = list(explain.keys())
df = df.select(input_features)
input_features.remove('Climate')
input_features.remove('Class')
print(f'Input features: {input_features}')

Input features: ['TEMPHOME', 'TEMPGONE', 'KWHSPH', 'KWHCOL', 'CUFEETNGSPH', 'GALLONFOSPH']


In [86]:
from pyspark.sql.functions import round

grouped = df.groupBy('Class').mean()
grouped.select('Class', *[round(f'avg({c})', 2).alias(c) for c in input_features]).show()

+-----+--------+--------+-------+-------+-----------+-----------+
|Class|TEMPHOME|TEMPGONE| KWHSPH| KWHCOL|CUFEETNGSPH|GALLONFOSPH|
+-----+--------+--------+-------+-------+-----------+-----------+
|    1|   67.47|   64.21|  991.4|1124.72|     291.16|      45.81|
|    0|   65.42|    62.7|1071.41|4225.86|      54.69|       0.22|
+-----+--------+--------+-------+-------+-----------+-----------+



In [87]:
from pyspark.ml.feature import VectorAssembler
df_assembler = VectorAssembler(
    inputCols=input_features,
    outputCol='features',

)
df = df_assembler.transform(df)
df.printSchema()

root
 |-- Climate: string (nullable = true)
 |-- Class: integer (nullable = true)
 |-- TEMPHOME: integer (nullable = true)
 |-- TEMPGONE: integer (nullable = true)
 |-- KWHSPH: double (nullable = true)
 |-- KWHCOL: double (nullable = true)
 |-- CUFEETNGSPH: double (nullable = true)
 |-- GALLONFOSPH: double (nullable = true)
 |-- features: vector (nullable = true)



In [88]:
df.describe().show()

+-------+-------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|            Climate|              Class|          TEMPHOME|          TEMPGONE|            KWHSPH|            KWHCOL|       CUFEETNGSPH|       GALLONFOSPH|
+-------+-------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|              10875|              10875|             10875|             10875|             10875|             10875|             10875|             10875|
|   mean|               null| 0.8200459770114943|  67.0968275862069|63.940781609195405|1005.7949160459785|1682.7826961839055|248.61066510344867| 37.60788570114944|
| stddev|               null|0.38416681735215996|13.589724722693937|13.719448689129953|1591.1841533271422| 2480.831034235845|334.31380003202287|154.43127656769101|
|    min|Hot dry

                                                                                

In [89]:
df.groupBy('Climate').count().show()
df.groupBy('Class').count().show()

+-------------------+-----+
|            Climate|count|
+-------------------+-----+
|             Marine|  612|
|        Mixed humid| 3169|
|   Very cold / Cold| 3593|
|Hot dry / Mixed dry| 1544|
|          Hot humid| 1957|
+-------------------+-----+

+-----+-----+
|Class|count|
+-----+-----+
|    1| 8918|
|    0| 1957|
+-----+-----+



In [90]:
df_train, df_test = df.select('Class', 'features').randomSplit([0.7, 0.3])
print(f'Records for training: {df_train.count()}')
df_train.printSchema()
df_train.show(5)
df_train.groupBy('Class').count().show(5)

print(f'Records for testing: {df_test.count()}')
df_test.printSchema()
df_test.show(5)
df_test.groupBy('Class').count().show(5)

Records for training: 7719
root
 |-- Class: integer (nullable = true)
 |-- features: vector (nullable = true)

+-----+-------------------+
|Class|           features|
+-----+-------------------+
|    0|(6,[0,1],[2.0,2.0])|
|    0|(6,[0,1],[2.0,2.0])|
|    0|(6,[0,1],[2.0,2.0])|
|    0|(6,[0,1],[2.0,2.0])|
|    0|(6,[0,1],[2.0,2.0])|
+-----+-------------------+
only showing top 5 rows

+-----+-----+
|Class|count|
+-----+-----+
|    1| 6322|
|    0| 1397|
+-----+-----+

Records for testing: 3156
root
 |-- Class: integer (nullable = true)
 |-- features: vector (nullable = true)

+-----+-------------------+
|Class|           features|
+-----+-------------------+
|    0|(6,[0,1],[2.0,2.0])|
|    0|(6,[0,1],[2.0,2.0])|
|    0|(6,[0,1],[2.0,2.0])|
|    0|(6,[0,1],[2.0,2.0])|
|    0|(6,[0,1],[2.0,2.0])|
+-----+-------------------+
only showing top 5 rows

+-----+-----+
|Class|count|
+-----+-----+
|    1| 2596|
|    0|  560|
+-----+-----+



In [91]:
from pyspark.ml.classification import LogisticRegression

logistic_reg = LogisticRegression(labelCol='Class').fit(df_train)

In [92]:
train_results = logistic_reg.evaluate(df_train).predictions

TP = train_results[
    (train_results['Class'] == 1) & (train_results['prediction'] == 1)
].count()
TN = train_results[
    (train_results['Class'] == 0) & (train_results['prediction'] == 0)
].count()
FP = train_results[
    (train_results['Class'] == 0) & (train_results['prediction'] == 1)
].count()
FN = train_results[
    (train_results['Class'] == 1) & (train_results['prediction'] == 0)
].count()

print('Train results')
print(f'Total records: {df_train.count()}')
print(f'Linear regression true positives: {TP}')
print(f'Linear regression true negatives: {TN}')
print(f'Linear regression false positives: {FP}')
print(f'Linear regression false negatives: {FN}')

Train results
Total records: 7719
Linear regression true positives: 6062
Linear regression true negatives: 652
Linear regression false positives: 745
Linear regression false negatives: 260


In [93]:
test_results = logistic_reg.evaluate(df_test).predictions

TP = test_results[
    (test_results['Class'] == 1) & (test_results['prediction'] == 1)
].count()
TN = test_results[
    (test_results['Class'] == 0) & (test_results['prediction'] == 0)
].count()
FP = test_results[
    (test_results['Class'] == 0) & (test_results['prediction'] == 1)
].count()
FN = test_results[
    (test_results['Class'] == 1) & (test_results['prediction'] == 0)
].count()

print('Test results')
print(f'Total records: {df_test.count()}')
print(f'True positives: {TP}')
print(f'True negatives: {TN}')
print(f'False positives: {FP}')
print(f'False negatives: {FN}')
print(f'Accuracy: {(TP + TN) / df_test.count()}')
print(f'Recall: {TP / (TP + FN)}')
print(f'Precision: {TP / (TP + FP)}')

Test results
Total records: 3156
True positives: 2512
True negatives: 270
False positives: 290
False negatives: 84
Accuracy: 0.8814955640050697
Recall: 0.9676425269645609
Precision: 0.8965024982155603


In [94]:
from pyspark.ml.classification import DecisionTreeClassifier

dt_classifier = DecisionTreeClassifier(
    featuresCol='features',
    labelCol='Class',
).fit(df_train)
dt_predictions = dt_classifier.transform(df_test)
dt_predictions.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  561|
|       1.0| 2595|
+----------+-----+



In [95]:
_idx_to_name = {
    feature['idx']: feature['name']
    for feature in df_train.schema['features'].metadata['ml_attr']['attrs']['numeric']
}

In [96]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

dt_accuracy = MulticlassClassificationEvaluator(
    labelCol='Class',
    metricName='accuracy',
).evaluate(dt_predictions)
dt_precision = MulticlassClassificationEvaluator(
    labelCol='Class',
    metricName='weightedPrecision',
).evaluate(dt_predictions)

dt_auc = BinaryClassificationEvaluator(labelCol='Class').evaluate(dt_predictions)

print('Decision tree results:\n')
print(f'Accuracy: {dt_accuracy}')
print(f'Precision: {dt_precision}')
print(f'AUC: {dt_auc}')
print(
    '\nFeature importances:',
    '\n'.join(
        f'{_idx_to_name[idx]}: {value}'
        for idx, value in enumerate(dt_classifier.featureImportances.values)
    ),
    sep='\n',
)

Decision tree results:

Accuracy: 0.8894169835234474
Precision: 0.8894945788770217
AUC: 0.8602637299141536

Feature importances:
TEMPHOME: 0.02036302389014466
TEMPGONE: 0.1552729067044929
KWHSPH: 0.6274715397172465
KWHCOL: 0.1968925296881161


In [97]:
from pyspark.ml.classification import RandomForestClassifier
rf_classifier = RandomForestClassifier(
    labelCol='Class',
    featuresCol='features',
    numTrees=50,
).fit(df_train)
rf_predictions = rf_classifier.transform(df_test)
rf_predictions.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  444|
|       1.0| 2712|
+----------+-----+



In [98]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

rf_accuracy = MulticlassClassificationEvaluator(
    labelCol='Class',
    metricName='accuracy',
).evaluate(rf_predictions)
rf_precision = MulticlassClassificationEvaluator(
    labelCol='Class',
    metricName='weightedPrecision',
).evaluate(rf_predictions)
rf_auc = BinaryClassificationEvaluator(labelCol='Class').evaluate(rf_predictions)

print('Random forest results:\n')
print(f'Accuracy: {rf_accuracy}')
print(f'Precision: {rf_precision}')
print(f'AUC: {rf_auc}')
print(
    '\nFeature importances:',
    '\n'.join(
        f'{_idx_to_name[idx]}: {value}'
        for idx, value in enumerate(rf_classifier.featureImportances.values)
    ),
    sep='\n',
)

Random forest results:

Accuracy: 0.8910012674271229
Precision: 0.8846812339443316
AUC: 0.9308259272507164

Feature importances:
TEMPHOME: 0.029660475734828502
TEMPGONE: 0.012901548484879762
KWHSPH: 0.11396458805798489
KWHCOL: 0.6380275739073855
CUFEETNGSPH: 0.18831213191450782
GALLONFOSPH: 0.017133681900413558
