In [1]:
import pickle

with open("results_components.pkl", 'rb') as file:
    df_final = pickle.load(file)

In [5]:
df_final.sort_values(by='Authors_N', ascending=False).head(60).sample(15, random_state=100)

Unnamed: 0,Publication Venue,Authors,Authors_N
1040,Journal of Computational Physics,"[M Berger, P Collela, M Berger, J Oliger, A Kh...",1884
1045,"Computer Architecture (ISCA), 2016 ACM/IEEE 43...","[J Albericio, P Judd, T Hetherington, T Aamodt...",1087
578,Behaviour & Information Technology,"[J Preece, R William Soukoreff, I Scott Macken...",679
571,Methodological implications of social media as...,"[H Pousti, C Urquhart, F Burstein, H Linger, W...",1327
702,Proceedings of the 42nd International Conferen...,"[V Van Der Veen, Jonathan Ho, Xi Chen, Aravind...",750
2121,IEEE Trans. Wireless Commun,"[H BÃ¶lcskei, R U Nabar, Ã– Oyman, A J Paulraj...",1652
4743,IEEE Trans. Inf. Theory,"[A F Dana, B Hassibi, T Cover, A El Gamal, G K...",2868
776,Computer Science and Information Technology,"[L Yuan, B Yang, S Ma, B Cen, K A Otunaiya, G ...",676
750,Controlling perceptual factors in neural style...,"[L A Gatys, A S Ecker, M Bethge, A Hertzmann, ...",598
1183,Learning to Hash with Binary Deep Neural Network,"[T.-T Do, A.-D Doan, N.-M Cheung, J H Reif, A ...",1017


In [181]:
from kafka import KafkaProducer, KafkaConsumer
import json

# Set up Kafka producer
producer = KafkaProducer(
    bootstrap_servers='127.0.0.1:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Set up Kafka consumer
consumer = KafkaConsumer(
    bootstrap_servers='127.0.0.1:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='bda'
)

In [20]:
data = {}

for record in df_final.to_dict('records'):
    venue = record['Publication Venue']
    authors = record['Authors']
    data[venue] = authors

In [108]:
publications_sample = list(data.keys())[:]
len(df_final.to_dict('records')) - len(set(df_final["Publication Venue"]))

0

# Question 1

In [103]:
# https://stackoverflow.com/questions/28096761/implementing-flajolet-and-martin-s-algorithm-in-python
import mmh3
import math

def flajolet_martin(authors):
    def trailing_zeros(n):
        count = 0
        while (n & 1) == 0:
            n //= 2
            count += 1
        return count

    num_hashes = 4
    max_zeros = [0] * num_hashes

    for author in authors:
        for i in range(num_hashes):
            h = mmh3.hash(author, i)
            max_zeros[i] = max(max_zeros[i], trailing_zeros(h))

    return sum([2 ** m for m in max_zeros]) / num_hashes

In [235]:
import re
import pandas as pd
from tqdm.notebook import tqdm

def clean_topic_name(topic):
    return re.sub('[^A-Za-z0-9._-]', '_', topic)[:240] # Upper limit for topic name in kafka

authors_count = []

for publication in tqdm(publications_sample):
    cleaned_publication = clean_topic_name(publication)
    timeout = 60

    # Consumer Initialize
    consumer.subscribe([cleaned_publication])

    # Producer Send
    authors = data[publication]
    producer.send(cleaned_publication, authors)

    # Consumer receive
    message = consumer.poll(timeout)
    if message is None or len(message) == 0:
        continue
    else:
        for tp, messages in message.items():
            for msg in messages:
                authors = msg.value
                unique_authors_estimate = flajolet_martin(authors)

                info = {}
                info['Publication Venue'] = msg.topic
                info['Unique Authors Count'] = unique_authors_estimate
                authors_count.append(info)
                break

  0%|          | 0/1113 [00:00<?, ?it/s]

# Question 1 Results

In [236]:
unique_authors_df = pd.DataFrame(authors_count)
unique_authors_df

Unnamed: 0,Publication Venue,Unique Authors Count
0,J_Numer_Math,20.4
1,CugLM_Model,8.4
2,Accounting_for_unobserved_confounding_in_domai...,65.6
3,International_Journal_of_Project_Management,84.8
4,Bayesian_recurrent_neural_networks,56.0
...,...,...
9563,International_Journal_on_Document_Analysis_and...,17.6
9564,IEEE_transactions_on_evolutionary_computation,889.6
9565,Math._Model,16.0
9566,Proceedings_of_the_29th_ACM_International_Conf...,13.6


In [237]:
import pickle

with open("unique_authors.pkl", 'wb') as file:
    pickle.dump(unique_authors_df, file, protocol=pickle.HIGHEST_PROTOCOL)

In [115]:
import pickle

with open("unique_authors.pkl", 'rb') as file:
    unique_authors_df = pickle.load(file)

with open("citations_df.pkl", 'rb') as file:
    citations_df = pickle.load(file)

In [226]:
citations = citations_df.to_dict('records')

# Question 2

In [212]:
from collections import deque

class DGIM:
    def __init__(self, window_size):
        self.window_size = window_size
        self.buckets = deque()

    def update(self, value):
        if value == 1:
            self.buckets.appendleft(1)

        # Merge buckets
        count = 0
        temp_buckets = self.buckets.copy()
        for i, b in enumerate(temp_buckets):
            count += b
            if count > self.window_size:
                self.buckets.pop()

            if i < len(temp_buckets) - 1 and temp_buckets[i] == temp_buckets[i + 1]:
                self.buckets[i + 1] = self.buckets[i] + self.buckets[i + 1]
                self.buckets.popleft()

    def query(self):
        return sum(self.buckets)

In [229]:
from collections import defaultdict

# Initialize DGIM for each publication venue
dgim_dict = defaultdict(lambda: DGIM(500))

# Process the last 500 citations
for citation in citations[-500:]:
    venue = citation['Publication Venue']
    cleaned_venue = clean_topic_name(venue)

    # Update DGIM for the current publication venue
    dgim = dgim_dict[cleaned_venue]
    dgim.update(1)

# Calculate cited counts for each publication venue
venues_cited_count = []
for venue, dgim in dgim_dict.items():
    info = {}
    info['Publication Venue'] = venue
    info['Venue Cited Count'] = dgim.query()
    venues_cited_count.append(info)

In [234]:
venues_cited_count_df = pd.DataFrame(venues_cited_count).sort_values(by='Venue Cited Count', ascending=False)
venues_cited_count_df

Unnamed: 0,Publication Venue,Venue Cited Count
195,Quantum_Information_Processing,64
147,International_Conference_on_Learning_Represent...,22
149,Advances_in_Neural_Information_Processing_Systems,22
189,Physical_Review_A,15
203,International_Journal_of_Quantum_Information,10
...,...,...
126,Computational_Complexity__A_Modern_Approach,1
125,Econometrica,1
124,Evolutionary_Computation,1
123,Proc._IEEE_Conf._Systems__Man_and_Cybernetics,1


In [206]:
import pickle

with open("venues_cited_count.pkl", 'wb') as file:
    pickle.dump(venues_cited_count_df, file, protocol=pickle.HIGHEST_PROTOCOL)

#### As a stream

In [None]:
from collections import deque, defaultdict
from kafka import KafkaConsumer
import json

# Initialize a Kafka consumer
consumer = KafkaConsumer(
    'citations',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Initialize data structures
venue_cited_counts = defaultdict(int)
venue_cited_counts_history = []
last_500_citations = deque(maxlen=500)

# Process incoming citations
for message in consumer:
    citation = message.value
    venue = citation['Publication Venue']
    cleaned_venue = clean_topic_name(venue)

    # Add the current citation to the last 500 citations
    last_500_citations.append(cleaned_venue)

    # Update venue_cited_counts
    venue_cited_counts[cleaned_venue] += 1

    # If the deque is full, we need to remove the oldest citation's count
    if len(last_500_citations) == 500:
        oldest_venue = last_500_citations[0]
        venue_cited_counts[oldest_venue] -= 1

    # Append the current venue_cited_counts to the history
    venue_cited_counts_history.append(dict(venue_cited_counts))

In [233]:
import pickle

with open("venues_cited_count.pkl", 'rb') as file:
    venues_cited_count_df = pickle.load(file)