In [49]:
from pyspark.sql import SQLContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

In [103]:
sqlContext = SQLContext(sc)
adclicks = sqlContext.read.load('file:///home/cloudera/Downloads/big_data_capstone_datasets_and_scripts/ad-clicks.csv',
                            format = 'com.databricks.spark.csv', 
                            header = 'true',
                            inferSchema = 'true')
buyclicks = sqlContext.read.load('file:///home/cloudera/Downloads/big_data_capstone_datasets_and_scripts/buy-clicks.csv',
                            format = 'com.databricks.spark.csv', 
                            header = 'true',
                            inferSchema = 'true')
gameclicks = sqlContext.read.load('file:///home/cloudera/Downloads/big_data_capstone_datasets_and_scripts/game-clicks.csv',
                            format = 'com.databricks.spark.csv', 
                            header = 'true',
                            inferSchema = 'true')

In [104]:
print(adclicks.count(), buyclicks.count(), gameclicks.count())

16323 2947 755806


In [110]:
user_adclicks = adclicks.groupBy('userId').count().withColumnRenamed('count', 'Total_ad_Clicks')
user_adclicks.count()

597

In [111]:
user_adclicks.take(5)

[Row(userId=231, Total_ad_Clicks=19),
 Row(userId=2032, Total_ad_Clicks=39),
 Row(userId=233, Total_ad_Clicks=37),
 Row(userId=433, Total_ad_Clicks=11),
 Row(userId=1234, Total_ad_Clicks=41)]

In [112]:
user_buyclicks = buyclicks.groupBy('userId').sum().select('userId', 'sum(price)')  \
                          .withColumnRenamed('sum(price)', 'Total_buy($)')
user_buyclicks.count()

546

In [113]:
user_buyclicks.take(5)

[Row(userId=231, Total_buy($)=63.0),
 Row(userId=2032, Total_buy($)=20.0),
 Row(userId=233, Total_buy($)=28.0),
 Row(userId=1234, Total_buy($)=53.0),
 Row(userId=1634, Total_buy($)=27.0)]

In [57]:
from pyspark.sql.functions import format_number, dayofmonth, hour, dayofyear, month, year, weekofyear, date_format

In [114]:
user_gameclicks = gameclicks.groupBy(['userId', dayofyear(gameclicks.timestamp), hour(gameclicks.timestamp)])  \
            .count().select('userId', 'count')  \
            .groupBy('userId').mean()  \
            .select('userId', 'avg(count)')  \
            .withColumnRenamed('avg(count)', 'avg_game_clicks_per_hour')
user_gameclicks.count()

1193

In [115]:
user_gameclicks.take(5)

[Row(userId=2231, avg_game_clicks_per_hour=1.5963636363636364),
 Row(userId=1631, avg_game_clicks_per_hour=1.6213235294117647),
 Row(userId=2031, avg_game_clicks_per_hour=8.08626198083067),
 Row(userId=231, avg_game_clicks_per_hour=2.3248407643312103),
 Row(userId=2032, avg_game_clicks_per_hour=2.051051051051051)]

In [139]:
df = user_gameclicks.join(user_adclicks, 'userId').join(user_buyclicks, 'userId')
df = df.drop('userId')
df.count()

543

In [140]:
df.take(5)

[Row(avg_game_clicks_per_hour=2.3248407643312103, Total_ad_Clicks=19, Total_buy($)=63.0),
 Row(avg_game_clicks_per_hour=2.051051051051051, Total_ad_Clicks=39, Total_buy($)=20.0),
 Row(avg_game_clicks_per_hour=1.3732057416267942, Total_ad_Clicks=37, Total_buy($)=28.0),
 Row(avg_game_clicks_per_hour=2.629139072847682, Total_ad_Clicks=34, Total_buy($)=95.0),
 Row(avg_game_clicks_per_hour=1.9754601226993864, Total_ad_Clicks=41, Total_buy($)=53.0)]

In [141]:
print(df.count(), len(df.columns))

543 3


In [133]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
userId,543,1215.3499079189687,684.1697334741,1,2387
avg_game_clicks_per_hour,543,3.05137382268546,2.0878902003169872,1.3636363636363635,20.19298245614035
Total_ad_Clicks,543,29.373848987108655,15.216343215129635,1,67
Total_buy($),543,39.34990791896869,41.221736767084764,1.0,223.0


In [142]:
df.columns

['avg_game_clicks_per_hour', 'Total_ad_Clicks', 'Total_buy($)']

In [143]:
featuresUsed = ['avg_game_clicks_per_hour', 'Total_ad_Clicks', 'Total_buy($)']
assembler = VectorAssembler(inputCols=featuresUsed, outputCol="features_unscaled")
assembled = assembler.transform(df)

In [144]:
assembled.take(5)

[Row(avg_game_clicks_per_hour=2.3248407643312103, Total_ad_Clicks=19, Total_buy($)=63.0, features_unscaled=DenseVector([2.3248, 19.0, 63.0])),
 Row(avg_game_clicks_per_hour=2.051051051051051, Total_ad_Clicks=39, Total_buy($)=20.0, features_unscaled=DenseVector([2.0511, 39.0, 20.0])),
 Row(avg_game_clicks_per_hour=1.3732057416267942, Total_ad_Clicks=37, Total_buy($)=28.0, features_unscaled=DenseVector([1.3732, 37.0, 28.0])),
 Row(avg_game_clicks_per_hour=2.629139072847682, Total_ad_Clicks=34, Total_buy($)=95.0, features_unscaled=DenseVector([2.6291, 34.0, 95.0])),
 Row(avg_game_clicks_per_hour=1.9754601226993864, Total_ad_Clicks=41, Total_buy($)=53.0, features_unscaled=DenseVector([1.9755, 41.0, 53.0]))]

In [145]:
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=True)
scalerModel=scaler.fit(assembled)
scaledData=scalerModel.transform(assembled)

In [146]:
scaledData.take(5)

[Row(avg_game_clicks_per_hour=2.3248407643312103, Total_ad_Clicks=19, Total_buy($)=63.0, features_unscaled=DenseVector([2.3248, 19.0, 63.0]), features=DenseVector([-0.348, -0.6818, 0.5737])),
 Row(avg_game_clicks_per_hour=2.051051051051051, Total_ad_Clicks=39, Total_buy($)=20.0, features_unscaled=DenseVector([2.0511, 39.0, 20.0]), features=DenseVector([-0.4791, 0.6326, -0.4694])),
 Row(avg_game_clicks_per_hour=1.3732057416267942, Total_ad_Clicks=37, Total_buy($)=28.0, features_unscaled=DenseVector([1.3732, 37.0, 28.0]), features=DenseVector([-0.8038, 0.5012, -0.2753])),
 Row(avg_game_clicks_per_hour=2.629139072847682, Total_ad_Clicks=34, Total_buy($)=95.0, features_unscaled=DenseVector([2.6291, 34.0, 95.0]), features=DenseVector([-0.2022, 0.304, 1.35])),
 Row(avg_game_clicks_per_hour=1.9754601226993864, Total_ad_Clicks=41, Total_buy($)=53.0, features_unscaled=DenseVector([1.9755, 41.0, 53.0]), features=DenseVector([-0.5153, 0.7641, 0.3311]))]

In [147]:
scaledData=scaledData.select("features")
scaledData.persist()

DataFrame[features: vector]

In [150]:
kmeans=KMeans(k=3, seed=1)
model=kmeans.fit(scaledData)
transformed=model.transform(scaledData)

In [151]:
centers = model.clusterCenters()
centers

[array([ 0.34491307, -0.88428349, -0.4376942 ]),
 array([-0.21638267,  0.72866978,  2.00843352]),
 array([-0.34502313,  0.8193491 , -0.21235772])]