In [16]:
import os
import socket
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from jinja2 import Environment, FileSystemLoader
from kafka import KafkaProducer
import time
from json import dumps

In [17]:
APP_NAME = "spark"
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_')
APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = f"-Dlog4j.configuration=file://{LOG4J_PROP_FILE} -Dspark.hadoop.dfs.replication=1 -Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"
LOCAL_IP = socket.gethostbyname(socket.gethostname())

In [18]:
for directory in APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template.stream(logfile=LOG_FILE).dump(LOG4J_PROP_FILE)

In [19]:
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

In [20]:
followers_info_df = spark.read.json("file:///home/jovyan/shared-data/bigdata20/followers_info.json")
followers_posts_api_final_df = spark.read.json("file:///home/jovyan/shared-data/bigdata20/followers_posts_api_final.json/*")

In [21]:
# regexp for vk's DD.MM.YYYY bdate patern
regexp = "^([0]?[1-9]|[1|2][0-9]|[3][0|1])[./-]([0]?[1-9]|[1][0-2])[./-]([0-9]{4}|[0-9]{2})$"

In [22]:
posts = followers_info_df.join(
            followers_posts_api_final_df, 
            followers_info_df.id==followers_posts_api_final_df.owner_id, 
            'inner'
        ) \
        .filter(col('bdate').rlike(regexp)) \
        .withColumn('date', F.from_unixtime('date')) \
        .where(col('text') != "") \
        .select(
            'owner_id', 
            col('sex').alias('owner_sex'), 
            col('bdate').alias('owner_bdate'), 
            'date', 
            'text'
        )

In [23]:
posts.show(5)

+--------+---------+-----------+-------------------+--------------------+
|owner_id|owner_sex|owner_bdate|               date|                text|
+--------+---------+-----------+-------------------+--------------------+
|   94494|        2|  31.8.1987|2019-06-04 11:28:50|Тот неловкий моме...|
|   94494|        2|  31.8.1987|2019-06-06 20:48:46|Белые ночи + жарк...|
|   94494|        2|  31.8.1987|2019-06-16 10:22:14|Не велосипедом ед...|
|   94494|        2|  31.8.1987|2019-06-19 23:36:14|Тот самый француз...|
|   94494|        2|  31.8.1987|2019-06-21 22:11:22|Да, знатное шоу с...|
+--------+---------+-----------+-------------------+--------------------+
only showing top 5 rows



In [24]:
producer = KafkaProducer(
    bootstrap_servers='kafka-svc:9092',\
    value_serializer=lambda data: dumps(data).encode('utf-8') 
)

In [None]:
for post in posts.rdd.collect():
    frame = {'sex': post.owner_sex,
            'bdate': post.owner_bdate,
            'text': post.text,
            'id':post.owner_id}
    
    producer.send('posts', frame)
    time.sleep(1)