<a href="https://colab.research.google.com/github/FredSadeghi/Amazon_CoPurchase_Network_Analysis/blob/main/BigDataAmazon.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import gzip
import csv
import re

In [1]:
!git clone https://github.com/FredSadeghi/Amazon_CoPurchase_Network_Analysis.git

Cloning into 'Amazon_CoPurchase_Network_Analysis'...
remote: Enumerating objects: 7, done.[K
remote: Counting objects: 100% (7/7), done.[K
remote: Compressing objects: 100% (6/6), done.[K
remote: Total 7 (delta 1), reused 3 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (7/7), 6.01 KiB | 3.01 MiB/s, done.
Resolving deltas: 100% (1/1), done.


In [None]:
# Input file
input_file = 'amazon-meta.txt.gz'

# Output files
product_output = 'products.csv'
edge_output = 'edges.csv'

In [None]:
def parse_amazon_data():
    with gzip.open(input_file, 'rt', encoding='latin-1') as f, \
         open(product_output, 'w', newline='', encoding='utf-8') as prod_out, \
         open(edge_output, 'w', newline='', encoding='utf-8') as edge_out:

        product_writer = csv.writer(prod_out)
        edge_writer = csv.writer(edge_out)

        # Write headers
        product_writer.writerow(['Id', 'ASIN', 'Title', 'Group', 'SalesRank'])
        edge_writer.writerow(['SourceASIN', 'TargetASIN'])

        current = {}
        for line in f:
            line = line.strip()

            # New product entry
            if line.startswith("Id:"):
                # Save previous product (if exists)
                if current.get('ASIN') and current.get('Id'):
                    product_writer.writerow([
                        current.get('Id'),
                        current.get('ASIN'),
                        current.get('title', ''),
                        current.get('group', ''),
                        current.get('salesrank', '')
                    ])
                    for similar_asin in current.get('similar', []):
                        edge_writer.writerow([current['ASIN'], similar_asin])
                current = {'similar': []}
                current['Id'] = line.split('Id:')[1].strip()

            elif line.startswith("ASIN:"):
                current['ASIN'] = line.split("ASIN:")[1].strip()

            elif 'title:' in line:
                match = re.search(r'title:\s*(.*)', line)
                if match:
                    current['title'] = match.group(1).strip()

            elif 'group:' in line:
                match = re.search(r'group:\s*(.*)', line)
                if match:
                    current['group'] = match.group(1).strip()

            elif 'salesrank:' in line:
                match = re.search(r'salesrank:\s*(.*)', line)
                if match:
                    current['salesrank'] = match.group(1).strip()

            elif line.startswith("similar:"):
                parts = line.split()
                current['similar'] = parts[2:] if len(parts) > 2 else []

        # Write last product
        if current.get('ASIN') and current.get('Id'):
            product_writer.writerow([
                current.get('Id'),
                current.get('ASIN'),
                current.get('title', ''),
                current.get('group', ''),
                current.get('salesrank', '')
            ])
            for similar_asin in current.get('similar', []):
                edge_writer.writerow([current['ASIN'], similar_asin])

In [None]:
print("Parsing Amazon metadata...")
parse_amazon_data()
print("Done. Output saved to products.csv and edges.csv")


Parsing Amazon metadata...


FileNotFoundError: [Errno 2] No such file or directory: '/content/drive/MyDrive/colab/amazon-meta.txt.gz'

In [None]:
!wget -O spark.tgz https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark.tgz && rm spark.tgz

--2025-04-10 17:55:23--  https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400395283 (382M) [application/x-gzip]
Saving to: ‘spark.tgz’


2025-04-10 17:55:41 (21.9 MB/s) - ‘spark.tgz’ saved [400395283/400395283]



In [None]:
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("AmazonDataAnalysis").getOrCreate()

In [None]:
products = spark.read.csv("products.csv", header=True, inferSchema=True)
edges = spark.read.csv("edges.csv", header=True, inferSchema=True)

In [None]:
# Example: Calculate median and quartiles for SalesRank
from pyspark.sql.types import IntegerType

products = products.withColumn("SalesRank", products["SalesRank"].cast(IntegerType()))
quantiles = products.approxQuantile("SalesRank", [0.25, 0.5, 0.75], 0.05)
median = quantiles[1]
q1 = quantiles[0]
q3 = quantiles[2]
print(f"Median SalesRank: {median}")
print(f"First Quartile SalesRank: {q1}")
print(f"Third Quartile SalesRank: {q3}")

Median SalesRank: 270086.0
First Quartile SalesRank: 99602.0
Third Quartile SalesRank: 610759.0


In [None]:
!pip install fuzzywuzzy

from pyspark.sql import SparkSession
from pyspark.sql.functions import count, collect_list, explode, array_intersect, size, udf, col
from pyspark.sql.types import IntegerType, FloatType
from fuzzywuzzy import fuzz

# Create a DataFrame of connected product pairs
connected_pairs = edges.withColumnRenamed("SourceASIN", "ASIN1").withColumnRenamed("TargetASIN", "ASIN2")

ModuleNotFoundError: No module named 'fuzzywuzzy'

In [None]:
# Join with product features
product_pairs_with_features = connected_pairs.join(products.alias("p1"), connected_pairs.ASIN1 == col("p1.ASIN")) \
    .join(products.alias("p2"), connected_pairs.ASIN2 == col("p2.ASIN")) \
    .select("ASIN1", "ASIN2", "p1.Group", "p2.Group", "p1.Title", "p2.Title")

In [None]:
# Similarity Calculation (Jaccard Similarity based on Group and Title Similarity)
@udf(returnType=FloatType())
def jaccard_similarity(group1, group2):
    if group1 == group2:
        return 1.0
    else:
        return 0.0

@udf(returnType=FloatType())
def title_similarity(title1, title2):
    return fuzz.ratio(title1, title2) / 100.0

product_pairs_with_similarity = product_pairs_with_features.withColumn(
    "GroupSimilarity",
    jaccard_similarity(col("p1.Group"), col("p2.Group"))  # Use the column names "Group" and "Group"
).withColumn(
    "TitleSimilarity",
    title_similarity(col("p1.Title"), col("p2.Title")) # Use the column names "Title" and "Title"
)

In [None]:
# Combine Similarity Scores (Example: Average)
product_pairs_with_combined_similarity = product_pairs_with_similarity.withColumn(
    "CombinedSimilarity",
    (col("GroupSimilarity") + col("TitleSimilarity")) / 2.0  # Example: Average of Group and Title similarity
)

In [None]:
# Filter for Similar Products (Example: Threshold of 0.8 - in this case only products in the same group)
similar_products = product_pairs_with_combined_similarity.filter("CombinedSimilarity > 0.7")

similar_products.show()

+----------+----------+-----+-----+--------------------+--------------------+---------------+---------------+------------------+
|     ASIN1|     ASIN2|Group|Group|               Title|               Title|GroupSimilarity|TitleSimilarity|CombinedSimilarity|
+----------+----------+-----+-----+--------------------+--------------------+---------------+---------------+------------------+
|0002250535|0002154463| Book| Book|Italy Today The B...|Italy : The Beaut...|            1.0|           0.54|0.7699999809265137|
|0002551543|0002154463| Book| Book|Provence : The Be...|Italy : The Beaut...|            1.0|           0.88|0.9399999976158142|
|0002154129|0002154463| Book| Book|France the Beauti...|Italy : The Beaut...|            1.0|           0.57|0.7849999666213989|
|000215949X|0002154463| Book| Book|Mexico : The Beau...|Italy : The Beaut...|            1.0|           0.89|0.9449999928474426|
|0002550326|0002154463| Book| Book|Tuscany : The Bea...|Italy : The Beaut...|            1.0|    

In [None]:
# Example: Calculate entropy of product groups
from pyspark.sql.functions import count, log2

group_counts = products.groupBy("Group").agg(count("*").alias("count"))
total_count = products.count()
entropy = -group_counts.selectExpr(
    "SUM(count / {} * log2(count / {})) as entropy".format(total_count, total_count)
).first().entropy

print(f"Entropy of product groups: {entropy}")

Entropy of product groups: 1.2607732770291709
