In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import ClusteringEvaluator
from notebooks import utils

In [None]:
spark = SparkSession.builder.appName('cluster_analysis').getOrCreate()

# Import data set:
df = spark.read.csv('minute_weather.csv', inferSchema=True, header=True)


# Subset and remove unused data
print("Total nº of rows = ", df.count())

# Compute the statistical summary
summary_df = df.describe()
# Convert to Pandas DataFrame and display
pd.set_option('display.max_rows', None)  # To display all rows
pd.set_option('display.max_columns', None)  # To display all columns
print("\nStatistics summary:\n", summary_df.toPandas().transpose())

print("How many values of 'rain_accumulation' are iqual to 0 = ", df.filter(df.rain_accumulation == 0.0).count())
print("How many values of 'rain_duration' are iqual to 0 = ", df.filter(df.rain_duration == 0.0).count())

# Since almost of the above rows are zero, let's drop those columns
# and also drop 'hpwren_timestamp' since it's not used here:
workingDF = df.drop("rain_accumulation").drop("rain_duration").drop("hpwren_timestamp")

# drop rows with missing values and count how many rows were dropped:
before = workingDF.count()
workingDF = df.na.drop()
after = workingDF.count()
print("\nNº of rows before 'na.drop' - Nº of rows after 'na.drop' = ", before - after)

# Create a data frame to use on the scale feature:
print(workingDF.columns)
featuresUsed = ['air_pressure', 'air_temp', 'avg_wind_direction', 'avg_wind_speed',
                'max_wind_direction', 'max_wind_speed', 'relative_humidity']
assembler = VectorAssembler(inputCols=featuresUsed, outputCol="features_unscaled")
assembled = assembler.transform(workingDF)

# StandardScaler to scale the data:
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=True)
scalerModel = scaler.fit(assembled)
scaledData = scalerModel.transform(assembled)
scaledData.show(5)


# # Create elbow plot:
# In order to find the optimal 'K' for the k-means, we are going to apply k-means using different values for k,
# and calculating the within-cluster sum-of-squared error (WSSE).
# Create elbow plot to find optimal k for K-means
cluster = range(2, 31)  # Run k from 2 to 30

In [2]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load('file:///home/cloudera/Downloads/big-data-4/minute_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')