In [None]:
import sys
import time
import datetime

In [None]:
TOPIC_Step2_NAME="Sahamyab-Session_16_2"
KAFKA_SERVER="kafka-broker:29092"

In [None]:
import os

# https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

os.environ['PYSPARK_SUBMIT_ARGS']='--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 pyspark-shell'

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("Count-Hashtags") \
    .config("spark.executor.memory", "1024mb") \
    .config("spark.executor.cores","1") \
    .config("spark.cores.max", "1") \
    .config("spark.sql.session.timeZone", "Asia/Tehran") \
    .getOrCreate()  

In [None]:
schema = StructType([StructField("id", StringType(), True),\
                         StructField("content", StringType(), True),\
                         StructField("sendTime", StringType(), True),\
                         StructField("sendTimePersian", StringType(), True),\
                         StructField("senderName", StringType(), True),\
                         StructField("senderUsername", StringType(), True),\
                         StructField("type", StringType(), True),\
                         StructField("hashtags", ArrayType(StringType()), True)
                    ])

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_SERVER) \
  .option("subscribe", TOPIC_Step2_NAME) \
  .option("startingOffsets", "earliest") \
  .option("kafka.group.id", "Count-Hashtags-and-write to-Stream")\
  .load()

In [None]:
tweetsStringDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
tweetsDF = tweetsStringDF.select(from_json(col("value"), schema).alias("data")).select("data.*")
tweetsDF = tweetsDF.withColumn("timestamp", unix_timestamp("sendTime", "yyyy-MM-dd'T'HH:mm:ss'Z'").cast('timestamp')) \
            .withColumn("persian_timestamp", from_utc_timestamp("timestamp", "Asia/Tehran").cast('timestamp')) \
            .withColumn("persianYear", tweetsDF['sendTimePersian'].substr(0, 4)) \
            .withColumn("persianMonth", tweetsDF['sendTimePersian'].substr(6, 2)) \
            .withColumn("persianDay", tweetsDF['sendTimePersian'].substr(9, 2))

In [None]:
hashtagCounts = tweetsDF.select(explode("hashtags").alias("hashtag")) \
                      .groupBy("hashtag")\
                      .count()\
                      .orderBy("count", ascending=False)

This code (above cell) transforms the `tweetsDF` DataFrame to compute the counts of hashtags. It creates a new DataFrame `hashtagCounts` that contains two columns: "hashtag" and "count". The "hashtag" column contains the unique hashtags, and the "count" column contains the number of occurrences of each hashtag. 

In [None]:
query = hashtagCounts.writeStream\
                  .outputMode("complete")\
                  .format("console")\
                  .option("truncate", "false")\
                  .option("numRows","20")\
                  .start()\
                  .awaitTermination()

This code (above cell) writes the contents of the `hashtagCounts` DataFrame to the console as a stream. 

`.outputMode("complete")`: This sets the output mode for the streaming query. In this case, it is set to "complete", which means that all rows in the result table will be written to the console every time there is an update.

### Submit Sample Spark App in Pyspark Container Bash 

# - Go to Pyspark Shell :
```bash
docker exec -it pyspark bash
```
- cd /opt/spark-apps/

```bash
unset PYSPARK_DRIVER_PYTHON
spark-submit --master  spark-master:7077  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 NAME_OF_YOUR_FILE.py
export PYSPARK_DRIVER_PYTHON=python

```

