<a href="https://colab.research.google.com/github/Howard3309/Least-Price-Detector/blob/main/bigdata_with_spark_a2_160121771045.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Clean up any old Spark folders
!rm -rf spark-*

# Install Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download Spark 3.4.1 (which works well in Colab)
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Extract Spark
!tar -xzf spark-3.4.1-bin-hadoop3.tgz

# Install findspark
!pip install -q findspark


In [None]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkML").getOrCreate()
spark


In [None]:
# Load dataset
import pandas as pd
url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
pdf = pd.read_csv(url)

# Convert to Spark DataFrame
df = spark.createDataFrame(pdf)

# Select relevant features
from pyspark.ml.feature import StringIndexer, VectorAssembler, Imputer
from pyspark.ml.classification import LogisticRegression

indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
df = indexer.fit(df).transform(df)

# Impute missing values for 'Age' and 'Fare' before assembling features
imputer = Imputer(inputCols=["Age", "Fare"], outputCols=["Age_imputed", "Fare_imputed"])
df = imputer.fit(df).transform(df)

assembler = VectorAssembler(inputCols=["Pclass", "SexIndex", "Age_imputed", "Fare_imputed"], outputCol="features")
df = assembler.transform(df).select("features", df["Survived"].cast("int").alias("label"))

# Train-test split
train, test = df.randomSplit([0.7, 0.3], seed=42)

# Train model
lr = LogisticRegression()
model = lr.fit(train)

# Evaluate
predictions = model.transform(test)
predictions.select("label", "prediction").show(5)

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       1.0|
|    0|       1.0|
|    0|       1.0|
|    1|       1.0|
|    0|       1.0|
+-----+----------+
only showing top 5 rows



In [None]:
from sklearn.datasets import load_iris
iris = load_iris()
iris_df = pd.DataFrame(iris.data, columns=iris.feature_names)

df = spark.createDataFrame(iris_df)

# Vector assembler
assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
df = assembler.transform(df).select("features")

from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df)

# Predictions
predictions = model.transform(df)
predictions.show(5)


+-----------------+----------+
|         features|prediction|
+-----------------+----------+
|[5.1,3.5,1.4,0.2]|         1|
|[4.9,3.0,1.4,0.2]|         1|
|[4.7,3.2,1.3,0.2]|         1|
|[4.6,3.1,1.5,0.2]|         1|
|[5.0,3.6,1.4,0.2]|         1|
+-----------------+----------+
only showing top 5 rows



In [None]:
# Load MovieLens data
url = "https://files.grouplens.org/datasets/movielens/ml-100k/u.data"
ratings = pd.read_csv(url, sep="\t", names=["userId", "movieId", "rating", "timestamp"])
df = spark.createDataFrame(ratings)

from pyspark.ml.recommendation import ALS

(training, test) = df.randomSplit([0.8, 0.2])
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

predictions = model.transform(test)
predictions.select("userId", "movieId", "rating", "prediction").show(5)


+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|      1|     4|  4.240008|
|   148|      7|     5| 3.9640293|
|   148|     78|     1|   1.50119|
|   148|    177|     2|  4.338417|
|   148|    234|     3| 3.1249607|
+------+-------+------+----------+
only showing top 5 rows

