In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lower, when, concat, lit, length, regexp_extract, avg, max, min, sum, desc

In [None]:
# Initialize Spark session (already running in Databricks)
spark = SparkSession.builder \
    .appName("SparkDataProcessing") \
    .getOrCreate()

In [None]:
# Read the CSV data into Spark from Databricks FileStore
file_path = "dbfs:/FileStore/tables/session_info_colab.csv"  # Adjust the path if necessary
df = spark.read.csv(file_path, header=True, inferSchema=True, sep=";")

In [None]:
# Initial Data Inspection
print("Initial Schema:")
df.printSchema()

In [None]:
print("Initial Data:")
df.show(5, truncate=False)

In [None]:
# Data Cleaning and Preprocessing
cleaned_df = df \
    .withColumn("user_name", lower(col("user_name"))) \
    .withColumn("email_domain", regexp_extract(col("email"), r'@(.+)', 1)) \
    .filter(col("email").isNotNull()) \
    .dropDuplicates(["user_id"]) \
    .withColumn("address_length", length(col("address")))

In [None]:
# Transformations and New Columns
transformed_df = cleaned_df \
    .withColumn("status", when(col("address_length") > 50, lit("Detailed")).otherwise(lit("Short"))) \
    .withColumn("full_user_info", concat(col("user_name"), lit(" - "), col("email_domain")))

In [None]:
# Aggregations
user_stats = cleaned_df.groupBy("user_name").agg(
    count("*").alias("session_count"),
    avg("address_length").alias("avg_address_length"),
    max("address_length").alias("max_address_length"),
    min("address_length").alias("min_address_length")
)

In [None]:
# Sorting
sorted_user_stats = user_stats.orderBy(desc("session_count"))

In [None]:
# Joins
joined_df = transformed_df.join(user_stats, on="user_name", how="inner")

In [None]:
# Caching Example
joined_df.cache()
print(f"Count after caching: {joined_df.count()}")

In [None]:
# Grouped Aggregations and Additional Insights
email_domain_stats = cleaned_df.groupBy("email_domain").agg(
    count("user_id").alias("user_count"),
    sum("address_length").alias("total_address_length")
).orderBy(desc("user_count"))

In [None]:
# Data Insights
print("Transformed Data:")
transformed_df.show(5, truncate=False)

print("User Statistics:")
sorted_user_stats.show(5, truncate=False)

print("Joined Data:")
joined_df.show(5, truncate=False)

print("Email Domain Statistics:")
email_domain_stats.show(5, truncate=False)

In [None]:
# Stop the Spark session
spark.stop()

### Why Use `spark.stop()`?

The `spark.stop()` method is used to **gracefully shut down a SparkSession or SparkContext** after completing your Spark operations. Below are the key reasons why this is important:

#### Key Reasons to Use `spark.stop()`:

1. **Release Resources**:
   - Spark consumes significant resources such as memory, disk, and CPU. 
   - Calling `spark.stop()` ensures these resources are released when they are no longer needed.

2. **Avoid Resource Contention**:
   - If `spark.stop()` is not called, the Spark application might continue holding resources.
   - This can cause issues when running multiple Spark jobs or sharing a cluster.

3. **Allow Subsequent Jobs to Run**:
   - Ensures that subsequent Spark jobs start with a clean slate.
   - Prevents conflicts or errors caused by residual sessions or contexts.

4. **Prevent Memory Leaks**:
   - Not stopping the Spark session may lead to memory leaks, especially in long-running applications or interactive sessions like Jupyter Notebooks.

5. **Best Practice**:
   - Similar to closing a file after opening it, stopping the Spark session is a good resource management practice.