In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext()
sqlContext = SQLContext(sc)

train = pd.read_csv(r'/home/kritz/Documents/DDL/Ex09/tags.dat',sep="::",header=None)

  


In [2]:
train.columns=["userId","movieaId","tag","timestamp"]
train.head()
train.info()
train['tag']= train['tag'].astype(str)
data = sqlContext.createDataFrame(train)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 95580 entries, 0 to 95579
Data columns (total 4 columns):
userId       95580 non-null int64
movieaId     95580 non-null int64
tag          95564 non-null object
timestamp    95580 non-null int64
dtypes: int64(3), object(1)
memory usage: 2.9+ MB


In [3]:
from pyspark.sql import functions as f
from pyspark.sql import types as t
data = data.withColumn('time', f.date_format(data.timestamp.cast(dataType=t.TimestampType()), "dd-MM-yyyy HH:mm:ss"))

In [4]:
from pyspark.sql.functions import *
data.dtypes
data = data.withColumn('new_time',to_timestamp(data.time, 'dd-MM-yyyy HH:mm:ss'))

In [5]:
from pyspark.sql import Window
w = Window.partitionBy("userId").orderBy(asc("new_time"))
dataNew = data.withColumn('lag',lag(data.new_time).over(w))
timeFmt = "yyyy-MM-dd'T'HH:mm:ss"
dataNew = dataNew.withColumn('diff',when((unix_timestamp(dataNew.new_time, format=timeFmt)
            - unix_timestamp(dataNew.lag, format=timeFmt))/60 < 30,0).otherwise(1))
dataNew = dataNew.withColumn('session', sum('diff').over(w))
columns_to_drop = ['time', 'lag','timestamp','diff']
dataNew = dataNew.drop(*columns_to_drop)
dataNew.show(10)

+------+--------+--------------------+-------------------+-------+
|userId|movieaId|                 tag|           new_time|session|
+------+--------+--------------------+-------------------+-------+
|  1806|   43560|              comedy|2006-05-18 22:23:28|      1|
|  1806|   43560|                kids|2006-05-18 22:23:28|      1|
|  1806|    7018|            language|2007-02-22 16:24:59|      2|
|  1806|    7152|              nudity|2007-04-13 19:05:53|      3|
|  1806|    7152|                dark|2007-04-13 19:06:30|      3|
|  1806|   44709|        heartwarming|2007-04-13 19:26:25|      3|
|  1806|   44199|intelligent thriller|2007-04-13 19:28:17|      3|
|  1806|   43936|               tense|2007-04-13 19:29:36|      3|
|  1806|   43928|              stupid|2007-04-13 19:30:29|      3|
|  1806|   42734|              clever|2007-04-13 19:32:16|      3|
+------+--------+--------------------+-------------------+-------+
only showing top 10 rows



In [6]:
frequency = dataNew.groupBy("userId").max("session").orderBy('max(session)', ascending=False)
frequency.show(10)

+------+------------+
|userId|max(session)|
+------+------------+
| 10555|         891|
| 23172|         482|
|   146|         333|
| 33384|         269|
| 47448|         199|
| 34745|         144|
| 11898|         127|
| 30167|         115|
| 64633|         108|
|  8041|         104|
+------+------------+
only showing top 10 rows



In [7]:
dataNew.groupBy("userId").mean("session").orderBy('avg(session)', ascending=False).show()

+------+------------------+
|userId|      avg(session)|
+------+------------------+
| 10555| 530.8542914171657|
| 23172|267.95190877540904|
|   146| 128.9478155339806|
| 33384|109.33039647577093|
| 47448| 91.14512195121951|
| 11898| 62.71298174442191|
| 64633| 56.24504249291785|
| 34745| 53.15585443037975|
| 41838|43.367198838896954|
|  6362|          43.28125|
| 23388| 38.94854586129754|
| 50970| 34.81666666666667|
|  8041| 34.44179104477612|
| 32828|29.953929539295395|
|  3962|29.456043956043956|
| 19460|29.116071428571427|
| 48621| 29.08076923076923|
| 49882| 28.65049928673324|
| 24221|  27.5472972972973|
| 39689| 27.19685039370079|
+------+------------------+
only showing top 20 rows



In [8]:
dataNew.groupBy("userId").agg(stddev("session")).show()

+------+--------------------+
|userId|stddev_samp(session)|
+------+--------------------+
|  1806|  1.2004900959975617|
|  2040|                 0.0|
| 15437|                 NaN|
| 15663|                 NaN|
| 15846|                 0.0|
| 18295|                 0.5|
| 18730|                 NaN|
| 19141|                 NaN|
| 25649|  1.2909944487358056|
| 27919|  0.5773502691896258|
| 29018|                 NaN|
| 31156|                 NaN|
| 37098|                 NaN|
| 39104|                 NaN|
| 39713|  0.5773502691896258|
| 48280|  0.5773502691896257|
| 50049|                 0.0|
| 55700|                 NaN|
| 60016|                 NaN|
| 60738|                 0.0|
+------+--------------------+
only showing top 20 rows



In [11]:
avg = dataNew.agg(mean("session")).collect()[0]['avg(session)']
std = dataNew.agg(stddev("session")).collect()[0]['stddev_samp(session)']

In [12]:
print("Mean of session frequency across all users is ", avg," and std is ",std)

Mean of session frequency across all users is  61.71259677756853  and std is  150.1792632124109


In [26]:
userMean = dataNew.groupBy("userId").mean("session").orderBy('avg(session)', ascending=False)
temp = userMean.withColumn("checkMean",when(userMean["avg(session)"] < (2*std), 1).otherwise(0))

In [32]:
users = temp.filter(temp.checkMean == 1)
users.select('userId').distinct().show()

+------+
|userId|
+------+
| 62989|
|  1806|
| 25649|
| 18295|
| 27919|
| 48280|
| 39713|
|  2040|
| 15437|
| 15663|
| 15846|
| 18730|
| 19141|
| 29018|
| 31156|
| 37098|
| 39104|
| 50049|
| 55700|
| 60016|
+------+
only showing top 20 rows

