In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, split

if __name__ == "__main__":

    # create Spark session
    spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()

    # read the tweet data from socket
    tweet_df = spark \
        .readStream \
        .format("socket") \
        .option("host", "127.0.0.1") \
        .option("port", 3354) \
        .load()

    # type cast the column value
    tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")


    # split words based on space, filter out hashtag values and group them up
    tweets_tab = tweet_df_string.withColumn('word', explode(split(col('value'), ' '))) \
        .groupBy('word') \
        .count() \
        .sort('count', ascending=False). \
        filter(col('word').contains('#'))

    # write the above data into memory. consider the entire analysis in all iteration (output mode = complete). and let the trigger runs in every 2 secs.
    writeTweet = tweets_tab.writeStream. \
            outputMode("update"). \
            format("console"). \
            trigger(processingTime='2 seconds'). \
            start(). \
            awaitTermination()
         




In [102]:
spark.stop()

22/04/23 22:53:50 ERROR TaskSchedulerImpl: Exception in statusUpdate + 8) / 200]
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$3@3cbca829 rejected from java.util.concurrent.ThreadPoolExecutor@6077b242[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2534]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:61)
	at org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:815)
	at org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:791)
	at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.sc

In [99]:
spark.sql("select *  from tweetquery limit 20").show()



+---------------------+-----+
|                 word|count|
+---------------------+-----+
| #cancelboardexams...|    9|
|              #corona|    4|
|                #MLB.|    3|
|          #Hit3000MC,|    3|
|               #covid|    3|
|         #coronavirus|    3|
|             #covid19|    3|
|              #Corona|    3|
|    #CoronavirusCases|    2|
| #ModijiSaveBoardS...|    2|
|     #TakeOnlineExams|    1|
|             #covidRT|    1|
|              #Cancel|    1|
|                #love|    1|
| Pal…#SaturdayMoti...|    1|
| #Corona-Infektion...|    1|
|            #Vene…wir|    1|
|#재난문자@EmeterioMar|    1|
|              #Jaipur|    1|
|                #P…RT|    1|
+---------------------+-----+





In [120]:
from influxdb import InfluxDBClient

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----+
|                word|count|
+--------------------+-----+
|#Patwari_Normalis...|    1|
|        #Liberazione|    1|
|           #25Aprile|    1|
|      #Maskenpflicht|    1|
|              #Öffis|    1|
+--------------------+-----+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-----+
|                word|count|
+--------------------+-----+
|#Patwari_Normalis...|    1|
|        #Liberazione|    1|
|           #25Aprile|    1|
|               #C…RT|    1|
|      #Maskenpflicht|    1|
|             #Corona|    1|
|              #Öffis|    1|
+--------------------+-----+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+-----+
|                word|count|
+--------------------+-----+
|#cancelboardexams...|    2|
|#Patwari_Normalis...|    1|
|        #Liberazione|    1|
|         #Barcelona.|    1|
|       #NarendraModi|    1|
|           #25Aprile|    1|
|            #Kubicki|    1|
|H…#cancelboardexa...|    1|
|               #C…RT|    1|
|      #Maskenpflicht|    1|
|          #PratikFam|    1|
|     #PratikSehajpal|    1|
|             #Corona|    1|
|            #Lindner|    1|
|              #Öffis|    1|
+--------------------+-----+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+--------------------+-----+
|                word|count|
+--------------------+-----+
|#cancelboardexams...|    2|
|#Patwari_Normalis...|    1|
|        #Liberazione|    1|
|         #Barcelona.|    1|
|       #NarendraModi|    1|
|           #25Aprile|    1|
|            #Kubicki|    1|
|H…#cancelboardexa...|    1|
|               #C…RT|    1|
|      #Maskenpflicht|    1|
|          #PratikFam|    1|
|     #PratikSehajpal|    1|
|             #Corona|    1|
|            #Lindner|    1|
|              #Öffis|    1|
+--------------------+-----+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+--------------------+-----+
|                word|count|
+--------------------+-----+
|#cancelboardexams...|    2|
|             #Corona|    2|
|#Patwari_Normalis...|    1|
|        #Liberazione|    1|
|         #Barcelona.|    1|
|             #Jaipur|    1|
|       #NarendraModi|    1|
|           #25Aprile|    1|
|            #Kubicki|    1|
|H…#cancelboardexa...|    1|
|              #Covid|    1|
|          #Rajasthan|    1|
|               #C…RT|    1|
|      #Maskenpflicht|    1|
|          #PratikFam|    1|
|     #PratikSehajpal|    1|
|        #Recovered✅8|    1|
|        #Covid_19…RT|    1|
|      #CoronaUpdate:|    1|
|            #Lindner|    1|
+--------------------+-----+
only showing top 20 rows





KeyboardInterrupt: 

In [1]:
from pymongo import MongoClient

In [2]:
client = MongoClient(port=27017)


In [8]:
print(client.list_database_names())

['admin', 'config', 'local', 'pyspark_twitter', 'test_database']


In [9]:
mydb = client['pyspark_twitter']

In [35]:
print(mydb.list_collection_names())

['top_hashtags']


In [51]:
spark.mongodb.write.connection.uri=('mongodb://127.0.0.1/test.myCollection')


AttributeError: 'SparkSession' object has no attribute 'mongodb'

In [48]:
spark = SparkSession.builder.appName("TwitterSentimentAnalysis").config("spark.mongodb.input.uri", "mongodb://127.0.0.1/pyspark_twitter.top_hashtags").config("spark.mongodb.output.uri", "mongodb://127.0.0.1/pyspark_twitter.top_hashtags").getOrCreate()


In [46]:

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/pyspark_twitter.top_hashtags") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/pyspark_twitter.top_hashtags") \
    .getOrCreate()

In [42]:
people = spark.createDataFrame([("Bilbo Baggins",  50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
   ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])

In [49]:
people.write.format("mongodb").mode("append").save()


Py4JJavaError: An error occurred while calling o640.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: mongodb. Please find packages at
http://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: mongodb.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:656)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656)
	... 16 more
