In [1]:
#!pip install pyarrow
#!pip install pyspark
#!pip install pandas

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [3]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
big_data = False
path ='data/100k_a.csv' if  not big_data else 'data/full_a.csv'
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv(path, names = ['userId','streamId', 'streamerName', 'timeStart', 'timeStop']) # if no header
pandas_df.streamerName = pandas_df.streamerName.astype(str)
s_df = sql_sc.createDataFrame(pandas_df)



In [4]:
s_df.show()

+------+-----------+---------------+---------+--------+
|userId|   streamId|   streamerName|timeStart|timeStop|
+------+-----------+---------------+---------+--------+
|     1|33842865744|       mithrain|      154|     156|
|     1|33846768288|          alptv|      166|     169|
|     1|33886469056|       mithrain|      587|     588|
|     1|33887624992|           wtcn|      589|     591|
|     1|33890145056|      jrokezftw|      591|     594|
|     1|33903958784|    berkriptepe|      734|     737|
|     1|33929318864|kendinemuzisyen|     1021|    1036|
|     1|33942837056|           wtcn|     1165|    1167|
|     1|33955351648|kendinemuzisyen|     1295|    1297|
|     1|34060922080|       mithrain|     2458|    2459|
|     1|34062621584|        unlostv|     2454|    2456|
|     1|34077379792|       mithrain|     2601|    2603|
|     1|34078096176|           zeon|     2603|    2604|
|     1|34079135968|        elraenn|     2600|    2601|
|     1|34082259232|           zeon|     2604|  

In [5]:
import sys
s_df = s_df.withColumn('interactionTime', ( s_df['timeStop'] - s_df['timeStart'] ) *10 )
s_df = s_df.drop('timeStart', 'timeStop')
tmax=s_df.select("interactionTime").rdd.max()[0]
tmin=s_df.select("interactionTime").rdd.min()[0]

In [6]:
s_df = s_df.withColumn('interactionTime', ( s_df['interactionTime'] - tmin)/ (tmax - tmin))
s_df = s_df.withColumn('interactionTime', ( s_df['interactionTime'] *99) +1 )
from pyspark.sql.functions import udf, col, lit
udf_s = udf(lambda x: hash(x) & sys.maxsize)
s_df = s_df.withColumn('streamerId', udf_s(col('streamerName')))
s_df = s_df.drop('streamerName')

In [7]:
s_df.show()

+------+-----------+------------------+-------------------+
|userId|   streamId|   interactionTime|         streamerId|
+------+-----------+------------------+-------------------+
|     1|33842865744|           2.03125|6237787267506800044|
|     1|33846768288|            3.0625|8455602530264088426|
|     1|33886469056|               1.0|6237787267506800044|
|     1|33887624992|           2.03125|5324476659562100413|
|     1|33890145056|            3.0625|6731343583494858722|
|     1|33903958784|            3.0625|4995281249414468822|
|     1|33929318864|15.437500000000002|2527439964060042775|
|     1|33942837056|           2.03125|5324476659562100413|
|     1|33955351648|           2.03125|2527439964060042775|
|     1|34060922080|               1.0|6237787267506800044|
|     1|34062621584|           2.03125|1360586551466601474|
|     1|34077379792|           2.03125|6237787267506800044|
|     1|34078096176|               1.0|4597330403337703764|
|     1|34079135968|               1.0|4

In [8]:
means = s_df.groupBy('streamerId').mean('interactionTime')
means_j = s_df.join(means, "streamerId")
means_j = means_j.drop('interactionTime')

In [9]:
means_j.show()

+-------------------+------+-----------+--------------------+
|         streamerId|userId|   streamId|avg(interactionTime)|
+-------------------+------+-----------+--------------------+
|7816379048229323178|    10|33937796688|                 1.0|
|2591307633944080486|    14|34216684384|   3.133972772277228|
|6521280271403371604|    18|33992690592|           6.4140625|
|  31729193875654826|    21|33826066208|           2.2890625|
|  31729193875654826|    21|33841709136|           2.2890625|
|  31729193875654826|    21|33886819392|           2.2890625|
| 714142164227083335|    31|34070105856|   1.843116554054054|
|4427644307642948912|    59|34039720720|   3.042655109489051|
|4427644307642948912|    59|34339008464|   3.042655109489051|
| 148372867682752476|    61|34087641984|   3.162468112244898|
|4665382578818889225|   114|34193555568|  6.3462171052631575|
|3476609577227435997|   116|34158140528|                 1.0|
|4427644307642948912|   121|34298536080|   3.042655109489051|
| 714142

In [10]:
means_j = means_j.withColumnRenamed("avg(interactionTime)","avgInteractionTime")

In [11]:
temp1 = means_j.groupBy('streamerId').count()
means_j2 = means_j.join(temp1, "streamerId")
means_j2 = means_j2.withColumnRenamed("streamerId","streamerId")
means_j2 = means_j2.withColumnRenamed("avgInteractionTime","avgInteractionTime")
means_j2 = means_j2.withColumnRenamed("count","interactionCounts")

In [12]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
df_final = means_j2.withColumn("streamerId", means_j2.streamerId.cast("bigint"))

In [13]:
df_final.show(3)

+-------------------+------+-----------+------------------+-----------------+
|         streamerId|userId|   streamId|avgInteractionTime|interactionCounts|
+-------------------+------+-----------+------------------+-----------------+
|1000001876270782414| 24906|34240427920|            1.6875|                3|
|1000001876270782414| 26664|34240427920|            1.6875|                3|
|1000001876270782414| 36942|34202272448|            1.6875|                3|
+-------------------+------+-----------+------------------+-----------------+
only showing top 3 rows



In [14]:
df4 = df_final.na.drop(how='any')

In [15]:
from pyspark.sql.functions import isnan, when, count, col
df4.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df4.columns]).show()


+----------+------+--------+------------------+-----------------+
|streamerId|userId|streamId|avgInteractionTime|interactionCounts|
+----------+------+--------+------------------+-----------------+
|         0|     0|       0|                 0|                0|
+----------+------+--------+------------------+-----------------+



In [16]:
from pyspark.ml.feature import VectorAssembler
df4.columns
assemble=VectorAssembler(inputCols=[
'avgInteractionTime', 'interactionCounts'], outputCol='features')
assembled_data=assemble.transform(df4)


In [17]:
assembled_data.show(3)

+-------------------+------+-----------+------------------+-----------------+------------+
|         streamerId|userId|   streamId|avgInteractionTime|interactionCounts|    features|
+-------------------+------+-----------+------------------+-----------------+------------+
|1000001876270782414| 24906|34240427920|            1.6875|                3|[1.6875,3.0]|
|1000001876270782414| 26664|34240427920|            1.6875|                3|[1.6875,3.0]|
|1000001876270782414| 36942|34202272448|            1.6875|                3|[1.6875,3.0]|
+-------------------+------+-----------+------------------+-----------------+------------+
only showing top 3 rows



In [18]:
from pyspark.ml.feature import StandardScaler
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)


In [19]:
## finding ideal k
import numpy as np
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
cost = []
for k in range(5,15):
    print(k)
    kmeans = KMeans().setK(k).setSeed(38).setFeaturesCol("features")
    model = kmeans.fit(data_scale_output.select('streamerId', 'features').sample(False,0.1, seed=38))
    predictions = model.transform(data_scale_output)
    evaluator = ClusteringEvaluator()
    silhouette = evaluator.evaluate(predictions)
    print("Silhouette with squared euclidean distance = " + str(silhouette))
    cost.append(silhouette)


5
Silhouette with squared euclidean distance = 0.8086940269323563
6
Silhouette with squared euclidean distance = 0.8447657482936937
7
Silhouette with squared euclidean distance = 0.8469084967382502
8
Silhouette with squared euclidean distance = 0.8374600771116928
9
Silhouette with squared euclidean distance = 0.8167735951540936
10


KeyboardInterrupt: 

In [None]:
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20])
ax.set_xlabel('k')
ax.set_ylabel('cost')

In [None]:
import numpy as np
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
kmeans = KMeans().setK(7).setSeed(38).setFeaturesCol("features")
model = kmeans.fit(data_scale_output)
predictions = model.transform(data_scale_output)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [None]:
transformed = model.transform(data_scale_output).select('streamerId', 'prediction')
rows = transformed.collect()
print(rows[:3])

In [None]:
df_pred = sql_sc.createDataFrame(rows)
df_pred.show()

In [None]:
df_pred = df_pred.join(df4, 'streamerId')
df_pred.show()

In [None]:
df_pred.dropDuplicates(['prediction']).show()# print(distinctValuesDF)

In [None]:
model.save('models/km_model')
# model2 = KMeansModel.load(model_path)

In [None]:
#!zip -r /km_model.zip /content/km_model


In [None]:
#from google.colab import files
#files.download("/km_model.zip")