# User Click Data Analysis with Apache Spark
## Interactive Cloud-Ready Analysis Notebook

This notebook provides an interactive environment for analyzing user click data using Apache Spark on Google Cloud Dataproc. It demonstrates distributed computing principles and provides visualization capabilities.

**Dataset**: User click events with timestamp and user ID  
**Platform**: Google Cloud Dataproc with Jupyter integration  
**Objective**: Analyze click patterns across 6-hour time intervals


## 1. Setup and Imports


In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

# Set up plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("✓ Libraries imported successfully")


: 

## 2. Initialize Spark Session


In [None]:
# Initialize Spark session with optimized configuration
spark = SparkSession.builder \
    .appName("InteractiveClickAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.ui.port", "4041") \
    .getOrCreate()

# Set log level to reduce verbose output
spark.sparkContext.setLogLevel("WARN")

print(f"✓ Spark session initialized")
print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")


## 3. Load and Process Data


In [None]:
# Define processing functions
def get_time_interval(time_str):
    """Categorize time into 6-hour intervals."""
    try:
        hour = int(time_str.split(':')[0])
        if 0 <= hour < 6:
            return "0-6"
        elif 6 <= hour < 12:
            return "6-12"
        elif 12 <= hour < 18:
            return "12-18"
        elif 18 <= hour < 24:
            return "18-24"
        else:
            return "invalid"
    except:
        return "invalid"

def parse_line(line):
    """Parse a data line into components."""
    try:
        parts = line.strip().split()
        if len(parts) == 3:
            date_part, time_part, user_id = parts
            interval = get_time_interval(time_part)
            if interval != "invalid":
                return (date_part, time_part, user_id, interval)
    except:
        pass
    return None

# Load and process data
data_file = "data.txt"
raw_rdd = spark.sparkContext.textFile(data_file)

print(f"✓ Data loaded from {data_file}")
print(f"Total lines: {raw_rdd.count()}")

# Process data using RDD transformations
parsed_rdd = raw_rdd.map(parse_line).filter(lambda x: x is not None)
interval_rdd = parsed_rdd.map(lambda x: (x[3], 1))
click_counts = interval_rdd.reduceByKey(lambda a, b: a + b).collect()

results_dict = dict(click_counts)
print(f"✓ Processed {parsed_rdd.count()} valid records")

# Display results
print("\nClick counts by time interval:")
for interval in ["0-6", "6-12", "12-18", "18-24"]:
    count = results_dict.get(interval, 0)
    print(f"{interval}: {count} clicks")


## 4. Data Visualization


In [None]:
# Create visualizations
import matplotlib.pyplot as plt

# Prepare data for plotting
intervals = ["0-6", "6-12", "12-18", "18-24"]
counts = [results_dict.get(interval, 0) for interval in intervals]
total_clicks = sum(counts)

# Create subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
fig.suptitle('User Click Data Analysis Results', fontsize=16, fontweight='bold')

# Bar chart
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4']
bars = ax1.bar(intervals, counts, color=colors)
ax1.set_title('Click Count by Time Interval')
ax1.set_xlabel('Time Interval')
ax1.set_ylabel('Number of Clicks')
ax1.grid(True, alpha=0.3)

# Add value labels on bars
for i, v in enumerate(counts):
    ax1.text(i, v + 0.1, str(v), ha='center', va='bottom', fontweight='bold')

# Pie chart
wedges, texts, autotexts = ax2.pie(counts, labels=intervals, autopct='%1.1f%%', 
                                   colors=colors, startangle=90)
ax2.set_title('Click Distribution by Time Interval')

plt.tight_layout()
plt.show()

# Print summary statistics
print("📊 ANALYSIS SUMMARY")
print("="*50)
print(f"Total Click Events: {total_clicks}")
peak_interval = intervals[counts.index(max(counts))]
peak_count = max(counts)
peak_percentage = (peak_count / total_clicks) * 100

print(f"Peak Activity Period: {peak_interval} hours")
print(f"Peak Clicks: {peak_count} ({peak_percentage:.1f}% of total)")
print("\nActivity Distribution:")
for interval, count in zip(intervals, counts):
    percentage = (count / total_clicks) * 100
    print(f"  {interval}: {count} clicks ({percentage:.1f}%)")


## 5. DataFrame Analysis with SQL


In [None]:
# Convert RDD to DataFrame for SQL analysis
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("date", StringType(), True),
    StructField("time", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("time_interval", StringType(), True)
])

df = spark.createDataFrame(parsed_rdd, schema)
df.createOrReplaceTempView("click_data")

print("✓ DataFrame created and registered as 'click_data'")
print(f"DataFrame schema:")
df.printSchema()

print("\nDataFrame preview:")
df.show(10)

# Perform SQL analysis
sql_results = spark.sql("""
    SELECT 
        time_interval,
        COUNT(*) as click_count,
        COUNT(DISTINCT user_id) as unique_users,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) as percentage
    FROM click_data 
    GROUP BY time_interval 
    ORDER BY time_interval
""")

print("SQL Analysis Results:")
sql_results.show()

# Advanced insights with SQL
peak_analysis = spark.sql("""
    SELECT 
        time_interval,
        COUNT(*) as clicks,
        COUNT(DISTINCT user_id) as users,
        ROUND(COUNT(*) * 1.0 / COUNT(DISTINCT user_id), 2) as clicks_per_user
    FROM click_data 
    GROUP BY time_interval 
    ORDER BY clicks DESC
""")

print("\nDetailed Analysis (sorted by activity):")
peak_analysis.show()


## 6. Business Insights & Recommendations


In [None]:
# Generate comprehensive business insights
print("🎯 BUSINESS INSIGHTS & RECOMMENDATIONS")
print("="*60)

# Key metrics
total_events = df.count()
unique_users = df.select("user_id").distinct().count()
avg_clicks_per_user = total_events / unique_users

print(f"📈 Key Metrics:")
print(f"   • Total Click Events: {total_events}")
print(f"   • Unique Users: {unique_users}")
print(f"   • Average Clicks per User: {avg_clicks_per_user:.2f}")
print()

# Peak activity insights
peak_data = peak_analysis.collect()
peak_hour = peak_data[0]['time_interval']
peak_clicks = peak_data[0]['clicks']

print(f"🏆 Peak Activity Analysis:")
print(f"   • Peak Period: {peak_hour} hours")
print(f"   • Peak Clicks: {peak_clicks} events")
print(f"   • Peak Represents: {(peak_clicks/total_events)*100:.1f}% of total activity")
print()

print("💡 Strategic Recommendations:")
print(f"   1. Schedule critical campaigns during {peak_hour} hours")
print(f"   2. Optimize server resources for {peak_hour} peak demand")
print(f"   3. Plan maintenance during low-activity periods (0-6 hours)")
print(f"   4. Consider user engagement strategies for off-peak hours")
print()

print("🔧 Technical Insights:")
print(f"   • Data processing success rate: {(total_events/30)*100:.1f}%")
print(f"   • Distributed processing across {spark.sparkContext.defaultParallelism} partitions")
print(f"   • Real-time monitoring available at: {spark.sparkContext.uiWebUrl}")
print("="*60)


## 7. Cleanup & Next Steps


In [None]:
# Optional: Stop Spark session when analysis is complete
# Uncomment the following line to stop the session
# spark.stop()
# print("✓ Spark session stopped")

print("🎉 Interactive Analysis Complete!")
print("\n📝 What you've accomplished:")
print("  ✓ Loaded and processed click data with Apache Spark")
print("  ✓ Performed distributed computing analysis")
print("  ✓ Created visualizations of user behavior patterns")
print("  ✓ Generated SQL-based insights")
print("  ✓ Developed actionable business recommendations")

print("\n🚀 Next Steps:")
print("  • Experiment with the DataFrames (df, sql_results, peak_analysis)")
print("  • Modify the SQL queries for custom analysis")
print("  • Scale to larger datasets using cloud deployment")
print("  • Integrate with real-time streaming data")
print(f"  • Monitor performance via Spark UI: {spark.sparkContext.uiWebUrl}")

print("\n💾 Available Objects:")
print("  • spark: SparkSession for creating new DataFrames")
print("  • df: Main click data DataFrame")
print("  • sql_results: Aggregated statistics")
print("  • peak_analysis: Detailed activity analysis")
print("  • results_dict: RDD analysis results")

print("\n🔗 For cloud deployment, consider:")
print("  • Google Cloud Dataproc")
print("  • AWS EMR")
print("  • Azure HDInsight")
print("  • Databricks")
