## Import required libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
import numpy
import findspark
findspark.init()

## Initialize Spark Session

In [None]:
from pyspark.sql import SparkSession

# Connect to your Spark cluster
spark = SparkSession.builder \
    .appName("Netflix_Analytics") \
    .master("spark://172.20.10.6:7077") \
    .getOrCreate()

# Verify the Spark session
print(spark.version)

## Read the CSV file

In [None]:
# HDFS path to the CSV file
hdfs_path = "hdfs://localhost:9000/data/netflix/netflix_merged_summary.csv"

# Read the CSV file from HDFS
df = spark.read.csv(hdfs_path, header=True, inferSchema=True)

# Show the first few rows
df.show()

# Print the schema of the DataFrame
df.printSchema()

In [None]:
print("\nSample Data:")
df.show(5)


In [None]:
# Show summary statistics
print("\nSummary Statistics:")
df.summary().show()


## Distribution of Average Ratings

In [None]:
pandas_df = df.select('avg_rating').toPandas()
plt.figure(figsize=(10, 6))
sns.histplot(data=pandas_df['avg_rating'], kde=True)
plt.title('Distribution of Average Ratings')
plt.xlabel('Average Rating')
plt.ylabel('Frequency')
plt.show()


## Calculate yearly statistics

In [None]:
yearly_stats = df.groupBy('Year').agg(
    avg('avg_rating').alias('mean_rating'),
    avg('total_ratings').alias('mean_ratings_count'),
    avg('unique_users').alias('mean_unique_users')
).orderBy('Year')

In [None]:
# Convert to pandas for visualization
yearly_stats_pd = yearly_stats.toPandas()
yearly_stats_pd

## Visualize Yearly Trends

In [None]:
plt.figure(figsize=(15, 10))

# Plot 1: Average Rating by Year
plt.subplot(3, 1, 1)
plt.plot(yearly_stats_pd['Year'], yearly_stats_pd['mean_rating'], marker='o')
plt.title('Average Rating by Year')
plt.ylabel('Average Rating')
plt.grid(True)



In [None]:
# Plot 2: Average Number of Unique Users by Year
plt.subplot(3, 1, 3)
plt.plot(yearly_stats_pd['Year'], yearly_stats_pd['mean_unique_users'], 
         marker='o', color='green')
plt.title('Average Number of Unique Users by Year')
plt.xlabel('Year')
plt.ylabel('Average Unique Users')
plt.grid(True)

plt.tight_layout()
plt.show()


In [None]:
# Prepare features
assembler = VectorAssembler(
    inputCols=['avg_rating', 'total_ratings', 'unique_users'], 
    outputCol='features'
)


In [None]:
# Scale the features
feature_df = assembler.transform(df)
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
scaler_model = scaler.fit(feature_df)
scaled_df = scaler_model.transform(feature_df)

# Cell 8: Apply KMeans Clustering
# Apply KMeans clustering
kmeans = KMeans(k=5, featuresCol='scaled_features', predictionCol='cluster')
model = kmeans.fit(scaled_df)
clustered_df = model.transform(scaled_df)


In [None]:
# Calculate cluster statistics
cluster_stats = clustered_df.groupBy('cluster').agg(
    count('*').alias('count'),
    round(avg('avg_rating'), 2).alias('avg_rating'),
    round(avg('total_ratings'), 2).alias('avg_total_ratings'),
    round(avg('unique_users'), 2).alias('avg_unique_users')
).orderBy('cluster')

print("Cluster Analysis:")
cluster_stats.show()

## Visualize Clusters

In [None]:
# Convert to pandas for visualization
clusters_pd = clustered_df.select('avg_rating', 'total_ratings', 'cluster').toPandas()

plt.figure(figsize=(12, 8))
scatter = plt.scatter(clusters_pd['avg_rating'], 
                     clusters_pd['total_ratings'],
                     c=clusters_pd['cluster'],
                     cmap='viridis',
                     alpha=0.6)

plt.title('Movie Clusters by Rating and Popularity')
plt.xlabel('Average Rating')
plt.ylabel('Total Ratings')
plt.colorbar(scatter, label='Cluster')
plt.show()

# Cell 11: Display Example Movies from Each Cluster
print("Example movies from each cluster:")
for i in range(5):
    print(f"\nCluster {i} examples:")
    clustered_df.filter(col('cluster') == i) \
        .select('Title', 'avg_rating', 'total_ratings', 'unique_users') \
        .orderBy(desc('total_ratings')) \
        .limit(3) \
        .show(truncate=False)

In [None]:
spark.stop()