In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark.sql.functions as F
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
import matplotlib.pyplot as plt
import numpy as np
from pyspark_dist_explore import hist
from pyspark.sql.types import IntegerType
import datetime
import time

# Importing the data using spark dataframe from tags.dat

In [3]:
conf = SparkConf().setAppName("Spark Count")
sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    

df = spark.read.option("header", "false") \
    .option("delimiter", ":") \
    .option("inferSchema", "true") \
    .csv(r"E:\Documents\University of Hildesheim\Distributed data analytics\lab8\ml-10M100K\tags.dat")
df.show()

+---+----+-----+----+--------------------+----+----------+
|_c0| _c1|  _c2| _c3|                 _c4| _c5|       _c6|
+---+----+-----+----+--------------------+----+----------+
| 15|null| 4973|null|          excellent!|null|1215184630|
| 20|null| 1747|null|            politics|null|1188263867|
| 20|null| 1747|null|              satire|null|1188263867|
| 20|null| 2424|null|     chick flick 212|null|1188263835|
| 20|null| 2424|null|               hanks|null|1188263835|
| 20|null| 2424|null|                ryan|null|1188263835|
| 20|null| 2947|null|              action|null|1188263755|
| 20|null| 2947|null|                bond|null|1188263756|
| 20|null| 3033|null|               spoof|null|1188263880|
| 20|null| 3033|null|           star wars|null|1188263880|
| 20|null| 7438|null|              bloody|null|1188263801|
| 20|null| 7438|null|             kung fu|null|1188263801|
| 20|null| 7438|null|           Tarantino|null|1188263801|
| 21|null|55247|null|                   R|null|120508150

# Final data with column headings

In [4]:
columns_to_drop = ['_c1', '_c3','_c5']
df = df.drop(*columns_to_drop)    
df = df.select(F.col("_c0").alias("UserID"), F.col("_c2").alias("MovieID"),F.col("_c4").alias("Tag"),F.col("_c6").alias("Timestamp"))
df=df.orderBy('UserID', ascending=True)
df = df.withColumn("Timestamp", df["Timestamp"].cast(IntegerType())) #casting the column to integer for so that it can be aggregated
df.show()


+------+-------+--------------------+----------+
|UserID|MovieID|                 Tag| Timestamp|
+------+-------+--------------------+----------+
|    15|   4973|          excellent!|1215184630|
|    20|   2424|                ryan|1188263835|
|    20|   1747|            politics|1188263867|
|    20|   2424|     chick flick 212|1188263835|
|    20|   7438|              bloody|1188263801|
|    20|   7438|             kung fu|1188263801|
|    20|   7438|           Tarantino|1188263801|
|    20|   2947|              action|1188263755|
|    20|   1747|              satire|1188263867|
|    20|   2947|                bond|1188263756|
|    20|   3033|               spoof|1188263880|
|    20|   3033|           star wars|1188263880|
|    20|   2424|               hanks|1188263835|
|    21|  55253|               NC-17|1205081488|
|    21|  55247|                   R|1205081506|
|    25|     50|        Kevin Spacey|1166101426|
|    25|   6709|         Johnny Depp|1162147221|
|    31|     65|    

### The dataframe is grouped by UserID and an aggregation using the function collect_list to get all the timestamp for each userID

In [5]:
df_grp=df.withColumn('UserID',df.UserID.cast("int")).groupBy(['UserID']).agg(F.collect_list("Timestamp").alias('time'))
df_grp=df_grp.orderBy('UserID', ascending=True)
df_grp.show()

+------+--------------------+
|UserID|                time|
+------+--------------------+
|    15|        [1215184630]|
|    20|[1188263867, 1188...|
|    21|[1205081506, 1205...|
|    25|[1166101426, 1162...|
|    31|[1188263759, 1188...|
|    32|        [1164735331]|
|    39|[1188263791, 1188...|
|    48|[1215135611, 1215...|
|    49|[1188264255, 1188...|
|    75|        [1162160415]|
|    78|        [1176691425]|
|   109|[1211433235, 1165...|
|   127|[1188265347, 1188...|
|   133|[1188265396, 1188...|
|   146|[1226742764, 1196...|
|   147|[1162188712, 1162...|
|   170|        [1162209176]|
|   175|[1188441420, 1192...|
|   181|[1188266123, 1188...|
|   190|[1151700107, 1151...|
+------+--------------------+
only showing top 20 rows



# Converting pyspark dataframe to pandas dataframe for data calculations

In [6]:
df_grp=df_grp.toPandas() 
print(df_grp)

      UserID                                               time
0         15                                       [1215184630]
1         20  [1188263867, 1188263867, 1188263835, 118826383...
2         21                           [1205081506, 1205081488]
3         25                           [1166101426, 1162147221]
4         31  [1188263759, 1188263674, 1188263741, 118826370...
5         32                                       [1164735331]
6         39  [1188263791, 1188263843, 1188263764, 118826378...
7         48                           [1215135611, 1215135517]
8         49  [1188264255, 1188264178, 1188264095, 118826409...
9         75                                       [1162160415]
10        78                                       [1176691425]
11       109  [1211433235, 1165555281, 1165555288, 123112228...
12       127  [1188265347, 1188265347, 1188265347, 118826536...
13       133  [1188265396, 1188265375, 1188265375, 118826537...
14       146  [1226742764, 1196517851, 1

# Finding out the tagging frequency of each user after separating the Timestamps for each user
- Calulated the mean and standard deviation for each user

In [7]:
sess=[]
tags=[]               
for userid in range (0,len(df_grp)):
    session=1
    frequen=1
    freq=[]
    for i in range (1,len(df_grp['time'][userid])):
            if(df_grp['time'][userid][i]-df_grp['time'][userid][i-1])>1800:
                session+=1
                freq.append(frequen)
                frequen=0
            frequen+=1
    freq.append(frequen)
    tags.append(freq)
    sess.append(session)
df_grp["Session"]=sess
df_grp["frequency"]=tags
mean_of_frq=[]
sum_of_frq=[]
stddev=[]
for i in tags:
    mean=np.mean(i)
    total=np.sum(i)
    std=np.std(i)
    mean_of_frq.append(mean)
    sum_of_frq.append(total)
    stddev.append(std)
df_grp["mean_of_frequency"]= mean_of_frq
df_grp["sum_of_frequency"]= sum_of_frq
df_grp["stddev"]=stddev
df=spark.createDataFrame(df_grp)
# df = df.drop(*columns_to_drop)
df.show()

+------+--------------------+-------+--------------------+------------------+----------------+------------------+
|UserID|                time|Session|           frequency| mean_of_frequency|sum_of_frequency|            stddev|
+------+--------------------+-------+--------------------+------------------+----------------+------------------+
|    15|        [1215184630]|      1|                 [1]|               1.0|               1|               0.0|
|    20|[1188263867, 1188...|      1|                [12]|              12.0|              12|               0.0|
|    21|[1205081506, 1205...|      1|                 [2]|               2.0|               2|               0.0|
|    25|[1166101426, 1162...|      1|                 [2]|               2.0|               2|               0.0|
|    31|[1188263759, 1188...|      1|                 [5]|               5.0|               5|               0.0|
|    32|        [1164735331]|      1|                 [1]|               1.0|           

# Mean and standard deviation of the tagging frequency of All user

In [8]:
mean_std=df.select(F.mean(df['sum_of_frequency']).alias('average'),F.stddev(df['sum_of_frequency']).alias('standard_deviation'))
mean_std.show()

+------------------+------------------+
|           average|standard_deviation|
+------------------+------------------+
|23.287353454726865| 171.3979804629899|
+------------------+------------------+



#  List of users with a mean tagging frequency within the two standard deviation from


In [9]:
l_bound=23.287353454726865-2*171.3979804629899
u_bound=23.287353454726865+2*171.3979804629899
df.filter(df['mean_of_frequency'].between(l_bound,u_bound)).show()

+------+--------------------+-------+--------------------+------------------+----------------+------------------+
|UserID|                time|Session|           frequency| mean_of_frequency|sum_of_frequency|            stddev|
+------+--------------------+-------+--------------------+------------------+----------------+------------------+
|    15|        [1215184630]|      1|                 [1]|               1.0|               1|               0.0|
|    20|[1188263867, 1188...|      1|                [12]|              12.0|              12|               0.0|
|    21|[1205081506, 1205...|      1|                 [2]|               2.0|               2|               0.0|
|    25|[1166101426, 1162...|      1|                 [2]|               2.0|               2|               0.0|
|    31|[1188263759, 1188...|      1|                 [5]|               5.0|               5|               0.0|
|    32|        [1164735331]|      1|                 [1]|               1.0|           