In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, explode 

In [2]:
print(pyspark.__version__)

3.5.0


In [3]:
conf= pyspark.SparkConf().set("spark.jars.packages",\
                              "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").\
                              setMaster("local").setAppName("Tweets").\
                              setAll([("spark.driver.memory","40g"), ("spark.executor.memory","50g")])

In [4]:
conf

<pyspark.conf.SparkConf at 0x1bf7076f510>

In [5]:
# Create a SparkSession
spark = SparkSession.\
builder.config(conf=conf).\
getOrCreate()

In [6]:
mongo_ip="mongodb://localhost:27017/Tweets."

In [7]:
print(mongo_ip)

mongodb://localhost:27017/Tweets.


In [8]:
#collection name
users=spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", mongo_ip + "Users").load()

In [9]:
users.show()

+--------------------+------------+-----------+--------------------+------------------+--------------------+--------------+---------+--------------------+----+-------------------+-------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+-----+-------------+---------+--------------------+---------+--------------------+
|                 _id|contributors|coordinates|          created_at|display_text_range|            entities|favorite_count|favorited|           full_text| geo|                 id|             id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|is_quote_status|lang|place|retweet_count|retweeted|              source|truncated|                user|
+--------------------+------------+-----------+--------------------+------------------+--------------------+--------------+---------+--------------------+----+---

In [10]:
users.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- contributors: void (nullable = true)
 |-- coordinates: void (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: integer (containsNull = true)
 |    |-- user_mentions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- screen_name: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- ele

In [11]:
#creating temp view
users.createOrReplaceTempView("MyTweet")

In [12]:
usersDF=spark.sql("SELECT * FROM MyTweet")

In [13]:
usersDF.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- contributors: void (nullable = true)
 |-- coordinates: void (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: integer (containsNull = true)
 |    |-- user_mentions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- screen_name: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- ele

In [15]:
usersDF = usersDF.withColumn("mention",explode("entities.user_mentions")).\
           withColumn("name",col("mention.screen_name")).\
           drop("mention")

In [16]:
from pyspark.sql.functions import count, min, max
user=usersDF.groupBy("user.name", "created_at")\
     .agg( max("user.screen_name").alias("screen_name"),
         max("user.location").alias("location"),
         count("entities.hashtags.text").alias("tags"),
         count("entities.user_mentions.screen_name").alias("user_mention"),
         max("favorite_count").alias("fav_count"),
         max("retweet_count").alias("retweets"),
         max("user.followers_count").alias("followers"),
         max("user.friends_count").alias("friends"),
         max("user.favourites_count").alias("fav"),
         max("user.verified").alias("verified"),
         max("user.statuses_count").alias("status_count"))
         

user.show()

+--------------+--------------------+-----------+-----------------+----+------------+---------+--------+---------+-------+---+--------+------------+
|          name|          created_at|screen_name|         location|tags|user_mention|fav_count|retweets|followers|friends|fav|verified|status_count|
+--------------+--------------------+-----------+-----------------+----+------------+---------+--------+---------+-------+---+--------+------------+
|Data Analytics|Sun May 12 10:41:...|SNA00761413|Karachi, Pakistan|   4|           4|        1|       0|      126|    147|132|   false|         182|
|Data Analytics|Sun May 12 10:57:...|SNA00761413|Karachi, Pakistan|   4|           4|        0|       0|      126|    147|132|   false|         182|
+--------------+--------------------+-----------+-----------------+----+------------+---------+--------+---------+-------+---+--------+------------+

