In [1]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

In [2]:
sc

org.apache.spark.SparkContext@43351693

In [3]:
val sqlContext = new SQLContext(sc)

sqlContext = org.apache.spark.sql.SQLContext@258418c9




org.apache.spark.sql.SQLContext@258418c9

In [4]:
sqlContext

org.apache.spark.sql.SQLContext@258418c9

# Data Preparation

### AdClicks

In [5]:
val adClicksRaw = sqlContext.read.
                        options(
                            Map(
                                "inferSchema"->"true",
                                "header"->"true")).
                                
                        csv("../DataSets/flamingo-data/ad-clicks.csv")

adClicksRaw = [timestamp: timestamp, txId: int ... 5 more fields]


[timestamp: timestamp, txId: int ... 5 more fields]

In [6]:
adClicksRaw.show(3)

+-------------------+----+-------------+------+------+----+-----------+
|          timestamp|txId|userSessionId|teamId|userId|adId| adCategory|
+-------------------+----+-------------+------+------+----+-----------+
|2016-05-26 15:13:22|5974|         5809|    27|   611|   2|electronics|
|2016-05-26 15:17:24|5976|         5705|    18|  1874|  21|     movies|
|2016-05-26 15:22:52|5978|         5791|    53|  2139|  25|  computers|
+-------------------+----+-------------+------+------+----+-----------+
only showing top 3 rows



In [7]:
val adClicks = adClicksRaw.groupBy(s"userId").count().withColumnRenamed("count", "adClicks")

adClicks = [userId: int, adClicks: bigint]


[userId: int, adClicks: bigint]

In [8]:
adClicks.show(3)

+------+--------+
|userId|adClicks|
+------+--------+
|  1645|      41|
|   471|      51|
|  2142|      46|
+------+--------+
only showing top 3 rows



In [9]:
adClicks.printSchema
adClicks.count

root
 |-- userId: integer (nullable = true)
 |-- adClicks: long (nullable = false)



597

### purchases

In [10]:
val purchasesRaw = sqlContext.read.
                        options(Map(
                            "header"->"true",
                            "inferSchema"->"true"
                        )).
                        csv("../DataSets/flamingo-data/buy-clicks.csv")

purchasesRaw = [timestamp: timestamp, txId: int ... 5 more fields]


[timestamp: timestamp, txId: int ... 5 more fields]

In [11]:
purchasesRaw.show(3)

+-------------------+----+-------------+----+------+-----+-----+
|          timestamp|txId|userSessionId|team|userId|buyId|price|
+-------------------+----+-------------+----+------+-----+-----+
|2016-05-26 15:36:54|6004|         5820|   9|  1300|    2|  3.0|
|2016-05-26 15:36:54|6005|         5775|  35|   868|    4| 10.0|
|2016-05-26 15:36:54|6006|         5679|  97|   819|    5| 20.0|
+-------------------+----+-------------+----+------+-----+-----+
only showing top 3 rows



In [12]:
val purchases = purchasesRaw.groupBy("userId").agg(sum("price") as "Total Purchase")

purchases = [userId: int, Total Purchase: double]


[userId: int, Total Purchase: double]

In [13]:
purchases.show(3)

+------+--------------+
|userId|Total Purchase|
+------+--------------+
|   471|         202.0|
|  1645|          16.0|
|  2142|          22.0|
+------+--------------+
only showing top 3 rows



In [14]:
purchases.printSchema
purchases.count

root
 |-- userId: integer (nullable = true)
 |-- Total Purchase: double (nullable = true)



546

### teamStrength

In [15]:
val userTeamRaw = sqlContext.read.
                    options(Map(
                        "header"->"true",
                        "inferSchema"->"true"
                    ))
                    .csv("../DataSets/flamingo-data/team-assignments.csv")

userTeamRaw = [timestamp: timestamp, team: int ... 2 more fields]


[timestamp: timestamp, team: int ... 2 more fields]

In [16]:
userTeamRaw.printSchema

root
 |-- timestamp: timestamp (nullable = true)
 |-- team: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- assignmentId: integer (nullable = true)



In [17]:
userTeamRaw show 3

+-------------------+----+------+------------+
|          timestamp|team|userId|assignmentId|
+-------------------+----+------+------------+
|2016-05-26 14:06:54|  64|   504|        5439|
|2016-05-26 14:07:00|  39|  1698|        5301|
|2016-05-26 14:07:04|  85|    81|        5540|
+-------------------+----+------+------------+
only showing top 3 rows



In [18]:
val userTeamStamped = userTeamRaw withColumn ("timeStampEpoch", unix_timestamp($"timestamp"))

userTeamStamped = [timestamp: timestamp, team: int ... 3 more fields]


[timestamp: timestamp, team: int ... 3 more fields]

In [19]:
val userTeamLatest = userTeamStamped groupBy "userId" agg (max("timeStampEpoch") alias "timeStampEpoch")

userTeamLatest = [userId: int, timeStampEpoch: bigint]


[userId: int, timeStampEpoch: bigint]

In [20]:
val userTeam = userTeamStamped.join(userTeamLatest, 
                     userTeamStamped("userId") === userTeamLatest("userId") 
                     && userTeamStamped("timeStampEpoch") === userTeamLatest("timeStampEpoch")
                    ).select(userTeamStamped("userId"), userTeamStamped("team"))

userTeam = [userId: int, team: int]


[userId: int, team: int]

In [21]:
userTeam.printSchema
userTeam.count

root
 |-- userId: integer (nullable = true)
 |-- team: integer (nullable = true)



2391

In [22]:
val teamStrength = sqlContext.read.
                    options(Map(
                        "header"->"true",
                        "inferSchema"->"true"
                    )).
                    csv("../DataSets/flamingo-data/team.csv").
                    select("teamId","strength")

teamStrength = [teamId: int, strength: double]


[teamId: int, strength: double]

In [23]:
teamStrength show 3

+------+--------------+
|teamId|      strength|
+------+--------------+
|    79|0.774473575316|
|    92| 0.17192602642|
|     6|0.537353043526|
+------+--------------+
only showing top 3 rows



In [24]:
teamStrength.printSchema
teamStrength.count

root
 |-- teamId: integer (nullable = true)
 |-- strength: double (nullable = true)



109

In [25]:
val userTeamStrength = userTeam.join(teamStrength, userTeam("team") === teamStrength("teamId")).select(col("userId"),col("strength") as "teamStrength")

userTeamStrength = [userId: int, teamStrength: double]


[userId: int, teamStrength: double]

In [26]:
userTeamStrength show 3

+------+--------------+
|userId|  teamStrength|
+------+--------------+
|   504|0.406089913506|
|    81|0.132214897776|
|  1665|0.393763462002|
+------+--------------+
only showing top 3 rows



In [27]:
userTeamStrength.printSchema
userTeamStrength.count

root
 |-- userId: integer (nullable = true)
 |-- teamStrength: double (nullable = true)



1213

### userClicks 

In [28]:
val userClicksRaw = sqlContext.read.
                        options(Map(
                            "header"->"true",
                            "inferSchema"->"true"
                        )).
                        csv("../DataSets/flamingo-data/game-clicks.csv").
                        select("timestamp","clickId","userId")

userClicksRaw = [timestamp: timestamp, clickId: int ... 1 more field]


[timestamp: timestamp, clickId: int ... 1 more field]

In [29]:
userClicksRaw.printSchema
userClicksRaw.count

root
 |-- timestamp: timestamp (nullable = true)
 |-- clickId: integer (nullable = true)
 |-- userId: integer (nullable = true)



755806

In [30]:
userClicksRaw show 3

+-------------------+-------+------+
|          timestamp|clickId|userId|
+-------------------+-------+------+
|2016-05-26 15:06:55|    105|  1038|
|2016-05-26 15:07:09|    154|  1099|
|2016-05-26 15:07:14|    229|   899|
+-------------------+-------+------+
only showing top 3 rows



In [31]:
val userClicksHour = userClicksRaw.withColumn(
    "timestamp", date_format($"timestamp", "MM/dd/yyyy HH")
).withColumnRenamed("timestamp", "hour")

userClicksHour = [hour: string, clickId: int ... 1 more field]


[hour: string, clickId: int ... 1 more field]

In [32]:
val userClicksPerHour = userClicksHour.groupBy(
    "userId","hour"
).
count.
withColumnRenamed("count", "clicks")

userClicksPerHour = [userId: int, hour: string ... 1 more field]


[userId: int, hour: string ... 1 more field]

In [33]:
val userClicks = userClicksPerHour.
groupBy("userId").
agg(
    avg("clicks") as "avg_clicks/hr"
)

userClicks = [userId: int, avg_clicks/hr: double]


[userId: int, avg_clicks/hr: double]

In [34]:
userClicks.printSchema
userClicks.count

root
 |-- userId: integer (nullable = true)
 |-- avg_clicks/hr: double (nullable = true)



1193

# Integrating all the datasets into one

In [35]:
adClicks.createOrReplaceTempView("adClicks")
purchases.createOrReplaceTempView("purchases")
userTeamStrength.createOrReplaceTempView("userTeamStrength")
userClicks.createOrReplaceTempView("userClicks")

Join all the above tables into one and drop the NaN/Null values

In [36]:
val df = spark.sql(
    "SELECT ac.adClicks, p.`Total Purchase`, s.teamStrength, c.`avg_clicks/hr` FROM adclicks ac INNER JOIN purchases p on ac.userId==p.userId INNER JOIN userTeamStrength s on ac.userId==s.userId INNER JOIN userClicks c on ac.userId==c.userId"
).
na.
drop

df = [adClicks: bigint, Total Purchase: double ... 2 more fields]


[adClicks: bigint, Total Purchase: double ... 2 more fields]

In [37]:
df show 5

+--------+--------------+--------------+------------------+
|adClicks|Total Purchase|  teamStrength|     avg_clicks/hr|
+--------+--------------+--------------+------------------+
|      51|         202.0|0.141376627543|1.7583892617449663|
|      41|          16.0|0.320057042827|2.5310880829015545|
|      17|           8.0|0.132214897776|2.0707964601769913|
|      56|          14.0|0.350676528613|1.9941176470588236|
|      39|          20.0|0.340788463107|1.8021582733812949|
+--------+--------------+--------------+------------------+
only showing top 5 rows



In [38]:
df.printSchema
df.count

root
 |-- adClicks: long (nullable = false)
 |-- Total Purchase: double (nullable = true)
 |-- teamStrength: double (nullable = true)
 |-- avg_clicks/hr: double (nullable = true)



357

In [39]:
df.describe().show

+-------+------------------+------------------+-------------------+------------------+
|summary|          adClicks|    Total Purchase|       teamStrength|     avg_clicks/hr|
+-------+------------------+------------------+-------------------+------------------+
|  count|               357|               357|                357|               357|
|   mean| 34.49579831932773|46.378151260504204| 0.5147398670937521| 2.161097447318779|
| stddev|14.508932128512162|45.351187365370755|0.28548569836670606| 0.867561647113368|
|    min|                 1|               1.0|    0.0132752022046|1.3636363636363635|
|    max|                67|             223.0|     0.952175553216| 9.808695652173913|
+-------+------------------+------------------+-------------------+------------------+



# Scaling the Data for Clustering

### Using VectorAssembler to gather all features into one column.

In [40]:
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}

In [41]:
val featuresUsed = df.columns
val assembler = new VectorAssembler().setInputCols(featuresUsed).setOutputCol("features_unscaled")
val assembled = assembler.transform(df)

featuresUsed = Array(adClicks, Total Purchase, teamStrength, avg_clicks/hr)
assembler = vecAssembler_294b41990025
assembled = [adClicks: bigint, Total Purchase: double ... 3 more fields]


[adClicks: bigint, Total Purchase: double ... 3 more fields]

### Scale the features using StandardScaler

In [42]:
val scaler = new StandardScaler().
setInputCol("features_unscaled").
setOutputCol("features").
setWithMean(true).
setWithStd(true)

val scalerModel = scaler.fit(assembled)

val dataWithScaledFeatures = scalerModel.transform(assembled)

scaler = stdScal_cefe059d044e
scalerModel = stdScal_cefe059d044e
dataWithScaledFeatures = [adClicks: bigint, Total Purchase: double ... 4 more fields]


[adClicks: bigint, Total Purchase: double ... 4 more fields]

In [43]:
val scaledData = dataWithScaledFeatures.select("features")
scaledData.persist

scaledData = [features: vector]


[features: vector]

# K-Means Clustering

In [44]:
import org.apache.spark.ml.clustering.KMeans

In [45]:
val kmeans = new KMeans().setK(3).setSeed(3)

kmeans = kmeans_0e4f02c2494b


kmeans_0e4f02c2494b

In [46]:
val model = kmeans.fit(scaledData)

model = kmeans_0e4f02c2494b


kmeans_0e4f02c2494b

In [47]:
model.clusterCenters foreach println

[0.5657139554913385,0.563821993934577,-0.735529367238947,-0.13910127144715317]
[-1.2507587950221974,-0.5508930923104601,-0.10507183478055206,-0.1993074766586671]
[0.5306311917353007,-0.1422001957462632,0.9800822528171503,0.35843527965628785]


In [48]:
df.columns

Array(adClicks, Total Purchase, teamStrength, avg_clicks/hr)