As the Chief Data Scientist of an e-commerce company, your main responsibility is to harness the power of data to improve customer experiences, drive business growth, and enhance operational efficiency. 

Recognizing the volume and complexity of data generated by e-commerce platform, you realize that traditional data processing tools are insufficient for your needs. Your need a robust solution that can process large datasets quickly, handle real-time data processing, and scale with the growing business.

Therefore, you decide to use [ApacheSpark](https://spark.apache.org/), an open-source tool for big data processing and analytics. Below is an example of using ApacheSpark to analyze user activity logs stored in <span style="color:blue">AzureBlobStorage</span> and user data stored in <span style="color:blue">Azure SQL Database</span>. 

You are interested in two metrics: <span style="color:orange">total number of website visitors in the last 7 days </span> and <span style="color:orange">how many of registered users visited the website in the last 7 days</span>. To get these two metrics, you perfom the following steps:

- Read the website activity logs from Azure Blob Storage.
- Extract information from user activity logs of the last 7 days to calculate the total number of website visitors, whether they are registered users or not.
- Identify registered users from website visitors of the last 7 days, these users are recognized as 'active users'. 
- Read the 'User' table from AzureSQL to obtain the total number of registered users.
- Calculate the percentage of active users (num_active_users/num_total_users).

<span style="color:grey">

- The logs are organized and saved in AzureBlobStorage with the file structure: Year/Month/Day/Hour

- Website Visitor -> Registered User -> Active User
    - **Website Visitor**
        - Any individual who visited the website, regardless of whether they have registered an account. 
        - Identified by unique IP address. 
    - **Registered User**
        - An individual who completed a registration process and created a profile. 
        - Each registered user has a unique user_id. 
    - **Active User**
        -  A registered user that has activities such as logging in, browsing content, and making orders within last 7 days. 

</span> 

In [None]:
from pyspark.sql import SparkSession
from datetime import datetime, timedelta

# Create a SparkSession
spark = SparkSession.builder \
    .appName("UserActivityAnalysis") \
    .getOrCreate()

# Calculate the start and end dates
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
# List all dates within the last 7 days
date_range = [start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)]

# Azure Blob Storage
container_name = "<container-name>"
path_to_logs = "<path-to-logs>"
# Paths for all logs within the last 7 days
file_paths = []
for date in date_range:
    year = date.strftime("%Y")
    month = date.strftime("%m")
    day = date.strftime("%d")
    for hour in range(24):
        file_path = f"wasbs://{container_name}/{path_to_logs}/{year}/{month}/{day}/{hour:02}/"
        file_paths.append(file_path)

# Read all logs from the last 7 days
website_activity_logs_last_7_days_df = spark.read.csv(file_paths, header=True, inferSchema=True)
# Calculate total number of visitors (based on unique IP addresses)
total_visitors = website_activity_logs_last_7_days_df.select(countDistinct("ip_address").alias("total_visitors")).first()[0]
print("Number of visitors in the last 7 days:", total_visitors)

# Calculate total number of active registered users (based on user_id) from the logs
num_active_users = website_activity_logs_last_7_days_df.select(countDistinct("user_id").alias("num_active_users")).first()[0]
print("Number of active registered users in the last 7 days:", num_active_users)

# Azure SQL connection
sql_server_name = "<sql-server-name>"
database_name = "<database-name>"
table_name = "User"
jdbc_url = f"jdbc:sqlserver://{sql_server_name}.database.windows.net:1433;database={database_name};user=<username>;password=<password>"
jdbc_properties = {
    "user": "<username>",
    "password": "<password>",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Total number of registered users from the "User" table
total_users_df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=jdbc_properties)
# Count total number of registered users
num_total_users = total_users_df.count()
print("Total number of registered users:", num_total_users)

pct_active_users  = num_active_users/num_total_users
print(pct_active_users," '%' of registered users with activities in the last 7 days")

spark.stop()