<h2>Importing all the necessary modules</h2>

In [1]:
import json
import os
from tqdm import tqdm

<h2>Sampling</h2>

In [3]:
def sample_json(input_file, output_file, target_size_gb, filter_key = 'also_buy'):
  target_size_bytes = target_size_gb *1024 **3
  current_size_bytes = 0

  with open(input_file, 'r', encoding = 'utf-8') as infile, open(output_file, 'w', encoding = 'utf-8') as outfile:
    for line in tqdm(infile):
      record = json.loads(line)
      if record.get(filter_key):
        outfile.write(json.dumps(record) + '\n')
        current_size_bytes += len(line.encode('utf-8'))

      if current_size_bytes >= target_size_bytes:
        break

  print(f"Finished Sampling. Output size: {current_size_bytes / 1024** 3:.2f} GB")

<h2>Providing the json file</h2>

In [4]:
sample_json = ('All_Amazon_Meta.json', 'Sampled_Amazon_Meta.json', 15)

<h2>Preprocessing and Cleaning</h2>

In [None]:
import re
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

# Function to load JSON data
def load_json(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        data = [json.loads(line) for line in file]
    return data

# Function to remove duplicates from data
def remove_duplicates(data):
    unique_data = [dict(t) for t in {tuple(d.items()) for d in data}]
    return unique_data

# Function to remove null values from data
def remove_null_values(data, keys_to_check=None):
    if keys_to_check:
        cleaned_data = [record for record in data if all(record.get(key) is not None for key in keys_to_check)]
    else:
        cleaned_data = [record for record in data if all(value is not None for value in record.values())]
    return cleaned_data

# Tokenization and text cleaning function
def tokenize_and_clean_text(text):
    # Tokenize the text
    tokens = word_tokenize(text.lower())
    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    filtered_tokens = [token for token in tokens if token not in stop_words]
    # Remove non-alphanumeric characters
    clean_tokens = [re.sub(r'[^a-zA-Z0-9]', '', token) for token in filtered_tokens]
    # Remove empty tokens
    clean_tokens = [token for token in clean_tokens if token]
    return clean_tokens

# Preprocess data function
def preprocess_data(input_file, output_file):
    # Load data
    data = load_json(input_file)
    
    # Remove duplicates
    data = remove_duplicates(data)
    
    # Remove null values
    data = remove_null_values(data)
    
    # Tokenize and clean text fields (if any)
    for record in data:
        if 'text_field' in record:  # Adjust this according to your dataset
            record['text_field'] = tokenize_and_clean_text(record['text_field'])
    
    # Save preprocessed data to a new JSON file
    with open(output_file, 'w', encoding='utf-8') as file:
        for record in data:
            file.write(json.dumps(record) + '\n')

# Function for batch processing
def batch_process(input_files_directory, output_directory):
    input_files = os.listdir(input_files_directory)
    for file_name in input_files:
        if file_name.endswith('.json'):
            input_file_path = os.path.join(input_files_directory, file_name)
            output_file_path = os.path.join(output_directory, f'preprocessed_{file_name}')
            preprocess_data(input_file_path, output_file_path)
            print(f'Preprocessing completed for {file_name}')

# Example usage:
input_file = 'Sampled_Amazon_Meta.json'
output_file = 'Preprocessed_Amazon_Meta.json'
preprocess_data(input_file, output_file)


<h2>Producer Application</h2>

In [None]:
from confluent_kafka import Producer
import json
import time

def produce_data(topic, data_file):
    p = Producer({'bootstrap.servers': 'localhost:9092'})

    with open(data_file, 'r', encoding='utf-8') as file:
        for line in file:
            message = json.loads(line)
            p.produce(topic, json.dumps(message))
            time.sleep(0.1)  # Simulate real-time streaming
            p.poll(0)

    p.flush()

if __name__ == "__main__":
    topic = 'preprocessed_data'
    data_file = 'Preprocessed_Amazon_Meta.json'
    produce_data(topic, data_file)


<h2>Consumer 1</h2>

In [None]:
from confluent_kafka import Consumer, KafkaError

def consume_data(topic):
    c = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'consumer_group_1',
        'auto.offset.reset': 'earliest'
    })
    c.subscribe([topic])

    while True:
        msg = c.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break

        print('Consumer 1:', msg.value().decode('utf-8'))

    c.close()

if __name__ == "__main__":
    topic = 'preprocessed_data'
    consume_data(topic)


<h2>Consumer 2</h2>

In [None]:
from confluent_kafka import Consumer, KafkaError

def consume_data(topic):
    c = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'consumer_group_2',  # Different group ID
        'auto.offset.reset': 'earliest'
    })
    c.subscribe([topic])

    while True:
        msg = c.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break

        print('Consumer 2:', msg.value().decode('utf-8'))

    c.close()

if __name__ == "__main__":
    topic = 'preprocessed_data'
    consume_data(topic)


<h2>Consumer 3</h2>

In [None]:
from confluent_kafka import Consumer, KafkaError

def consume_data(topic):
    c = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'consumer_group_3',  # Different group ID
        'auto.offset.reset': 'earliest'
    })
    c.subscribe([topic])

    while True:
        msg = c.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break

        print('Consumer 3:', msg.value().decode('utf-8'))

    c.close()

if __name__ == "__main__":
    topic = 'preprocessed_data'
    consume_data(topic)


<h2>Consumer 1 Apriori </h2>

In [None]:
from collections import defaultdict

def apriori(stream, min_support):
    item_counts = defaultdict(int)
    frequent_itemsets = defaultdict(int)
    
    for transaction in stream:
        # Update counts for single items
        for item in transaction:
            item_counts[item] += 1
        
        # Generate candidate itemsets
        candidate_itemsets = set()
        for item, count in item_counts.items():
            if count >= min_support:
                candidate_itemsets.add((item,))
        
        # Update counts for candidate itemsets
        for itemset in candidate_itemsets:
            for transaction in stream:
                if all(item in transaction for item in itemset):
                    frequent_itemsets[itemset] += 1
    
    # Output frequent itemsets
    for itemset, count in frequent_itemsets.items():
        if count >= min_support:
            print("Frequent Itemset:", itemset, "Support:", count)


<h2>Consumer 2 PCY </h2>

In [None]:
from collections import defaultdict

def pcy(stream, min_support, hash_table_size):
    item_counts = defaultdict(int)
    hash_table = [0] * hash_table_size
    frequent_itemsets = defaultdict(int)
    
    for transaction in stream:
        # Update counts for single items
        for item in transaction:
            item_counts[item] += 1
        
        # Generate candidate item pairs
        candidate_pairs = set()
        for item1, count1 in item_counts.items():
            if count1 >= min_support:
                for item2, count2 in item_counts.items():
                    if item1 != item2 and count2 >= min_support:
                        candidate_pairs.add((item1, item2))
        
        # Update hash table
        for item1, item2 in candidate_pairs:
            hash_value = hash((item1, item2)) % hash_table_size
            hash_table[hash_value] += 1
        
        # Update counts for frequent itemsets
        for transaction in stream:
            for item1, item2 in candidate_pairs:
                hash_value = hash((item1, item2)) % hash_table_size
                if hash_table[hash_value] >= min_support and item1 in transaction and item2 in transaction:
                    frequent_itemsets[(item1, item2)] += 1
    
    # Output frequent itemsets
    for itemset, count in frequent_itemsets.items():
        if count >= min_support:
            print("Frequent Itemset:", itemset, "Support:", count)
