In [None]:
import pandas as pd
import json

# Load data
df = pd.read_json('Sampled_Amazon_Meta.json', lines=True)

# Clean data
df.dropna(subset=['asin', 'title', 'price'], inplace=True)  # Drop rows with essential missing values

# Format data
df['price'] = df['price'].str.replace('$', '').astype(float)  # Convert price to float

# Filter data
df = df[df['categories'].apply(lambda x: 'Books' in x)]  # Example: Keep only products categorized under 'Books'

# Feature Engineering
df['description_length'] = df['description'].apply(len)  # New feature: length of description

# Save processed data
df.to_json('Amazon_Meta.json', orient='records', lines=True)

# Print completion
print("Data preprocessing completed and saved to 'Amazon_Meta.json'.")


In [None]:
from kafka import KafkaProducer
import json

# Initialize a producer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: json.dumps(x).encode('utf-8'))

# Load your preprocessed data
with open('Amazon_Meta.json', 'r') as file:
    for line in file:
        message = json.loads(line)
        producer.send('amazon_products', value=message)

producer.flush()
print("Data has been sent to Kafka topic 'amazon_products'")


In [None]:
from kafka import KafkaConsumer
import json

# Initialize a consumer
consumer = KafkaConsumer(
    'amazon_products',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Consume data
for message in consumer:
    data = message.value
    print("Received data: ", data)
    # Implement your algorithm or analysis here


In [None]:
from kafka import KafkaConsumer
from efficient_apriori import apriori
import json

# Initialize a consumer for Apriori
consumer_apriori = KafkaConsumer(
    'amazon_products',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

transactions = []

# Collect data for a batch
for message in consumer_apriori:
    data = message.value
    transactions.append(data['categories'])  # Assuming categories are items of interest
    if len(transactions) >= 100:  # Define a suitable batch size
        break

# Apply the Apriori algorithm
itemsets, rules = apriori(transactions, min_support=0.5, min_confidence=1)
print("Frequent Itemsets:", itemsets)
print("Association Rules:", rules)

consumer_apriori.close()


In [None]:
from kafka import KafkaConsumer
import json

# Initialize a consumer for PCY
consumer_pcy = KafkaConsumer(
    'amazon_products',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Assume you have defined hash functions and baskets
item_count = {}
hash_table = [0] * 1000  # Example hash table size

for message in consumer_pcy:
    data = message.value
    items = data['categories']  # Assuming categories are items of interest

    # First Pass: Count items and fill hash table
    for item in items:
        if item in item_count:
            item_count[item] += 1
        else:
            item_count[item] = 1

        hash_value = hash(frozenset([item])) % 1000
        hash_table[hash_value] += 1

# Second Pass: Identify frequent items and pairs
frequent_items = {item for item, count in item_count.items() if count >= 5}  # Example support threshold

consumer_pcy.close()


In [None]:
from kafka import KafkaConsumer
import json

# Initialize a consumer for innovative analysis
consumer_innovative = KafkaConsumer(
    'amazon_products',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Real-time recommendation logic here
for message in consumer_innovative:
    product = message.value
    # Implement your innovative analysis logic

consumer_innovative.close()


In [None]:
from kafka import KafkaConsumer
from efficient_apriori import apriori
from pymongo import MongoClient
import json

# Initialize MongoDB connection
client = MongoClient('localhost', 27017)
db = client['amazon_data_analysis']
apriori_collection = db['apriori_results']

# Initialize a consumer for Apriori
consumer_apriori = KafkaConsumer(
    'amazon_products',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

transactions = []

# Collect data for a batch
for message in consumer_apriori:
    data = message.value
    transactions.append(data['categories'])  # Assuming categories are items of interest
    if len(transactions) >= 100:  # Define a suitable batch size
        break

# Apply the Apriori algorithm
itemsets, rules = apriori(transactions, min_support=0.5, min_confidence=1)

# Store results in MongoDB
result = {
    'itemsets': itemsets,
    'rules': rules
}
apriori_collection.insert_one(result)

print("Data has been processed and stored in MongoDB.")
consumer_apriori.close()
