## **Import Required Libraries**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, weekofyear, month, dayofweek
import matplotlib.pyplot as plt
import pandas as pd


## **Loading the dataset(CSV)**

In [0]:
# File location and type
file_location = "/FileStore/tables/daily_website_visitors-3.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7
Row,Day,Day.Of.Week,Date,Page.Loads,Unique.Visits,First.Time.Visits,Returning.Visits
1,Sunday,1,9/14/2014,2146,1582,1430,152
2,Monday,2,9/15/2014,3621,2528,2297,231
3,Tuesday,3,9/16/2014,3698,2630,2352,278
4,Wednesday,4,9/17/2014,3667,2614,2327,287
5,Thursday,5,9/18/2014,3316,2366,2130,236
6,Friday,6,9/19/2014,2815,1863,1622,241
7,Saturday,7,9/20/2014,1658,1118,985,133
8,Sunday,1,9/21/2014,2288,1656,1481,175
9,Monday,2,9/22/2014,3638,2586,2312,274


## **Creating Database**

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS ts_project_write")
spark.sql("USE ts_project_write")


DataFrame[]

## **Load Data Again with File Location & Display Preview**

In [0]:
file_location = "/FileStore/tables/daily_website_visitors-3.csv"
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_location)
display(df)

Row,Day,Day.Of.Week,Date,Page.Loads,Unique.Visits,First.Time.Visits,Returning.Visits
1,Sunday,1,2014-09-14,2146,1582,1430,152
2,Monday,2,2014-09-15,3621,2528,2297,231
3,Tuesday,3,2014-09-16,3698,2630,2352,278
4,Wednesday,4,2014-09-17,3667,2614,2327,287
5,Thursday,5,2014-09-18,3316,2366,2130,236
6,Friday,6,2014-09-19,2815,1863,1622,241
7,Saturday,7,2014-09-20,1658,1118,985,133
8,Sunday,1,2014-09-21,2288,1656,1481,175
9,Monday,2,2014-09-22,3638,2586,2312,274
10,Tuesday,3,2014-09-23,4462,3257,2989,268


## **Save DataFrame as Table (daily_visitors)**

In [0]:
df.write.mode("overwrite").saveAsTable("daily_visitors")


## **Explore Tables in Database**

In [0]:
spark.sql("SHOW TABLES").show()
spark.sql("SELECT * FROM daily_visitors LIMIT 5").show()


+----------------+--------------+-----------+
|        database|     tableName|isTemporary|
+----------------+--------------+-----------+
|ts_project_write|daily_visitors|      false|
+----------------+--------------+-----------+

+---+---------+-----------+----------+----------+-------------+-----------------+----------------+
|Row|      Day|Day.Of.Week|      Date|Page.Loads|Unique.Visits|First.Time.Visits|Returning.Visits|
+---+---------+-----------+----------+----------+-------------+-----------------+----------------+
|  1|   Sunday|          1|2014-09-14|     2,146|        1,582|            1,430|             152|
|  2|   Monday|          2|2014-09-15|     3,621|        2,528|            2,297|             231|
|  3|  Tuesday|          3|2014-09-16|     3,698|        2,630|            2,352|             278|
|  4|Wednesday|          4|2014-09-17|     3,667|        2,614|            2,327|             287|
|  5| Thursday|          5|2014-09-18|     3,316|        2,366|            2

## **Data Cleaning & Column Renaming**

In [0]:
from pyspark.sql.functions import col

df = df.toDF(*[c.strip().replace(" ", "_").replace(".", "_").lower() for c in df.columns])
display(df)



row,day,day_of_week,date,page_loads,unique_visits,first_time_visits,returning_visits
1,Sunday,1,2014-09-14,2146,1582,1430,152
2,Monday,2,2014-09-15,3621,2528,2297,231
3,Tuesday,3,2014-09-16,3698,2630,2352,278
4,Wednesday,4,2014-09-17,3667,2614,2327,287
5,Thursday,5,2014-09-18,3316,2366,2130,236
6,Friday,6,2014-09-19,2815,1863,1622,241
7,Saturday,7,2014-09-20,1658,1118,985,133
8,Sunday,1,2014-09-21,2288,1656,1481,175
9,Monday,2,2014-09-22,3638,2586,2312,274
10,Tuesday,3,2014-09-23,4462,3257,2989,268


## **Convert String Date Column to Standard Date Format**

In [0]:
df = df.withColumn("date", to_date(col("date"), "M/d/yyyy"))
display(df)


row,day,day_of_week,date,page_loads,unique_visits,first_time_visits,returning_visits,week
1,Sunday,1,2014-09-14,2146,,1430,152,37
2,Monday,2,2014-09-15,3621,,2297,231,38
3,Tuesday,3,2014-09-16,3698,,2352,278,38
4,Wednesday,4,2014-09-17,3667,,2327,287,38
5,Thursday,5,2014-09-18,3316,,2130,236,38
6,Friday,6,2014-09-19,2815,,1622,241,38
7,Saturday,7,2014-09-20,1658,,985,133,38
8,Sunday,1,2014-09-21,2288,,1481,175,38
9,Monday,2,2014-09-22,3638,,2312,274,39
10,Tuesday,3,2014-09-23,4462,,2989,268,39


## **Trend Analysis**

###  Daily Trend Analysis

In [0]:
from pyspark.sql.functions import col, month, to_date, dayofweek, date_format, avg, sum as F_sum
df = df.withColumn("unique_visits", col("unique_visits").cast("int"))
daily_trend = df.groupBy("date").agg(F_sum("unique_visits").alias("total_visits")).orderBy("date")
display(daily_trend)

date,total_visits
2014-09-14,
2014-09-15,
2014-09-16,
2014-09-17,
2014-09-18,
2014-09-19,
2014-09-20,
2014-09-21,
2014-09-22,
2014-09-23,


Databricks visualization. Run in Databricks to view.

### Weekly Trend Analysis

In [0]:
weekly_trend = df.groupBy(weekofyear("date").alias("week")).agg(F_sum("unique_visits").alias("total_visits")).orderBy("week")
display(weekly_trend)

week,total_visits
1,1831.0
2,
3,
4,
5,
6,
7,
8,
9,
10,


Databricks visualization. Run in Databricks to view.

### Monthly Trend Analysis

In [0]:
monthly_trend = df.groupBy(month("date").alias("month")).agg(F_sum("unique_visits").alias("total_visits")).orderBy("month")
display(monthly_trend)

month,total_visits
1,1836.0
2,
3,
4,
5,
6,925.0
7,4504.0
8,3674.0
9,908.0
10,


Databricks visualization. Run in Databricks to view.

## **Seasonality Analysis**

In [0]:
seasonality = df.groupBy(dayofweek("date").alias("day_of_week")).agg(avg("unique_visits").alias("avg_visits")).orderBy("day_of_week")
display(seasonality)  

day_of_week,avg_visits
1,924.0
2,
3,
4,951.0
5,771.5
6,842.0
7,906.6875


Databricks visualization. Run in Databricks to view.

In [0]:
seasonality = df.groupBy(dayofweek("date").alias("day_of_week")).agg(F_sum("unique_visits").alias("total_visits")).orderBy("day_of_week")
display(seasonality) 

day_of_week,total_visits
1,3696.0
2,
3,
4,1902.0
5,1543.0
6,1684.0
7,14507.0


Databricks visualization. Run in Databricks to view.

### Anomaly Detection (Z-Score Method)

In [0]:
from pyspark.sql import functions as F

daily_visits = df.groupBy("date").agg(F.sum("unique_visits").alias("total_visits"))
stats = daily_visits.agg(
    F.mean("total_visits").alias("mean_val"),
    F.stddev("total_visits").alias("std_val")
).collect()[0]
mean_val = stats["mean_val"]
std_val = stats["std_val"]

daily_with_z = daily_visits.withColumn(
    "z_score", 
    (F.col("total_visits") - mean_val) / std_val
)

anomalies = daily_with_z.filter(F.abs(F.col("z_score")) > 3)

display(anomalies.orderBy("date"))



date,total_visits,z_score
2014-12-25,667,-3.330646658999872


Databricks visualization. Run in Databricks to view.