In [1]:
import pulsar
import json
import matplotlib.pyplot as plt
import pandas as pd
from IPython.display import clear_output
import time

In [2]:
client = pulsar.Client('pulsar://localhost:6650')

In [3]:
producer = client.create_producer('request')

2022-06-06 21:34:48.703 INFO  [139787335300928] ConnectionPool:85 | Created connection for pulsar://localhost:6650
2022-06-06 21:34:48.706 INFO  [139786381453056] ClientConnection:356 | [127.0.0.1:52064 -> 127.0.0.1:6650] Connected to broker
2022-06-06 21:34:48.717 INFO  [139786381453056] HandlerBase:54 | [persistent://public/default/request, ] Getting connection from pool
2022-06-06 21:34:48.722 INFO  [139786381453056] ProducerImpl:170 | [persistent://public/default/request, ] Created producer on broker [127.0.0.1:52064 -> 127.0.0.1:6650] 


In [4]:
total_data = 85400

In [5]:
test_stack = []

# Divides the request into batches and sends it to the analysis nodes
def makeRequest(data_size, batch_size=264):
    temp_total = data_size
    start_index = 0
    if not (0 < data_size < 365000):
        raise ValueError("Data Size not available!")
    while data_size > 0:
        if data_size - batch_size < 0:
            batch_size = data_size
        next_index = start_index
        next_index += batch_size
        producer.send((f"{start_index} - {next_index}").encode('utf-8'))
        #test_stack.append(f"{start_index} - {next_index}")
        start_index = next_index + 1
        data_size -= batch_size + 1

In [6]:
def aggregate_dicts(dict_a, dict_b):
    for key in dict_b.keys():
        if key not in dict_a.keys():
            dict_a[key] = int(dict_b[key])
        else:
            dict_a[key] += int(dict_b[key])
    return dict_a

def parse_dict(prog_dict):
    prog_df = pd.DataFrame.from_dict(prog_dict, orient='index', columns=["count"])
    prog_df = prog_df.sort_values("count", ascending=False)
    return prog_df

In [7]:
start_time = time.time()

In [8]:
consumer_one = client.subscribe('response_one', subscription_name='client')
consumer_two = client.subscribe('response_two', subscription_name='client')
consumer_three = client.subscribe('response_three', subscription_name='client')
consumer_four = client.subscribe('response_four', subscription_name='client')

2022-06-06 21:34:49.410 INFO  [139787335300928] Client:88 | Subscribing on Topic :response_one
2022-06-06 21:34:49.416 INFO  [139786381453056] HandlerBase:54 | [persistent://public/default/response_one, client, 0] Getting connection from pool
2022-06-06 21:34:49.422 INFO  [139786381453056] ConsumerImpl:216 | [persistent://public/default/response_one, client, 0] Created consumer on broker [127.0.0.1:52064 -> 127.0.0.1:6650] 
2022-06-06 21:34:49.422 INFO  [139787335300928] Client:88 | Subscribing on Topic :response_two
2022-06-06 21:34:49.424 INFO  [139786381453056] HandlerBase:54 | [persistent://public/default/response_two, client, 1] Getting connection from pool
2022-06-06 21:34:49.426 INFO  [139786381453056] ConsumerImpl:216 | [persistent://public/default/response_two, client, 1] Created consumer on broker [127.0.0.1:52064 -> 127.0.0.1:6650] 
2022-06-06 21:34:49.431 INFO  [139787335300928] Client:88 | Subscribing on Topic :response_three
2022-06-06 21:34:49.433 INFO  [139786381453056]

In [9]:
makeRequest(10000)
# for weak scalability: makeRequest(5000), makeRequest(10000), makeRequest(15000)

In [10]:
prog_dict = {}
test_prog_dict = {}
devop_prog_dict = {}
top_repos = []
total_num = 0
total_test = 0
total_devop_test = 0

In [None]:
while True:
    # merge top ranking programming languages
    msg = consumer_one.receive()
    consumer_one.acknowledge(msg)
    prog_lang = json.loads(msg.data().decode("utf-8"))
    prog_dict = aggregate_dicts(prog_dict, prog_lang)
    
    # List top commits
    msg = consumer_two.receive()
    consumer_two.acknowledge(msg)
    top_repos.extend(json.loads(msg.data().decode("utf-8")))
    top_repos = sorted(top_repos, key=lambda tup: tup[1], reverse=True)
    top_repos = top_repos[:10]

    # Show total test
    msg = consumer_three.receive()
    consumer_three.acknowledge(msg)
    print(msg.data().decode("utf-8"))
    test_prog_lang = json.loads(msg.data().decode("utf-8"))
    test_prog_dict = aggregate_dicts(test_prog_dict, test_prog_lang)
    
    # Show total test + devops
    msg = consumer_four.receive()
    consumer_four.acknowledge(msg)
    devop_prog_lang = json.loads(msg.data().decode("utf-8"))
    devop_prog_dict = aggregate_dicts(devop_prog_dict, devop_prog_lang)
    
    clear_output(wait=True)
    
    print(top_repos[:10])
    prog_dict_df = parse_dict(prog_dict)
    print(prog_dict_df.iloc[:10])
    print(parse_dict(test_prog_dict).iloc[:10])
    print(parse_dict(devop_prog_dict).iloc[:10])
    print("run time: %s seconds" % (time.time() - start_time))

[['https://github.com/fabianishere/udm-kernel', 799619], ['https://github.com/leaningtech/cheerp-compiler', 423988], ['https://github.com/Status-Plus/StatusPlus', 396098], ['https://github.com/GiorgioComitini/COVID-19', 107359], ['https://github.com/vlggms/tegustation', 79729], ['https://github.com/feravolt/Brutal_busybox', 16916], ['https://github.com/pyiron/pyiron_atomistics', 9064], ['https://github.com/mantinedev/mantine', 7282], ['https://github.com/kraflab/dsda-doom', 5929], ['https://github.com/gbif/hp-colombian-biodiversity', 5445]]
                  count
Python             2005
JavaScript         1404
TypeScript          764
Java                524
C++                 503
C#                  484
Jupyter Notebook    466
HTML                424
Go                  403
C                   330
                  count
Python              363
TypeScript          200
JavaScript          183
Dart                113
Go                  112
PHP                  97
C++                  

In [None]:
client.close()