## ADD spark streams

In [1]:
import os
postgres_pack = 'org.postgresql:postgresql:42.2.12'
kafka_streams_pack = f"org.apache.spark:spark-streaming-kafka-0-8_2.11:{os.environ['APACHE_SPARK_VERSION']}"

# Add extra packages
os.environ['PYSPARK_SUBMIT_ARGS'] = f"--packages {postgres_pack},{kafka_streams_pack} pyspark-shell"

In [2]:
import json
from dotenv import load_dotenv
from pyspark import SparkContext, Row
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [3]:
# load enviroments for connection to DB
load_dotenv('../src/database.env')

True

In [4]:
def get_spark_session(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']

In [5]:
def spark_df_to_postgres(spark_df):
    
    print(f"=========-----> working with DB <-----=========")
    
    try:
        # From database.env.
        db_host = os.environ['POSTGRES_HOST']
        db_name = os.environ['POSTGRES_DB']
        db_user = os.environ['POSTGRES_USER']
        db_pass = os.environ['POSTGRES_PASSWORD']
        db_table = 'clickstream_filtered'
        
    except Exception as e:
        print("--> It seems Database not initialized from .env", e)
    
    
    try:
        # '?stringtype=unspecified' For personalized enum_type in postgres
        spark_df.write \
            .format("jdbc") \
            .mode("append") \
            .option("driver", 'org.postgresql.Driver') \
            .option("url", f"jdbc:postgresql://{db_host}:5432/{db_name}?stringtype=unspecified") \
            .option("dbtable", db_table) \
            .option("user", db_user) \
            .option("password", db_pass) \
            .save()
        print(f"=========-----> Insert complete <-----=========")
    except Exception as e:
        print("--> It seems an Error with connection to DB", e)

In [6]:
def data_processing(time, rdd):
    
    print(f"===========-----> {str(time)} <-----===========")

    result_df = None
    filtered_list = ['Click', 'Like', 'Complete']
    
    try:
        spark = get_spark_session(rdd.context.getConf())
        
        row_rdd = rdd \
                .map(lambda r_json: Row(epk_id=r_json['epk_id'],
                                        content_id=r_json['content_id'],
                                        event_type=r_json['event_type'],
                                        event_ts=r_json['event_ts'],
                                        insert_ts=r_json['insert_ts'])) \
                .filter(lambda row: row['event_type'] in filtered_list)
                                       
        result_df = spark.createDataFrame(row_rdd)
        result_df.createOrReplaceTempView("treasury_stream")
        result_df.show(n=3)
        
        spark_df_to_postgres(result_df)

    except Exception as e:
        print("--> Opps! Is seems an Error!!!", e)
        
    return result_df

In [7]:
def create_context(kafka_server, kafka_topic):

    sc = SparkContext(appName="PythonStreamingKafka")
    sc.setLogLevel("ERROR")
    n_seconds = 5
    ssc = StreamingContext(sc, n_seconds) #job every n_seconds

    try:
        directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                        [kafka_topic],
                                        {"metadata.broker.list": kafka_server})
    except:
        raise ConnectionError(f"Kafka error: Connection refused: \
                            broker_list={kafka_server} topic={kafka_topic}")
        
    parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))

    # RDD handling
    filtered_df = parsed_lines.foreachRDD(data_processing)

    spark_df_to_postgres(filtered_df)

    return ssc

In [8]:
server = 'kafka:9093'
topic = 'clickstream'
output_path = '/tmp/spark/checkpoint_01'

ssc = StreamingContext.getOrCreate(output_path, lambda: create_context(server, topic))
ssc.start()
# ssc.awaitTermination()

--> It seems an Error with connection to DB 'NoneType' object has no attribute 'write'
+----------+------+----------+----------+-------------------+
|content_id|epk_id|  event_ts|event_type|          insert_ts|
+----------+------+----------+----------+-------------------+
|      2242|  5578|1586254465|     Click|1.586336746999377E9|
|       465|  5748|1586007754|  Complete|1.586336747020188E9|
|      2119|  8943|1585388642|     Click| 1.58633674700369E9|
+----------+------+----------+----------+-------------------+
only showing top 3 rows

+----------+------+----------+----------+-------------------+
|content_id|epk_id|  event_ts|event_type|          insert_ts|
+----------+------+----------+----------+-------------------+
|      2809|  8448|1585883364|  Complete|1.586336750134334E9|
|      3999|  1579|1585308182|  Complete|1.586336753231109E9|
|      3184|  2402|1585522778|  Complete|1.586336753170254E9|
+----------+------+----------+----------+-------------------+
only showing top 3 r

## Clean up

In [9]:
ssc.stop()

In [10]:
!rm -rdf /tmp/spark/checkpoint_01

In [11]:
globals()['sparkSessionSingletonInstance'].stop()
del(globals()['sparkSessionSingletonInstance'])

## Test

In [22]:
spark = SparkSession\
            .builder\
            .appName('test')\
            .getOrCreate()

In [23]:
df = spark.createDataFrame([1, 2, 3], "int").toDF("test")

In [24]:
df.count()

3