In [1]:
import os
import socket
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, length, when, col
from pyspark.sql.types import BooleanType, IntegerType, LongType, StringType, ArrayType, FloatType, StructType, StructField
from pyspark.sql.functions import *
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from jinja2 import Environment, FileSystemLoader


# setting constants
APP_NAME = "producer_app"
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = "-Dlog4j.configuration=file://{} -Dspark.hadoop.dfs.replication=1 -Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"\
    .format(LOG4J_PROP_FILE)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

# preparing configuration files from templates
for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template\
    .stream(logfile=LOG_FILE)\
    .dump(LOG4J_PROP_FILE)

# run spark
spark = SparkSession\
    .builder\
    .appName(APP_NAME)\
    .master("k8s://https://10.32.7.103:6443")\
    .config("spark.driver.host", LOCAL_IP)\
    .config("spark.driver.bindAddress", "0.0.0.0")\
    .config("spark.executor.instances", "2")\
    .config("spark.executor.cores", '3')\
    .config("spark.memory.fraction", "0.8")\
    .config("spark.memory.storageFraction", "0.6")\
    .config("spark.executor.memory", '3g')\
    .config("spark.driver.memory", "3g")\
    .config("spark.driver.maxResultSize", "1g")\
    .config("spark.kubernetes.memoryOverheadFactor", "0.3")\
    .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)\
    .config("spark.kubernetes.namespace", "gkulagin-307618")\
    .config("spark.kubernetes.driver.label.appname", APP_NAME)\
    .config("spark.kubernetes.executor.label.appname", APP_NAME)\
    .config("spark.kubernetes.container.image", "node03.st:5000/spark-executor:gkulagin-307618")\
    .config("spark.local.dir", "/tmp/spark")\
    .config("spark.driver.extraClassPath", "/home/jovyan/shared-data/my-project-name-jar-with-dependencies.jar")\
    .config("spark.executor.extraClassPath", "/home/jovyan/shared-data/my-project-name-jar-with-dependencies.jar")\
    .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.path", "/tmp/spark")\
    .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.readOnly", "false")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.mount.path", "/home/jovyan/shared-data")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.options.path", "/nfs/shared")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.options.type", "Directory")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.mount.readOnly", "true")\
    .getOrCreate()

# printing important urls and pathes
print("Web UI: {}".format(spark.sparkContext.uiWebUrl))
print("\nlog4j file: {}".format(LOG4J_PROP_FILE))
print("\ndriver log file: {}".format(LOG_FILE))

Web UI: http://10.128.107.151:4040

log4j file: /home/jovyan/nfs-home/conf/pyspark-log4j-producer_app.properties

driver log file: /home/jovyan/nfs-home/logs/pyspark-producer_app.log


Posts must go to Kafka by a separate producer

⁃ Divide the stream of posts into male and female

⁃ Divide streams by age - up to 18, 18-27, 27 -40, 40-60, 60 and more

⁃ Divide streams into words, filters stop words

⁃ Counts words with a window of 1 hour / day / week in each stream (Which category more active writes?)

⁃ Saves the result to Kafka and prints it to the console

⁃ Processed results must be read by a separate consumer

⁃ Use python, java or scala, DStreams or Structured Streaming


In [2]:
from kafka import KafkaProducer
from tqdm import tqdm
import json
import time
import random

In [3]:
posts_df = spark.read.json("/shared/bigdata20/followers_posts_api_final.json")\
    .select(col("owner_id").name('user_id'), "text", col("date").alias("timestamp"))\
    .where("text != ''")
posts_df.show(5)

+-------+--------------------+----------+
|user_id|                text| timestamp|
+-------+--------------------+----------+
|  87449|Я люблю Вас. Я вч...|1550165023|
|  87449|call me by your n...|1553774858|
|  87449|                  🦋|1555602008|
|  87449|         Браво,Юра !|1558105050|
|  87449|                  🕊|1559301729|
+-------+--------------------+----------+
only showing top 5 rows



In [4]:
producer = KafkaProducer(bootstrap_servers="kafka-svc:9092", value_serializer=str.encode)
topic_name = "posts"

In [5]:
located_df = posts_df.orderBy("timestamp").rdd.toLocalIterator()

In [6]:
user_post_cnt = posts_df.count()

In [None]:
for row in tqdm(located_df, total=user_post_cnt):
    value = json.dumps(row.asDict(), ensure_ascii=False)
    producer.send(topic_name, value)
    time.sleep(0.5)

print("Producer finished")

  7%|▋         | 13034/189848 [1:49:36<24:42:26,  1.99it/s]