In [None]:
import os
import subprocess

def module(*args):        
    if isinstance(args[0], list):        
        args = args[0]        
    else:        
        args = list(args)        
    (output, error) = subprocess.Popen(['/usr/bin/modulecmd', 'python'] + args, stdout=subprocess.PIPE).communicate()
    exec(output)
    
module('load', 'apps/java/jdk1.8.0_102/binary')    
os.environ['PYSPARK_PYTHON'] = os.environ['HOME'] + '/.conda/envs/jupyter-spark/bin/python'

#### Assignment Question 1A

In [None]:
import pyspark.sql.functions as funcs
from pyspark.sql import SparkSession

# Create a Spark Session for Question 1
spark = SparkSession.builder \
    .master('local[2]') \
    .appName('Question 1') \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('WARN')

# Read in the NASA HTTP request file, SPARK can unzip .gz files automatically
logFile = spark.read.text('Data/NASA_access_log_Jul95.gz').cache()

# Extract the relevant data from the logFile using Regular Expressions
# In this assignment we do not need the full request path hence why it is stemmed
dataFrame = logFile.select(funcs.regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('date'), 
                           funcs.regexp_extract('value', r'^.+\/([^\/]+(?=H))', 1).alias('request'))

# Convert the timestamps from dd/MMM/yyyy:HH:mm:ss Z into standard GMT form
dataFrameWithTimeStamps = dataFrame.withColumn('date', funcs.date_format(funcs.to_timestamp('date', 'dd/MMM/yyyy:HH:mm:ss Z'), 'yyyy-MM-dd HH:mm:ss Z'))

# Now we have a GMT form timestamp we can find out the day of the week for the timestamp
# using the dayOfweek function, this returns the day as an integer ie sunday = 1, saturday = 7
days = dataFrameWithTimeStamps.select('date', funcs.dayofweek('date'))
# Group the Data Frame by the days of week integer and return the count for each day
daysCount = days.groupBy('dayOfweek(date)').count().na.drop().sort('dayOfweek(date)')

# Init an array of day names to ease readability
names = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
# Init an array of the number of each day in July 1995, ie Sunday, Monday and Saturday all
# occur 5 times in that month
frequency = [5, 5, 4, 4, 4, 4, 5]

# Init a blank array to store the average counts and ease plotting
averageCounts = []

# Iterate through each day and print the average requests per day
for row in daysCount.collect():
    averageCounts.append(int(row['count']) / frequency[int(row['dayOfweek(date)']) - 1 ])
    print(names[int(row['dayOfweek(date)']) - 1 ] + ': ' + str(round(int(row['count']) / frequency[int(row['dayOfweek(date)']) - 1 ])))


#### Assignment Question 1B

In [None]:
import matplotlib.pyplot as plt
import numpy as np

plt.bar(np.arange(len(names)), averageCounts, align = 'center')
plt.xticks(np.arange(len(names)), names, rotation = 45)
plt.ylabel('Average Number Of Requests')
plt.xlabel('Day of Week')
plt.title('Average Number of HTTP Requests Per Weekday')
plt.show()

The trend for the number of requests per day is that there is on average half the number of requests on days in the weekend that on days during the working week, with the number of requests increasing up until a peak on Thursday. This is expected as many of the requests could be from people in work or at school, so during the weekend they would not be making requests and might leave early on Fridays. It's worth noting that there are five Mondays, Saturdays and Sundays in July 1995, this does not affect the average for these as much as expected and there is still a clear drop in requests for days in the Weekend.   

#### Assignment Question 1C

In [None]:
# Filter the main Data Frame to return rows where the 'request' only contains gifs as required 
# Group all requests of the same gif together and count how many
# Sort the Data Frame of gifs to descending order of count, to give the 20 most requested
gifs = dataFrame.select(funcs.col('request')).where(funcs.col('request').contains('.gif')).groupBy('request').count()
sortedGifs = gifs.sort('count', ascending = False).show(truncate = False)

#### Assignment Question 1D

In [None]:
# Init blank arrays for counts and the gifs requested to ease plotting
reqCounts = []
gifRequests = []

# Iterate through the first 20 rows of the requested gifs Data Frame
# Add the seperated row data to corresponding array
for row in gifs.sort('count', ascending = False).collect()[:20]:
    reqCounts.append(int(row['count']))
    gifRequests.append(row['request'])

plt.bar(np.arange(len(gifRequests)), reqCounts, align = 'center')
plt.xticks(np.arange(len(gifRequests)), gifRequests, rotation = 90)
plt.ylabel('Number of Requests')
plt.xlabel('Name of Requested .gif File')
plt.title('Top 20 Most Requested .gif files for July 1995')
plt.show()

spark.stop()

The most common .gif requests are small logos these are most likely to be from webpages; hence the reason why NASA-logosmall.gif is the most requested gif, as this appears on the most NASA webpages.

#### Assignment Question 2A

In [None]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, FloatType

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row

# Create a Spark Session for Question 2

spark = SparkSession.builder \
    .master('local[2]') \
    .appName('Question 2') \
    .config("spark.local.dir","/fastdata/acp18dck") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('WARN')

# Define the Schema for the Data Frame 
schema = StructType([
    StructField('userId', IntegerType()),
    StructField('movieId', IntegerType()),
    StructField('rating', FloatType()),
    StructField('timestamp', IntegerType())
])

# Read the ratings csv file and format into pre-defined schema
ratingsDataFrame = spark.read.format('csv').schema(schema).option('header', 'true').load('Data/ml-20m/ratings.csv')

# Specify number of folds required
numberOfFolds = 5

# Split the data into 5 equal parts, using a fixed random seed for repeatability.
splitData = ratingsDataFrame.randomSplit([0.2, 0.2, 0.2, 0.2, 0.2], (numberOfFolds*numberOfFolds))

# Init blank arrays to store evaluations of the ALS models
als1RMSE = []
als1MAE = []
als2RMSE = []
als2MAE = []

alsA = ALS(maxIter = 2, regParam = 0.1, userCol = 'userId', itemCol = 'movieId', ratingCol = 'rating',
             coldStartStrategy = 'drop')
alsB = ALS(maxIter = 10, regParam = 0.1, userCol = 'userId', itemCol = 'movieId', ratingCol = 'rating',
             coldStartStrategy = 'drop')

rmseAEvaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating', predictionCol = 'prediction')
maeAEvaluator = RegressionEvaluator(metricName = 'mae', labelCol = 'rating', predictionCol = 'prediction')
rmseBEvaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating', predictionCol = 'prediction')
maeBEvaluator = RegressionEvaluator(metricName = 'mae', labelCol = 'rating', predictionCol = 'prediction')

# Iterate through each fold
for fold in range(numberOfFolds):
    print("Begin Fold: " + str(fold + 1))
    
    # Select the test data from the 5 pre-split data sets
    test = splitData[fold]
    # Create a blank Data Frame with same schema as test and the original data frame
    # will be used to store rest of pre-split data sets
    train = spark.createDataFrame(sc.emptyRDD(), ratingsDataFrame.schema)
    
    # Iterate through each dataset in pre-split datasets
    for split in splitData:
        # If the dataset isnt the set set then it is part of the training set
        if split != test:
            train = train.union(split)
             
    print("Fit ALS 1")
    alsAModel = alsA.fit(train)
    print("Fit ALS 2")
    alsBModel = alsB.fit(train)
    
    print("Begin ALS 1 Evaluation")
    alsAPredictions = alsAModel.transform(test)
    alsARMSEEval = rmseAEvaluator.evaluate(alsAPredictions)
    alsAMAEEval = maeAEvaluator.evaluate(alsAPredictions)
    
    print("Begin ALS 2 Evaluation")
    alsBPredictions = alsBModel.transform(test)
    alsBRMSEEval = rmseBEvaluator.evaluate(alsBPredictions)
    alsBMAEEval = maeBEvaluator.evaluate(alsBPredictions)
    
    print(alsARMSEEval)
    print(alsAMAEEval)
    print(alsBRMSEEval)
    print(alsBMAEEval)
    
    als1RMSE.append(alsARMSEEval)
    als1MAE.append(alsAMAEEval)
    als2RMSE.append(alsBRMSEEval)
    als2MAE.append(alsBMAEEval)

spark.stop()

#### Assignment Question 2C

In [None]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import KMeansModel
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.types import DoubleType

spark = SparkSession.builder \
    .master('local[2]') \
    .appName('Question 2C') \
    .config("spark.local.dir","/fastdata/acp18dck") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('WARN')

dfItemFactors = alsAModel.itemFactors
df2 = dfItemFactors.select('features')

schema = StructType([StructField(str(i), DoubleType(), True) for i in range(10)])
correctSchema = spark.createDataFrame(sc.emptyRDD(), schema)

# Convert array of features into individual columns of type double as analogous with lab 4 dataframe
exploded = df2.select([funcs.col('features').getItem(i) for i in range(10)])
correctSchema = exploded

# Hits 'NoneType' object has no attribute 'sc' here
#exploded.rdd.map(lambda r: [Vectors.dense(r[:-1])]).toDF(exploded, schema)

#def transData(data):
 #   return data.rdd.map(lambda r: [Vectors.dense(r[:-1])]).toDF(['features'])

#features = transData(df2)

kValue = 20
silhouettes = np.zeros(kValue)

clusterList = []
tagList = []
tagSummary = []

for k in range(2, kValue):
    kmeans = KMeans().setK(k).setSeed(11)
    model = kmeans.fit(features)
    
    summary = model.summary
    clusterSize = summary.clusterSizes
    topClusterSize = heapq.nlargest(3, clusterSize)
    
    predictions = model.transform(features)
    
    movieIDCluster = model.itemFactors.withColumn("index", monotonically_increasing_id()).join(predictions.withColumn("index", monotonically_increasing_id()), on=['index'], how = 'left_outer').orderBy("index")
    movieIDCluster = movieIDCluster.selectExpr("index as index", "id as movieId", "prediction as cluster")
    
    movieIDClusterCount = movieIDCluster.groupby("cluster").count().orderBy(col("count").desc())  
    top3Clusters = movieIDClusterCount.limit(3)
    top3ClustersList = top3Clusters.select("cluster").rdd.flatMap(lambda x: x).collect()
    clusterList.append(top3ClustersList)
    print("Split:", i+1, " top 3 clusters:", top3ClustersList, " each cluster size:", topClusterSize)
    
    movieIDTagID = newMovieTagID.join(newTagIDTag, on =['tagID'],how ='left_outer')
    movieIDClusterTag = movieIDCluster.join(movieIDTagID, on = ['movieId'], how = 'left_outer')
    
    for j in range(3):
        rankTags = movieIDClusterTag.filter(col("cluster") == top3ClustersList[j]).groupBy("tag").sum('relevance').na.drop().orderBy(col("sum(relevance)").desc())
        top5Tags = rankTags.limit(5).select("tag").rdd.flatMap(lambda x: x).collect()  
        tagList.append(top5_tag)    
        print("For cluster:", top3ClustersList[j], " top 5 tags:", top5Tags)
