In [11]:
import os
if "HADOOP_CONF_DIR" in os.environ:
   del os.environ["HADOOP_CONF_DIR"] 

In [12]:
"HADOOP_CONF_DIR" in os.environ

False

In [13]:
import socket
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, length, when, col
from pyspark.sql.types import BooleanType, IntegerType, LongType, StringType, ArrayType, FloatType, StructType, StructField
from pyspark.sql.functions import *
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from jinja2 import Environment, FileSystemLoader


# setting constants
APP_NAME = "producer"
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = "-Dlog4j.configuration=file://{} -Dspark.hadoop.dfs.replication=1 -Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"\
    .format(LOG4J_PROP_FILE)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

# preparing configuration files from templates
for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template\
    .stream(logfile=LOG_FILE)\
    .dump(LOG4J_PROP_FILE)

# run spark
spark = SparkSession\
    .builder\
    .appName(APP_NAME)\
    .master("k8s://https://10.32.7.103:6443")\
    .config("spark.driver.host", LOCAL_IP)\
    .config("spark.driver.bindAddress", "0.0.0.0")\
    .config("spark.executor.instances", "2")\
    .config("spark.executor.cores", '3')\
    .config("spark.memory.fraction", "0.8")\
    .config("spark.memory.storageFraction", "0.6")\
    .config("spark.executor.memory", '3g')\
    .config("spark.driver.memory", "3g")\
    .config("spark.driver.maxResultSize", "1g")\
    .config("spark.kubernetes.memoryOverheadFactor", "0.3")\
    .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)\
    .config("spark.kubernetes.namespace", "sabizer-297910")\
    .config("spark.kubernetes.driver.label.appname", APP_NAME)\
    .config("spark.kubernetes.executor.label.appname", APP_NAME)\
    .config("spark.kubernetes.container.image", "node03.st:5000/spark-executor:sabizer-297910")\
    .config("spark.local.dir", "/tmp/spark")\
    .config("spark.driver.extraClassPath", "/home/jovyan/shared-data/my-project-name-jar-with-dependencies.jar")\
    .config("spark.executor.extraClassPath", "/home/jovyan/shared-data/my-project-name-jar-with-dependencies.jar")\
    .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.path", "/tmp/spark")\
    .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.readOnly", "false")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.mount.path", "/home/jovyan/shared-data")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.options.path", "/nfs/shared")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.options.type", "Directory")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.mount.readOnly", "true")\
    .getOrCreate()

# printing important urls and pathes
print("Web UI: {}".format(spark.sparkContext.uiWebUrl))
print("\nlog4j file: {}".format(LOG4J_PROP_FILE))
print("\ndriver log file: {}".format(LOG_FILE))

Web UI: http://10.128.37.127:4040

log4j file: /home/jovyan/nfs-home/conf/pyspark-log4j-producer.properties

driver log file: /home/jovyan/nfs-home/logs/pyspark-producer.log


In [14]:
import json
import time
import random
from kafka import KafkaProducer
from tqdm import tqdm

In [15]:
posts_df = spark.read.json("/home/jovyan/shared-data/bigdata20/followers_posts_api_final.json")\
    .select(col("owner_id").alias("user_id"), "text", col("date").alias("timestamp"))\
    .where("text != ''")
posts_df.show(5)

+-------+--------------------+----------+
|user_id|                text| timestamp|
+-------+--------------------+----------+
|  57114|Я всё время себя ...|1552061847|
|  57114|Я наверно не тот ...|1552066470|
|  57114|Местами дождь, ме...|1552223531|
|  57114|Ну отчего все так...|1552250945|
|  57114|Друзья, давайте в...|1552309202|
+-------+--------------------+----------+
only showing top 5 rows



In [16]:
profile_df = spark.read.json("/home/jovyan/shared-data/bigdata20/followers_info.json").select(
        col("id").alias("user_id"), (
            when(col("sex") == lit(1), lit("F"))
            .when(col("sex") == lit(2), lit("M"))
            .otherwise(lit(None))
        ).alias("sex"),
        (months_between(
            current_date(), 
            concat_ws(
                "-", regexp_extract(col("bdate"), r"(\d{1,2})\.(\d{1,2})\.(\d{4})", 3).cast("int"), 
                regexp_extract(col("bdate"), r"(\d{1,2})\.(\d{1,2})\.(\d{4})", 2).cast("int"), 
                regexp_extract(col("bdate"), r"(\d{1,2})\.(\d{1,2})\.(\d{4})", 1).cast("int")
            ).cast("date")
        ) / lit(12)).cast("int").alias("age")
    )
profile_df.show(5)

+-------+---+----+
|user_id|sex| age|
+-------+---+----+
|     34|  F|null|
|    102|  F|null|
|    175|  M|null|
|    243|  F|  34|
|    533|  M|  32|
+-------+---+----+
only showing top 5 rows



In [17]:
user_post_df = posts_df.join(broadcast(profile_df), ["user_id"])\
    .select("user_id", "sex", "age", "text", "timestamp")
user_post_df.printSchema()
user_post_df.show(5)

root
 |-- user_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)

+-------+---+----+--------------------+----------+
|user_id|sex| age|                text| timestamp|
+-------+---+----+--------------------+----------+
|  39499|  M|  14|        #ЧайнаяЛента|1554622080|
|  39499|  M|  14|Побывали в [club2...|1554742564|
|  68686|  M|null|Полезный курс для...|1550587946|
|  60086|  M|  34|С днем рождения! ...|1556793944|
|  71307|  M|null|Женька, мой герой...|1552734777|
+-------+---+----+--------------------+----------+
only showing top 5 rows



In [18]:
user_post_cnt = user_post_df.count()

In [22]:
producer = KafkaProducer(bootstrap_servers="kafka-svc:9092", value_serializer=str.encode)
topic_name = "vk_post_topic"

for row in tqdm(user_post_df.orderBy("timestamp").rdd.toLocalIterator(), total=user_post_cnt):
    value = json.dumps(row.asDict(), ensure_ascii=False)
    producer.send(topic_name, json.dumps(row.asDict(), ensure_ascii=False))
    time.sleep(0.1)

print("=" * 80, "\nProducer finished")

 69%|██████▉   | 112307/163196 [3:12:27<1:26:13,  9.84it/s]IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

100%|██████████| 163196/163196 [4:39:54<00:00,  9.72it/s]  

Producer finished



