In [7]:
from pyspark.sql import SQLContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
import pandas as pd

Import data for buy clicks to get number of add clicks per user

In [8]:

sqlContext = SQLContext(sc)
add_clicks = sqlContext.read.load('file:///home/cloudera/Desktop/bigdatastudy/big-data-4/ad-clicks.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
adclicks_df = pd.read_csv('file:///home/cloudera/Desktop/bigdatastudy/big-data-4/ad-clicks.csv')
adclicks_df = adclicks_df.rename(columns=lambda x: x.strip()) #remove whitespaces from headers

In [9]:
adclicks_df.head(5)

Unnamed: 0,timestamp,txId,userSessionId,teamId,userId,adId,adCategory
0,2016-05-26 15:13:22,5974,5809,27,611,2,electronics
1,2016-05-26 15:17:24,5976,5705,18,1874,21,movies
2,2016-05-26 15:22:52,5978,5791,53,2139,25,computers
3,2016-05-26 15:22:57,5973,5756,63,212,10,fashion
4,2016-05-26 15:22:58,5980,5920,9,1027,20,clothing


In [10]:
adclicks_df['adCount'] = 1

In [11]:
adclicks_df.head(5)

Unnamed: 0,timestamp,txId,userSessionId,teamId,userId,adId,adCategory,adCount
0,2016-05-26 15:13:22,5974,5809,27,611,2,electronics,1
1,2016-05-26 15:17:24,5976,5705,18,1874,21,movies,1
2,2016-05-26 15:22:52,5978,5791,53,2139,25,computers,1
3,2016-05-26 15:22:57,5973,5756,63,212,10,fashion,1
4,2016-05-26 15:22:58,5980,5920,9,1027,20,clothing,1


In [12]:
gameclicks_df = pd.read_csv('file:///home/cloudera/Desktop/bigdatastudy/big-data-4/game-clicks.csv')
gameclicks_df = gameclicks_df.rename(columns=lambda x: x.strip()) #remove whitespaces from headers
gameclicks_df['clickCount'] = 1
gameclicks_df.head(5)

Unnamed: 0,timestamp,clickId,userId,userSessionId,isHit,teamId,teamLevel,clickCount
0,2016-05-26 15:06:55,105,1038,5916,0,25,1,1
1,2016-05-26 15:07:09,154,1099,5898,0,44,1,1
2,2016-05-26 15:07:14,229,899,5757,0,71,1,1
3,2016-05-26 15:07:14,322,2197,5854,0,99,1,1
4,2016-05-26 15:07:20,22,1362,5739,0,13,1,1


In [13]:
buyclicks_df = pd.read_csv('file:///home/cloudera/Desktop/bigdatastudy/big-data-4/buy-clicks.csv')
buyclicks_df = buyclicks_df.rename(columns=lambda x: x.strip()) #removes whitespaces from headers 
buyclicks_df.head(5)

Unnamed: 0,timestamp,txId,userSessionId,team,userId,buyId,price
0,2016-05-26 15:36:54,6004,5820,9,1300,2,3.0
1,2016-05-26 15:36:54,6005,5775,35,868,4,10.0
2,2016-05-26 15:36:54,6006,5679,97,819,5,20.0
3,2016-05-26 16:36:54,6067,5665,18,121,2,3.0
4,2016-05-26 17:06:54,6093,5709,11,2222,5,20.0


In [14]:
user_purchases = buyclicks_df[['userId','price']] #select only userid and price
user_purchases.head(5)

Unnamed: 0,userId,price
0,1300,3.0
1,868,10.0
2,819,20.0
3,121,3.0
4,2222,20.0


In [15]:
user_adclicks = adclicks_df[['userId','adCount']]
user_adclicks.head(5)

Unnamed: 0,userId,adCount
0,611,1
1,1874,1
2,2139,1
3,212,1
4,1027,1


In [16]:
user_gameclicks = gameclicks_df[['userId','clickCount']]
user_gameclicks.head(5) #as we saw before, this line displays first five lines

Unnamed: 0,userId,clickCount
0,1038,1
1,1099,1
2,899,1
3,2197,1
4,1362,1


In [31]:
ads_per_user = user_adclicks.groupby('userId').sum()
ads_per_user = ads_per_user.reset_index()
ads_per_user.columns = ['userId', 'TotalAddClicks'] #rename the columns

In [18]:
ads_per_user.head(5)


Unnamed: 0,userId,totalAdClicks
0,1,44
1,8,10
2,9,37
3,10,19
4,12,46


In [30]:
game_clicks_per_user = user_gameclicks.groupby('userId').sum()
game_clicks_per_user = game_clicks_per_user.reset_index()
game_clicks_per_user.columns = ['userId', 'TotalGameClicks'] #rename the columns

In [20]:
game_clicks_per_user.head(5)

Unnamed: 0,userId,totalGameClicks
0,0,1355
1,1,716
2,2,231
3,6,151
4,8,380


In [27]:
revenue_per_user = user_purchases.groupby('userId').sum()
revenue_per_user = revenue_per_user.reset_index()
revenue_per_user.columns = ['userId', 'MoneySpent'] #rename the columns
revenue_per_user.head(5)

Unnamed: 0,userId,MoneySpent
0,1,21.0
1,8,53.0
2,9,80.0
3,10,11.0
4,12,215.0


In [32]:
combined_df = ads_per_user.merge(game_clicks_per_user, on='userId') #userId, adCount, clickCount
combined_df = combined_df.merge(revenue_per_user, on='userId') #userId, adCount, clickCount, price 
combined_df.head(5) #display how the merged table looks

Unnamed: 0,userId,TotalAddClicks,TotalGameClicks,MoneySpent
0,1,44,716,21.0
1,8,10,380,53.0
2,9,37,508,80.0
3,10,19,3107,11.0
4,12,46,704,215.0


In [55]:
clusters_df = combined_df[['TotalAddClicks','MoneySpent', 'TotalGameClicks']]
clusters_df.head(5)

Unnamed: 0,TotalAddClicks,MoneySpent,TotalGameClicks
0,44,21.0,716
1,10,53.0,380
2,37,80.0,508
3,19,11.0,3107
4,46,215.0,704


In [36]:
clusters_df.shape

(543, 3)

In [56]:
# Convert to Kmeans format 
sqlContext = SQLContext(sc)
p_df = sqlContext.createDataFrame(cluster_df)
parsed_data = p_df.rdd.map(lambda line: array([line[0], line[1], line[2]]))

p_df = p_df.na.drop()
p_df.columns
p_df.head(5)

[Row(totalAdClicks=44, revenue=21.0, totalGameClicks=716),
 Row(totalAdClicks=10, revenue=53.0, totalGameClicks=380),
 Row(totalAdClicks=37, revenue=80.0, totalGameClicks=508),
 Row(totalAdClicks=19, revenue=11.0, totalGameClicks=3107),
 Row(totalAdClicks=46, revenue=215.0, totalGameClicks=704)]

In [50]:
# use vector assembler to get all features 
featuresUsed = ['totalAdClicks', 'revenue','totalGameClicks']
assembler = VectorAssembler(inputCols=featuresUsed, outputCol="features_unscaled")
assembled = assembler.transform(p_df)

In [51]:
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=True)
scalerModel = scaler.fit(assembled)
scaledData = scalerModel.transform(assembled)

In [63]:
# Select the features column make the data persist: 
scaledData = scaledData.select("features")
scaledData.persist()

DataFrame[features: vector]

In [53]:
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(scaledData)
transformed = model.transform(scaledData)

In [54]:
centers = model.clusterCenters()
centers

[array([-0.85958441, -0.43877731, -0.45920948]),
 array([ 0.12132685, -0.03517338,  2.4755784 ]),
 array([ 0.89549446,  0.47750791, -0.02806934])]

In [65]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets.samples_generator import make_blobs

threedee = plt.figure(figsize=(12,10)).gca(projection='3d')
threedee.scatter(scaledData[:,0], scaledData[:,1], scaledData[:,2] )
threedee.set_xlabel('totalAdClicks')
threedee.set_ylabel('revenue')
threedee.set_zlabel('totalGameClicks')
plt.show()

AttributeError: 'slice' object has no attribute '_get_object_id'