In [2]:
#Build a Classification Model with Spark with a dataset of your choice
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("RecommendationEngine") \
    .getOrCreate()

# Try loading from URL first
data_url = "https://raw.githubusercontent.com/apache/spark/master/data/mllib/als/sample_movielens_ratings.txt"
try:
    ratings = spark.read.option("delimiter", "::").csv(data_url, schema="userId int, movieId int, rating float, timestamp long")
    print("Loaded data from URL successfully")
except:
    # Fallback to sample data if URL fails
    print("Using sample data as fallback")
    data = [
        Row(userId=1, movieId=101, rating=5.0),
        Row(userId=1, movieId=102, rating=3.0),
        Row(userId=1, movieId=103, rating=2.5),
        Row(userId=2, movieId=101, rating=2.0),
        Row(userId=2, movieId=102, rating=2.5),
        Row(userId=2, movieId=103, rating=5.0),
        Row(userId=3, movieId=101, rating=2.5),
        Row(userId=3, movieId=104, rating=4.0),
        Row(userId=3, movieId=105, rating=4.5)
    ]
    ratings = spark.createDataFrame(data)

# Show the data
ratings.show()

# Split data into training and test
(train, test) = ratings.randomSplit([0.8, 0.2], seed=42)

# Build ALS recommendation model
als = ALS(
    maxIter=5,
    regParam=0.01,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
    seed=42
)

# Train model
model = als.fit(train)

# Evaluate model
predictions = model.transform(test)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse:.4f}")

# Generate recommendations
userRecs = model.recommendForAllUsers(3)
movieRecs = model.recommendForAllItems(3)

print("Top 3 recommendations for users:")
userRecs.show(truncate=False)

print("Top 3 recommendations for movies:")
movieRecs.show(truncate=False)

# Stop Spark session
spark.stop()

Using sample data as fallback
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    101|   5.0|
|     1|    102|   3.0|
|     1|    103|   2.5|
|     2|    101|   2.0|
|     2|    102|   2.5|
|     2|    103|   5.0|
|     3|    101|   2.5|
|     3|    104|   4.0|
|     3|    105|   4.5|
+------+-------+------+

Root-mean-square error = 1.2715
Top 3 recommendations for users:
+------+------------------------------------------------------+
|userId|recommendations                                       |
+------+------------------------------------------------------+
|1     |[{101, 4.9958506}, {105, 4.3145623}, {104, 3.8351665}]|
|2     |[{103, 4.99809}, {101, 2.0000303}, {102, 1.1110015}]  |
|3     |[{105, 4.498306}, {104, 3.9984937}, {101, 2.5000443}] |
+------+------------------------------------------------------+

Top 3 recommendations for movies:
+-------+-------------------------------------------------+
|movieId|recommendations                         

In [4]:
#Build a Clustering Model with Spark with a dataset of your choice
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml import Pipeline
from pyspark.sql import Row

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("ClusteringExample") \
    .getOrCreate()

# Try loading from URL first
try:
    # Using a publicly available Iris dataset URL
    data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
    df = spark.read.csv(data_url, header=False, inferSchema=True)

    # Add column names manually since the file doesn't have headers
    column_names = ["sepal_length", "sepal_width", "petal_length", "petal_width", "species"]
    for i, col_name in enumerate(column_names):
        df = df.withColumnRenamed(f"_c{i}", col_name)

    print("Successfully loaded Iris dataset from URL")
except Exception as e:
    print(f"Failed to load from URL: {str(e)}")
    print("Using sample data instead")

    # Create sample data
    sample_data = [
        Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa'),
        Row(sepal_length=4.9, sepal_width=3.0, petal_length=1.4, petal_width=0.2, species='Iris-setosa'),
        Row(sepal_length=7.0, sepal_width=3.2, petal_length=4.7, petal_width=1.4, species='Iris-versicolor'),
        Row(sepal_length=6.4, sepal_width=3.2, petal_length=4.5, petal_width=1.5, species='Iris-versicolor'),
        Row(sepal_length=6.3, sepal_width=3.3, petal_length=6.0, petal_width=2.5, species='Iris-virginica'),
        Row(sepal_length=5.8, sepal_width=2.7, petal_length=5.1, petal_width=1.9, species='Iris-virginica')
    ]
    df = spark.createDataFrame(sample_data)

# Show the data
print("Data sample:")
df.show(5)

# Prepare features (excluding the species column)
feature_cols = [c for c in df.columns if c != "species"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")

# Scale features
scaler = StandardScaler(inputCol="raw_features", outputCol="features")

# Create KMeans model (using k=3 since we know Iris has 3 species)
kmeans = KMeans(featuresCol="features", k=3, seed=42)

# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, kmeans])

# Fit model
model = pipeline.fit(df)

# Make predictions
predictions = model.transform(df)

# Evaluate clustering
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"\nSilhouette score = {silhouette:.4f}")

# Show cluster centers
centers = model.stages[-1].clusterCenters()
print("\nCluster Centers:")
for i, center in enumerate(centers):
    print(f"Cluster {i}: {center}")

# Show sample predictions with actual species
print("\nSample predictions:")
predictions.select("species", "prediction").show(10)

# Stop Spark session
spark.stop()

Failed to load from URL: None
Using sample data instead
Data sample:
+------------+-----------+------------+-----------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|        species|
+------------+-----------+------------+-----------+---------------+
|         5.1|        3.5|         1.4|        0.2|    Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|    Iris-setosa|
|         7.0|        3.2|         4.7|        1.4|Iris-versicolor|
|         6.4|        3.2|         4.5|        1.5|Iris-versicolor|
|         6.3|        3.3|         6.0|        2.5| Iris-virginica|
+------------+-----------+------------+-----------+---------------+
only showing top 5 rows


Silhouette score = 0.5598

Cluster Centers:
Cluster 0: [ 8.12207138 11.80646402  2.57653026  1.94817669]
Cluster 1: [ 6.18431831 11.86732208  0.71193599  0.21646408]
Cluster 2: [7.17380924 9.85900604 2.59348112 2.05640873]

Sample predictions:
+---------------+----------+
|        species|pre

In [5]:
#Build a Recommendation Engine with Spark with a dataset of your choice
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("RecommendationEngine") \
    .getOrCreate()

# Sample data - replace with your dataset
data = [
    Row(userId=1, movieId=101, rating=5.0),
    Row(userId=1, movieId=102, rating=3.0),
    Row(userId=1, movieId=103, rating=2.5),
    Row(userId=2, movieId=101, rating=2.0),
    Row(userId=2, movieId=102, rating=2.5),
    Row(userId=2, movieId=103, rating=5.0),
    Row(userId=3, movieId=101, rating=2.5),
    Row(userId=3, movieId=104, rating=4.0),
    Row(userId=3, movieId=105, rating=4.5)
]

# Create DataFrame
ratings = spark.createDataFrame(data)

# Split data into training and test
(train, test) = ratings.randomSplit([0.8, 0.2], seed=42)

# Build ALS recommendation model
als = ALS(
    maxIter=5,
    regParam=0.01,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
    seed=42
)

# Train model
model = als.fit(train)

# Evaluate model
predictions = model.transform(test)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse:.4f}")

# Generate top 3 movie recommendations for each user
userRecs = model.recommendForAllUsers(3)
print("Top recommendations:")
userRecs.show(truncate=False)

# Generate top 3 user recommendations for each movie
movieRecs = model.recommendForAllItems(3)
movieRecs.show(truncate=False)

# Stop Spark session
spark.stop()

Root-mean-square error = 1.2715
Top recommendations:
+------+------------------------------------------------------+
|userId|recommendations                                       |
+------+------------------------------------------------------+
|1     |[{101, 4.9958506}, {105, 4.3145623}, {104, 3.8351665}]|
|2     |[{103, 4.99809}, {101, 2.0000303}, {102, 1.1110015}]  |
|3     |[{105, 4.498306}, {104, 3.9984937}, {101, 2.5000443}] |
+------+------------------------------------------------------+

+-------+-------------------------------------------------+
|movieId|recommendations                                  |
+-------+-------------------------------------------------+
|101    |[{1, 4.9958506}, {3, 2.5000443}, {2, 2.0000303}] |
|102    |[{1, 3.001308}, {3, 1.3686249}, {2, 1.1110015}]  |
|103    |[{2, 4.99809}, {1, 3.642063}, {3, 0.7022577}]    |
|104    |[{3, 3.9984937}, {1, 3.8351665}, {2, 0.58498895}]|
|105    |[{3, 4.498306}, {1, 4.3145623}, {2, 0.6581126}]  |
+-------+---------