# Set Up and Install PySpark

In [None]:
! pip install -q findspark
! pip install pyspark==3.5.4

In [None]:
import findspark
# automatically find the Spark installation directory
# based on common locations or environment variables.
findspark.init()

import pyspark as spark
print(spark.__version__)

3.5.4


In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.feature import MinHashLSH, CountVectorizer
from pyspark.sql import functions as F

import time
import matplotlib.pyplot as plt

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
#print(sc)
sqlc = SQLContext(sc)

df = sqlc.read.csv("/content/baskets.csv",
                   sep=",",
                   header=True,
                   inferSchema=True)
df.show(10)

# Print the schema to verify column names
df.printSchema()
# Expected schema:
#  |-- Member_number: integer (nullable = true)
#  |-- Date: string (nullable = true)
#  |-- itemDescription: string (nullable = true)
#  |-- year: integer (nullable = true)
#  |-- month: integer (nullable = true)
#  |-- day: integer (nullable = true)
#  |-- day_of_week: integer (nullable = true)

class DateSimilarityAnalyzer:
    """
    Class to analyze similarity between dates based on items purchased.

    Implements two approaches:
    1. MinHashLSH algorithm for efficient similarity search
    2. Brute force comparison for validation
    """
    DEFAULT_HASH_TABLES = 10
    DEFAULT_SEED = 42
    FIGURE_WIDTH = 12
    FIGURE_HEIGHT = 8

    def __init__(self, spark=None, data_path=None):
        """
        Initialize the analyzer with Spark session and data path.

        Args:
            spark: SparkSession object
            data_path: Path to the baskets.csv file
        """
        self.spark = SparkSession.builder.appName("DateSimilarityAnalyzer").getOrCreate()
        self.data_path = data_path
        self.date_items_df = None

    def load_data(self):
        """
        Load and prepare data for similarity analysis.
        """
        try:
            # Load data
            df = self.spark.read.csv(self.data_path, header=True)

            df = self.preprocess_data(df)

            # Cache for performance
            self.date_items_df.cache()

            print(f"Loaded data: {df.count()} transactions")
            print(f"Unique dates: {self.date_items_df.count()}")

            return self.date_items_df

        except Exception as e:
            print(f"Error loading data: {e}")
            raise
    def preprocess_data(self, df):
        """
        Preprocess the data to create baskets for each date.

        Args:
            df: DataFrame with raw transaction data

        Returns:
            DataFrame with unique dates and their item sets
        """
        # Create date identifier
        df = df.withColumn("Date",
                           F.concat_ws("-", F.col("day"), F.col("month"), F.col("year")))

        # Group items by date
        self.date_items_df = df.groupBy("Data").agg(collect_set("itemDescription").alias("items_array"))

        return df

    def run_minhash_lsh(self, threshold, num_hash_tables=DEFAULT_HASH_TABLES):
        """
        Implements MinHashLSH approach to find similar date pairs.

        Args:
            threshold: Jaccard similarity threshold
            num_hash_tables: Number of hash tables for LSH

        Returns:
            similar_pairs: DataFrame with similar date pairs
            execution_time: Time taken to execute the approach
        """
        start_time = time.time()

        # Create feature vectors (one-hot encoding for items to binary vector) -> input for MinHashLSH

        # Apply CountVectorizer for more scalable for large datasets with many unique items
        # Rather than Manual Sparse Vector Creation
        cv = CountVectorizer(inputCol="items", outputCol="features", binary=True)
        cv_model = cv.fit(self.date_items_df)
        df_features = cv_model.transform(df_grouped)

        # Set up the MinHashLSH transformer
        mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=num_hash_tables)
        mh_model = mh.fit(df_features)

        # Define a similarity threshold.
        # For a minimum Jaccard similarity of 50%
        # The Jaccard distance threshold maximum is 50%

        similarity_threshold = threshold
        distance_threshold = 1 - similarity_threshold

        # Use approxSimilarityJoin to find candidate pairs of dates with JaccardDistance below the threshold
        similar_pairs = mh_model.approxSimilarityJoin(df_features, df_features, distance_threshold, distCol="JaccardDistance")

        # Filter out self-joins(same date compared to itself)
        # And duplicate pairs (keep only pairs where datasetA.Date < datasetB.Date).
        similar_pairs = similar_pairs.filter(col("datasetA.Date") < col("datasetB.Date"))

        # Calculate actual Jaccard similarity (1 - distance)
        similar_pairs = similar_pairs.withColumn(
            "jaccard_similarity", 1 - col("jaccard_distance")
        ).select(
            col("datasetA.date_id").alias("date1"),
            col("datasetB.date_id").alias("date2"),
            "jaccard_similarity"
        )

        end_time = time.time()
        execution_time = end_time - start_time

        return similar_pairs, execution_time

    def run_brute_force(self, threshold):
        """
        Implements brute force approach to find similar date pairs.

        Args:
            threshold: Jaccard similarity threshold

        Returns:
            similar_pairs: DataFrame with similar date pairs
            execution_time: Time taken to execute the approach
        """
        start_time = time.time()

        # Collect date_items_df to driver for easier processing
        date_items = self.date_items_df.collect()

        # Generate all possible date pairs
        all_pairs = []
        date_ids = [row["date_id"] for row in date_items]

        # Create a dictionary for faster lookups
        date_to_items = {row["date_id"]: set(row["items"]) for row in date_items}

        # Calculate Jaccard similarity for all pairs
        for i, date1 in enumerate(date_ids):
            for date2 in date_ids[i+1:]:
                items1 = date_to_items[date1]
                items2 = date_to_items[date2]

                # Calculate Jaccard similarity
                intersection = len(items1.intersection(items2))
                union = len(items1.union(items2))
                similarity = intersection / union if union > 0 else 0

                # Keep pairs above threshold
                if similarity >= threshold:
                    all_pairs.append((date1, date2, similarity))

        # Convert results back to DataFrame
        similar_pairs = self.spark.createDataFrame(
            all_pairs, ["date1", "date2", "jaccard_similarity"]
        )

        end_time = time.time()
        execution_time = end_time - start_time

        return similar_pairs, execution_time

# Execution
if __name__ == "__main__":
    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("MinHashLSH for Similar Dates") \
        .getOrCreate()

    # Initialize analyzer
    analyzer = DateSimilarityAnalyzer(
        spark=spark,
        data_path="/content/baskets.csv"
    )

    # Load and prepare data
    analyzer.load_data()
    analyzer.date_items_df.show(20)

    # Run comparison with thresholds from 0.0 to 1.0 with step 0.1
    thresholds = [round(t * 0.1, 1) for t in range(11)]  # [0.0, 0.1, ..., 1.0]
    results = analyzer.compare_approaches(thresholds)

    # Plot and save results
    analyzer.plot_performance_comparison(results, save_path="performance_comparison.png")

    # Save results to CSV
    results.to_csv("performance_results.csv", index=False)

    print("Analysis complete!")



AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/baskets.csv.

# Task 2: PCY Algorithm for Frequent Items

# Task 3: MinHashLSH for Similar Dates

In [None]:
# 1. Reuse or create a SparkContext and SparkSession
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

from pyspark.sql import SparkSession
spark = SparkSession(sc)

# 2. Read data from Google Drive (adjust file_path to your mounted drive)
file_path = "baskets.csv"  # Change this path accordingly
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Print the schema to verify column names
df.printSchema()
# Expected schema:
#  |-- Member_number: integer (nullable = true)
#  |-- Date: string (nullable = true)
#  |-- itemDescription: string (nullable = true)
#  |-- year: integer (nullable = true)
#  |-- month: integer (nullable = true)
#  |-- day: integer (nullable = true)
#  |-- day_of_week: integer (nullable = true)

# 3. (Optional) If you want to build your own Date column from day, month, year, you can do so:
from pyspark.sql.functions import concat_ws, col
# Uncomment the following if you wish to override the existing Date column:
# df = df.withColumn("Date", concat_ws("-", col("day"), col("month"), col("year")))

# 4. Group by Date and aggregate unique item descriptions into an array.
from pyspark.sql.functions import collect_set
df_grouped = df.groupBy("Date").agg(collect_set("itemDescription").alias("items_array"))

# Show a few rows to verify the grouping.
df_grouped.show(truncate=False)

# 5. Convert the items_array into a binary feature vector using CountVectorizer.
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="items_array", outputCol="features", binary=True)
cv_model = cv.fit(df_grouped)
df_features = cv_model.transform(df_grouped)

# Verify that the "features" column has been created.
df_features.select("Date", "items_array", "features").show(truncate=False)

# 6. Set up the MinHashLSH transformer.
from pyspark.ml.feature import MinHashLSH
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=3)
mh_model = mh.fit(df_features)

# 7. Define a similarity threshold.
# For a minimum Jaccard similarity of 50%
# The Jaccard distance threshold = 1 - Jaccard similarity, is 1 - 0.5 = 0.5.
similarity_threshold = 0.5
distance_threshold = 1 - similarity_threshold

# 8. Use approxSimilarityJoin to find candidate pairs of dates with JaccardDistance below the threshold.
similar_pairs = mh_model.approxSimilarityJoin(df_features, df_features, distance_threshold, distCol="JaccardDistance")

# 9. Filter out self-joins(same date compared to itself) and duplicate pairs (keep only pairs where datasetA.Date < datasetB.Date).
similar_pairs = similar_pairs.filter(col("datasetA.Date") < col("datasetB.Date"))

# 10. Select and display the similar date pairs and their Jaccard distances.
similar_pairs.select(
    col("datasetA.Date").alias("Date_A"),
    col("datasetB.Date").alias("Date_B"),
    col("JaccardDistance")
).show(truncate=False)

# Optionally, when finished, stop the SparkSession.
# spark.stop()


root
 |-- Member_number: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- itemDescription: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)

+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Date      |items_array                                                                                                              

In [None]:
+----------+----------+-------------------+
|Date_A    |Date_B    |JaccardDistance    |
+----------+----------+-------------------+
|02/08/2015|06/07/2014|0.4782608695652174 | 0.4782608695652174
|02/05/2015|05/08/2015|0.4629629629629629 | 0.4629629629629629
|12/02/2015|27/07/2015|0.48888888888888893|
|03/11/2015|28/05/2015|0.4565217391304348 |
|14/02/2015|28/08/2015|0.4821428571428571 |
|08/02/2014|30/03/2015|0.45999999999999996|
|01/10/2015|13/11/2015|0.4878048780487805 |
|14/02/2015|23/11/2015|0.4814814814814815 |
|13/12/2014|17/12/2015|0.48888888888888893|
|04/07/2015|18/04/2015|0.4897959183673469 |
|10/11/2015|15/12/2014|0.4736842105263158 |
|02/05/2015|21/04/2015|0.4693877551020408 |
|10/06/2015|26/11/2015|0.4878048780487805 |
|16/12/2015|27/07/2015|0.4666666666666667 |
|05/01/2015|10/11/2015|0.4915254237288136 |
|08/07/2015|13/09/2015|0.4727272727272728 |
|04/01/2015|22/01/2015|0.49056603773584906|
|08/07/2015|13/11/2015|0.45999999999999996|
|18/03/2015|21/01/2015|0.4838709677419355 |
|14/07/2015|30/03/2015|0.4666666666666667 |
+----------+----------+-------------------+

## Approach 2: Manually compare all date pairs to find those meeting the similarity threshold.

In [None]:
import pandas as pd
import numpy as np

# Load dataset
df = pd.read_csv("baskets.csv")  # Make sure your file is uploaded in Colab
print(df.head())

# Group items by date
df_grouped = df.groupby("Date")["itemDescription"].apply(set).reset_index()

print(df_grouped.head())

from itertools import combinations

# Function to compute Jaccard similarity
def jaccard_similarity(set1, set2):
    intersection = len(set(set1) & set(set2))
    union = len(set(set1) | set(set2))
    return float(intersection) / union if union != 0 else 0.0

# Compute Jaccard similarity for all date pairs
Jaccard_pairs = []

for date1, date2 in combinations(df_grouped["Date"], 2):
    items1 = df_grouped.loc[df_grouped["Date"] == date1, "itemDescription"].values[0]
    items2 = df_grouped.loc[df_grouped["Date"] == date2, "itemDescription"].values[0]

    similarity = jaccard_similarity(items1, items2)

    if similarity >= 0.5 and date1 < date2:  # Filter pairs with similarity ≥ 50%
        distance = 1 - similarity
        Jaccard_pairs.append((date1, date2, similarity, distance))

# Show first 10 similar pairs
print(("DataA", "DataB", "Jaccard Similar", "Jaccard Distance"))
for pair in Jaccard_pairs[:50]:
    print(pair)


   Member_number        Date   itemDescription  year  month  day  day_of_week
0           1249  01/01/2014      citrus fruit  2014      1    1            2
1           1249  01/01/2014            coffee  2014      1    1            2
2           1381  01/01/2014              curd  2014      1    1            2
3           1381  01/01/2014              soda  2014      1    1            2
4           1440  01/01/2014  other vegetables  2014      1    1            2
         Date                                    itemDescription
0  01/01/2014  {coffee, citrus fruit, berries, onions, soda, ...
1  01/01/2015  {citrus fruit, pot plants, salty snack, soda, ...
2  01/02/2014  {coffee, root vegetables, soda, salty snack, p...
3  01/02/2015  {cocoa drinks, citrus fruit, mayonnaise, bottl...
4  01/03/2014  {ice cream, root vegetables, citrus fruit, flo...
('DataA', 'DataB', 'Jaccard Similar', 'Jaccard Distance')
('01/01/2015', '13/11/2015', 0.5, 0.5)
('01/01/2015', '18/05/2015', 0.51063829787234

In [None]:
# 1. Reuse or create a SparkContext and SparkSession
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

from pyspark.sql import SparkSession
spark = SparkSession(sc)

# Import necessary libraries for timing and visualization
import time
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from itertools import combinations
from pyspark.sql.functions import collect_set, col, concat_ws

# Define a function to measure the execution time
def measure_execution_time(func, *args, **kwargs):
    start_time = time.time()
    result = func(*args, **kwargs)
    end_time = time.time()
    return result, end_time - start_time

# 2. Read data from Google Drive (adjust file_path to your mounted drive)
file_path = "baskets.csv"  # Change this path accordingly
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Print the schema to verify column names
print("Dataset Schema:")
df.printSchema()

# 3. Group by Date and aggregate unique item descriptions into an array
df_grouped = df.groupBy("Date").agg(collect_set("itemDescription").alias("items_array"))
print("\nData grouped by date:")
df_grouped.show(5, truncate=False)

# 4. Function to implement MinHashLSH approach
def minhash_lsh_approach(df_grouped, similarity_threshold):
    # Convert the items_array into a binary feature vector using CountVectorizer
    from pyspark.ml.feature import CountVectorizer
    cv = CountVectorizer(inputCol="items_array", outputCol="features", binary=True)
    cv_model = cv.fit(df_grouped)
    df_features = cv_model.transform(df_grouped)

    # Set up the MinHashLSH transformer
    from pyspark.ml.feature import MinHashLSH
    mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10)
    mh_model = mh.fit(df_features)

    # Calculate Jaccard distance threshold
    distance_threshold = 1.0 - similarity_threshold

    # Use approxSimilarityJoin to find candidate pairs with JaccardDistance below the threshold
    similar_pairs = mh_model.approxSimilarityJoin(
        df_features, df_features, distance_threshold, distCol="JaccardDistance"
    )

    # Filter out self-joins and duplicate pairs
    similar_pairs = similar_pairs.filter(col("datasetA.Date") < col("datasetB.Date"))

    result_df = similar_pairs.select(
        col("datasetA.Date").alias("Date_A"),
        col("datasetB.Date").alias("Date_B"),
        col("JaccardDistance")
    )

    return result_df

# 5. Function to implement manual approach
def manual_approach(df, similarity_threshold):
    # Convert to pandas for the manual approach
    pdf = df.toPandas()

    # Group items by date
    df_grouped = pdf.groupby("Date")["itemDescription"].apply(set).reset_index()

    # Function to compute Jaccard similarity
    def jaccard_similarity(set1, set2):
        intersection = len(set(set1) & set(set2))
        union = len(set(set1) | set(set2))
        return float(intersection) / union if union != 0 else 0.0

    # Compute Jaccard similarity for all date pairs
    jaccard_pairs = []

    for date1, date2 in combinations(df_grouped["Date"], 2):
        items1 = df_grouped.loc[df_grouped["Date"] == date1, "itemDescription"].values[0]
        items2 = df_grouped.loc[df_grouped["Date"] == date2, "itemDescription"].values[0]

        similarity = jaccard_similarity(items1, items2)

        if similarity >= similarity_threshold and date1 < date2:
            distance = 1.0 - similarity
            jaccard_pairs.append((date1, date2, similarity, distance))

    # Convert to pandas DataFrame for easier comparison
    result_df = pd.DataFrame(jaccard_pairs, columns=["Date_A", "Date_B", "JaccardSimilarity", "JaccardDistance"])

    return result_df

# 6. Run both approaches with varying similarity thresholds and measure performance
similarity_thresholds = np.arange(0.0, 1.1, 0.1)
lsh_times = []
manual_times = []

print("\nRunning performance comparison with varying similarity thresholds...")
for threshold in similarity_thresholds:
    print(f"\nTesting with similarity threshold: {threshold:.1f}")

    # Run and time MinHashLSH approach
    _, lsh_time = measure_execution_time(minhash_lsh_approach, df_grouped, threshold)
    lsh_times.append(lsh_time)
    print(f"MinHashLSH approach execution time: {lsh_time:.4f} seconds")

    # Run and time manual approach
    _, manual_time = measure_execution_time(manual_approach, df, threshold)
    manual_times.append(manual_time)
    print(f"Manual approach execution time: {manual_time:.4f} seconds")

# 7. Create a performance comparison chart
plt.figure(figsize=(10, 6))
plt.plot(similarity_thresholds, lsh_times, 'b-o', label='MinHashLSH Approach')
plt.plot(similarity_thresholds, manual_times, 'r-o', label='Manual Approach')
plt.xlabel('Similarity Threshold')
plt.ylabel('Execution Time (seconds)')
plt.title('Performance Comparison: MinHashLSH vs Manual Approach')
plt.grid(True)
plt.legend()

# Save the figure
plt.savefig('performance_comparison.png')
plt.close()

# 8. Compare the two approaches with the required threshold of 0.5
print("\n=== Detailed Comparison at Similarity Threshold = 0.5 ===")

# Run MinHashLSH approach with threshold 0.5
lsh_results, lsh_time = measure_execution_time(minhash_lsh_approach, df_grouped, 0.5)
lsh_count = lsh_results.count()
print(f"\nMinHashLSH approach found {lsh_count} similar date pairs")
print(f"Execution time: {lsh_time:.4f} seconds")
print("Sample results:")
lsh_results.show(5, truncate=False)

# Run manual approach with threshold 0.5
manual_results, manual_time = measure_execution_time(manual_approach, df, 0.5)
manual_count = len(manual_results)
print(f"\nManual approach found {manual_count} similar date pairs")
print(f"Execution time: {manual_time:.4f} seconds")
print("Sample results:")
print(manual_results.head(5))

# 9. Analysis of results
print("\n=== Performance Analysis ===")
print(f"MinHashLSH is {manual_time/lsh_time:.2f}x faster than the manual approach at threshold 0.5")

speedup_factors = [m/l if l > 0 else float('inf') for m, l in zip(manual_times, lsh_times)]

print("\nSpeedup factors (Manual time / LSH time) at different thresholds:")
for threshold, factor in zip(similarity_thresholds, speedup_factors):
    print(f"Threshold {threshold:.1f}: {factor:.2f}x")

# Create a speedup chart
plt.figure(figsize=(10, 6))
plt.plot(similarity_thresholds, speedup_factors, 'g-o')
plt.xlabel('Similarity Threshold')
plt.ylabel('Speedup Factor (Manual/LSH)')
plt.title('MinHashLSH Speedup Factor Over Manual Approach')
plt.grid(True)

# Save the figure
plt.savefig('speedup_comparison.png')
plt.close()

print("\n=== Conclusion ===")
print("The MinHashLSH approach provides significant performance benefits compared to the manual approach,")
print("especially for larger datasets and lower similarity thresholds. The speedup is attributable to LSH's")
print("ability to efficiently narrow down candidate pairs before computing exact Jaccard similarities.")

# Optionally, when finished, stop the SparkSession
# spark.stop()

Dataset Schema:
root
 |-- Member_number: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- itemDescription: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)


Data grouped by date:
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Date      |items_array                                                                                                                                                                          