<a href="https://colab.research.google.com/github/ManishGovind/hadoop-hive/blob/master/pyspark_ml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# prompt: install pyspark

!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=bf37bed48b5acce56d35c4aea767e08c8af2b9460dd78aca28244979662a20be
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
# Create SparkSession
spark = SparkSession.builder.appName("GenreBasedMovieRating").getOrCreate()
ratings = spark.read.csv('/content/ratings.csv', inferSchema=True,header=True)
movies = spark.read.csv('/content/movies.csv', inferSchema=True,header=True)



# Join ratings and movies on movie_id to get genre information
data = ratings.join(movies, "movieId")
data.show()
indexer = StringIndexer(inputCol="genre", outputCol="genre_index")
data_indexed = indexer.fit(data).transform(data)
#data_indexed.show()

assembler = VectorAssembler(inputCols=["genre_index"],
outputCol="features")
data_assembled = assembler.transform(data_indexed)

data_assembled.show()
train_data, test_data = data_assembled.randomSplit([0.8, 0.2], seed=42)

# Train a decision tree regressor
dt = DecisionTreeRegressor(featuresCol="features", labelCol="rating")

# Train the model
dt_model = dt.fit(train_data)

# Make predictions on the test set
predictions = dt_model.transform(test_data)

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)





AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/ratings.csv.

SQL ANAlysis

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc , avg
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

from pyspark.ml.recommendation import ALS

In [None]:
spark = SparkSession.builder.appName('SQL Analysis').getOrCreate()

#movies_csv = '/content/movies.csv'
# ratings_csv = '/content/ratings.csv'

# # Define schema
# schema = " date STRING, delay INT, distance INT, \
# origin STRING, destination STRING"
# # Read and create a temporary view
# df = (spark.read.format("csv") \
# .option("header", "true") \
# .schema(schema) \
# .load(csv_file))
# df.createOrReplaceTempView("us_delay_flights_tbl")


# Load the data


movies = spark.read.csv('/content/movies.csv', inferSchema=True, header=True)
ratings = spark.read.csv('/content/ratings.csv', inferSchema=True, header=True)

# Create a new database named movie_ratings and load the two files created in Step 1 into
# managed tables movies and ratings.
movies.createOrReplaceTempView("movies")
ratings.createOrReplaceTempView("ratings")





#Calculate the total number of movies and ratings.
total_movies = spark.sql("SELECT COUNT(*) AS total_movies FROM movies")
total_ratings = spark.sql("SELECT COUNT(*) AS total_ratings FROM ratings")
total_movies.show()
total_ratings.show()

# Find the average rating for each movie.
average_ratings = spark.sql("SELECT movieId, AVG(rating) AS average_rating FROM ratings \
    GROUP BY movieId \
")

# Determine the top-rated movies.
top_ratings = spark.sql("SELECT movieId, AVG(rating) AS top_rating FROM ratings \
    GROUP BY movieId \
      ORDER BY top_rating DESC \
")
average_ratings.show(10)
top_ratings.show(10)

# Using window functions, compute the overall movies rating rank per genre.
window_spec = Window.partitionBy("genre").orderBy(col("average_rating").desc())
combined_table = spark.sql(" SELECT m.movieId, m.genre, AVG(r.rating) AS average_rating \
        FROM movies m \
        JOIN ratings r ON m.movieId = r.movieId \
        GROUP BY m.movieId,  m.genre \
   " )
movies_rank_per_genre = combined_table.withColumn("rank", rank().over(window_spec))
movies_rank_per_genre.show()

# Identify users who have rated movies from a wide range of genres.
wide_range_users = spark.sql("""
    SELECT userId, COUNT(DISTINCT genre) AS num_genres_rated
    FROM ratings r JOIN movies m ON r.movieId = m.movieId
    GROUP BY userId
    ORDER BY num_genres_rated DESC
""")
wide_range_users.show()


average_ratings_per_genre = spark.sql("""
    SELECT genre, AVG(r.rating) AS average_rating
    FROM ratings r
    JOIN movies m ON r.movieId = m.movieId
    GROUP BY genre
    ORDER BY average_rating DESC
""")
average_ratings_per_genre.show()

# Create an ALS model
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(ratings)

# Generate top 10 movie recommendations for a given user
user_id = 1
recommendations = model.recommendForUserSubset(ratings.filter(ratings.userId == user_id), 10) # N can be changed to get the top N movies

# Print the recommendations
recommendations.show()





In [None]:


# Calculate the average rating for each genre and identify the genres with the highest average
# ratings.
average_ratings_per_genre = spark.sql(" SELECT genre, AVG(r.rating) AS average_rating \
    FROM ratings r \
    JOIN movies m ON r.movieId = m.movieId \
    GROUP BY genre \
    ORDER BY average_rating DESC \
")
average_ratings_per_genre.show(10)

#Find movies with a high average rating but a low number of ratings.

high_average_low_count_movies = spark.sql(" SELECT m.title, m.genre, AVG(r.rating) AS average_rating, COUNT(*) AS rating_count \
    FROM movies m \
    JOIN ratings r ON m.movieId = r.movieId \
    GROUP BY m.title, m.genre \
    HAVING AVG(r.rating) >= (SELECT AVG(rating) FROM ratings) \
           AND COUNT(*) <= 4 \
")
high_average_low_count_movies.show()

 #Find genres that have relatively low average ratings compared to other genres.

low_average_ratings_genres = spark.sql(" SELECT genre, AVG(r.rating) AS average_rating \
    FROM movies m \
    JOIN ratings r ON m.movieId = r.movieId \
    GROUP BY genre  \
    HAVING AVG(r.rating) < (SELECT AVG(rating) FROM ratings) \
")
low_average_ratings_genres.show()

In [None]:
# prompt: highest rated movie for each genre

highest_rated_movies_per_genre = spark.sql("""
    SELECT m.genre, m.title, MAX(r.rating) AS highest_rating
    FROM movies m
    JOIN ratings r ON m.movieId = r.movieId
    GROUP BY m.genre, m.title
    ORDER BY m.genre, highest_rating DESC
""")

highest_rated_movies_per_genre.show()


Decision tree Regressor


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName("DecisionTrees").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

In [None]:
#load the data
filePath = "/content/sf-airbnb-clean.parquet/"
airbnbDF = spark.read.parquet(filePath)
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms", "number_of_reviews", "price").show(5)

# Split the dataset into training and test sets


trainDF, testDF = airbnbDF.randomSplit([0.8, 0.2], seed=42)
print(f"There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set")

+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows

There are 5780 rows in the training set, and 1366 in the test set


In [None]:
# Initialize the DecisionTreeRegressor
#dt = DecisionTreeRegressor(labelCol="price")
dt = RandomForestRegressor(labelCol="price" , maxDepth=10, numTrees=20)

# Index categorical columns
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

# Assemble features
numericCols = [field for (field, dataType) in trainDF.dtypes if (dataType == "double") and (field != "price")]
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

# Define pipeline stages
stages = [stringIndexer, vecAssembler, dt]
pipeline = Pipeline(stages=stages)

# Fit the pipeline to the training data
dt.setMaxBins(40)  # Set max bins for Decision Tree
pipelineModel = pipeline.fit(trainDF)

# Retrieve the trained DecisionTreeRegressor model
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString)

In [None]:
# Extract Feature Importances
dtModel = pipelineModel.stages[-1]
#print(dtModel.toDebugString)

# Create DataFrame of Feature Importances
schema = "feature STRING, importance FLOAT"
feature_importances_float = [float(val) for val in dtModel.featureImportances]
featureImp = spark.createDataFrame(list(zip(vecAssembler.getInputCols(), feature_importances_float)), schema)
featureImp.orderBy("importance", ascending=False).show()

# Make Predictions
predDF = pipelineModel.transform(testDF)
predDF.select("price", "prediction").show()

# Evaluate Model
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")

# Stop SparkSession
spark.stop()

+--------------------+-----------+
|             feature| importance|
+--------------------+-----------+
|neighbourhood_cle...| 0.13150403|
|      minimum_nights|0.099471994|
|            bedrooms| 0.09508264|
|        accommodates|  0.0902129|
|   number_of_reviews| 0.07730515|
|cancellation_poli...| 0.06669854|
|                beds| 0.06279356|
|host_total_listin...|0.058785368|
|            latitude|0.045034967|
|instant_bookableI...|0.038089752|
|           longitude|0.034680534|
|  property_typeIndex| 0.03272191|
|review_scores_rating| 0.02939379|
|           bathrooms| 0.02686555|
| review_scores_value|0.023565233|
|      room_typeIndex| 0.02064288|
|review_scores_loc...|0.010091712|
|host_is_superhost...|0.008890638|
|review_scores_val...|0.008696568|
|review_scores_rat...|0.008357036|
+--------------------+-----------+
only showing top 20 rows

+------+------------------+
| price|        prediction|
+------+------------------+
|  85.0|136.35044810693128|
|  45.0| 71.2193884265

In [None]:
!pip install spark-nlp



K fold Validation

In [None]:
from pyspark.ml.tuning import ParamGridBuilder , CrossValidator


paramGrid = (ParamGridBuilder()
.addGrid(dt.maxDepth, [2, 4, 6])
.addGrid(dt.numTrees, [10, 100])
.build())

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")
cv = CrossValidator(estimator=pipeline,
 evaluator=evaluator,
 estimatorParamMaps=paramGrid,
 numFolds=3,seed=42)

cvModel = cv.fit(testDF)
print(list(zip(cvModel.getEstimatorParamMaps(),cvModel.avgMetrics)))

In [None]:
print(list(zip(cvModel.getEstimatorParamMaps(),cvModel.avgMetrics)))