### Import LIbs

In [166]:
import time
import requests
# ! pip install kafka-python
from kafka import KafkaProducer
from json import dumps
import json
import datetime

### First task

Return the statistics with the number of newly created events per each country for the last 6 full hours, excluding the last hour. The report should be in a format : [{country1: number1}, {country2: number2},..]. 

Example of the response: 
{“time_start”: “15:00”,
“time_end”: “21:00”,
“statistics”: [{“US”: 1543}, {“France” : 899}, {“Germany” : 923}, ...]}.


In [9]:
# ! pip install datapackage

In [12]:
from datapackage import Package

package = Package('https://datahub.io/core/country-list/datapackage.json')

# print list of all resources:
print(package.resource_names)

# print processed tabular data (if exists any)
for resource in package.resources:
    if resource.descriptor['datahub']['type'] == 'derived/csv':
        countries_abreviation = resource.read()

['validation_report', 'data_csv', 'data_json', 'country-list_zip', 'data']


In [20]:
dict_countries_abreviation = {}
for country, short in countries_abreviation:
    dict_countries_abreviation[short.lower()] = country
dict_countries_abreviation['gb'] = 'Great Britain'

In [21]:
dict_countries_abreviation['gb'], dict_countries_abreviation['fr']

('Great Britain', 'France')

In [23]:
from collections import Counter

In [28]:
res_l = []
for k, v in dict(Counter([1, 2, 2, 3])).items():
    res_l.append({k: v})
res_l

[{1: 1}, {2: 2}, {3: 1}]

In [97]:
def format_date(date):
    min_ = ('0' + str(date.minute)) if (len(str(date.minute)) < 2) else str(date.minute)
    return str(date.hour) + ':' + min_

In [98]:
print(format_date(datetime.datetime.now()))

14:19


### Task 1
To run task 1, fill the config with the proper time for which you want a statistics and with the name of JSON file

In [99]:
config = {
    'minutes': 1,
    'output': 'task1.json',
}

In [116]:
meetup_url = "http://stream.meetup.com/2/rsvps"
kafka_topic_name = "task1"
kafka_bootstrap_servers = "localhost:9092"

def task1():
    start = time.time()
    time_start = datetime.datetime.now()
    delay = 6 * config['minutes']

    try:
        stream_api_response = requests.get(meetup_url, stream=True)
        if stream_api_response.status_code == 200:
            task1_res = []
            for i, api_response_message in enumerate(stream_api_response.iter_lines()):
                print('Response number: ', i+1, end=', ')
                api_response_message = json.loads(api_response_message)
                if api_response_message['group'].get('group_country', None):
                    task1_res.append(\
                                     dict_countries_abreviation[api_response_message['group']['group_country']])

                if (time.time() - start) >= delay:
                    break

            task1_res_l = []
            for k, v in dict(Counter(task1_res)).items():
                task1_res_l.append({k: v})
            dict_res = {
                'time_start': format_date(time_start),
                'time_end': format_date(time_start + datetime.timedelta(minutes=config['minutes'])),
                'statistics': task1_res_l
            }
#             print(dict_res)
            f = open(config['output'], 'w+')
            json.dump(dict_res, f, indent=2)
            f.close()
        
    except Exception as ex:
        print('NO CONNECTION')
    return dict_res
        
task1()

Response number:  1, Response number:  2, Response number:  3, Response number:  4, Response number:  5, Response number:  6, Response number:  7, {'time_start': '15:12', 'time_end': '15:13', 'statistics': [{'Australia': 2}, {'Germany': 1}, {'Argentina': 1}, {'Netherlands': 1}, {'United States': 1}, {'Japan': 1}]}


{'time_start': '15:12',
 'time_end': '15:13',
 'statistics': [{'Australia': 2},
  {'Germany': 1},
  {'Argentina': 1},
  {'Netherlands': 1},
  {'United States': 1},
  {'Japan': 1}]}

### Second task
Return the statistics containing the information about which groups posted the events at each US state in the last 3 full hours, excluding the last hour. The names of the states should be full, not only the state code.

Example of the response: 
{“time_start”: “15:00”,
“time_end”: “18:00”,
“statistics”: [{“California” : [“Happy Programmers Group”, “Emmy’s Bookclub”]}, {“Nevada”: [“Las Vegas Warriors”, “Desert Bikers”]}, ...]}


In [105]:
states_constants = {
    "AL": "Alabama",
    "AK": "Alaska",
    "AS": "American Samoa",
    "AZ": "Arizona",
    "AR": "Arkansas",
    "CA": "California",
    "CO": "Colorado",
    "CT": "Connecticut",
    "DE": "Delaware",
    "DC": "District Of Columbia",
    "FM": "Federated States Of Micronesia",
    "FL": "Florida",
    "GA": "Georgia",
    "GU": "Guam",
    "HI": "Hawaii",
    "ID": "Idaho",
    "IL": "Illinois",
    "IN": "Indiana",
    "IA": "Iowa",
    "KS": "Kansas",
    "KY": "Kentucky",
    "LA": "Louisiana",
    "ME": "Maine",
    "MH": "Marshall Islands",
    "MD": "Maryland",
    "MA": "Massachusetts",
    "MI": "Michigan",
    "MN": "Minnesota",
    "MS": "Mississippi",
    "MO": "Missouri",
    "MT": "Montana",
    "NE": "Nebraska",
    "NV": "Nevada",
    "NH": "New Hampshire",
    "NJ": "New Jersey",
    "NM": "New Mexico",
    "NY": "New York",
    "NC": "North Carolina",
    "ND": "North Dakota",
    "MP": "Northern Mariana Islands",
    "OH": "Ohio",
    "OK": "Oklahoma",
    "OR": "Oregon",
    "PW": "Palau",
    "PA": "Pennsylvania",
    "PR": "Puerto Rico",
    "RI": "Rhode Island",
    "SC": "South Carolina",
    "SD": "South Dakota",
    "TN": "Tennessee",
    "TX": "Texas",
    "UT": "Utah",
    "VT": "Vermont",
    "VI": "Virgin Islands",
    "VA": "Virginia",
    "WA": "Washington",
    "WV": "West Virginia",
    "WI": "Wisconsin",
    "WY": "Wyoming"
}

In [103]:
config = {
    'minutes': 1,
    'output': 'task2.json',
}

In [109]:
meetup_url = "http://stream.meetup.com/2/rsvps"
kafka_topic_name = "task2"

start = time.time()
time_start = datetime.datetime.now()
delay = 60 * config['minutes']

try:
    stream_api_response = requests.get(meetup_url, stream=True)
    if stream_api_response.status_code == 200:
        countries_dict = {}
        for i, api_response_message in enumerate(stream_api_response.iter_lines()):
            print('Response number: ', i+1, end=', ')
            api_response_message = json.loads(api_response_message)
            if api_response_message['group']['group_country'] == 'us' and api_response_message['group'].get('group_state', None):
                state_name = states_constants[api_response_message['group']['group_state']]
                if countries_dict.get(state_name, None) is None:
                    countries_dict[state_name] = []
                countries_dict[state_name] += [d['topic_name'] for d in api_response_message['group']['group_topics']]
            
            if (time.time() - start) >= delay:
                break
        
        task1_res_l = []
        for k, v in dict(Counter(countries_dict)).items():
            task1_res_l.append({k: list(set(v))})
        dict_res = {
            'time_start': format_date(time_start),
            'time_end': format_date(time_start + datetime.timedelta(minutes=config['minutes'])),
            'statistics': task1_res_l
        }
#         print(dict_res)
        f = open(config['output'], 'w+')
        json.dump(dict_res, f, indent=2)
        f.close()

except Exception as ex:
    print('NO CONNECTION')


Response number:  1, Response number:  2, Response number:  3, Response number:  4, Response number:  5, Response number:  6, Response number:  7, Response number:  8, Response number:  9, Response number:  10, Response number:  11, Response number:  12, Response number:  13, Response number:  14, Response number:  15, Response number:  16, Response number:  17, Response number:  18, Response number:  19, Response number:  20, Response number:  21, Response number:  22, Response number:  23, Response number:  24, Response number:  25, Response number:  26, Response number:  27, Response number:  28, Response number:  29, Response number:  30, Response number:  31, Response number:  32, Response number:  33, Response number:  34, Response number:  35, Response number:  36, Response number:  37, Response number:  38, Response number:  39, Response number:  40, Response number:  41, Response number:  42, Response number:  43, Response number:  44, Response number:  45, Response number:  4

### Third task
The most popular topic of the events for each country posted in the last 6 hours, excluding the last hour. The popularity is calculated based on the number of occurrences the topic has amongst all the topics in all the events created in that country during the specified period. 

Example of the response: 
{“time_start”: “15:00”,
“time_end”: “21:00”,
“statistics”: [{“France” : {“Baking croissants”: 88}}, {“Germany”: {“Brewing beer”: 71}, ...]}

In [113]:
config = {
    'minutes': 1,
    'output': 'task3.json',
}

In [114]:
import operator

meetup_url = "http://stream.meetup.com/2/rsvps"
kafka_topic_name = "task3"

start = time.time()
time_start = datetime.datetime.now()
delay = 60 * config['minutes']

try:
    stream_api_response = requests.get(meetup_url, stream=True)
    if stream_api_response.status_code == 200:
        countries_dict = {}
        for i, api_response_message in enumerate(stream_api_response.iter_lines()):
            print('Response number: ', i+1, end=', ')
            api_response_message = json.loads(api_response_message)
            if api_response_message['group']['group_country'] == 'us' and \
            api_response_message['group'].get('group_state', None):
                state_name = states_constants[api_response_message['group']['group_state']]
                if countries_dict.get(state_name, None) is None:
                    countries_dict[state_name] = []
                countries_dict[state_name] += [d['topic_name'] \
                                               for d in api_response_message['group']['group_topics']]
            
            if (time.time() - start) >= delay:
                break
        
        task1_res_l = []
        for k, v in dict(Counter(countries_dict)).items():
            v = dict(Counter(v))
            max_topic_value = max(v.items(), key=operator.itemgetter(1))
            v = {max_topic_value[0]: max_topic_value[1]}
            task1_res_l.append({k: v})
        dict_res = {
            'time_start': format_date(time_start),
            'time_end': format_date(time_start + datetime.timedelta(minutes=config['minutes'])),
            'statistics': task1_res_l
        }

        f = open(config['output'], 'w+')
        json.dump(dict_res, f, indent=2)
        f.close()

except Exception as ex:
    print('NO CONNECTION')


Response number:  1, Response number:  2, Response number:  3, Response number:  4, Response number:  5, Response number:  6, Response number:  7, Response number:  8, Response number:  9, Response number:  10, Response number:  11, Response number:  12, Response number:  13, Response number:  14, Response number:  15, Response number:  16, Response number:  17, Response number:  18, Response number:  19, Response number:  20, Response number:  21, Response number:  22, Response number:  23, Response number:  24, Response number:  25, Response number:  26, Response number:  27, Response number:  28, Response number:  29, Response number:  30, Response number:  31, Response number:  32, Response number:  33, Response number:  34, Response number:  35, Response number:  36, Response number:  37, Response number:  38, Response number:  39, Response number:  40, Response number:  41, Response number:  42, Response number:  43, Response number:  44, Response number:  45, Response number:  4

### Ad-hoc Query 1
Return the list of all the countries for which the events were created.

In [131]:
config = {
    'output': 'ad_hoc_task1.json'
}

In [132]:
def ad_hoc_task1():
    result1 = task1()
# Example of result1 data:
#     result1 = {'time_start': '15:12', 'time_end': '15:13', 'statistics': \
#                [{'Australia': 2}, {'Germany': 1}, {'Argentina': 1}, {'Netherlands': 1}, \
#                 {'United States': 1}, {'Japan': 1}]}
    return list(map(lambda el: list(el.keys())[0], result1['statistics']))

ad_hoc_res = ad_hoc_task1()
ad_hoc_res

['Australia', 'Germany', 'Argentina', 'Netherlands', 'United States', 'Japan']

In [133]:
f = open(config['output'], 'w+')
json.dump(ad_hoc_res, f, indent=2)
f.close()

### Ad-hoc Query 2
Return the list of the cities for the specified country where at least one event was created.

In [151]:
config = {
    'country': 'Great Britain',
    'minutes': 1,
    'output': 'ad_hoc_task2.json'
}

In [153]:
meetup_url = "http://stream.meetup.com/2/rsvps"
kafka_topic_name = "ad_hoc_task2"
# kafka_bootstrap_servers = "localhost:9092"


start = time.time()
time_start = datetime.datetime.now()
delay = 60 * config['minutes']

try:
    stream_api_response = requests.get(meetup_url, stream=True)
    if stream_api_response.status_code == 200:
        cities_l = []
        for api_response_message in stream_api_response.iter_lines():
            api_response_message = json.loads(api_response_message)
            if api_response_message['group'].get('group_country', None):
                if dict_countries_abreviation[api_response_message['group']['group_country']] == config['country']:
                    cities_l.append(api_response_message['group']['group_city'])
#                     time.sleep(1)
            if (time.time() - start) >= delay:
                break
        cities_l = list(set(cities_l))
            

except Exception as ex:
    print('NO CONNECTION')

print("AFTER WHILE")


AFTER WHILE


In [154]:
f = open(config['output'], 'w+')
json.dump(cities_l, f, indent=2)
f.close()

### Ad-hoc Query 3
Given the event id, return the following details:
- event name
- event time
- the list of the topics
- the group name
- the city and the country of the event

In [167]:
config = {
    'event_id': '279038557',
    'minutes': 10,
    'output': 'ad_hoc_task3.json'
}

In [248]:
meetup_url = "http://stream.meetup.com/2/rsvps"
kafka_topic_name = "ad_hoc_task3"
# kafka_bootstrap_servers = "localhost:9092"

start = time.time()
time_start = datetime.datetime.now()
delay = 60 * config['minutes']
res_d = {}

try:
    print('We are here...')
    stream_api_response = requests.get(meetup_url, stream=True)

    if stream_api_response.status_code == 200:
        for api_response_message in stream_api_response.iter_lines():
            api_response_message = json.loads(api_response_message)
#             event_id = config["event_id"]
            event_id = api_response_message["event"]["event_id"]
            
            if api_response_message["event"]['event_id'] == event_id:
                res_d['event_id'] = api_response_message['event']['event_id'],
                res_d['event_name'] = (api_response_message['event']['event_name']),
                time_ = api_response_message["event"]["time"]
                res_d['event_time'] = time.ctime(time_)
                res_d['topics'] = [el['topic_name'] for el in api_response_message['group']['group_topics']],
                res_d['group_name'] = api_response_message["group"]["group_name"],
                res_d['city'] = (api_response_message["group"]["group_city"]),
                res_d['country'] = dict_countries_abreviation[api_response_message["group"]["group_country"]]
                print(res_d)
                break

            if (time.time() - start) >= delay:
                break
            
except Exception as ex:
    print()
#     print('NO CONNECTION')

print("AFTER WHILE")


We are here...
{'event_id': ('shvzwrycckbrb',), 'event_name': ('Analyzing the unanswered questions of the universe. ',), 'event_time': 'Fri Oct 10 11:00:00 53502', 'topics': (['Catholic', 'Catholic Friends', 'Catholic Bible Study', 'Catholic Fellowship', 'Catholic Social Networking', 'Catholic Social', 'Bible', 'Catholic Spirtuality', 'Bible Study', 'The Bible Timeline Study Group', 'Catholic Bible Study Group'],), 'group_name': ('Northern Virginia Catholic Bible Study Meetup',), 'city': ('Annandale',), 'country': 'United States'}
AFTER WHILE


In [238]:
f = open(config['output'], 'w+')
json.dump(res_d, f, indent=2)
f.close()

In [239]:
res_d

{'event_id': ('plsqhsyccjbgc',),
 'event_name': ("Let's go out for a drink or two !! - We are back!",),
 'event_time': 'Fri Jan 25 10:00:00 53450',
 'topics': (['Fun Times',
   'Social Networking',
   'Nightlife',
   'New In Town',
   'International and Exchange Students',
   'Expat',
   'International Professionals',
   'Expat Foreigner',
   'Cultural Activities',
   'Foreign',
   'International Friends',
   'Singles',
   'Fun and Laughter',
   'Culture Exchange',
   'Language & Culture'],),
 'group_name': ('Antwerp Expats',),
 'city': ('Antwerpen',),
 'country': 'Belgium'}

### Ad-hoc Query 4
Return the list of the groups which have created events in the specified city. It should contain the following details:
- City name
- Group name
- Group id

In [244]:
config = {
    'city': 'London',
    'minutes': 1,
    'output': 'ad_hoc_task4.json'
}

In [245]:
meetup_url = "http://stream.meetup.com/2/rsvps"
kafka_topic_name = "ad_hoc_task3"
# kafka_bootstrap_servers = "localhost:9092"

start = time.time()
time_start = datetime.datetime.now()
delay = 60 * config['minutes']
res_l = []

try:
    print('We are here...')
    stream_api_response = requests.get(meetup_url, stream=True)

    if stream_api_response.status_code == 200:
        for api_response_message in stream_api_response.iter_lines():
            api_response_message = json.loads(api_response_message)
            group_city = api_response_message["group"]["group_city"]
            
            if api_response_message["group"]['group_city'] == group_city:
                res_d = {}
                res_d['city'] = api_response_message["group"]['group_city'],    
                res_d['group_name'] = api_response_message["group"]["group_name"],
                res_d['group_id'] = (api_response_message["group"]["group_id"]),
                res_l.append(res_d)

            if (time.time() - start) >= delay:
                break
            
except Exception as ex:
    print()
#     print('NO CONNECTION')

print("AFTER WHILE")


We are here...
AFTER WHILE


In [246]:
f = open(config['output'], 'w+')
json.dump(res_l, f, indent=2)
f.close()

### Ad-hoc Query 5
Return all the events that were created by the specified group (group id will be the input parameter). Each event in the list should have the format as in the API #3.


In [250]:
config = {
    'group_id': '31892462',
    'minutes': 1,
    'output': 'ad_hoc_task5.json'
}

In [253]:
meetup_url = "http://stream.meetup.com/2/rsvps"
kafka_topic_name = "ad_hoc_task3"
# kafka_bootstrap_servers = "localhost:9092"

start = time.time()
time_start = datetime.datetime.now()
delay = 60 * config['minutes']
res_d = {}

try:
    print('We are here...')
    stream_api_response = requests.get(meetup_url, stream=True)
    res_l = []
    if stream_api_response.status_code == 200:
        i = 0
        for api_response_message in stream_api_response.iter_lines():
            api_response_message = json.loads(api_response_message)
#             group_id = config['group_id']
            if i ==0:
                group_id = api_response_message["group"]["group_id"]
            
            if api_response_message["group"]["group_id"] == group_id:
                res_d = {}
                res_d['event_id'] = api_response_message['event']['event_id'],
                res_d['event_name'] = (api_response_message['event']['event_name']),
                time_ = api_response_message["event"]["time"]
                res_d['event_time'] = time.ctime(time_)
                res_d['topics'] = [el['topic_name'] for el in api_response_message['group']['group_topics']],
                res_d['group_name'] = api_response_message["group"]["group_name"],
                res_d['city'] = (api_response_message["group"]["group_city"]),
                res_d['country'] = dict_countries_abreviation[api_response_message["group"]["group_country"]]
                res_l.append(res_d)
                i += 1
            if (time.time() - start) >= delay:
                break
            
except Exception as ex:
    print()
#     print('NO CONNECTION')

print("AFTER WHILE")


We are here...
AFTER WHILE


In [254]:
f = open(config['output'], 'w+')
json.dump(res_l, f, indent=2)
f.close()