# Import Libraries

In [1]:
!pip install pyspark
!pip install pyspark[pandas_on_spark] plotly
!pip install -U pandas

[0m[31mERROR: Could not find a version that satisfies the requirement pyspark (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for pyspark[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement pyspark[pandas_on_spark] (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for pyspark[pandas_on_spark][0m[31m


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, collect_list, monotonically_increasing_id
from pyspark.sql.window import Window

ModuleNotFoundError: No module named 'pyspark'

In [None]:
spark= SparkSession \
    .builder \
    .appName('Hadoop Analysis') \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")


# Load the dataset
df = spark.read.csv('/kaggle/input/ecommerce-behavior-data-from-multi-category-store/2019-Oct.csv', header=True, inferSchema=True)


In [None]:
spark

In [None]:
df = df.limit(1000)

In [None]:
df.show()

# Dataset Summary

This dataset captures essential e-commerce interactions and provides a rich source for analyzing user behavior, product performance, and financial metrics. By converting data types and handling missing values, the dataset becomes more structured and ready for detailed analysis, enabling meaningful insights and data-driven decisions.

# Schema

* event_time (string): Timestamp of the event.
* event_type (string): Type of event (e.g., view, purchase).
* product_id (string): Unique identifier of the product.
* category_id (string): Unique identifier of the product category.
* category_code (string): Human-readable code of the product category.
* brand (string): Brand of the product.
* price (string): Price of the product at the time of the event.
* user_id (string): Unique identifier of the user.
* user_session (string): Unique identifier of the user session.

In [None]:
df.columns

In [None]:
# Check for missing values
missing_values = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
missing_values.show()

# Fill missing values in user_session with unique IDs to differentiate sessions

In [None]:
df = df.withColumn('user_session', when(col('user_session').isNull(), monotonically_increasing_id()).otherwise(col('user_session')))
df.show(2)

# Fill missing values in 'category_code' and 'brand' with 'unknown'

In [None]:

df = df.fillna({'category_code': 'unknown', 'brand': 'unknown'})
df.show(2)

# Convert 'user_session' to integer

In [None]:
df = df.withColumn('user_session', col('user_session').cast("int"))
df.show(2)

# Create product columns based on user sessions

In [None]:
window_spec = Window.partitionBy('user_session').orderBy('user_session')
df = df.withColumn('product_list', collect_list('product_id').over(window_spec))

# Select unique sessions and create a DataFrame with product columns

In [None]:
sessions_df = df.select('user_session', 'product_list').distinct()

# Explode the product list to create individual product columns
def create_product_columns(df, max_products=12):
    for i in range(max_products):
        df = df.withColumn(f'product_id_{i+1}', col('product_list')[i])
    return df

sessions_df = create_product_columns(sessions_df)
sessions_df = sessions_df.drop('product_list')

# Show the final DataFrame with product columns
sessions_df.show()

# Number of unique visitors

In [None]:
# Number of unique visitors
visitors = df.select("user_id").distinct().count()
print("Number of visitors: {}".format(visitors))



# Brand popularity

In [None]:

purchase_df = df.filter(df.event_type == 'purchase')
top_brands = purchase_df.groupBy('brand').count().orderBy('count', ascending=False)
top_brands.show(25)

# Does traffic flunctuate by date?

In [None]:
from pyspark.sql.functions import to_date, col, countDistinct
import matplotlib.pyplot as plt

# Extract the date from event_time
df = df.withColumn('date', to_date(col('event_time')))

# Group by date and count the number of unique visitors
daily_visitors = df.groupBy('date').agg(countDistinct('user_id').alias('unique_visitors'))

# Show the result
daily_visitors.show()

# Convert to Pandas for plotting
daily_visitors_pd = daily_visitors.toPandas()

# Plot the daily visitors trend
plt.figure(figsize=(15, 8))
plt.plot(daily_visitors_pd['date'], daily_visitors_pd['unique_visitors'], marker='o', linestyle='-')
plt.title('Daily Visitors Trend')
plt.xlabel('Date')
plt.ylabel('Number of Unique Visitors')
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()



# Drop using columns and duplicated rows

In [None]:
df = df.dropDuplicates()

# Which category customers interact the most?

In [None]:

# Group by category and count the number of interactions
category_interactions = df.groupBy('category_code').agg(count('*').alias('interaction_count')).orderBy('interaction_count', ascending=False)

# Show the top 10 categories
top_categories = category_interactions.limit(10)
top_categories.show()

# Convert to Pandas for plotting
top_categories_pd = top_categories.toPandas()

# Plot the top 10 categories
plt.figure(figsize=(8, 6))
plt.bar(top_categories_pd['category_code'], top_categories_pd['interaction_count'], color='skyblue')
plt.title('Top 10 Categories by Number of Interactions')
plt.xlabel('Category')
plt.ylabel('Number of Interactions')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

#  What brand the view to most?

In [None]:

# Group by brand and count the number of views
brand_views = df.groupBy('brand').agg(count('*').alias('view_count')).orderBy('view_count', ascending=False)

# Show the top 10 brands
top_brands = brand_views.limit(10)
top_brands.show()

# Convert to Pandas for plotting
top_brands_pd = top_brands.toPandas()

# Plot the top 10 brands
plt.figure(figsize=(8, 6))
plt.bar(top_brands_pd['brand'], top_brands_pd['view_count'], color='skyblue')
plt.title('Top 10 Brands by Number of Views')
plt.xlabel('Brand')
plt.ylabel('Number of Views')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Does traffic flunctuate by date?

In [None]:

df = df.withColumn('date', to_date(col('event_time')))

# Group by date and count the number of unique visitors
daily_visitors = df.groupBy('date').agg(countDistinct('user_id').alias('unique_visitors'))

# Show the result
daily_visitors.show()

# Convert to Pandas for plotting
daily_visitors_pd = daily_visitors.toPandas()

# Plot the daily visitors trend
plt.figure(figsize=(8, 6))
plt.plot(daily_visitors_pd['date'], daily_visitors_pd['unique_visitors'], marker='o', linestyle='-')
plt.title('Daily Visitors Trend')
plt.xlabel('Date')
plt.ylabel('Number of Unique Visitors')
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
