### Connecting spark to cassandra

In [0]:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F


cosmosdb_contact_point = 'host'
cosmosdb_port = '10350'
cosmosdb_username = 'username'
cosmosdb_password = 'pass'
cosmosdb_keyspace = 'clickstream'

# Create the SparkSession with Cassandra configuration
spark = SparkSession.builder \
    .appName("Cassandra Spark Connector Example") \
    .config("spark.cassandra.connection.host", cosmosdb_contact_point) \
    .config("spark.cassandra.auth.username", cosmosdb_username) \
    .config("spark.cassandra.auth.password", cosmosdb_password) \
    .config("spark.cassandra.connection.port", 10350) \
    .config("spark.cassandra.connection.ssl.enabled", "true") \
    .getOrCreate()

# Read data from Cassandra table into DataFrame
df = spark.read\
    .format("org.apache.spark.sql.cassandra") \
    .options(table="clickstream_events", keyspace=cosmosdb_keyspace) \
    .load()

# Show the data
df.show()



+--------------------+-------+-----------------+--------------------+--------------------+----------------+--------------------+--------------------+---------------+
|             row_key|browser|             city|             country|              device|              os|           timestamp|                 url|        user_id|
+--------------------+-------+-----------------+--------------------+--------------------+----------------+--------------------+--------------------+---------------+
|f7c6cc1f-addf-5ca...|Mozilla|       Davisshire|              Jersey|X11; Linux x86_64...| Safari/532.12.2|2023-07-22 16:05:...|https://www.examp...|      bridget40|
|e61bb684-eff3-526...|Mozilla|       Michaelton|              Serbia|   KHTML, like Gecko|   Version/10.00|2023-07-22 16:05:...|https://www.examp...|       xproctor|
|521e27d5-17cd-5e7...|Mozilla|      Josephville|    French Polynesia|   KHTML, like Gecko|Safari/6534.39.6|2023-07-22 16:06:...|https://www.examp...|    miguelrojas|
|42e

##Group by "URL" and "country" and calculate the count of unique users

In [0]:
unique_user_df = df.groupBy("URL" , "country").agg(F.countDistinct("user_id").alias("unique_user_count")).orderBy(F.desc("unique_user_count"))
display(unique_user_df)

URL,country,unique_user_count
https://www.example.com/,Jersey,2
https://www.example.com/contact,Jersey,2
https://www.example.com/services,Guinea,1
https://www.example.com/about,Vietnam,1
https://www.example.com/faq,Jersey,1
https://www.example.com/contact,Spain,1
https://www.example.com/faq,French Polynesia,1
https://www.example.com/products,Liechtenstein,1
https://www.example.com/faq,Italy,1
https://www.example.com/,Serbia,1


In [0]:
from pyspark.sql import Window
# Define the window specification to partition by user_id and order by timestamp
window_spec = Window.partitionBy("user_id").orderBy("timestamp")


# Calculating the time spent on each URL by subtracting the current timestamp from the next one
time_spent_df = df.select("user_id" , "timestamp" ,"country" ,"url").withColumn("next_timestamp", F.lead("timestamp").over(window_spec))
time_spent_df = time_spent_df.withColumn("time_spent_in_seconds", (F.col("next_timestamp").cast("long") - F.col("timestamp").cast("long")))

time_spent_df = time_spent_df.dropna(subset=["time_spent_in_seconds"])
# Aggregating data by url and country to get average time
time_spent_df = time_spent_df.groupBy("URL" , "country").agg(F.avg("time_spent_in_seconds").alias("average_time_spent"))
display(time_spent_df)


URL,country,average_time_spent
https://www.example.com/faq,Jersey,11.0
https://www.example.com/faq,French Polynesia,13.0
https://www.example.com/faq,Italy,17.0
https://www.example.com/,Serbia,3.0
https://www.example.com/contact,Austria,1.0
https://www.example.com/,Liberia,15.0
https://www.example.com/,Jersey,26.0
https://www.example.com/contact,Burundi,12.0
https://www.example.com/,Bahamas,11.0
https://www.example.com/products,Austria,1.0


### Final Processed Data

In [0]:
final_df = unique_user_df.join(time_spent_df, on=["URL" , "country"])
display(final_df)

URL,country,unique_user_count,average_time_spent
https://www.example.com/faq,Jersey,1,11.0
https://www.example.com/faq,French Polynesia,1,13.0
https://www.example.com/faq,Italy,1,17.0
https://www.example.com/,Serbia,1,3.0
https://www.example.com/contact,Austria,1,1.0
https://www.example.com/,Liberia,1,15.0
https://www.example.com/,Jersey,2,26.0
https://www.example.com/contact,Burundi,1,12.0
https://www.example.com/,Bahamas,1,11.0
https://www.example.com/products,Austria,1,1.0


### Insert data to Elastic Search

In [0]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import pandas as pd
# Connect to Elasticsearch
es = Elasticsearch(
    cloud_id="cloud_id",
    http_auth=("elastic", "pass")
)
processed_data_pd = final_df.toPandas()
processed_data  = processed_data_pd.to_dict(orient="records")


# Define Elasticsearch index mapping
index_name = "clickstream_data_index"

mapping = {
    "mappings": {
        "properties": {
            "URL": {"type": "keyword"},
            "country": {"type": "keyword"},
            "unique_users": {"type": "integer"},
            "avg_time_spent": {"type": "float"}
        }
    }
}

# Create the index with the defined mapping
try:
    es.indices.create(index=index_name, body=mapping)
except:
    pass

# Bulk index the processed data into Elasticsearch
bulk_data = [
    {
        "_index": index_name,
        "_source": entry
    }
    for entry in processed_data
]

bulk(es, bulk_data)






  es = Elasticsearch(
  es.indices.create(index=index_name, body=mapping)


Out[7]: (65, [])