In [1]:
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import KafkaProducer
import json
import time
import  pandas as pd
from pyspark.sql import SparkSession

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
import os
os.chdir("../src/")
%pwd
name_space = "eabraham-373705"

In [4]:
import socket

LOCAL_IP = socket.gethostbyname(socket.gethostname())
name_space = "eabraham-373705"

# Master node
kubernetes_master_url = "k8s://https://10.32.7.103:6443"

# Resource settings
driver_cores = "1"
executor_cores = "1"
driver_memory = "3g"
executor_memory = "3g"
executor_memory_overhead = "0.5g"

# These are the limits
cpu_limit = "3"  # 12 cores
memory_limit = "32g"  # Upto 32 GB
executor_limit = "8"

APP_NAME = 'scalables_executor'


spark = SparkSession\
    .builder\
    .appName(APP_NAME)\
    .master(kubernetes_master_url)\
    .config("spark.driver.host", LOCAL_IP)\
    .config("spark.driver.bindAddress", "0.0.0.0")\
    .config("spark.executor.instances", "2")\
    .config("spark.executor.cores", executor_cores)\
    .config("spark.executor.memory", executor_memory)\
    .config("spark.memory.fraction", "0.8")\
    .config("spark.memory.storageFraction", "0.2")\
    .config("spark.kubernetes.executor.limit.cores", executor_limit)\
    .config("spark.kubernetes.namespace", name_space)\
    .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")\
    .config("spark.kubernetes.driver.label.appname", APP_NAME)\
    .config("spark.kubernetes.executor.label.appname", APP_NAME)\
    .config("spark.kubernetes.executor.deleteOnTermination", "false") \
    .config("spark.kubernetes.container.image.pullPolicy", "Always") \
    .config("spark.kubernetes.container.image", "node03.st:5000/pyspark-hdfs-jupyter:eabraham-373705-v4-executor")\
    .config("spark.local.dir", "/tmp/spark")\
    .config("spark.kubernetes.driver.volumes.emptyDir.spark-local-dir-tmp-spark.mount.path", "/tmp/spark")\
    .config("spark.kubernetes.driver.volumes.emptyDir.spark-local-dir-tmp-spark.mount.readOnly", "false")\
    .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.path", "/tmp/spark")\
    .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.readOnly", "false")\
    .getOrCreate()

23/12/29 11:59:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/29 11:59:05 WARN spark.SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [5]:
def json_serializer(data):
    return json.dumps(data).encode("utf-8")


In [6]:
data = pd.read_csv('../src/test_data/sample_data.csv')
data = data.drop(columns = ['Unnamed: 0'])
data = data.fillna('None')
for column in data:
    data[column] = data[column].astype('str')

prices = data[['vin', 'price']]

In [7]:
producer = KafkaProducer(bootstrap_servers="kafka-service:9092",
                         value_serializer=json_serializer)
i = 0
for row in data.to_dict('records'): 
    #print(row)
    producer.send("Objects", row)
    i += 1
    if i >10:
        break
        
j = 0
for row in prices.to_dict('records'): 
    #print(row)
    producer.send("Price", row)
    j += 1
    if i >10:
        break

In [11]:
consumer = KafkaConsumer(
        #"Objects",
        bootstrap_servers="kafka-service:9092",
        auto_offset_reset='earliest',
        #group_id="consumer-group-a"
        )
tp = TopicPartition('Objects', 0)
#register to the topic
consumer.assign([tp])

# obtain the last offset value
consumer.seek_to_end(tp)
lastOffset = consumer.position(tp)
consumer.seek_to_beginning(tp)
msg_list = []
for msg in consumer:
        msg_list.append(json.loads(msg.value))
        if msg.offset == lastOffset - 1:
            df = spark.createDataFrame(msg_list)
            break
        

In [12]:
   
for i in range(5):
    consumer = KafkaConsumer(
        bootstrap_servers="kafka-service:9092",
        auto_offset_reset='earliest')

    tp = TopicPartition('Objects', i)
    consumer.assign([tp])

    # obtain the last offset value
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)
    consumer.seek_to_beginning(tp)
    msg_list = []
    for msg in consumer:
        msg_list.append(json.loads(msg.value))
        #print("Message = {}".format(json.loads(msg.value)))
        if msg.offset == lastOffset - 1:
            otp = spark.createDataFrame(msg_list)
            df = df.union(otp)
            df = df.distinct()
            break
df.write.mode('overwrite').format('parquet').save('hdfs:///home/eabraham-373705/data/raw/NEW_raw_data.parquet')


                                                                                

In [10]:
pred = spark.read.parquet('/home/eabraham-373705/data/predictions/NEW_second_level_predictions.parquet', 
                        header=True, inferSchema=True)
pred = pred.toPandas()
for row in pred.to_dict('records'): 
    #print(row)
    producer.send("Predictions", row)