In [22]:
# Install Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!ls

# Check if Spark tarball already exists and download only if it doesn't
spark_file = 'spark-3.5.1-bin-hadoop3.tgz'
if not os.path.exists(spark_file):
    !wget https://dlcdn.apache.org/spark/spark-3.5.1/{spark_file}

# Ensure the file is present
!ls -lh {spark_file}

# Extract the Spark tarball
!tar xzf {spark_file}

# Install findspark
!pip install findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

# Initialize Spark using findspark
import findspark
findspark.init()

from faker import Faker
import random
import pyspark.sql.types as T

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Fake Data Generation").getOrCreate()
# Verify Spark is initialized
print(spark.version)











0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.82)] [Connected to cloud.r-                                                                                                    Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.82)] [Connecting to ppa.lau                                                                                                    Hit:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.82)] [Connecting to ppa.lau                                                                                                    Hit:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
0% [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.80)] [Waiting for heade    

In [33]:


fake = Faker()
from faker.providers import DynamicProvider

products_category = DynamicProvider(
     provider_name="categories",
     elements=["home appliances" ,"phones","laptops", "clothing" , "pharmacy" , "garden","kids toys"],
)
fake.add_provider(products_category)
def generate_data(num_records):
    data = []
    for _ in range(num_records):
        data.append((
            fake.random_number(digits=5),  # user_id
            fake.random_number(digits=7),  # product_id
            random.choice(['view', 'cart', 'purchase']),  # event_type using random.choice
            fake.random_number(digits=3),  # price
            fake.date_time_this_year(),  # event_time
            fake.categories(),  # category_code (using custom provider
            fake.company(),  # brand
            fake.uuid4()  # user_session
        ))
    return data


In [34]:

# Generate data
data = generate_data(1000)

# Define schema for the DataFrame
schema = T.StructType([
    T.StructField("user_id", T.IntegerType(), True),
    T.StructField("product_id", T.IntegerType(), True),
    T.StructField("event_type", T.StringType(), True),
    T.StructField("price", T.IntegerType(), True),
    T.StructField("event_time", T.TimestampType(), True),
    T.StructField("category_code", T.StringType(), True),
    T.StructField("brand", T.StringType(), True),
    T.StructField("user_session", T.StringType(), True)
])



In [35]:


# Create DataFrame from the data
df = spark.createDataFrame(data, schema)

# Show the DataFrame
df.show(5)

df.printSchema()
# Register the DataFrame as a temporary view to run SQL queries
df.createOrReplaceTempView("events")

# SQL query to count the number of each event type
result = spark.sql("""
SELECT event_type, COUNT(*) as count
FROM events
GROUP BY event_type
ORDER BY count DESC
""")

# Show the query results
result.show()


+-------+----------+----------+-----+--------------------+---------------+--------------------+--------------------+
|user_id|product_id|event_type|price|          event_time|  category_code|               brand|        user_session|
+-------+----------+----------+-----+--------------------+---------------+--------------------+--------------------+
|  56005|   1601859|      cart|  169|2024-01-24 22:20:...|       clothing|Nicholson, Lewis ...|bd6d5702-e2d6-4da...|
|  16637|   6240909|  purchase|  279|2024-03-12 01:48:...|       pharmacy|Villegas, Barber ...|ff8de1b8-29b4-446...|
|  22559|   9397413|      view|   25|2024-05-13 03:47:...|       clothing|       Cooley-Garcia|091b86d9-4360-45c...|
|  69823|   7683406|  purchase|  482|2024-02-22 21:07:...|        laptops|      Padilla-Nelson|1c5712a8-fc79-4b1...|
|  49380|    447809|      view|  536|2024-03-23 16:03:...|home appliances|   Mitchell-Peterson|9153d5d6-cf45-48b...|
+-------+----------+----------+-----+--------------------+------

In [45]:
# Window function query for Spark SQL
rolling_purchases_query = """
WITH DailyPurchases AS (
    SELECT
        category_code,
        DATE(event_time) AS event_date,
        COUNT(*) AS daily_purchases
    FROM events
    WHERE event_type = 'purchase'
    GROUP BY category_code, DATE(event_time)
), RollingTotals AS (
    SELECT
        category_code,
        event_date,
        daily_purchases,
        SUM(daily_purchases) OVER (PARTITION BY category_code ORDER BY event_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rolling_purchases,
        AVG(daily_purchases) OVER (PARTITION BY category_code ORDER BY event_date ROWS BETWEEN 7 PRECEDING AND CURRENT ROW) AS avg_last_7_days
    FROM DailyPurchases
)
SELECT
    *,
    (daily_purchases - avg_last_7_days) AS diff_from_avg
FROM RollingTotals
ORDER BY rolling_purchases DESC

"""

# Execute the query
rolling_purchases_results = spark.sql(rolling_purchases_query)
rolling_purchases_results.show()

# Optionally, convert to Pandas DataFrame for visualization
rolling_purchases_pd = rolling_purchases_results.toPandas()

# Visualization can be done using Plotly for interactive graphs


+---------------+----------+---------------+-----------------+---------------+-------------+
|  category_code|event_date|daily_purchases|rolling_purchases|avg_last_7_days|diff_from_avg|
+---------------+----------+---------------+-----------------+---------------+-------------+
|         phones|2024-06-26|              1|               57|          1.125|       -0.125|
|         phones|2024-06-23|              1|               56|          1.375|       -0.375|
|         phones|2024-06-18|              2|               55|            1.5|          0.5|
|         phones|2024-06-14|              1|               53|          1.375|       -0.375|
|         phones|2024-06-10|              1|               52|          1.375|       -0.375|
|         phones|2024-06-05|              1|               51|          1.375|       -0.375|
|         phones|2024-05-29|              1|               50|            1.5|         -0.5|
|         garden|2024-06-26|              1|               49|        

In [46]:
from pyspark.sql import functions as F
# Print DataFrame schema
print("DataFrame Schema:")
rolling_purchases_results.printSchema()

# Display summary statistics of the result DataFrame
rolling_purchases_results.describe().show()

# Show the top days with the highest rolling purchases
rolling_purchases_results.sort(F.col("rolling_purchases").desc()).show(5)



DataFrame Schema:
root
 |-- category_code: string (nullable = true)
 |-- event_date: date (nullable = true)
 |-- daily_purchases: long (nullable = false)
 |-- rolling_purchases: long (nullable = true)
 |-- avg_last_7_days: double (nullable = true)
 |-- diff_from_avg: double (nullable = true)

+-------+-------------+------------------+------------------+-------------------+-------------------+
|summary|category_code|   daily_purchases| rolling_purchases|    avg_last_7_days|      diff_from_avg|
+-------+-------------+------------------+------------------+-------------------+-------------------+
|  count|          288|               288|               288|                288|                288|
|   mean|         NULL|             1.125|          23.40625|  1.112070105820106|0.01292989417989418|
| stddev|         NULL|0.3416500236732881|13.617403901786503|0.13502298258440176|  0.304596575413524|
|    min|     clothing|                 1|                 1|                1.0|             

In [47]:
# First, register the DataFrame as a temp view if not already done
rolling_purchases_results.createOrReplaceTempView("rolling_totals")

# SQL Query to find top 5 categories based on rolling purchases
top_categories_query = """
SELECT category_code, SUM(rolling_purchases) as total_rolling_purchases
FROM rolling_totals
GROUP BY category_code
ORDER BY total_rolling_purchases DESC
LIMIT 5
"""

# Execute the query
top_categories = spark.sql(top_categories_query)
top_categories.show()


+---------------+-----------------------+
|  category_code|total_rolling_purchases|
+---------------+-----------------------+
|         phones|                   1384|
|         garden|                   1053|
|home appliances|                    999|
|       clothing|                    966|
|      kids toys|                    823|
+---------------+-----------------------+



In [54]:
rolling_purchases_pd
import sqlite3
import pandas as pd

# Connect to or create a SQLite database file
conn = sqlite3.connect('ecommerce_data_spark.db')

# Assuming `df` is your DataFrame containing the data
rolling_purchases_pd.to_sql('sales_summary', conn, if_exists='replace', index=False)

from google.colab import files
files.download('ecommerce_data_spark.db')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>