<a href="https://colab.research.google.com/github/AshishTiwari1m/CalculatorApp_Reactjs/blob/main/Pyspark_task2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark




In [4]:
# Import Spark Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, from_unixtime

# Initialize Spark Session
spark = SparkSession.builder.appName("MovieLensAnalysis").getOrCreate()

In [7]:
# Load ratings data from Kaggle directory
ratings_df = spark.read.csv("/kaggle/input/movielens-20m-dataset/rating.csv", header=True, inferSchema=True)
ratings_df.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+
only showing top 5 rows



In [8]:
# Load the dataset as RDD
ratings_rdd = ratings_df.rdd.map(lambda row: (row.movieId, (row.rating, 1)))

# Aggregate ratings and counts
movie_ratings = ratings_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Compute the average rating for each movie
average_ratings = movie_ratings.mapValues(lambda v: v[0] / v[1])

# Find the movie with the lowest average rating
lowest_rated_movie = average_ratings.sortBy(lambda x: x[1]).first()
print("Lowest Rated Movie:", lowest_rated_movie)

Lowest Rated Movie: (129456, 0.5)


In [6]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("grouplens/movielens-20m-dataset")

print("Path to dataset files:", path)

Using Colab cache for faster access to the 'movielens-20m-dataset' dataset.
Path to dataset files: /kaggle/input/movielens-20m-dataset


In [10]:
top_users = ratings_df.groupBy("userId") \
    .agg(count("movieId").alias("num_ratings")) \
    .orderBy(col("num_ratings").desc())

top_users.show(5)

+------+-----------+
|userId|num_ratings|
+------+-----------+
|118205|       9254|
|  8405|       7515|
| 82418|       5646|
|121535|       5520|
|125794|       5491|
+------+-----------+
only showing top 5 rows



In [11]:
from pyspark.sql.functions import col, count, from_unixtime

# Convert timestamp to BIGINT before using from_unixtime()
ratings_df = ratings_df.withColumn("date", from_unixtime(col("timestamp").cast("bigint")).cast("date"))

# Count ratings per date
ratings_per_date = ratings_df.groupBy("date").agg(count("rating").alias("num_ratings")).orderBy("date")

# Show rating trends
ratings_per_date.show(10)

+----------+-----------+
|      date|num_ratings|
+----------+-----------+
|1995-01-09|          4|
|1996-01-29|         42|
|1996-02-01|         59|
|1996-02-02|         67|
|1996-02-05|         32|
|1996-02-06|          1|
|1996-02-07|          1|
|1996-02-08|          4|
|1996-02-09|         10|
|1996-02-10|          1|
+----------+-----------+
only showing top 10 rows



In [1]:
# Load movies dataset
movies_df = spark.read.csv("/kaggle/input/movielens-20m-dataset/movie.csv", header=True, inferSchema=True)

# Compute average ratings and count for each movie
movie_avg_ratings = ratings_df.groupBy("movieId") \
    .agg(count("rating").alias("num_ratings"), avg("rating").alias("avg_rating")) \
    .filter("num_ratings >= 100")  # Set a minimum threshold for valid ratings

# Join with movie titles
best_movies = movie_avg_ratings.join(movies_df, "movieId").orderBy(col("avg_rating").desc())

# Show top-rated movies
best_movies.show(10, truncate=False)

NameError: name 'spark' is not defined