In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf, regexp_replace, trim
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, ArrayType
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.functions import trim
import re

spark = SparkSession.builder.appName("StructuredJsonStreaming").getOrCreate()

# Define the schema for your JSON data
schema = StructType([
    StructField("text", StringType(), True),
    StructField("id", StringType(), False),
    StructField("author_id", StringType(), False),
    StructField("retweet_count", IntegerType(), True),
    StructField("reply_count", IntegerType(), True),
    StructField("like_count", IntegerType(), True),
    StructField("quote_count", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("followers_count", IntegerType(), True),
    StructField("following_count", IntegerType(), True),
    StructField("tweet_count", IntegerType(), True),
    StructField("listed_count", IntegerType(), True),
    StructField("verified", StringType(), True)
])

# Read the data from the socket
df = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 7777) \
    .load() \
    .select(from_json(col("value"), schema).alias("data")) \
   .selectExpr(
        "data.id",
        "data.text",
        "data.author_id",
        "data.retweet_count",
        "data.reply_count",
        "data.like_count",
        "data.quote_count",
        "data.created_at",
        "data.name",
        "data.username",
        "data.followers_count",
        "data.following_count",
        "data.tweet_count",
        "data.listed_count",
        "data.verified",
        "date(data.created_at) as date",
        "year(data.created_at) as year",
        "month(data.created_at) as month",
        "day(data.created_at) as day",
        "hour(data.created_at) as hour"
    )
#replacing all cols related to the tweet measures by a new col called "tweet_value" which i assume that its value will be calculated as the next function.
# df = df.withColumn("tweet_value", (col("retweet_count") * 10 + col("reply_count") * 5 + col("like_count") * 10 + col("quote_count") * 5  ))
# Replace emojis with an empty string
df = df.withColumn("text", regexp_replace("text",u'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF\U00002702-\U000027B0\U000024C2-\U0001F251\U0001F9D1\u200D\U0001F3A4]+', ' '))
# Remove user mentions from the text column
df = df.withColumn("text", regexp_replace("text", "@\\w+", ""))
# Remove any "\n" from the text column
df = df.withColumn("text", regexp_replace("text", "\n", " "))
# Define a regular expression pattern to match URLs
url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')

# Define a UDF to remove URLs from text
def remove_urls(text):
    return url_pattern.sub('', text)

# # Register the UDF
remove_urls_udf = udf(remove_urls, StringType())
# # Apply the UDF to the 'text' column
df = df.withColumn('text', remove_urls_udf('text'))
df = df.withColumn("text", trim(df["text"]))
df = df.select("id","text","author_id","retweet_count",
        "reply_count",
        "like_count",
        "quote_count",
        "created_at" ,"name","username",
        "followers_count","following_count","tweet_count","listed_count",
        "verified","year","month","day","hour")


# Write the streaming data to HDFS in csv format
query_out = df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("checkpointLocation", "/FileStore/chec9") \
    .option("path", "/FileStore/output9") \
    .partitionBy("year", "month", "day", "hour") \
    .start()


In [2]:
# Write the streaming data to HDFS in csv format
# query_out=df.writeStream \
#     .outputMode("append") \
#     .format("csv") \
#     .option("checkpointLocation", "/FileStore/chec30 ")\
#     .option("path", "/FileStore/output") \
#     .partitionBy("year", "month", "day", "hour") \
#     .start()

In [3]:
parquet_df = spark.read.parquet("/FileStore/output9")

# register the DataFrame as a temporary view
parquet_df.createOrReplaceTempView("my_table")

# run a SQL query on the DataFrame
result_df = spark.sql("SELECT count (*)  FROM my_table")

result_df = spark.sql("SELECT  (*)  FROM my_table")


# show the resulting DataFrame
result_df.show()

+-------------------+--------------------+-------------------+-------------+-----------+----------+-----------+-------------------+--------------------+---------------+---------------+---------------+-----------+------------+--------+----+-----+---+----+
|                 id|                text|          author_id|retweet_count|reply_count|like_count|quote_count|         created_at|                name|       username|followers_count|following_count|tweet_count|listed_count|verified|year|month|day|hour|
+-------------------+--------------------+-------------------+-------------+-----------+----------+-----------+-------------------+--------------------+---------------+---------------+---------------+-----------+------------+--------+----+-----+---+----+
|1654239177595146240|RT : Omdurman Mid...| 711486983655301121|           26|          0|         0|          0|2023-05-04 21:38:48|              Hisham|    hishamjr145|            335|            376|      10988|           2|   false|2

In [4]:
df.printSchema()


root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- reply_count: integer (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- quote_count: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- followers_count: integer (nullable = true)
 |-- following_count: integer (nullable = true)
 |-- tweet_count: integer (nullable = true)
 |-- listed_count: integer (nullable = true)
 |-- verified: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)



In [5]:
# query_out.stop()

In [6]:
import pandas as pd

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = result_df.toPandas()

# Do something with Pandas DataFrame
pandas_df.head()


Unnamed: 0,id,text,author_id,retweet_count,reply_count,like_count,quote_count,created_at,name,username,followers_count,following_count,tweet_count,listed_count,verified,year,month,day,hour
0,1654239177595146240,"RT : Omdurman Midwives hospital, the largest m...",711486983655301121,26,0,0,0,2023-05-04 21:38:48,Hisham,hishamjr145,335,376,10988,2,False,2023,5,4,21
1,1654239165159034880,RT : Those in Sudan that were evacuated by an ...,3134827066,162,0,0,0,2023-05-04 21:38:45,Jonathan Abuajah,jabuajah,517,422,105994,0,False,2023,5,4,21
2,1654239163560996865,The US just brought 'democracy' to Sudan. That...,1283731444830023680,0,0,0,0,2023-05-04 21:38:45,Fidel Slim Guevara,Slimchineme,843,1046,16970,0,False,2023,5,4,21
3,1654239162214797312,"RT : After she fled her home, Ms Haile crossed...",738952995136667649,45,0,0,0,2023-05-04 21:38:45,Freedom for Tigray!,HGebreslassie,1623,1731,68639,1,False,2023,5,4,21
4,1654239160624992256,RT : My only fear in this Sudan crisis is the ...,61762023,364,0,0,0,2023-05-04 21:38:44,Oluwaseun,deolujames,1950,1058,404521,51,False,2023,5,4,21
