In [0]:
from datetime import datetime
import pandas as pd
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("WebActivityAnalytics").getOrCreate()

# Generate sample data
data = {
    'user_id': [1, 2, 1, 3, 2],
    'timestamp': [datetime(2022, 1, 1), datetime(2022, 1, 1), datetime(2022, 1, 2), datetime(2022, 1, 2), datetime(2022, 1, 3)],
    'page_visited': ['home', 'product', 'home', 'product', 'contact']
}

df = spark.createDataFrame(pd.DataFrame(data))

# Save data to Parquet format for persistence
df.write.mode('overwrite').parquet("/mnt/databricks/sample_data.parquet")


In [0]:
# Show the first few rows of the DataFrame
df.show()

# Display summary statistics
df.describe().show()

# Check for missing values
df.na.drop().show()

# Check for duplicate records
df.dropDuplicates().show()


+-------+-------------------+------------+
|user_id|          timestamp|page_visited|
+-------+-------------------+------------+
|      1|2022-01-01 00:00:00|        home|
|      2|2022-01-01 00:00:00|     product|
|      1|2022-01-02 00:00:00|        home|
|      3|2022-01-02 00:00:00|     product|
|      2|2022-01-03 00:00:00|     contact|
+-------+-------------------+------------+

+-------+------------------+------------+
|summary|           user_id|page_visited|
+-------+------------------+------------+
|  count|                 5|           5|
|   mean|               1.8|        NULL|
| stddev|0.8366600265340756|        NULL|
|    min|                 1|     contact|
|    max|                 3|     product|
+-------+------------------+------------+

+-------+-------------------+------------+
|user_id|          timestamp|page_visited|
+-------+-------------------+------------+
|      1|2022-01-01 00:00:00|        home|
|      2|2022-01-01 00:00:00|     product|
|      1|2022-01-0

In [0]:
# Optimize schema by choosing appropriate data types
df = df.withColumn("user_id", df["user_id"].cast("integer"))

# Optimize storage and retrieval by partitioning
df.write.mode('overwrite').partitionBy("timestamp").parquet("/mnt/databricks/partitioned_data.parquet")

# Cache the DataFrame
df.cache()

# Perform operations on the cached DataFrame
df_filtered = df.filter(df['page_visited'] == 'home')

# Create a small DataFrame to be broadcasted
small_df = df.select('user_id').distinct()

# Perform a broadcast join
df_joined = df.join(broadcast(small_df), 'user_id', 'inner')


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-2165825361569193>, line 17[0m
[1;32m     14[0m small_df [38;5;241m=[39m df[38;5;241m.[39mselect([38;5;124m'[39m[38;5;124muser_id[39m[38;5;124m'[39m)[38;5;241m.[39mdistinct()
[1;32m     16[0m [38;5;66;03m# Perform a broadcast join[39;00m
[0;32m---> 17[0m df_joined [38;5;241m=[39m df[38;5;241m.[39mjoin(broadcast(small_df), [38;5;124m'[39m[38;5;124muser_id[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124minner[39m[38;5;124m'[39m)

[0;31mNameError[0m: name 'broadcast' is not defined

In [0]:
# Aggregate user activity metrics
activity_metrics = df.groupBy('user_id').agg(count('page_visited').alias('page_views'))

# Perform time-based analysis
from pyspark.sql.functions import hour
time_analysis = df.groupBy(hour('timestamp').alias('hour')).agg(count('*').alias('activity_count'))


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-2165825361569194>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Aggregate user activity metrics[39;00m
[0;32m----> 2[0m activity_metrics [38;5;241m=[39m df[38;5;241m.[39mgroupBy([38;5;124m'[39m[38;5;124muser_id[39m[38;5;124m'[39m)[38;5;241m.[39magg(count([38;5;124m'[39m[38;5;124mpage_visited[39m[38;5;124m'[39m)[38;5;241m.[39malias([38;5;124m'[39m[38;5;124mpage_views[39m[38;5;124m'[39m))
[1;32m      4[0m [38;5;66;03m# Perform time-based analysis[39;00m
[1;32m      5[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfunctions[39;00m [38;5;28;01mimport[39;00m hour

[0;31mNameError[0m: name 'count' is not defined

In [0]:
import matplotlib.pyplot as plt

# Create a bar chart using Matplotlib
activity_metrics_pd = activity_metrics.toPandas()
plt.bar(activity_metrics_pd['user_id'], activity_metrics_pd['page_views'])
plt.show()


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-2165825361569195>, line 4[0m
[1;32m      1[0m [38;5;28;01mimport[39;00m [38;5;21;01mmatplotlib[39;00m[38;5;21;01m.[39;00m[38;5;21;01mpyplot[39;00m [38;5;28;01mas[39;00m [38;5;21;01mplt[39;00m
[1;32m      3[0m [38;5;66;03m# Create a bar chart using Matplotlib[39;00m
[0;32m----> 4[0m activity_metrics_pd [38;5;241m=[39m activity_metrics[38;5;241m.[39mtoPandas()
[1;32m      5[0m plt[38;5;241m.[39mbar(activity_metrics_pd[[38;5;124m'[39m[38;5;124muser_id[39m[38;5;124m'[39m], activity_metrics_pd[[38;5;124m'[39m[38;5;124mpage_views[39m[38;5;124m'[39m])
[1;32m      6[0m plt[38;5;241m.[39mshow()

[0;31mNameError[0m: name 'activity_metrics' is not defined