In [4]:
# Question 12
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("Distance Measures") \
    .getOrCreate()

sc = spark.sparkContext

In [6]:
def jaccard_distance(rdd1, rdd2):
    union = rdd1.union(rdd2).distinct().count()
    intersection = rdd1.intersection(rdd2).count()
    return 1 - intersection / union

# Example usage with RDDs
rdd1 = sc.parallelize([(0, 1), (1, 1), (2, 0)])  # Represents set {0, 1}
rdd2 = sc.parallelize([(0, 1), (1, 0), (2, 1)])  # Represents set {0, 2}

print("Jaccard Distance:", jaccard_distance(rdd1, rdd2))


Jaccard Distance: 0.8


In [7]:
from math import sqrt

def cosine_distance(rdd1, rdd2):
    dot_product = rdd1.join(rdd2).map(lambda x: x[1][0] * x[1][1]).sum()
    norm1 = sqrt(rdd1.map(lambda x: x[1]**2).sum())
    norm2 = sqrt(rdd2.map(lambda x: x[1]**2).sum())
    return 1 - dot_product / (norm1 * norm2)

# Example usage
print("Cosine Distance:", cosine_distance(rdd1, rdd2))


Cosine Distance: 0.5000000000000001


In [8]:
def hamming_distance(rdd1, rdd2):
    # Assume rdd1 and rdd2 are of the same length
    return rdd1.zip(rdd2).filter(lambda x: x[0] != x[1]).count()

# Example usage with RDDs
rdd1 = sc.parallelize([0, 1, 0])  # Represents the vector [0, 1, 0]
rdd2 = sc.parallelize([0, 0, 1])  # Represents the vector [0, 0, 1]

print("Hamming Distance:", hamming_distance(rdd1, rdd2))


Hamming Distance: 2


In [12]:
# Question 13
def read_file(file_name):
    with open(file_name, 'r') as file:
        return file.read()

# Assuming the files are named accordingly and located in the current working directory
movies_txt = read_file('movies.txt')
preferences_txt = read_file('preferences.txt')
watchedmovies_txt = read_file('watchedmovies.txt')

# The rest of the code would remain the same, parsing these variables and processing the data
# Convert the string data into usable Python structures
def parse_data(movies, preferences, watched_movies):
    movie_genres = {line.split(',')[0]: line.split(',')[2] for line in movies.strip().split('\n')}
    user_preferences = {}
    for line in preferences.strip().split('\n'):
        user, genre = line.split(',')
        if user not in user_preferences:
            user_preferences[user] = set()
        user_preferences[user].add(genre)
    
    user_watched = {}
    for line in watched_movies.strip().split('\n'):
        user, movie, _, _ = line.split(',')
        if user not in user_watched:
            user_watched[user] = []
        user_watched[user].append(movie_genres[movie])
    
    return movie_genres, user_preferences, user_watched

movie_genres, user_preferences, user_watched = parse_data(movies_txt, preferences_txt, watchedmovies_txt)

# Determine misleading profiles
def find_misleading_profiles(user_preferences, user_watched, threshold):
    misleading_profiles = []
    for user, watched_genres in user_watched.items():
        if user in user_preferences:  # Check if the user has preferences
            total_watched = len(watched_genres)
            disliked_count = sum(1 for genre in watched_genres if genre not in user_preferences[user])
            if (disliked_count / total_watched) > threshold:
                misleading_profiles.append(user)
    
    return misleading_profiles

# Example threshold of 0.5
threshold = 0.5
misleading_profiles = find_misleading_profiles(user_preferences, user_watched, threshold)

print(f"threshold = {threshold}: {misleading_profiles}")

# Example threshold of 0.9 
threshold = 0.9
misleading_profiles = find_misleading_profiles(user_preferences, user_watched, threshold)

print(f"threshold = {threshold}: {misleading_profiles}")

threshold = 0.5: ['user2']
threshold = 0.9: []
