In [None]:
from pyspark.sql import Row, DataFrame, HiveContext
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer

sqlContext_H = HiveContext(sc)

from scipy.sparse import lil_matrix

from sklearn.decomposition import TruncatedSVD
from sklearn.metrics.pairwise import cosine_similarity

import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', 250)

import numpy as np
import copy
from collections import OrderedDict

import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
%matplotlib inline

---

# Read log files and calculate TF-IDF

---

In [None]:
%%time

fields_list = (
    "ACCESS_DTTM", "ACCESS_MONTH", "ACCESS_WEEK", "ACCESS_DAY", "ACCESS_HOUR", "USER_ID", "WORKSTATION"
)
full_data = sqlContext_H.read.parquet(HOST + "/parquet2/*/*").select(*fields_list).persist()
sqlContext_H.registerDataFrameAsTable(full_data, 'full_data')

In [None]:
%%time

total_users = sc.textFile(HOST + "/csv/unique_users.csv").count()
total_workstations = sc.textFile(HOST + "/csv/unique_workstations.csv").count()
print "Total users:", total_users
print "Total workstations:", total_workstations

In [None]:
%%time

query = lambda table, group_by, total_users: " ".join([
    "SELECT mainT1.USER_ID, mainT1.WORKSTATION, {}, mainT1.TF * mainT2.IDF AS TF_IDF".format(", ".join(["mainT1."+i for i in group_by.replace(" ","").split(",")])),
    "FROM (SELECT T1.USER_ID, T1.WORKSTATION, {}, T1.AMOUNT*1.0 / T2.TOTAL AS TF".format(", ".join(["T1."+i for i in group_by.replace(" ","").split(",")])),   
        "FROM (SELECT USER_ID, {group_by}, WORKSTATION, COUNT(ACCESS_DTTM) AS AMOUNT",
            "FROM {table_name}",
            "GROUP BY USER_ID, {group_by}, WORKSTATION",
        ") AS T1",
        "JOIN (SELECT USER_ID, {group_by}, COUNT(ACCESS_DTTM) AS TOTAL",
            "FROM {table_name}",
            "GROUP BY USER_ID, {group_by}",
        ") AS T2",
        "ON T1.USER_ID = T2.USER_ID AND {}".format(" AND ".join(["T1."+i+" = T2."+i for i in group_by.replace(" ","").split(",")])),
    ") AS mainT1 JOIN (",
        "SELECT USER_ID, {group_by}, log10(1 + {total_users}*1.0 / COUNT(DISTINCT(WORKSTATION))) AS IDF",
        "FROM {table_name}",
        "GROUP BY USER_ID, {group_by}",
    ") AS mainT2 ON mainT1.USER_ID = mainT2.USER_ID AND {}".format(" AND ".join(["mainT1."+i+" = mainT2."+i for i in group_by.replace(" ","").split(",")])),
    ]).format(table_name=table, group_by=group_by, total_users=total_users)

res = sqlContext_H.sql(query("full_data", "ACCESS_WEEK", total_users)).persist()
res.limit(10).toPandas()

In [None]:
%%time
plt.figure(figsize=(16, 7))
x = np.array(map(lambda x: x[0], res.select("TF_IDF").collect()))
plt.hist(x, 100, facecolor='green', alpha=0.75)
plt.xlabel("TF_IDF", fontsize=12)
plt.ylabel('Amount', fontsize=12)
plt.grid(True)
plt.show()

---

---

# Factorization

### Read data of the 7th month

In [None]:
try:
    full_data.unpersist()
    all_users.unpersist()
    all_workstations.unpersist()
except:
    pass

month = 7
fields_list = ( "ACCESS_DTTM", "USER_ID", "WORKSTATION" )
data = sqlContext_H.read.parquet( HOST + "/parquet2/0{}/*".format(month) ).select(*fields_list).persist()
sqlContext_H.registerDataFrameAsTable(data, 'data')
data.limit(5).toPandas()

### Calculate TF-IDF and encode USER_ID and WORKSTATION to label indices

In [None]:
%%time

# 1. Count all unique users of the defined month
overwrite = False
if overwrite:
    all_users = sqlContext_H.sql("SELECT DISTINCT(USER_ID) AS count FROM data")    
    all_users.write.format("com.databricks.spark.csv").mode('overwrite').save(HOST + "/csv/unique_users_{}.csv".format(month))

total_users = sc.textFile(HOST + "/csv/unique_users_{}.csv".format(month)).count()

# 2. Calculate tf-idf
query = lambda table, total_users: """
    SELECT mainT1.USER_ID, mainT1.WORKSTATION, mainT1.TF * mainT2.IDF AS TF_IDF, mainT2.IDF
    FROM (SELECT T1.USER_ID, T1.WORKSTATION, T1.AMOUNT*1.0 / T2.TOTAL AS TF 
        FROM (SELECT USER_ID, WORKSTATION, COUNT(ACCESS_DTTM) AS AMOUNT
            FROM {table_name}
            GROUP BY USER_ID, WORKSTATION
        ) AS T1
        JOIN (SELECT USER_ID, COUNT(ACCESS_DTTM) AS TOTAL
            FROM {table_name}
            GROUP BY USER_ID
        ) AS T2
        ON T1.USER_ID = T2.USER_ID
    ) AS mainT1 JOIN (
        SELECT T.USER_ID, log10( 1 + {total_users} * 1.0 / COUNT(T.WORKSTATION) ) AS IDF
        FROM (SELECT USER_ID, WORKSTATION
            FROM {table_name}
            GROUP BY USER_ID, WORKSTATION
        ) AS T
        GROUP BY T.USER_ID
    ) AS mainT2 ON mainT1.USER_ID = mainT2.USER_ID
""".format(table_name=table, total_users=total_users)

res = sqlContext_H.sql(query("data", total_users)).persist()

# 3. Encode a string column of labels to a column of label indices
all_users = sqlContext_H.createDataFrame( 
        sc.textFile(HOST + "/csv/unique_users.csv").map(lambda p: Row(USER_ID=p)) 
    ).union( sqlContext_H.createDataFrame([Row(USER_ID='')]) ).persist()
all_workstations = sqlContext_H.createDataFrame( 
        sc.textFile(HOST + "/csv/unique_workstations.csv").map(lambda p: Row(WORKSTATION=p)) 
    ).union( sqlContext_H.createDataFrame([Row(WORKSTATION='')]) ).persist()

indexerU = StringIndexer(inputCol="USER_ID", outputCol="USER_ID_Index").fit(all_users)
indexerW = StringIndexer(inputCol="WORKSTATION", outputCol="WORKSTATION_Index").fit(all_workstations)

all_users.unpersist()
all_workstations.unpersist()

indexedU_df = indexerU.transform(res).withColumn(
        "USER_ID_Index", F.col("USER_ID_Index").cast(IntegerType())
    ).persist()

table = indexerW.transform(indexedU_df).withColumn(
        "WORKSTATION_Index", F.col("WORKSTATION_Index").cast(IntegerType())
    ).persist()

res.unpersist()
indexedU_df.unpersist()
table.limit(5).toPandas()

# SVD

In [None]:
%%time 

t = table.select("USER_ID_Index", "WORKSTATION_Index", "TF_IDF").toPandas()

total_users = sc.textFile(HOST + "/csv/unique_users.csv").count()
total_workstations = sc.textFile(HOST + "/csv/unique_workstations.csv").count()

def sparse_df_to_array(df, shape):
    """ Convert sparse dataframe to sparse array csr_matrix used by scikit learn. """
    arr = lil_matrix(shape, dtype=np.float32)
    for i in range(df.shape[0]):
        arr[df.ix[i, "USER_ID_Index"]-1, df.ix[i, "WORKSTATION_Index"]-1] = df.ix[i, "TF_IDF"]
    return arr.tocsr()

m = sparse_df_to_array(t, (total_users, total_workstations))

In [None]:
%%time 

svd = TruncatedSVD(n_components=10, n_iter=15, random_state=42)
svd.fit(m) 
print(svd.explained_variance_ratio_) 
print(svd.explained_variance_ratio_.sum()) 

### Caclulate cosine similarities

Prepare dataframes for each day from the 8th month

In [None]:
%%time

query = lambda table: """
    SELECT mainT1.USER_ID, mainT1.WORKSTATION, mainT1.TF * mainT2.IDF AS TF_IDF
    FROM (SELECT T1.USER_ID, T1.WORKSTATION, T1.AMOUNT*1.0 / T2.TOTAL AS TF 
        FROM (SELECT USER_ID, WORKSTATION, COUNT(ACCESS_DTTM) AS AMOUNT
            FROM {table_name}
            GROUP BY USER_ID, WORKSTATION
        ) AS T1
        JOIN (SELECT USER_ID, COUNT(ACCESS_DTTM) AS TOTAL
            FROM {table_name}
            GROUP BY USER_ID
        ) AS T2
        ON T1.USER_ID = T2.USER_ID
    ) AS mainT1 JOIN (
        SELECT T.USER_ID, 
               log10( 1 + (SELECT COUNT(DISTINCT(USER_ID)) AS count FROM {table_name}) * 1.0 / COUNT(T.WORKSTATION) ) AS IDF
        FROM (SELECT USER_ID, WORKSTATION
            FROM {table_name}
            GROUP BY USER_ID, WORKSTATION
        ) AS T
        GROUP BY T.USER_ID
    ) AS mainT2 ON mainT1.USER_ID = mainT2.USER_ID
""".format(table_name=table)


for d in range(1, 32):
    try: sqlContext_H.dropTempTable("df")
    except: pass
    try:
        df = sqlContext_H.read.parquet(
                HOST + "/parquet2/08/Epic_Access_Log_201608{0:02d}.parquet".format(d)
            ).select(*fields_list).persist()
    except:
        continue
    sqlContext_H.registerDataFrameAsTable(df, 'df')
    globals()['df_table_' + str(d)] = sqlContext_H.sql(query("df")).persist()
    df.unpersist()

In [None]:
%%time

# 1. Fit FM 
svd = TruncatedSVD(n_components=50, n_iter=25, random_state=42)
svd.fit(m) 
print "svd.explained_variance_ratio_.sum() =", svd.explained_variance_ratio_.sum()

main1 = svd.transform(m)
main2 = svd.inverse_transform(main1)

main_svd = True

# Here we will collect all cosine similarities for both users matrix of factors (cos1) and full FM (cos2)
cos1 = {}
cos2 = {}

for d in range(1, 32):
    df_indexedU = indexerU.transform(globals()['df_table_' + str(d)]).withColumn(
            "USER_ID_Index", F.col("USER_ID_Index").cast(IntegerType())
        ).persist()
    df_table = indexerW.transform(df_indexedU).withColumn(
            "WORKSTATION_Index", F.col("WORKSTATION_Index").cast(IntegerType())
        ).select(
            "USER_ID_Index", "WORKSTATION_Index", "TF_IDF"
        ).toPandas()
    
    df.unpersist()
    df_indexedU.unpersist()

    df_m = sparse_df_to_array(df_table, (total_users, total_workstations))

    if main_svd:
        print d
        x1 = svd.transform(df_m)
        x2 = svd.inverse_transform(x1)
    else:
        df_svd = TruncatedSVD(n_components=50, n_iter=15, random_state=42)
        df_svd.fit(df_m) 
        print d, "\texplained_variance_ratio_ =", df_svd.explained_variance_ratio_.sum()
        x1 = df_svd.transform(df_m)
        x2 = df_svd.inverse_transform(x1)
    
    cos1.update({d:[]})
    cos2.update({d:[]})
    for i in range(df_m.shape[0]):
        cos1[d].append(cosine_similarity(main1[i].reshape(1, -1), x1[i].reshape(1, -1))[0][0])
        cos2[d].append(cosine_similarity(main2[i].reshape(1, -1), x2[i].reshape(1, -1))[0][0])

# Nearest Neighbors

In [None]:
%%time

from sklearn.neighbors import NearestNeighbors

renew = False
if renew:
    svd = TruncatedSVD(n_components=50, n_iter=25, random_state=42)
    svd.fit(m) 
    print "svd.explained_variance_ratio_.sum() =", svd.explained_variance_ratio_.sum()
    main1 = svd.transform(m)
    main2 = svd.inverse_transform(main1)

nbrs = NearestNeighbors(n_neighbors=25, algorithm='ball_tree', n_jobs=-1).fit(main2)
# Array `indices` contains numbers of users with similar behaviour
# Array `distances` contains euclidean distances between each pair of users from `indices`
distances, indices = nbrs.kneighbors(main2)

### Read users data

In [None]:
secure_rdd = sc.textFile(HOST + "/users_info/SecureHealEmpTest.csv").persist()
first = secure_rdd.first()
header = first.split("|")
row_data = secure_rdd.filter(lambda x: x != first).map( lambda x: x.split("|") ) \
                    .map( lambda p: Row(**{header[i]:p[i] for i in range(len(header))}) ).persist()
secure_rdd.unpersist() 
users = sqlContext.createDataFrame(row_data).select(
    "Employee", "JobName", "DeptName", "ProcDesc", "EmpStatus" 
).persist()
row_data.unpersist();
sqlContext_H.registerDataFrameAsTable(users, 'users')

users.limit(5).toPandas()

## Find out relationships between users within each group of 10 elements in `indices`

In [None]:
%%time

fields_list = ( "ACCESS_DTTM", "USER_ID", "WORKSTATION" )
full_data = sqlContext_H.read.parquet(HOST + "/parquet2/*/*").select(*fields_list).persist()
sqlContext_H.registerDataFrameAsTable(full_data, 'full_data')

query = """
    SELECT T.USER_ID, users.JobName, users.DeptName, users.ProcDesc AS HospName, users.EmpStatus,
           T.RECORDS_AMOUNT, T.WORKSTATION_AMOUNT, T.WORKSTATIONs
    FROM (SELECT USER_ID, COUNT(ACCESS_DTTM) AS RECORDS_AMOUNT, 
           COUNT(DISTINCT(WORKSTATION)) AS WORKSTATION_AMOUNT, concat_ws('; ', collect_set(WORKSTATION)) AS WORKSTATIONs
        FROM full_data
        GROUP BY USER_ID
    ) AS T
    JOIN users
    ON T.USER_ID = users.Employee
"""
users_grouped = sqlContext_H.sql(query).persist()

In [None]:
users_grouped.limit(3).toPandas()

## Divide nearest neighbours into groups taking into account the amount of users with known job name and department 

In [None]:
all_known_users = set(map(lambda x: x[0], users.select("Employee").collect()))

labels = indexerU.labels
indices_users = [set([labels[idx + 1] for idx in lst]) for lst in indices]

groups = {i: [] for i in range(0,26)}
for num, lst in enumerate(indices_users):
    x = len(all_known_users & lst)
    flag = True
    for _, el in groups[x]:
        if set(lst) == set(el):
            flag = False
            break
    if flag:
        groups[x].append((num, lst))

for k, v in groups.iteritems():
    print "Groups of {} known users: {}".format(k, len(v))

In [None]:
print "Distances:"
print distances[groups[25][0][0]]

x0 = users_grouped.filter("USER_ID IN ({})".format(", ".join(["\'{}\'".format(i) for i in groups[25][0][1]]))).selectExpr(
    "USER_ID", "JobName", "DeptName", "HospName", "EmpStatus", "RECORDS_AMOUNT as Requests", "WORKSTATION_AMOUNT as WS_amount"
).toPandas()
x0.head(25)