In [25]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pandas as pd

spark1 = SparkSession.builder.appName("hw1").getOrCreate()
filePath = 'household_power_consumption.csv'
# df = spark.read.format("csv").option("header", "true").option("sep", ";").load(filePath)
df = spark1.read.csv(filePath, inferSchema=True, header=True, sep=';')

In [26]:
column_list = ['Global_active_power', 'Global_reactive_power', 'Voltage', 'Global_intensity']
# column_list = ['Global_active_power']
df = df[column_list]
df.show(5)

+-------------------+---------------------+-------+----------------+
|Global_active_power|Global_reactive_power|Voltage|Global_intensity|
+-------------------+---------------------+-------+----------------+
|              4.216|                0.418| 234.84|            18.4|
|               5.36|                0.436| 233.63|            23.0|
|              5.374|                0.498| 233.29|            23.0|
|              5.388|                0.502| 233.74|            23.0|
|              3.666|                0.528| 235.68|            15.8|
+-------------------+---------------------+-------+----------------+
only showing top 5 rows



In [103]:
# change dtypes
from pyspark.sql.types import DoubleType
for column in column_list:
    df = df.withColumn(column, df[column].cast('float'))
# print(df.dtypes)


In [104]:
# (1) Output the minimum, maximum, and count of the following columns: ‘global active power’, ‘global reactive power’, ‘voltage’, and ‘global intensity’. 
statistics_list = ['max', 'min', 'count']
max_min_count_list = [[df.agg({column: stat}).first()[0] for stat in statistics_list] for column in column_list]

max_min_count_dict = dict()
for i in range(len(column_list)):
    max_min_count_dict[column_list[i]] = max_min_count_list[i]

max_min_count_df = pd.DataFrame(max_min_count_dict, index=statistics_list)
print(max_min_count_df)

       Global_active_power  Global_reactive_power     Voltage  \
max                  5.388                  0.528  235.679993   
min                  3.520                  0.418  233.289993   
count               10.000                 10.000   10.000000   

       Global_intensity  
max                23.0  
min                15.0  
count              10.0  


In [105]:
# (2) Output the mean and standard deviation of these columns.
statistics_list = ['mean', 'std']
mean_std_list = [[df.agg({column: stat}).first()[0] for stat in statistics_list] for column in column_list]

mean_std_dict = dict()
for i in range(len(column_list)):
    mean_std_dict[column_list[i]] = mean_std_list[i]

mean_std_df = pd.DataFrame(mean_std_dict, index=statistics_list)
print(mean_std_df)

      Global_active_power  Global_reactive_power     Voltage  Global_intensity
mean             4.225600               0.496400  234.436000         18.140000
std              0.812879               0.037957    0.821396          3.466731


In [119]:
# (3) Perform min-max normalization on the columns to generate normalized output.
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.sql.functions import split, explode, concat, concat_ws
vector_assembler = VectorAssembler(inputCols=column_list, outputCol='ss_feature')
temp_train = vector_assembler.transform(df)
# temp_train.show(2)

minmax_scaler = MinMaxScaler(inputCol='ss_feature', outputCol='scaled')
train_df = minmax_scaler.fit(temp_train).transform(temp_train)
train.show(2)



+-------------------+---------------------+-------+----------------+--------------------+--------------------+
|Global_active_power|Global_reactive_power|Voltage|Global_intensity|          ss_feature|              scaled|
+-------------------+---------------------+-------+----------------+--------------------+--------------------+
|              4.216|                0.418| 234.84|            18.4|[4.216,0.418,234....|[0.37259100642398...|
|               5.36|                0.436| 233.63|            23.0|[5.36,0.436,233.6...|[0.98501070663811...|
+-------------------+---------------------+-------+----------------+--------------------+--------------------+
only showing top 2 rows

