# Part 1. Get Twitter tweets

1. Dowload MongoDB

In [None]:
try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False


In [None]:
if IN_COLAB:
    !pip install pymongo
    !pip install kafka-python


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pymongo
  Downloading pymongo-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (492 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m492.9/492.9 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.3.0-py3-none-any.whl (283 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m283.7/283.7 kB[0m [31m24.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.3.0 pymongo-4.3.3
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstallin

In [None]:
if IN_COLAB:
    !apt install mongodb
    !service mongodb start


Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following additional packages will be installed:
  libpcap0.8 libyaml-cpp0.6 mongo-tools mongodb-clients mongodb-server
  mongodb-server-core
The following NEW packages will be installed:
  libpcap0.8 libyaml-cpp0.6 mongo-tools mongodb mongodb-clients mongodb-server
  mongodb-server-core
0 upgraded, 7 newly installed, 0 to remove and 34 not upgraded.
Need to get 55.8 MB of archives.
After this operation, 226 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu focal/main amd64 libpcap0.8 amd64 1.9.1-3 [128 kB]
Get:2 http://archive.ubuntu.com/ubuntu focal/main amd64 libyaml-cpp0.6 amd64 0.6.2-4ubuntu1 [124 kB]
Get:3 http://archive.ubuntu.com/ubuntu focal/universe amd64 mongo-tools amd64 3.6.3-0ubuntu1 [12.3 MB]
Get:4 http://archive.ubuntu.com/ubuntu focal-updates/universe amd64 mongodb-clients amd64 1:3.6.9+really3.6.8+90~g8e540c0b6d-0ubuntu5.3 [21.6 MB]
Get:5 

In [None]:
from pymongo import MongoClient
import requests


2. Download ChatGPT-Tweets dataset from Huggingface

In [None]:
dataset_url = ("https://datasets-server.huggingface.co/rows?"
               "dataset=deberain%2FChatGPT-Tweets&"
               "config=deberain--ChatGPT-Tweets&"
               "split=train&"
               "offset={offset}&"
               "limit={limit}"
               )

if IN_COLAB:
    client = MongoClient("mongodb://localhost:27017/")
else:
    client = MongoClient("mongodb://mongodb:27017/")
db = client.get_database("data")


In [None]:
db

Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'data')

In [None]:
current_offset = 0
calls = 0


def download_tweets(limit, offset=None):
    global current_offset, calls
    calls += 1
    if offset is None:
        offset = current_offset

    response = requests.get(dataset_url.format(limit=limit, offset=offset))
    data = [{"id": row['row_idx'], **row['row']}
            for row in response.json()['rows']]
    if len(data) == 0:
        return False

    # Insert into database with upsert to avoid duplicates
    for row in data:
        db.tweets.update_one(
            {"id": row['id']},
            {"$set": row},
            upsert=True
        )
        current_offset += 1

    return True


# Part 2. Stream tweets to Apache Spark

- Install Spark

In [None]:
if IN_COLAB:
    !pip install pyspark
    !apt-get install openjdk-11-jdk-headless
    !wget https://downloads.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
    !tar -xvf spark-3.4.0-bin-hadoop3.tgz
    !pip install findspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=c536703bb07ab6302d6494141b77783b13d3a34d355132bb13cdea3915cf9582
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-11-jdk-headless is already the newest version (11.0.19+7~us1-0ubuntu1~20.

In [None]:
import pyspark
print(pyspark.__version__)


3.4.0


- Get downloaded tweets from MongoDB

In [None]:
import pandas as pd
import json
import requests
from kafka import KafkaProducer, KafkaConsumer


- Download and setup Kafka and Zookeeper instances. The following instances are setup locally:
  - Kafka (Brokers: localhost:9092)
  - Zookeeper (Node: localhost:2181)


In [None]:
if IN_COLAB:
    !curl -sSOL https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
    !tar -xzf kafka_2.12-3.4.0.tgz
    !./kafka_2.12-3.4.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-3.4.0/config/zookeeper.properties
    !./kafka_2.12-3.4.0/bin/kafka-server-start.sh -daemon ./kafka_2.12-3.4.0/config/server.properties
    !echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
    !sleep 10
    !ps -ef | grep kafka

Waiting for 10 secs until kafka and zookeeper services are up and running
root        1527       1 19 16:48 ?        00:00:02 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/content/kafka_2.12-3.4.0/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/content/kafka_2.12-3.4.0/bin/../logs -Dlog4j.configuration=file:./kafka_2.12-3.4.0/bin/../config/log4j.properties -cp /content/kafka_2.12-3.4.0/bin/../libs/activation-1.1.1.jar:/content/kafka_2.12-3.4.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/content/kafka_2.12-3.4.0/bin/../libs/argparse4j-0.7.0.jar:/content/kafka_2.12-3.4.0/bin/../libs/audience-annotations-0.5.0.jar:/content/kafka_2.12-3.4.0/bin/../libs/commons-cli-1.4.jar:/conten

- Create a topic

In [None]:
if IN_COLAB:
    !./kafka_2.12-3.4.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic tweets


Created topic tweets.


In [None]:
if IN_COLAB:
    import os
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 pyspark-shell'
    import findspark
    findspark.init()
    findspark.find()


- Create a producer script to push the data to the Kafka Brokers.

In [None]:
class Producer(KafkaProducer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def produce(self, df, kafka_topic):
        count = 0
        # iterate over the rows of the dataframe
        for _, row in df.iterrows():
            # create a dictionary from the row data
            msg = {
                'id': row['id'],
                'Date': row['Date'],
                'Likes': row['Likes'],
                'Location': row['Location'],
                'Retweets': row['Retweets'],
                'Tweet': row['Tweet'],
                'Url': row['Url'],
                'User': row['User'],
                'UserCreated': row['UserCreated'],
                'UserDescription': row['UserDescription'],
                'UserFollowers': row['UserFollowers'],
                'UserFriends': row['UserFriends'],
                'UserVerified': row['UserVerified']
            }
            # send the data as a message to the Kafka topic
            self.send(kafka_topic, msg)
            # wait for all messages to be sent
            self.flush()
            count += 1

        print("{0} new messages were written into topic: {1}".format(
            count, kafka_topic))

In [None]:
def stream_to_kafka(n):
    global client
    db = client.data
    collection = db.tweets
    df = pd.DataFrame(list(collection.find().sort('_id', -1).limit(n)))
    df.drop(columns=['_id'], axis=1, inplace=True)

    # create a Producer instance
    producer = Producer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda m: json.dumps(m).encode('utf-8'),
        api_version=(0, 10, 1)
    )

    # define the name of the Kafka topic to send the messages to
    kafka_topic = 'tweets'

    # send the dataframe to the Kafka topic
    producer.produce(df, kafka_topic)


- Spark streaming

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# create a Spark session
if IN_COLAB:
    my_spark = (SparkSession
                .builder
                .config("spark.executor.memory", "1g")
                .config("spark.jars", "spark-sql-kafka-0-10_2.12-3.4.0.jar")
                .config("spark.jars", "kafka-clients-0.11.0.0.jar")
                .appName("csc14112")
                .getOrCreate())
else:
    packages = [
        'org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.2',
        'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2',
        'org.apache.kafka:kafka-clients:0.11.0.0'
    ]

    my_spark = (SparkSession
                .builder
                .config("spark.executor.memory", "1g")
                .config("spark.jars.packages", ",".join(packages))
                .appName("csc14112")
                .getOrCreate())


In [None]:
# define the Kafka bootstrap servers and topic to read from
kafka_bootstrap_servers = "localhost:9092"
kafka_topic_name = "tweets"

# read data from the Kafka topic
df = (my_spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
      .option("startingOffsets", "earliest")
      .option("failOnDataLoss", "false")
      .option("subscribe", kafka_topic_name)
      .load())


# Part 3. Perform Sentiment Analysis on Tweets

In [None]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
# select the value column and deserialize it as a string
query = df.withColumn("value", col("value").cast(StringType()))


In [None]:
if IN_COLAB:
    !pip install nltk
import nltk
nltk.download('vader_lexicon')


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


True

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from nltk.sentiment import SentimentIntensityAnalyzer

from pyspark.sql.functions import col, from_json, udf
from pyspark.sql.types import StringType, StructType

# defind schema for JSON
json_schema = (StructType()
               .add("id", StringType())
               .add("Date", StringType())
               .add("Likes", StringType())
               .add("Location", StringType())
               .add("Tweet", StringType())
               .add("Retweets", StringType()))

# A function return the categorical of one tweet
def analyze_sentiment(tweet):
    # Init SentimentIntensityAnalyzer
    analyzer = SentimentIntensityAnalyzer()
    sentiment = analyzer.polarity_scores(tweet)
    if sentiment['compound'] < -0.5:
        return 'negative'
    elif sentiment['compound'] > 0.5:
        return 'positive'
    else:
        return 'neutral'

# Use udf on the function for one tweet to have another function that can apply on the whole column
analyze_sentiment_udf = udf(analyze_sentiment, StringType())


In [None]:
resultDF = None
resultPandasDF = None


def save_to_df(batch_df, epoch_id):
    parsed_df = (batch_df
                 .select(from_json(col("value"), json_schema).alias("data"))
                 .select("data.*"))
    result = parsed_df.select(
        col("Tweet"), col("Date"), col("Likes"), col("Location"), col("Retweets"),
        analyze_sentiment_udf(col("Tweet")).alias("Emotion")
    )
    global resultDF, resultPandasDF

    if resultDF is None:
        resultDF = result
    else:
        resultDF = resultDF.unionByName(result)

    res = resultDF.toPandas().groupby(["Date", "Emotion"]).count().reset_index()
    if 'count' not in res.columns:
        res['count'] = 1
    res.Date = pd.to_datetime(res.Date)
    resultPandasDF = (res
                      .groupby(['Emotion', pd.Grouper(key='Date', freq='10min')])
                      .sum()
                      .reset_index())


In [None]:
def start_stream(batch_df, epoch_id):
    download_tweets(100)
    stream_to_kafka(100)

In [None]:
if IN_COLAB:
  !pip install dash==1.19.0 werkzeug==2.0.0 jupyter_dash==0.3.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting dash==1.19.0
  Downloading dash-1.19.0.tar.gz (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.3/75.3 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting werkzeug==2.0.0
  Downloading Werkzeug-2.0.0-py3-none-any.whl (288 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m288.1/288.1 kB[0m [31m11.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jupyter_dash==0.3.0
  Downloading jupyter_dash-0.3.0-py3-none-any.whl (49 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.0/49.0 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
Collecting flask-compress (from dash==1.19.0)
  Downloading Flask_Compress-1.13-py3-none-any.whl (7.9 kB)
Collecting dash_renderer==1.9.0 (from dash==1.19.0)
  Downloading dash_renderer-1.9.0.tar.gz (1.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━

# Part 4. Visualize the analytics results

In [None]:
import datetime

try:
  import dash
except:
  import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly.express as px
from jupyter_dash import JupyterDash

# Initialize the Dash app
app = JupyterDash(__name__)
app.layout = html.Div(children=[
    html.H1(children='Tweet Sentiment Analysis'),
    html.Div(id='stat'),
    dcc.Graph(id='line-plot'),
    dcc.Interval(id='interval', interval=1000, n_intervals=0)
])


@app.callback(
    [dash.dependencies.Output('line-plot', 'figure'),
        dash.dependencies.Output('stat', 'children')],
    [dash.dependencies.Input('interval', 'n_intervals')]
)
def update_plots(n):
    global resultPandasDF
    res = (resultPandasDF
           if resultPandasDF is not None
           else pd.DataFrame(columns=['Date', 'Emotion', 'count']))

    # Update the plots using the global variable
    line_plot = px.line(res,
                        x="Date", y="count", color="Emotion",
                        title="Count of Tweets Over Time")
    line_plot.update_layout(transition_duration=500)

    return [
        line_plot,
        [
            html.P(f"Total tweets analyzed: {current_offset}"),
            html.P(f"Total hugging face calls: {calls}"),
            html.P(f"Total data points aggregated: {len(res)}"),
            html.P(f"Last updated: {datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=7)))}")
        ]
    ]


In [None]:
# Run the app
current_offset = 0
print("Start streaming...")
app.run_server(host='0.0.0.0', mode='inline', debug=True)
process_query = query.writeStream.foreachBatch(save_to_df)
process_stream = process_query.start()
read_query = query.writeStream.foreachBatch(start_stream)
read_stream = read_query.start()

Start streaming...


<IPython.core.display.Javascript object>

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages wer

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'


100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'


100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets
100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets
100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages wer

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'


100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'


100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'


100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets
100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages wer

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    ret

100 new messages were written into topic: tweets
100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'


100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages wer

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'


100 new messages were written into topic: tweets


ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 114, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 111, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-23-0cba13dca6b8>", line 2, in start_stream
    download_tweets(100)
  File "<ipython-input-7-2e7b36133235>", line 13, in download_tweets
    for row in response.json()['rows']]
KeyError: 'rows'


100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages were written into topic: tweets
100 new messages wer