## Download and Setup Kafka

In [None]:
!curl -sSOL https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
!tar -xzf kafka_2.13-2.7.0.tgz

In [None]:
!./kafka_2.13-2.7.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.0/config/zookeeper.properties
!./kafka_2.13-2.7.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

Waiting for 10 secs until kafka and zookeeper services are up and running


In [None]:
!ps -ef | grep kafka

root         447       1 16 05:21 ?        00:00:02 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/content/kafka_2.13-2.7.0/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/content/kafka_2.13-2.7.0/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-2.7.0/bin/../config/log4j.properties -cp /content/kafka_2.13-2.7.0/bin/../libs/activation-1.1.1.jar:/content/kafka_2.13-2.7.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/content/kafka_2.13-2.7.0/bin/../libs/argparse4j-0.7.0.jar:/content/kafka_2.13-2.7.0/bin/../libs/audience-annotations-0.5.0.jar:/content/kafka_2.13-2.7.0/bin/../libs/commons-cli-1.4.jar:/content/kafka_2.13-2.7.0/bin/../libs/commons-lang3-3.8.1.jar:/content/kafka_2.13

In [None]:
!./kafka_2.13-2.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic meetup-rsvp

Created topic meetup-rsvp.


## Download and Setup Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirrors.estointernet.in/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf /content/spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
%env JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
%env SPARK_HOME=/content/spark-2.4.7-bin-hadoop2.7

env: JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
env: SPARK_HOME=/content/spark-2.4.7-bin-hadoop2.7


## Setup Python Packages

In [None]:
!pip install websocket-client
!pip install kafka-python
!pip install PyMySQL

Collecting websocket-client
[?25l  Downloading https://files.pythonhosted.org/packages/08/33/80e0d4f60e84a1ddd9a03f340be1065a2a363c47ce65c4bd3bae65ce9631/websocket_client-0.58.0-py2.py3-none-any.whl (61kB)
[K     |█████▍                          | 10kB 14.3MB/s eta 0:00:01[K     |██████████▊                     | 20kB 11.7MB/s eta 0:00:01[K     |████████████████                | 30kB 8.9MB/s eta 0:00:01[K     |█████████████████████▍          | 40kB 7.6MB/s eta 0:00:01[K     |██████████████████████████▊     | 51kB 4.5MB/s eta 0:00:01[K     |████████████████████████████████| 61kB 2.7MB/s 
Installing collected packages: websocket-client
Successfully installed websocket-client-0.58.0
Collecting kafka-python
[?25l  Downloading https://files.pythonhosted.org/packages/75/68/dcb0db055309f680ab2931a3eeb22d865604b638acf8c914bedf4c1a0c8c/kafka_python-2.0.2-py2.py3-none-any.whl (246kB)
[K     |████████████████████████████████| 256kB 4.3MB/s 
[?25hInstalling collected packages: kafk

## Setup MySQL Server

In [None]:
config = {
  'user': 'admin',
  'password': '************',
  'host': '************.rds.amazonaws.com',
  'database': 'BigData'
}

In [None]:
import pymysql
conn = pymysql.connect(**config)

In [None]:
with conn.cursor() as cur:
  cur.execute("DROP DATABASE BigData")
  cur.execute("CREATE DATABASE BigData")
  cur.execute("USE BigData")

## Run Kafka Producer

In [None]:
import json
import websocket
import threading
from kafka import KafkaProducer

In [None]:
KAFKA_PRODUCER = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('utf-8'),
                               bootstrap_servers=['localhost:9092'])

In [None]:
def on_message(wsapp, message):
    KAFKA_PRODUCER.send('meetup-rsvp', message)
    KAFKA_PRODUCER.flush()

def on_error(wsapp, error):
    print(error)

def on_close(wsapp):
    print("Connection Closed")

In [None]:
def main():
  websocket.enableTrace(True)
  print("Connection Established")
  wsapp = websocket.WebSocketApp("wss://stream.meetup.com/2/rsvps", 
                                  on_message=on_message, 
                                  on_error=on_error,
                                  on_close=on_close)
  wsapp.run_forever()

In [None]:
threading.Thread(target=main).start()

Connection Established


## Run Spark Streaming

In [None]:
import json
import findspark

findspark.add_packages(['org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0'])
findspark.init()

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [None]:
sc = SparkContext(appName='App')
ssc = StreamingContext(sc, 2)

In [None]:
brokers, topic = 'localhost:9092', 'meetup-rsvp'
raw_data = KafkaUtils.createDirectStream(ssc, [topic], {'metadata.broker.list': brokers})

In [None]:
raw_data.pprint()
ssc.start()
ssc.awaitTermination(timeout=5)
ssc.stop()
sc.stop()

-------------------------------------------
Time: 2021-04-21 05:34:02
-------------------------------------------
(None, '"{\\"venue\\":{\\"venue_name\\":\\"Tank Stream Labs\\",\\"lon\\":115.85466,\\"lat\\":-31.956133,\\"venue_id\\":25746863},\\"visibility\\":\\"public\\",\\"response\\":\\"yes\\",\\"guests\\":0,\\"member\\":{\\"member_id\\":207032668,\\"member_name\\":\\"James\\"},\\"rsvp_id\\":1870880994,\\"mtime\\":1618983231000,\\"event\\":{\\"event_name\\":\\"Sarada Lee presents \\\\\\"From Counting Beans to Counting Cells\\\\\\" [Beginner Friendly]\\",\\"event_id\\":\\"277270711\\",\\"time\\":1620725400000,\\"event_url\\":\\"https:\\\\/\\\\/www.meetup.com\\\\/Perth-Machine-Learning-Group\\\\/events\\\\/277270711\\\\/\\"},\\"group\\":{\\"group_topics\\":[{\\"urlkey\\":\\"data-analytics\\",\\"topic_name\\":\\"Data Analytics\\"},{\\"urlkey\\":\\"ai\\",\\"topic_name\\":\\"Artificial Intelligence\\"},{\\"urlkey\\":\\"predictive-analytics\\",\\"topic_name\\":\\"Predictive Analytics\\"},

In [None]:
raw_data = raw_data.map(lambda x: json.loads(x[1])).map(lambda x: json.loads(x))
stream_data = raw_data.flatMap(lambda x: x.items())

In [None]:
raw_data.pprint()
ssc.start()
ssc.awaitTermination(timeout=5)
ssc.stop()
sc.stop()

-------------------------------------------
Time: 2021-04-21 05:35:32
-------------------------------------------
{'venue': {'venue_name': 'Southern Cross Garden Bar & Restaurant', 'lon': 174.775421, 'lat': -41.293785, 'venue_id': 11135672}, 'visibility': 'public', 'response': 'yes', 'guests': 0, 'member': {'member_id': 127254612, 'photo': 'https://secure.meetupstatic.com/photos/member/8/5/d/f/thumb_303694271.jpeg', 'member_name': 'Sharon'}, 'rsvp_id': 1870881058, 'mtime': 1618983330661, 'event': {'event_name': 'Quebecois Short Film Night - Southern Cross', 'event_id': '277680406', 'time': 1622010600000, 'event_url': 'https://www.meetup.com/Wellington-Independent-Film-Meetup/events/277680406/'}, 'group': {'group_topics': [{'urlkey': 'drinking', 'topic_name': 'Drinking'}, {'urlkey': 'movie-nights', 'topic_name': 'Movie Nights'}, {'urlkey': 'beer', 'topic_name': 'Beer'}, {'urlkey': 'indiefilm', 'topic_name': 'Indie Films'}, {'urlkey': 'movies', 'topic_name': 'Watching Movies'}, {'urlkey'

In [None]:
stream_data.pprint()
ssc.start()
ssc.awaitTermination(timeout=5)
ssc.stop()
sc.stop()

-------------------------------------------
Time: 2021-04-21 05:38:08
-------------------------------------------
('venue', {'venue_name': 'Online event', 'lon': 179.1962, 'lat': -8.521147, 'venue_id': 26906060})
('visibility', 'public')
('response', 'yes')
('guests', 0)
('member', {'member_id': 187981047, 'other_services': {'facebook': {'identifier': '10153423426807022'}}, 'photo': 'https://secure.meetupstatic.com/photos/member/1/1/9/5/thumb_246724501.jpeg', 'member_name': 'Özgür Karadeniz'})
('rsvp_id', 1870881161)
('mtime', 1618983484285)
('event', {'event_name': 'Tour de Tools: Veezoo with JP Monteiro', 'event_id': '277683522', 'time': 1621501200000, 'event_url': 'https://www.meetup.com/rootlabs-x/events/277683522/'})
('group', {'group_topics': [{'urlkey': 'data-analytics', 'topic_name': 'Data Analytics'}, {'urlkey': 'softwaredev', 'topic_name': 'Software Development'}, {'urlkey': 'data-science', 'topic_name': 'Data Science'}, {'urlkey': 'courses-and-workshops', 'topic_name': 'Cour

## Group By Country

### MySQL Functions

In [None]:
with conn.cursor() as cur:
  cur.execute("CREATE TABLE countries(name VARCHAR(50), alpha CHAR(2), region VARCHAR(15), count INT)")

In [None]:
def create_mysql(df):
  cols = "`,`".join([str(i) for i in df.columns.tolist()])
  with conn.cursor() as cur:
    for i,row in df.iterrows():
        sql = "INSERT INTO `countries` (`" +cols + "`) VALUES (" + "%s,"*(len(row)-1) + "%s)"
        cur.execute(sql, tuple(row))
        conn.commit()

In [None]:
def update_mysql(alpha, count):
  with conn.cursor() as cur:
    sql = f"UPDATE countries SET count = {count} WHERE alpha = '{alpha}'"
    cur.execute(sql)
    conn.commit()

### Country Data

In [None]:
!wget https://raw.githubusercontent.com/lukes/ISO-3166-Countries-with-Regional-Codes/master/all/all.csv

--2021-04-21 05:53:48--  https://raw.githubusercontent.com/lukes/ISO-3166-Countries-with-Regional-Codes/master/all/all.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20759 (20K) [text/plain]
Saving to: ‘all.csv’


2021-04-21 05:53:48 (14.7 MB/s) - ‘all.csv’ saved [20759/20759]



In [None]:
import pandas as pd

In [None]:
countries = pd.read_csv('/content/all.csv')
countries = countries[['name', 'alpha-2', 'region']]
countries = countries.dropna()
countries['alpha-2'] = countries['alpha-2'].str.lower()
countries.loc[countries['alpha-2'] == 'gb', ['name']] = 'Britain'
countries = countries.rename(columns={'alpha-2': 'alpha'})
countries['count'] = 0

In [None]:
countries.head()

Unnamed: 0,name,alpha,region,count
0,Afghanistan,af,Asia,0
1,Åland Islands,ax,Europe,0
2,Albania,al,Europe,0
3,Algeria,dz,Africa,0
4,American Samoa,as,Oceania,0


In [None]:
create_mysql(countries)

### Spark

In [None]:
def aggregate(_, x):
  x = x.collect()
  for i in x:
    countries.loc[countries['alpha'] == i[0], ['count']] += i[1]
    update_mysql(i[0], int(countries.loc[countries['alpha'] == i[0], ['count']].values))

In [None]:
group_country = stream_data.filter(lambda x: x[0] == 'group')
group_country = group_country.map(lambda x: x[1]['group_country'])
group_country = group_country.map(lambda x: (x, 1))
group_country = group_country.reduceByKey(lambda a, b: a+b).foreachRDD(aggregate)

In [None]:
# group_country.pprint()
ssc.start()
ssc.awaitTermination(timeout=50)
ssc.stop()
sc.stop()

## Save Data

In [None]:
save_data = raw_data.map(lambda x: (x['member']['member_id'], x['event']['event_name'], [x['group']['group_topics'][i]['urlkey'] for i in range(len(x['group']['group_topics']))]))

In [None]:
save_data.pprint()
ssc.start()
ssc.awaitTermination(timeout=5)
ssc.stop()
sc.stop()

-------------------------------------------
Time: 2021-04-21 03:06:00
-------------------------------------------
(209210628, "NY Women's Coming Out Meetup", ['Lesbian', 'lesbianmoms', 'lgbtfriends', 'women', 'gay-and-lesbian-friends', 'lgbt-social-group', 'lesbian-friends', 'bisexual-women', 'gay-friends', 'bisexual-friends', 'bisexual-support', 'lesbian-social-networking', 'lesbians-over-40', 'lgbtq'])

-------------------------------------------
Time: 2021-04-21 03:06:02
-------------------------------------------
(327749715, 'Online ESL Conversation Class', ['esl', 'communication-esl-immersion', 'culture-exchange', 'esl-meet-up', 'language-exchange', 'english-language', 'language', 'esl-practice', 'esl-teachers', 'esl-friends', 'esl-program', 'esl-toronto'])
(329385059, 'Explore: Outdoor BYOB(runch) ', ['bookclub', 'social', 'tabletop-role-playing-and-board-games', 'outdoors', 'newintown', 'exploring-new-restaurants', 'singles', 'hiking', 'yoga', 'indoor-and-outdoor-rock-climbing',

In [None]:
ssc.stop()
sc.stop()