In [None]:
import json

# Function to preprocess data
def preprocess_data(data):
    cleaned_data = []
    for item in data:
        cleaned_item = {
            "asin": item.get("asin", None),
            "title": item.get("title", None),
            "features": item.get("feature", None),
            "description": item.get("description", None),
            "price": item.get("price", None),
            "brand": item.get("brand", None),
            "categories": item.get("categories", None)
            
        }
        cleaned_data.append(cleaned_item)
    return cleaned_data

# Function to save preprocessed data as JSON
def save_preprocessed_data(data, output_file):
    with open(output_file, 'a', encoding='utf-8') as file:
        json.dump(data, file, ensure_ascii=False, indent=4)
        file.write('\n')  # Add newline after each batch

# Load the Sampled Amazon dataset and preprocess the data
json_file = 'Sampled_Amazon_eta.json'
output_file = 'preprocessed_amazon_metadata.json'

batch_size = 1000  # Define batch size
preprocessed_data = []

with open(json_file, 'r', encoding='utf-8') as file:
    batch = []
    for idx, line in enumerate(file):
        item = json.loads(line)
        batch.append(item)

        # Check if batch size is reached
        if len(batch) >= batch_size:
            preprocessed_batch = preprocess_data(batch)
            save_preprocessed_data(preprocessed_batch, output_file)
            batch = []  # Reset batch

    # Process the remaining items if any
    if batch:
        preprocessed_batch = preprocess_data(batch)
        save_preprocessed_data(preprocessed_batch, output_file)

print("Preprocessing completed successfully!")


producer for Streaming Pipeline Setup
Develop a producer application that streams the preprocessed data in real time. 


In [None]:
from kafka import KafkaProducer
import json
import time

# Kafka broker address
bootstrap_servers = ['localhost:9092']

# Kafka topic to which data will be produced
topic = 'preprocessed_data'

# Function to read preprocessed data from JSON file and stream it to Kafka
def stream_preprocessed_data_to_kafka(producer, input_file):
    with open(input_file, 'r', encoding='utf-8') as file:
        for line in file:
            producer.send(topic, json.dumps(json.loads(line)).encode('utf-8'))
            time.sleep(0.1)  # Adjust sleep time based on your throughput requirements

if __name__ == "__main__":
    # Create Kafka producer
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    # Path to the preprocessed data JSON file
    input_file = 'preprocessed_amazon_metadata.json'

    # Stream preprocessed data to Kafka
    stream_preprocessed_data_to_kafka(producer, input_file)


or

In [None]:
import json
import time
from kafka import KafkaProducer

def stream_data(json_file, topic):
    producer = KafkaProducer(bootstrap_servers='localhost:9092',
                             value_serializer=lambda x: json.dumps(x).encode('utf-8'))

    with open(json_file, 'r', encoding='utf-8') as file:
        for line in file:
            item = json.loads(line)
            producer.send(topic, value=item)
            time.sleep(0.1)  # Simulate streaming delay
    producer.close()

if __name__ == "__main__":
    json_file = 'preprocessed_amazon_metadata.json'
    topic = 'amazon_data'
    stream_data(json_file, topic)


consumer for Streaming Pipeline Setup
Develop a producer application that streams the preprocessed data in real time. 


In [None]:
from kafka import KafkaConsumer
import json

# Kafka broker address
bootstrap_servers = ['localhost:9092']

# Kafka topic from which data will be consumed
topic = 'preprocessed_data'

# Function to consume data from Kafka
def consume_preprocessed_data_from_kafka(consumer):
    for message in consumer:
        # Decode and print the message value
        print(json.loads(message.value.decode('utf-8')))

if __name__ == "__main__":
    # Create Kafka consumer
    consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers)

    # Consume preprocessed data from Kafka
    consume_preprocessed_data_from_kafka(consumer)


or

In [None]:
from kafka import KafkaConsumer

def consume_data(topic):
    consumer = KafkaConsumer(topic, bootstrap_servers='localhost:9092',
                             group_id='group',
                             value_deserializer=lambda x: json.loads(x.decode('utf-8')))
    for message in consumer:
        print(message.value)  # Replace with your processing logic
    consumer.close()

if __name__ == "__main__":
    topic = 'amazon_data'
    consume_data(topic)


consumer code for Frequent Itemset Mining
Implement the Apriori algorithm in one consumer. There should be print
statements showing real-time insights and associations.

In [None]:
import json

class Apriori:
    def __init__(self, min_support):
        self.min_support = min_support
        self.item_counts = {}
        self.frequent_itemsets = []

    def process_transaction(self, transaction):
        for item in transaction:
            if item in self.item_counts:
                self.item_counts[item] += 1
            else:
                self.item_counts[item] = 1

    def update_frequent_itemsets(self):
        self.frequent_itemsets = [item for item, count in self.item_counts.items() if count >= self.min_support]

    def print_frequent_itemsets(self):
        print("Frequent Itemsets:")
        for itemset in self.frequent_itemsets:
            print(itemset)

def consume_data(topic, apriori):
    with open('preprocessed_amazon_metadata.json', 'r', encoding='utf-8') as file:
        for line in file:
            transaction = json.loads(line)['categories']
            apriori.process_transaction(transaction)
            apriori.update_frequent_itemsets()
            apriori.print_frequent_itemsets()

if __name__ == "__main__":
    min_support = 5  # Adjust this threshold as needed
    apriori = Apriori(min_support)
    topic = 'amazon_data'
    consume_data(topic, apriori)


producer code for Frequent Itemset Mining
Implement the Apriori algorithm in one consumer. There should be print
statements showing real-time insights and associations.

In [None]:
import json
import time
from kafka import KafkaProducer

def stream_data(json_file, topic):
    producer = KafkaProducer(bootstrap_servers='localhost:9092',
                             value_serializer=lambda x: json.dumps(x).encode('utf-8'))

    with open(json_file, 'r', encoding='utf-8') as file:
        for line in file:
            item = json.loads(line)
            producer.send(topic, value=item)
            time.sleep(0.1)  # Simulate streaming delay
    producer.close()

if __name__ == "__main__":
    json_file = 'preprocessed_amazon_metadata.json'
    topic = 'amazon_data'
    stream_data(json_file, topic)


PCY consumer 

In [None]:
import json
import itertools

class PCY:
    def __init__(self, min_support, hash_table_size):
        self.min_support = min_support
        self.hash_table_size = hash_table_size
        self.item_counts = {}
        self.pair_counts = [0] * hash_table_size
        self.frequent_itemsets = []

    def hash_function(self, item1, item2):
        return (hash(item1) + hash(item2)) % self.hash_table_size

    def process_transaction(self, transaction):
        # Count single items
        for item in transaction:
            if item in self.item_counts:
                self.item_counts[item] += 1
            else:
                self.item_counts[item] = 1

        # Count pairs and update hash table
        pairs = list(itertools.combinations(transaction, 2))
        for pair in pairs:
            hash_value = self.hash_function(pair[0], pair[1])
            self.pair_counts[hash_value] += 1

    def update_frequent_itemsets(self):
        frequent_single_items = [item for item, count in self.item_counts.items() if count >= self.min_support]
        frequent_pairs = []
        for pair, count in zip(itertools.combinations(frequent_single_items, 2), self.pair_counts):
            if count >= self.min_support:
                frequent_pairs.append(pair)
        self.frequent_itemsets = frequent_single_items + frequent_pairs

    def print_frequent_itemsets(self):
        print("Frequent Itemsets:")
        for itemset in self.frequent_itemsets:
            print(itemset)

def consume_data(topic, pcy):
    with open('preprocessed_amazon_metadata.json', 'r', encoding='utf-8') as file:
        for line in file:
            transaction = json.loads(line)['categories']
            pcy.process_transaction(transaction)
            pcy.update_frequent_itemsets()
            pcy.print_frequent_itemsets()

if __name__ == "__main__":
    min_support = 5  # Adjust this threshold as needed
    hash_table_size = 10000  # Adjust hash table size as needed
    pcy = PCY(min_support, hash_table_size)
    topic = 'amazon_data'
    consume_data(topic, pcy)


pcy producer

In [None]:
import json
import time
from kafka import KafkaProducer

def stream_data(json_file, topic):
    producer = KafkaProducer(bootstrap_servers='localhost:9092',
                             value_serializer=lambda x: json.dumps(x).encode('utf-8'))

    with open(json_file, 'r', encoding='utf-8') as file:
        for line in file:
            item = json.loads(line)
            producer.send(topic, value=item)
            time.sleep(0.1)  # Simulate streaming delay
    producer.close()

if __name__ == "__main__":
    json_file = 'preprocessed_amazon_metadata.json'
    topic = 'amazon_data'
    stream_data(json_file, topic)


innovative and creative

In [None]:
import json
from collections import defaultdict
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class RecommendationSystem:
    def __init__(self, k_neighbors=5):
        self.user_item_matrix = defaultdict(lambda: defaultdict(int))
        self.k_neighbors = k_neighbors

    def process_transaction(self, transaction):
        user_id = transaction.get("user_id")
        items = transaction.get("items", [])

        for item in items:
            self.user_item_matrix[user_id][item] += 1

    def update_recommendations(self):
        user_item_matrix = self.convert_matrix()
        similarity_matrix = self.compute_similarity_matrix(user_item_matrix)
        recommendations = self.generate_recommendations(user_item_matrix, similarity_matrix)
        self.print_recommendations(recommendations)

    def convert_matrix(self):
        users = list(self.user_item_matrix.keys())
        items = set()
        for user_id, user_items in self.user_item_matrix.items():
            items.update(user_items.keys())
        items = list(items)

        user_item_matrix = np.zeros((len(users), len(items)), dtype=np.int32)
        for i, user_id in enumerate(users):
            for j, item in enumerate(items):
                user_item_matrix[i, j] = self.user_item_matrix[user_id][item]

        return user_item_matrix

    def compute_similarity_matrix(self, user_item_matrix):
        similarity_matrix = cosine_similarity(user_item_matrix)
        np.fill_diagonal(similarity_matrix, 0)  # Set diagonal elements to 0 to avoid self-similarity
        return similarity_matrix

    def generate_recommendations(self, user_item_matrix, similarity_matrix):
        num_users = user_item_matrix.shape[0]
        recommendations = defaultdict(list)

        for i in range(num_users):
            similar_users = np.argsort(similarity_matrix[i])[::-1][:self.k_neighbors]  # Get indices of top-k similar users
            for similar_user in similar_users:
                common_items = np.where(user_item_matrix[i] * user_item_matrix[similar_user] > 0)[0]
                for item in common_items:
                    if user_item_matrix[i, item] == 0:  # Recommend items not interacted by the user
                        recommendations[i].append(item)

        return recommendations

    def print_recommendations(self, recommendations):
        print("Recommendations:")
        for user_id, recommended_items in recommendations.items():
            print(f"User {user_id} may be interested in: {recommended_items}")

def consume_data(topic, recommendation_system):
    with open('preprocessed_user_data.json', 'r', encoding='utf-8') as file:
        for line in file:
            transaction = json.loads(line)
            recommendation_system.process_transaction(transaction)
            recommendation_system.update_recommendations()

if __name__ == "__main__":
    recommendation_system = RecommendationSystem(k_neighbors=5)
    topic = 'user_data'
    consume_data(topic, recommendation_system)


integration 

In [None]:
import json
import itertools
from pymongo import MongoClient

class PCY:
    def __init__(self, min_support, hash_table_size, db_name, collection_name):
        self.min_support = min_support
        self.hash_table_size = hash_table_size
        self.item_counts = {}
        self.pair_counts = [0] * hash_table_size
        self.frequent_itemsets = []

        # MongoDB setup
        self.client = MongoClient('mongodb://localhost:27017/')
        self.db = self.client[db_name]
        self.collection = self.db[collection_name]

    def hash_function(self, item1, item2):
        return (hash(item1) + hash(item2)) % self.hash_table_size

    def process_transaction(self, transaction):
        # Count single items
        for item in transaction:
            if item in self.item_counts:
                self.item_counts[item] += 1
            else:
                self.item_counts[item] = 1

        # Count pairs and update hash table
        pairs = list(itertools.combinations(transaction, 2))
        for pair in pairs:
            hash_value = self.hash_function(pair[0], pair[1])
            self.pair_counts[hash_value] += 1

    def update_frequent_itemsets(self):
        frequent_single_items = [item for item, count in self.item_counts.items() if count >= self.min_support]
        frequent_pairs = []
        for pair, count in zip(itertools.combinations(frequent_single_items, 2), self.pair_counts):
            if count >= self.min_support:
                frequent_pairs.append(pair)
        self.frequent_itemsets = frequent_single_items + frequent_pairs

    def store_frequent_itemsets(self):
        self.collection.insert_one({"frequent_itemsets": self.frequent_itemsets})

    def print_frequent_itemsets(self):
        print("Frequent Itemsets:")
        for itemset in self.frequent_itemsets:
            print(itemset)

def consume_data(topic, pcy):
    with open('preprocessed_amazon_metadata.json', 'r', encoding='utf-8') as file:
        for line in file:
            transaction = json.loads(line)['categories']
            pcy.process_transaction(transaction)
            pcy.update_frequent_itemsets()
            pcy.print_frequent_itemsets()
            pcy.store_frequent_itemsets()

if __name__ == "__main__":
    min_support = 5  # Adjust this threshold as needed
    hash_table_size = 10000  # Adjust hash table size as needed
    db_name = 'streaming_db'
    collection_name = 'frequent_itemsets'
    pcy = PCY(min_support, hash_table_size, db_name, collection_name)
    topic = 'amazon_data'
    consume_data(topic, pcy)


consumer2 a bit good 

In [None]:
import json
import itertools
from kafka import KafkaConsumer

class PCY:
    def __init__(self, min_support, hash_table_size):
        self.min_support = min_support
        self.hash_table_size = hash_table_size
        self.item_counts = {}
        self.pair_counts = {}
        self.frequent_items = set()
        self.frequent_pairs = set()

    def process_transaction(self, transaction):
        if transaction:
            for item in transaction:
                self.item_counts[item] = self.item_counts.get(item, 0) + 1

    def update_frequent_items(self):
        self.frequent_items = {item for item, count in self.item_counts.items() if count >= self.min_support}

    def hash_function(self, pair):
        return hash(pair) % self.hash_table_size

    def process_pairs(self, transaction):
        if transaction:
            pairs = itertools.combinations(sorted(transaction), 2)
            for pair in pairs:
                if all(item in self.frequent_items for item in pair):
                    hash_value = self.hash_function(pair)
                    self.pair_counts[hash_value] = self.pair_counts.get(hash_value, 0) + 1

    def update_frequent_pairs(self):
        self.frequent_pairs = {(item1, item2) for (item1, item2), count in self.pair_counts.items() if isinstance(item1, tuple) and count >= self.min_support}

    def print_frequent_itemsets(self):
        print("Frequent Itemsets:")
        for itemset in self.frequent_items:
            print(itemset)
        print("Frequent Pairs:")
        for pair in self.frequent_pairs:
            print(pair)

def consume_data(topic, pcy):
    consumer = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'])

    for message in consumer:
        data = json.loads(message.value)
        alsobuy_items = data.get('alsobuy')
        pcy.process_transaction(alsobuy_items)
        pcy.process_pairs(alsobuy_items)
        pcy.update_frequent_items()
        pcy.update_frequent_pairs()
        pcy.print_frequent_itemsets()

    consumer.close()

if __name__ == "__main__":
    min_support = 5  # Adjust this threshold as needed
    hash_table_size = 1000  # Adjust hash table size as needed
    pcy = PCY(min_support, hash_table_size)
    topic = 'preprocessed_data'
    consume_data(topic, pcy)
