In [1]:
import os
import pyspark
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.streaming import StreamingQueryException
import pandas as pd
pd.set_option('display.max_colwidth', None)
import json
from ast import literal_eval
import ast
import time
import numpy as np

In [2]:
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["SPARK_LOCAL_HOSTNAME"] = "localhost"

In [3]:
spark = SparkSession.builder.appName("TwitterStream").enableHiveSupport().getOrCreate()
spark

In [4]:
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [5]:
lines = spark.readStream.format("socket").option("host", "127.0.0.1").option("port", 7777).load()

#time.sleep(10)

In [6]:
# Write the tweets to the "tweetquery" memory table
writeTweet = lines.writeStream.\
    outputMode("append"). \
    format("memory"). \
    queryName("tweetquery"). \
    trigger(processingTime='2 seconds'). \
    start()

print("----- streaming is running -------")

#time.sleep(20)

----- streaming is running -------


In [7]:
data_schema = StructType([
    StructField('reply_settings', StringType(), True),
    StructField('edit_history_tweet_ids', ArrayType(StringType()), True),
    StructField('referenced_tweets', ArrayType(MapType(StringType(), StringType())), True),
    StructField('created_at', StringType(), True),
    StructField('lang', StringType(), True),
    StructField('text', StringType(), True),
    StructField('conversation_id', StringType(), True),
    StructField('author_id', StringType(), True),
    StructField('id', StringType(), True),
    StructField('batch_id', IntegerType(), True),
    StructField('hashtags', ArrayType(StringType()), True),
    StructField('attachments.media_keys', ArrayType(StringType()), True),
    StructField('public_metrics.retweet_count', LongType(), True),
    StructField('public_metrics.reply_count', LongType(), True),
    StructField('public_metrics.like_count', LongType(), True),
    StructField('public_metrics.quote_count', LongType(), True),
    StructField('public_metrics.impression_count', LongType(), True),
    StructField('entities.mentions', ArrayType(MapType(StringType(), StringType())), True),
    StructField('user.profile_image_url', StringType(), True),
    StructField('user.pinned_tweet_id', StringType(), True),
    StructField('user.location', StringType(), True),
    StructField('user.protected', BooleanType(), True),
    StructField('user.verified', BooleanType(), True),
    StructField('user.description', StringType(), True),
    StructField('user.name', StringType(), True),
    StructField('user.id', StringType(), True),
    StructField('user.username', StringType(), True),
    StructField('user.created_at', StringType(), True),
    StructField('user.public_metrics.followers_count', LongType(), True),
    StructField('user.public_metrics.following_count', LongType(), True),
    StructField('user.public_metrics.tweet_count', LongType(), True),
    StructField('user.public_metrics.listed_count', LongType(), True),
    StructField('media.media_key', StringType(), True),
    StructField('media.type', StringType(), True),
    StructField('in_reply_to_user_id', StringType(), True),
    StructField('geo.place_id', StringType(), True),
    StructField('place.name', StringType(), True),
    StructField('place.place_type', StringType(), True),
    StructField('place.id', StringType(), True),
    StructField('place.country_code', StringType(), True),
    StructField('place.full_name', StringType(), True),
    StructField('place.country', StringType(), True),
    StructField('place.geo.type', StringType(), True),
    StructField('place.geo.bbox', StringType(), True)
])

In [9]:
try:
    
   # Define an empty dataframe to hold the normalized data
    df_normalized = pd.DataFrame() 
    
    max_id = 0
    
    new_order = ['reply_settings', 'edit_history_tweet_ids', 'referenced_tweets', 'created_at', 'lang', 'text', 'conversation_id', 'author_id', 'id', 'batch_id', 'hashtags', 'attachments.media_keys', 'public_metrics.retweet_count',\
            'public_metrics.reply_count','public_metrics.like_count','public_metrics.quote_count','public_metrics.impression_count','entities.mentions','user.profile_image_url','user.pinned_tweet_id','user.location',\
            'user.protected','user.verified','user.description','user.name','user.id','user.username','user.created_at','user.public_metrics.followers_count','user.public_metrics.following_count','user.public_metrics.tweet_count',\
            'user.public_metrics.listed_count','media.media_key','media.type','in_reply_to_user_id','geo.place_id','place.name','place.place_type','place.id','place.country_code','place.full_name','place.country','place.geo.type','place.geo.bbox']
    
    hdfs_path = "/user/itversity/twitter-landing-data"
    
    hive_table = "twitter_landing_table"
    
    while writeTweet.isActive: 

    # Continuously read data from the stream and append to the dataframe
    
        # Read the tweets from the "tweetquery" memory table and show the result
        result_df = spark.sql("SELECT * FROM tweetquery").toPandas()

        # Assuming `result_df` contains the dataframe with the JSON strings
        # Convert each JSON string to a Python dictionary
        result_df['value'] = result_df['value'].apply(json.loads)

        # Normalize the dictionary data to a dataframe
        new_normalized_df = pd.json_normalize(result_df['value'])
        
        
        # Filter out data that has already been appended
        final_normalized_df = new_normalized_df[new_normalized_df['batch_id'] > max_id]

        # Append the new data to the existing dataframe    
        df_normalized = df_normalized.append(final_normalized_df)
        
        max_id = df_normalized["batch_id"].max()

        df_normalized = df_normalized.reindex(columns=new_order)
        
        
        
        # change the dataframe coulmns datatypes to convert it into spark dataframe
        
        df_normalized['referenced_tweets'] = df_normalized['referenced_tweets'].apply(lambda x: [str(tweet) for tweet in x] if isinstance(x, list) else [])
        df_normalized['referenced_tweets'] = df_normalized['referenced_tweets'].apply(lambda x: [json.loads(tweet.replace("'", "\"")) for tweet in x] if isinstance(x, list) else [])
        df_normalized['attachments.media_keys'] = df_normalized['attachments.media_keys'].apply(lambda x: [] if isinstance(x, float) and np.isnan(x) else x)
        df_normalized['user.pinned_tweet_id'] = df_normalized['user.pinned_tweet_id'].apply(lambda x: None if x == [] else x)
        df_normalized['attachments.media_keys'] = df_normalized['attachments.media_keys'].fillna(value={})
        # Fill null values with an empty dictionary
        df_normalized['entities.mentions'] = df_normalized['entities.mentions'].fillna(value={})
        # Convert string values to dictionary values
        df_normalized['entities.mentions'] = df_normalized['entities.mentions'].apply(lambda x: [ast.literal_eval(mention) if isinstance(mention, str) else mention for mention in x] if isinstance(x, list) else [])
        
        df_normalized['text'] = df_normalized['text'].apply(lambda s: s.replace('\n', ' '))
        df_normalized['user.description'] = df_normalized['user.description'].apply(lambda s: s.replace('\n', ' '))
        




        # Create Spark DataFrame
        spark_df = spark.createDataFrame(df_normalized, schema=data_schema)
        
        
        spark_df = spark_df.withColumn("created_at", to_timestamp("created_at", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
        
        # Extract Year, Month, Day, and Hour from created_at column
        final_spark_df = spark_df.withColumn("Year", year("created_at")) \
            .withColumn("Month", month("created_at")) \
            .withColumn("Day", dayofmonth("created_at")) \
            .withColumn("Hour", hour("created_at"))
        
        

        # Convert DataFrame to Spark DataFrame and write to HDFS partitioned by year, month, day, and hour
        final_spark_df.write.partitionBy("year", "month", "day", "hour").mode("append").parquet("twitter-landing-data")
        
            
        # Load the partitioned data into a Spark DataFrame
        hive_df = spark.read.parquet(hdfs_path+"/*/*/*/*")
        
        # create a temporary view of the DataFrame
        hive_df.createOrReplaceTempView("temp_data_view")
        
        
        
        final_hive_df= hive_df.withColumnRenamed("attachments.media_keys", "attachments_media_keys")\
                        .withColumnRenamed("public_metrics.retweet_count", "public_metrics_retweet_count")\
                        .withColumnRenamed("public_metrics.reply_count", "public_metrics_reply_count")\
                        .withColumnRenamed("public_metrics.like_count", "public_metrics_like_count")\
                        .withColumnRenamed("public_metrics.quote_count", "public_metrics_quote_count")\
                        .withColumnRenamed("public_metrics.impression_count", "public_metrics_impression_count")\
                        .withColumnRenamed("entities.mentions", "entities_mentions")\
                        .withColumnRenamed("user.profile_image_url", "user_profile_image_url")\
                        .withColumnRenamed("user.pinned_tweet_id", "user_pinned_tweet_id")\
                        .withColumnRenamed("user.location", "user_location")\
                        .withColumnRenamed("user.protected", "user_protected")\
                        .withColumnRenamed("user.verified", "user_verified")\
                        .withColumnRenamed("user.description", "user_description")\
                        .withColumnRenamed("user.name", "user_name")\
                        .withColumnRenamed("user.id", "user_id")\
                        .withColumnRenamed("user.username", "user_username")\
                        .withColumnRenamed("user.created_at", "user_created_at")\
                        .withColumnRenamed("user.public_metrics.followers_count", "user_public_metrics_followers_count")\
                        .withColumnRenamed("user.public_metrics.following_count", "user_public_metrics_following_count")\
                        .withColumnRenamed("user.public_metrics.tweet_count", "user_public_metrics_tweet_count")\
                        .withColumnRenamed("user.public_metrics.listed_count", "user_public_metrics_listed_count")\
                        .withColumnRenamed("media.media_key", "media_media_key")\
                        .withColumnRenamed("media.type", "media_type")\
                        .withColumnRenamed("geo.place_id", "geo_place_id")\
                        .withColumnRenamed("place.name", "place_name")\
                        .withColumnRenamed("place.place_type", "place_place_type")\
                        .withColumnRenamed("place.id", "place_id")\
                        .withColumnRenamed("place.country_code", "place_country_code")\
                        .withColumnRenamed("place.full_name", "place_full_name")\
                        .withColumnRenamed("place.country", "place_country")\
                        .withColumnRenamed("place.geo.type", "place_geo_type")\
                        .withColumnRenamed("place.geo.bbox", "place_geo_bbox")
        
        
        
        #hive_df.write.mode("append").saveAsTable("hive_tw_landing_table")
        
        final_hive_df.write.mode("append").saveAsTable("test_landing_table")


        
        
        
        
        #hive_df.write.mode("append").saveAsTable("twitter_landing_table_test").format('hive')
        
        #spark.sql("CREATE DATABASE IF NOT EXISTS twitter_landing_data_DB")
        
        # Create the Hive table
        #hive_df.write.format("hive").option("database", "twitter_landing_data").mode("append").saveAsTable("twitter_landing_table")
        
        #create the Hive table with the same schema as the DataFrame
        
        #spark.sql("CREATE TABLE IF NOT EXISTS twitter_landing_data_DB.twitter_landing_table \
          #LIKE temp_data_view".format("twitter-landing-data/*/*/*/*"))
        

        

        # Wait for a few seconds before reading the stream again
        time.sleep(100)
        
except KeyboardInterrupt:
    # Stop the streaming query when the socket is closed from the Python listener
    writeTweet.stop()
    print("Streaming stopped due to keyboard interrupt")
    
except StreamingQueryException:
    # The socket was closed, stop the query
    writeTweet.stop()
    print("Streaming stopped due to socket closure")
    
except ConnectionResetError:
    # Break the loop when the connection with the port is interrupted
    print("Connection with port interrupted, stopping the loop.")
    writeTweet.stop()
    
#except:
    # Stop the streaming query when the listener sends a unique sign
    #writeTweet.stop()
    #print("Streaming stopped due to a signal from the listener")

Streaming stopped due to keyboard interrupt


In [10]:
print(new_normalized_df)

Empty DataFrame
Columns: [value]
Index: []


In [14]:
final_spark_df.printSchema()

root
 |-- reply_settings: string (nullable = true)
 |-- edit_history_tweet_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- referenced_tweets: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- created_at: timestamp (nullable = true)
 |-- lang: string (nullable = true)
 |-- text: string (nullable = true)
 |-- conversation_id: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- batch_id: integer (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- attachments.media_keys: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- public_metrics.retweet_count: long (nullable = true)
 |-- public_metrics.reply_count: long (nullable = true)
 |-- public_metrics.like_count: long (nullable = true)
 |-- public_metrics.quote_count: lo

In [26]:
final_spark_df.toPandas()

Unnamed: 0,reply_settings,edit_history_tweet_ids,referenced_tweets,created_at,lang,text,conversation_id,author_id,id,batch_id,...,place.id,place.country_code,place.full_name,place.country,place.geo.type,place.geo.bbox,Year,Month,Day,Hour
0,everyone,[1653187529129230336],"[{'type': 'retweeted', 'id': '1653187477090467841'}]",2023-05-01 23:59:56,en,RT @nh0ckut: 😀😝💘💌👋 Technology at safe. #杭州 #深圳 https://t.co/pnptzw5AUk,1653187529129230336,479514569,1653187529129230336,1,...,,,,,,,2023,5,1,23
1,everyone,[1653187477979496453],"[{'type': 'replied_to', 'id': '1653179265389666311'}]",2023-05-01 23:59:44,en,"@zillow @Compass @Redfin @PropyInc is a real estate company that uses blockchain technology and AI to streamline the buying and selling process. In Hong Kong, AI is being used to make the market more accessible to buyers, with #Squarefoot using AI to analyze data on properties. 9/11",1653114589343539212,2395761194,1653187477979496453,1,...,,,,,,,2023,5,1,23
2,everyone,[1653187477090467841],[],2023-05-01 23:59:43,en,😀😝💘💌👋 Technology at safe. #杭州 #深圳 https://t.co/pnptzw5AUk,1653187477090467841,191459117,1653187477090467841,1,...,,,,,,,2023,5,1,23
3,everyone,[1653187433750581250],"[{'type': 'replied_to', 'id': '1653161507675594752'}]",2023-05-01 23:59:33,en,"@professordilly @PUMA #Gdex is a multi-chain Defi project that combines AI technology along with many other attractive features. 🟢KYC &amp; AUDIT + SAFU 🟢CMC, CGK Fast Track 🟢Tax=0% 🟢Huge marketing 🟢Released product 🟢 CEX listing 🟢 Binance Live, Gate Live, Houbi Live https://t.co/lnFV8uNrmU #BNB",1653161507675594752,1268773011446018048,1653187433750581250,1,...,,,,,,,2023,5,1,23
4,everyone,[1653187408073039873],[],2023-05-01 23:59:27,en,"Unplugging from technology is one of the best ways to ensure quality family time with your family. Yes, you can enjoy each other's presence without phones in the picture. #AnchorBrokerage #R #LakeBlueRidgeHomes #RealEstateBlueRidgeGa #MorgantonGA https://t.co/W8OdH0Ywle",1653187408073039873,1124340085959294976,1653187408073039873,1,...,,,,,,,2023,5,1,23
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
495,everyone,[1653187041658560512],[],2023-05-01 23:58:00,en,"Secure your early bird tickets to connect and learn with leaders at #UXA2023, the premier four-day conference for all disciplines in UX design. Register for your early bird tickets today and be part of the conversation at #UXA2023! https://t.co/Jn29BNzZ4P https://t.co/rH5TpWJHmc",1653187041658560512,18538621,1653187041658560512,5,...,,,,,,,2023,5,1,23
496,everyone,[1653187029797052416],"[{'type': 'retweeted', 'id': '1653081946770096131'}]",2023-05-01 23:57:57,en,"RT @ACPIMPhysicians: Congratulations to ACP’s new officers who took office during #IM2023. To learn more about ACP Leadership, visit https:…",1653187029797052416,4631746376,1653187029797052416,5,...,,,,,,,2023,5,1,23
497,everyone,[1653187023509807104],[],2023-05-01 23:57:55,en,Indeed is proud to announce its data collaboration with the World Economic Forum. This year’s WEF “Future of Jobs Report” highlights new Indeed #HiringLab labor market research. Learn more here: https://t.co/aGgJ9eQlP8 @wef @SvenjaGudell #IndeedData #LaborMarket #Economy,1653187023509807104,17781955,1653187023509807104,5,...,,,,,,,2023,5,1,23
498,everyone,[1653187007584276481],[],2023-05-01 23:57:51,en,Day 24 / #100daysofcode to learn #iosdevelopment Review day on #100daysofswiftui today. Just checked what I learned yesterday in a little test. Then also used the new skills and made some slight adjustments to the first two projects. https://t.co/HEEz1JEUlO,1653187007584276481,1182457269268533248,1653187007584276481,5,...,,,,,,,2023,5,1,23


In [42]:
#specific_seasons = spark.read.parquet("twitter-landing-data/*/*/*/*")

In [43]:
# create a temporary view of the DataFrame
#specific_seasons.createOrReplaceTempView("temp_data_view")

In [44]:
# create the Hive table with the same schema as the DataFrame
#spark.sql("CREATE TABLE IF NOT EXISTS twitter_landing_table_test \
#          LIKE temp_data_view".format("twitter-landing-data/*/*/*/*"))

In [45]:
#spark.sql("INSERT INTO TABLE twitter_landing_table_test \
#          SELECT * FROM temp_data_view")

In [23]:
# verify the table creation
#spark.sql("SELECT * FROM twitter_landing_table_test").toPandas()

In [30]:
#specific_seasons.toPandas()

Unnamed: 0,reply_settings,edit_history_tweet_ids,referenced_tweets,created_at,lang,text,conversation_id,author_id,id,batch_id,...,in_reply_to_user_id,geo.place_id,place.name,place.place_type,place.id,place.country_code,place.full_name,place.country,place.geo.type,place.geo.bbox
0,everyone,[1650650819228839937],"[{'type': 'retweeted', 'id': '1650594132325269504'}]",2023-04-24 23:59:57,en,RT @KSAmofaEN: In continuation of the evacuation efforts made by the Kingdom of #Saudi Arabia under the directives of the Kingdom's Leaders…,1650650819228839937,719467444,1650650819228839937,1,...,,,,,,,,,,
1,everyone,[1650650243581485057],"[{'type': 'retweeted', 'id': '1650601463083593729'}]",2023-04-24 23:57:40,en,RT @EKHNews_EN: 🎥 | #Saudi citizen expresses his feelings after arriving in the kingdom of #SaudiArabia \n\n#EKHNews_EN https://t.co/Bp032P6P…,1650650243581485057,719467444,1650650243581485057,1,...,,,,,,,,,,
2,everyone,[1650650180826308608],"[{'type': 'retweeted', 'id': '1650630705951694848'}]",2023-04-24 23:57:25,en,"RT @arabnews: Saudi students won another global robotics competition in Houston, #US, one of the world's most prestigious events https://t.…",1650650180826308608,759774284323565568,1650650180826308608,1,...,,,,,,,,,,
3,everyone,[1650650033535033344],"[{'type': 'retweeted', 'id': '1650638516723019776'}]",2023-04-24 23:56:50,en,"RT @TheRajGiri: According to @Fightful, the Miz was one of the people CM Punk spoke to backstage at #WWERAW today. Who can forget the time…",1650650033535033344,888452184836386816,1650650033535033344,1,...,,,,,,,,,,
4,everyone,[1650649831629787136],"[{'type': 'retweeted', 'id': '1650584866516357120'}]",2023-04-24 23:56:02,en,RT @going_svt: [TRENDS] Youtube 🌎 update\n\n#2 (+2) SEVENTEEN SUPER\n\nTrending Overall · Music\n#1 United Arab Emirates \n#1 Kuwait\n#1 Qatar\n#2…,1650649831629787136,1173122453192577025,1650649831629787136,1,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
381,everyone,[1650632737060012034],"[{'type': 'retweeted', 'id': '1650623224630091778'}]",2023-04-24 22:48:06,en,"RT @AshimovTunay: Thanks to everyone who has supported me so far! They money we raise get's sent to Brixton Masjid every week, so we're alr…",1650632737060012034,2198468167,1650632737060012034,2,...,,,,,,,,,,
382,everyone,[1650632647666802691],[],2023-04-24 22:47:45,en,Eid Mubarak to everyone celebrating.\n\n#eid #eidmubarak #ramadan https://t.co/3OQQualZEb,1650632647666802691,716024935848681472,1650632647666802691,2,...,,,,,,,,,,
383,everyone,[1650632208065994754],"[{'type': 'retweeted', 'id': '1650483573789716480'}]",2023-04-24 22:46:00,en,RT @Camps_Of_Iman: #NEW | Ramadan Series | 12 | THE BITTERSWEET END | Sincere Counsel to The Believing Slave\n\nhttps://t.co/MRaACtoQpn,1650632208065994754,1436044246625275912,1650632208065994754,2,...,,,,,,,,,,
384,everyone,[1650632051924631552],"[{'type': 'retweeted', 'id': '1648826673159340032'}]",2023-04-24 22:45:23,en,"RT @AyeshaaSiddiqa: O Allah, you are the most forgiving and you love to forgive, so forgive me.\n\n#Ramadan https://t.co/acCsN8yDva",1650632051924631552,1519818546422947840,1650632051924631552,2,...,,,,,,,,,,


In [19]:
# verify the table creation
spark.sql("TRUNCATE TABLE test_landing_table")

In [10]:
spark.sql("SELECT * FROM test_landing_table").toPandas()

Unnamed: 0,reply_settings,edit_history_tweet_ids,referenced_tweets,created_at,lang,text,conversation_id,author_id,id,batch_id,...,in_reply_to_user_id,geo_place_id,place_name,place_place_type,place_id,place_country_code,place_full_name,place_country,place_geo_type,place_geo_bbox
0,everyone,[1653910499741028352],[],2023-05-03 23:52:45,en,Which Sport is your favorite 🤭 #Balls #analtoys #anal #analfetish #milf #gape #pawg #insertion #phatass https://t.co/FvwBO02FlY,1653910499741028352,1068035459291344896,1653910499741028352,2,...,,,,,,,,,,
1,everyone,[1653912310766014464],"[{'type': 'retweeted', 'id': '1653887036082749441'}]",2023-05-03 23:59:57,en,RT @Haze_PHXXX: ROOFTOP STORAGE ROOM FUN 🔥 IS 🥵 HOT with #Lucas... See this and more at: https://t.co/qVFKA99gHw https://t.co/qxkocaNiSF,1653912310766014464,1333009601332088832,1653912310766014464,3,...,,,,,,,,,,
2,everyone,[1653912309557805056],[],2023-05-03 23:59:57,en,#HumpDay with stunning @figotwink on his https://t.co/yAMnJD9tA4 for hot fun 🔥🔥🔥 https://t.co/56BcpxPfh0,1653912309557805056,1424009948422344704,1653912309557805056,3,...,,,,,,,,,,
3,everyone,[1653912305686528004],"[{'type': 'retweeted', 'id': '1653791866771439623'}]",2023-05-03 23:59:56,en,"RT @deanslistDAO: We had so much fun going through our feedback session for @zignaly that we decided to make this video for them! Plus, he…",1653912305686528004,1649444472927801344,1653912305686528004,3,...,,,,,,,,,,
4,everyone,[1653912301932810240],"[{'type': 'retweeted', 'id': '1653912232453914624'}]",2023-05-03 23:59:55,en,RT @papaboateng57: Day 8 and 9 of #100DaysOfCode I got interested in python scripts. I was able to make a text(utf8) to base64 encoding sc…,1653912301932810240,1531451345517088770,1653912301932810240,3,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
494,everyone,[1653910717886775297],"[{'type': 'retweeted', 'id': '1653910061847310336'}]",2023-05-03 23:53:37,en,RT @tariqnasheed: Understand that every mainstream white media outlet knows the name of this man who killed #JordanNeely but they are all s…,1653910717886775297,902976804419289088,1653910717886775297,2,...,,,,,,,,,,
495,everyone,[1653910691236007937],"[{'type': 'retweeted', 'id': '1653910061847310336'}]",2023-05-03 23:53:31,en,RT @tariqnasheed: Understand that every mainstream white media outlet knows the name of this man who killed #JordanNeely but they are all s…,1653910691236007937,2575097594,1653910691236007937,2,...,,,,,,,,,,
496,everyone,[1653910615826595841],"[{'type': 'retweeted', 'id': '1653910061847310336'}]",2023-05-03 23:53:13,en,RT @tariqnasheed: Understand that every mainstream white media outlet knows the name of this man who killed #JordanNeely but they are all s…,1653910615826595841,3700349842,1653910615826595841,2,...,,,,,,,,,,
497,everyone,[1653910606871957504],"[{'type': 'replied_to', 'id': '1653632361928556544'}]",2023-05-03 23:53:11,en,@plucche117 @PeterLaBarbera @LOLGOP 1. That happens naturally do to low body fat not b/c of injected puberty blockers but even that causes irreversible damage that most #gymnasts dont completely recover from thats why certain types of conditioning are banned by the #US&amp;most countries!…https://t.co/PUP5TxOFmG,1653046654222696449,865376410864570368,1653910606871957504,2,...,334006391,,,,,,,,,


In [18]:
spark.sql("DESCRIBE test_landing_table").toPandas()

Unnamed: 0,col_name,data_type,comment
0,reply_settings,string,
1,edit_history_tweet_ids,array<string>,
2,referenced_tweets,"array<map<string,string>>",
3,created_at,timestamp,
4,lang,string,
5,text,string,
6,conversation_id,string,
7,author_id,string,
8,id,string,
9,batch_id,int,


In [79]:
hive_df.toPandas()

Unnamed: 0,reply_settings,edit_history_tweet_ids,referenced_tweets,created_at,lang,text,conversation_id,author_id,id,batch_id,...,in_reply_to_user_id,geo.place_id,place.name,place.place_type,place.id,place.country_code,place.full_name,place.country,place.geo.type,place.geo.bbox
0,everyone,[1650650819228839937],"[{'type': 'retweeted', 'id': '1650594132325269504'}]",2023-04-24 23:59:57,en,RT @KSAmofaEN: In continuation of the evacuation efforts made by the Kingdom of #Saudi Arabia under the directives of the Kingdom's Leaders…,1650650819228839937,719467444,1650650819228839937,1,...,,,,,,,,,,
1,everyone,[1650650243581485057],"[{'type': 'retweeted', 'id': '1650601463083593729'}]",2023-04-24 23:57:40,en,RT @EKHNews_EN: 🎥 | #Saudi citizen expresses his feelings after arriving in the kingdom of #SaudiArabia \n\n#EKHNews_EN https://t.co/Bp032P6P…,1650650243581485057,719467444,1650650243581485057,1,...,,,,,,,,,,
2,everyone,[1650650180826308608],"[{'type': 'retweeted', 'id': '1650630705951694848'}]",2023-04-24 23:57:25,en,"RT @arabnews: Saudi students won another global robotics competition in Houston, #US, one of the world's most prestigious events https://t.…",1650650180826308608,759774284323565568,1650650180826308608,1,...,,,,,,,,,,
3,everyone,[1650650033535033344],"[{'type': 'retweeted', 'id': '1650638516723019776'}]",2023-04-24 23:56:50,en,"RT @TheRajGiri: According to @Fightful, the Miz was one of the people CM Punk spoke to backstage at #WWERAW today. Who can forget the time…",1650650033535033344,888452184836386816,1650650033535033344,1,...,,,,,,,,,,
4,everyone,[1650649831629787136],"[{'type': 'retweeted', 'id': '1650584866516357120'}]",2023-04-24 23:56:02,en,RT @going_svt: [TRENDS] Youtube 🌎 update\n\n#2 (+2) SEVENTEEN SUPER\n\nTrending Overall · Music\n#1 United Arab Emirates \n#1 Kuwait\n#1 Qatar\n#2…,1650649831629787136,1173122453192577025,1650649831629787136,1,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
767,everyone,[1650632737060012034],"[{'type': 'retweeted', 'id': '1650623224630091778'}]",2023-04-24 22:48:06,en,"RT @AshimovTunay: Thanks to everyone who has supported me so far! They money we raise get's sent to Brixton Masjid every week, so we're alr…",1650632737060012034,2198468167,1650632737060012034,2,...,,,,,,,,,,
768,everyone,[1650632647666802691],[],2023-04-24 22:47:45,en,Eid Mubarak to everyone celebrating.\n\n#eid #eidmubarak #ramadan https://t.co/3OQQualZEb,1650632647666802691,716024935848681472,1650632647666802691,2,...,,,,,,,,,,
769,everyone,[1650632208065994754],"[{'type': 'retweeted', 'id': '1650483573789716480'}]",2023-04-24 22:46:00,en,RT @Camps_Of_Iman: #NEW | Ramadan Series | 12 | THE BITTERSWEET END | Sincere Counsel to The Believing Slave\n\nhttps://t.co/MRaACtoQpn,1650632208065994754,1436044246625275912,1650632208065994754,2,...,,,,,,,,,,
770,everyone,[1650632051924631552],"[{'type': 'retweeted', 'id': '1648826673159340032'}]",2023-04-24 22:45:23,en,"RT @AyeshaaSiddiqa: O Allah, you are the most forgiving and you love to forgive, so forgive me.\n\n#Ramadan https://t.co/acCsN8yDva",1650632051924631552,1519818546422947840,1650632051924631552,2,...,,,,,,,,,,


In [66]:
# Close the streaming

writeTweet.stop()

# stop the SparkSession
spark.stop()

#Stop the context

sc.stop()

