## Setup

### Spark 3.0

In [4]:
%%capture
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install -q pyspark

In [1]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

import findspark
findspark.init()

### Postgres

In [6]:
%%capture
# Install postgresql server
!sudo apt-get -y -qq update
!sudo apt-get -y -qq install postgresql
!sudo service postgresql start

# Setup a password `postgres` for username `postgres`
!sudo -u postgres psql -U postgres -c "ALTER USER postgres PASSWORD 'postgres';"

In [7]:
! lsof -i -P -n | grep -E 'postgres'

postgres  3669 postgres    7u  IPv4  46139      0t0  TCP 127.0.0.1:5432 (LISTEN)
postgres  3669 postgres   11u  IPv4  46150      0t0  UDP 127.0.0.1:52935->127.0.0.1:52935 
postgres  3671 postgres   11u  IPv4  46150      0t0  UDP 127.0.0.1:52935->127.0.0.1:52935 
postgres  3672 postgres   11u  IPv4  46150      0t0  UDP 127.0.0.1:52935->127.0.0.1:52935 
postgres  3673 postgres   11u  IPv4  46150      0t0  UDP 127.0.0.1:52935->127.0.0.1:52935 
postgres  3674 postgres   11u  IPv4  46150      0t0  UDP 127.0.0.1:52935->127.0.0.1:52935 
postgres  3675 postgres   11u  IPv4  46150      0t0  UDP 127.0.0.1:52935->127.0.0.1:52935 
postgres  3676 postgres   11u  IPv4  46150      0t0  UDP 127.0.0.1:52935->127.0.0.1:52935 


In [27]:
!psql --version

psql (PostgreSQL) 10.21 (Ubuntu 10.21-0ubuntu0.18.04.1)


### Kafka

In [8]:
%%capture
!pip install kafka-python
!curl -sSOL https://downloads.apache.org/kafka/3.2.0/kafka_2.12-3.2.0.tgz
!tar -xzf kafka_2.12-3.2.0.tgz
!./kafka_2.12-3.2.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-3.2.0/config/zookeeper.properties
!./kafka_2.12-3.2.0/bin/kafka-server-start.sh -daemon ./kafka_2.12-3.2.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

## Kafka Spark

In [28]:
!wget -q --show-progress https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.2.0/spark-sql-kafka-0-10_2.12-3.2.0.jar
!wget -q --show-progress https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
!wget -q --show-progress https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.2.0/spark-token-provider-kafka-0-10_2.12-3.2.0.jar
!wget -q --show-progress https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.0/commons-pool2-2.11.0.jar
!wget -q --show-progress https://repo1.maven.org/maven2/org/postgresql/postgresql/42.4.0/postgresql-42.4.0.jar



In [3]:
from pyspark.sql import SparkSession

spark_jars = """/content/spark-sql-kafka-0-10_2.12-3.2.0.jar,
/content/kafka-clients-3.2.0.jar,
/content/spark-token-provider-kafka-0-10_2.12-3.2.0.jar,
/content/commons-pool2-2.11.0.jar,
/content/postgresql-42.4.0.jar
"""

spark = SparkSession \
    .builder \
    .appName("PySpark Structured Streaming") \
    .master("local[*]") \
    .config("spark.jars", spark_jars) \
    .config("spark.executor.extraClassPath", spark_jars) \
    .config("spark.executor.extraLibrary", spark_jars) \
    .config("spark.driver.extraClassPath", spark_jars) \
    .getOrCreate()
spark.sparkContext.setLogLevel('INFO')

## Kafka Producer

In [4]:
from kafka import KafkaProducer
import json
import pandas as pd
from random import randint
import time
import uuid

In [5]:
# Constants
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_TOPIC_NAME = 'ecommercetopic'
DATA_PATH = 'sample.csv'

In [6]:
# Serializer method
def serializer(data):
    return json.dumps(data).encode('utf-8')

In [16]:
# Producer object
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=serializer
)

In [19]:
# Dataframe to simulate real-time data flow
df = pd.read_csv(DATA_PATH)
df

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-11-08 08:14:53 UTC,view,1004259,2053013555631882655,electronics.smartphone,apple,776.74,568847318,3fca063b-dbd6-4306-8c8f-be86cc369d6d
1,2019-11-08 08:15:32 UTC,view,16000290,2053013558223962683,,tefal,22.14,547707147,57b0fa7e-fcf0-4a22-b537-07c59fdfeaa2
2,2019-11-08 08:17:27 UTC,view,16000976,2053013558223962683,,pyrex,9.76,552474342,380c380b-22c5-40e8-8d0e-daf409032477
3,2019-11-08 08:19:50 UTC,view,17300768,2053013553853497655,,,56.61,563883687,e6a954a2-3cb6-47fc-89c5-77eb09016c8e
4,2019-11-08 08:21:58 UTC,view,4501335,2053013563877884791,appliances.kitchen.hob,bosch,334.37,568857333,f7c46092-13bd-418b-bc94-7e41bb5e660d
...,...,...,...,...,...,...,...,...,...
10029,2019-11-21 16:43:15 UTC,view,1004886,2053013555631882655,electronics.smartphone,oppo,154.16,522921329,3020bc5b-3133-4988-94a5-c621779ed9fc
10030,2019-11-21 16:46:51 UTC,purchase,4804056,2053013554658804075,electronics.audio.headphone,apple,165.07,566161134,0555d9f8-d12c-4e38-9497-95e16102514d
10031,2019-11-21 16:46:59 UTC,view,5100402,2053013553341792533,electronics.clocks,garmin,262.30,535237020,52d7c479-4203-4b76-b9be-e6dd4bff5adb
10032,2019-11-21 16:49:35 UTC,view,29100105,2053013565941482475,appliances.personal.massager,casada,102.96,553359592,b45d8cb4-499c-49ff-8777-ac22cc7758f4


In [20]:
for _ in range(10):
    # Number of messages to send in this iteration
    n_msjs = randint(1, 10)
    # Getting random n_msjs from the dataframe
    sample_df = df.sample(n_msjs, axis=0)
    # Setting a timestamp
    sample_df.event_time = pd.Timestamp.now()
    sample_df.event_time = sample_df.event_time.astype('str')
    # Setting a unique ID
    sample_df['id'] = df.apply(lambda x: str(uuid.uuid1()), axis=1)
    # Creating a list of dictionaries from sampled dataframe
    sample = sample_df.to_dict('records')

    # Sending all messages in the sample to Kafka Topic
    for message in sample:
        print(message)
        producer.send(KAFKA_TOPIC_NAME, message)
    # Sleep randomly between 1 and 3 seconds
    time.sleep(randint(1, 3))

{'event_time': '2022-07-31 08:15:35.972490', 'event_type': 'view', 'product_id': 12600086, 'category_id': 2053013554751078769, 'category_code': 'appliances.kitchen.grill', 'brand': 'redmond', 'price': 51.46, 'user_id': 512391301, 'user_session': 'fbd30c28-1e91-4900-be0d-e90245a2ebf1', 'id': 'f477eeb6-10a8-11ed-adee-0242ac1c0002'}
{'event_time': '2022-07-31 08:15:35.972490', 'event_type': 'view', 'product_id': 1004133, 'category_id': 2053013555631882655, 'category_code': 'electronics.smartphone', 'brand': 'xiaomi', 'price': 136.4, 'user_id': 569305509, 'user_session': '664883ca-ebe4-411f-b1f3-a3752a932f8f', 'id': 'f478b698-10a8-11ed-adee-0242ac1c0002'}
{'event_time': '2022-07-31 08:15:35.972490', 'event_type': 'view', 'product_id': 1005248, 'category_id': 2053013555631882655, 'category_code': 'electronics.smartphone', 'brand': 'inoi', 'price': 46.05, 'user_id': 566653563, 'user_session': 'c33975ec-fdf5-4f24-a29a-609ad8c0ef56', 'id': 'f478b4cc-10a8-11ed-adee-0242ac1c0002'}
{'event_time':

In [7]:
!kafka_2.12-3.2.0/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --topic $KAFKA_TOPIC_NAME --from-beginning

{"event_time": "2022-07-31 08:15:35.972490", "event_type": "view", "product_id": 12600086, "category_id": 2053013554751078769, "category_code": "appliances.kitchen.grill", "brand": "redmond", "price": 51.46, "user_id": 512391301, "user_session": "fbd30c28-1e91-4900-be0d-e90245a2ebf1", "id": "f477eeb6-10a8-11ed-adee-0242ac1c0002"}
{"event_time": "2022-07-31 08:15:35.972490", "event_type": "view", "product_id": 1004133, "category_id": 2053013555631882655, "category_code": "electronics.smartphone", "brand": "xiaomi", "price": 136.4, "user_id": 569305509, "user_session": "664883ca-ebe4-411f-b1f3-a3752a932f8f", "id": "f478b698-10a8-11ed-adee-0242ac1c0002"}
{"event_time": "2022-07-31 08:15:35.972490", "event_type": "view", "product_id": 1005248, "category_id": 2053013555631882655, "category_code": "electronics.smartphone", "brand": "inoi", "price": 46.05, "user_id": 566653563, "user_session": "c33975ec-fdf5-4f24-a29a-609ad8c0ef56", "id": "f478b4cc-10a8-11ed-adee-0242ac1c0002"}
{"event_time":

In [17]:
!kafka_2.12-3.2.0/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list

__consumer_offsets
ecommercetopic


## Spark Streaming

In [7]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Normalizer, StandardScaler

import random
import pyspark
import sys
import time
from datetime import datetime

In [4]:
# Postgres constants
POSTGRES_HOST_NAME='localhost'
POSTGRES_PORT='5432'
POSTGRES_DATABASE='postgres'
POSTGRES_TABLE='kafka_pyspark'
POSTGRES_USR_NAME='postgres'
POSTGRES_PASSWORD='postgres'
POSTGRES_JDBC_URL=f'jdbc:postgresql://{POSTGRES_HOST_NAME}:{POSTGRES_PORT}/{POSTGRES_DATABASE}'
POSTGRES_DRIVER_CLASS='org.postgresql.Driver'

In [10]:
def save_to_postgres(current_df, epoch_id):
    db_credentials = {
        'user': POSTGRES_USR_NAME,
        'password': POSTGRES_PASSWORD,
        'driver': POSTGRES_DRIVER_CLASS
    }

    print('Saving to Postgresql')
    current_df \
        .write \
        .jdbc(
            url=POSTGRES_JDBC_URL,
            table=POSTGRES_TABLE,
            mode='append',
            properties=db_credentials
        )

In [11]:
print('Data Processing application started')
print(time.strftime("%Y-%m-%d %H:%M:%S"))

Data Processing application started
2022-07-31 11:31:33


In [47]:
# # Extracting information from Kafka topic
# kafka_stream = spark \
#     .readStream \
#     .format('kafka') \
#     .option('kafka.bootstrap.servers', KAFKA_BOOTSTRAP_SERVERS) \
#     .option('subscribe', KAFKA_TOPIC_NAME) \
#     .option("kafka.security.protocol", "SASL_PLAINTEXT") \
#     .option("startingOffsets","earliest") \
#     .option("maxOffsetsPerTrigger","6000") \
#     .load()
# raw_info = kafka_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [12]:
# Extracting information from Kafka topic
kafka_stream = spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', KAFKA_BOOTSTRAP_SERVERS) \
    .option('subscribe', KAFKA_TOPIC_NAME) \
    .option("startingOffsets", "earliest") \
    .load()
raw_info = kafka_stream.selectExpr("CAST(value AS STRING)")

In [57]:
# # Extracting information from Kafka topic
# kafka_stream = spark \
#     .read \
#     .format('kafka') \
#     .option('kafka.bootstrap.servers', KAFKA_BOOTSTRAP_SERVERS) \
#     .option('subscribe', KAFKA_TOPIC_NAME) \
#     .option("startingOffsets","earliest") \
#     .option("startingOffsets", """{"ecommercetopic":{"0":5}}""") \
#     .option("endingOffsets", """{"endingOffsets":{"0":50}}""") \
#     .load()
# raw_info = kafka_stream.selectExpr("CAST(value AS STRING)")

In [13]:
# Building a Schema (columns and their types) to the information
# retrieved
df_schema = StructType() \
        .add('id', StringType()) \
        .add('event_time', StringType()) \
        .add('event_type', StringType()) \
        .add('product_id', StringType()) \
        .add('category_id', StringType()) \
        .add('category_code', StringType()) \
        .add('brand', StringType()) \
        .add('price', FloatType()) \
        .add('user_id', StringType()) \
        .add('user_session', StringType())

In [14]:
# Application of the schema to the information retrieved
df_raw = raw_info \
    .select(from_json(col('value'), df_schema).alias('dataframe'))
df_raw = df_raw.select('dataframe.*')

In [15]:
# ----- DATA PREPROCESSING -----

# Removing useless columns
df = df_raw.drop('product_id', 'category_id', 'user_id', 'user_session')\
# Splitting 'category_code' to find the department and product
split_col = split(df['category_code'], '\.')
df = df.withColumn('department', element_at(split_col, 1))
df = df.withColumn('product', element_at(split_col, -1))
# Removing column 'category_code'
df = df.drop('category_code')
# Creating revenue column
df = df.withColumn('revenue', when(df.event_type=='purchase', df.price).otherwise(0))
# Filling nans in 'brand', 'department' and 'product' columns
for c in ['brand', 'department', 'product']:
    df = df.withColumn(c, regexp_replace(c, 'NaN', 'other'))

In [None]:
# query = df \
#     .writeStream \
#     .format("console") \
#     .option("checkpointLocation", "/content/chkpt") \
#     .start()

# query.awaitTermination()

In [None]:
# Storing processed dataframe into POSTGRES database
df \
    .writeStream \
    .trigger(processingTime='15 seconds') \
    .outputMode('update') \
    .foreachBatch(save_to_postgres) \
    .start() \
    .awaitTermination()

## SQL

In [2]:
%reload_ext sql

In [5]:
%sql postgresql://{POSTGRES_USR_NAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOST_NAME}/{POSTGRES_DATABASE}

'Connected: postgres@postgres'

In [7]:
%sql SELECT * FROM information_schema.tables;

 * postgresql://postgres:***@localhost/postgres
189 rows affected.


table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action
postgres,public,kafka_pyspark,BASE TABLE,,,,,,YES,NO,
postgres,pg_catalog,pg_statistic,BASE TABLE,,,,,,YES,NO,
postgres,pg_catalog,pg_type,BASE TABLE,,,,,,YES,NO,
postgres,pg_catalog,pg_policy,BASE TABLE,,,,,,YES,NO,
postgres,pg_catalog,pg_authid,BASE TABLE,,,,,,YES,NO,
postgres,pg_catalog,pg_shadow,VIEW,,,,,,NO,NO,
postgres,pg_catalog,pg_settings,VIEW,,,,,,NO,NO,
postgres,pg_catalog,pg_hba_file_rules,VIEW,,,,,,NO,NO,
postgres,pg_catalog,pg_file_settings,VIEW,,,,,,NO,NO,
postgres,pg_catalog,pg_config,VIEW,,,,,,NO,NO,


In [8]:
%sql SELECT COUNT(*) FROM $POSTGRES_TABLE;

 * postgresql://postgres:***@localhost/postgres
1 rows affected.


count
70


In [9]:
%sql SELECT * FROM $POSTGRES_TABLE LIMIT 10;

 * postgresql://postgres:***@localhost/postgres
10 rows affected.


id,event_time,event_type,brand,price,department,product,revenue
f477eeb6-10a8-11ed-adee-0242ac1c0002,2022-07-31 08:15:35.972490,view,redmond,51.46,appliances,grill,0.0
f478b698-10a8-11ed-adee-0242ac1c0002,2022-07-31 08:15:35.972490,view,xiaomi,136.4,electronics,smartphone,0.0
f478b4cc-10a8-11ed-adee-0242ac1c0002,2022-07-31 08:15:35.972490,view,inoi,46.05,electronics,smartphone,0.0
f483001c-10a8-11ed-adee-0242ac1c0002,2022-07-31 08:15:35.972490,view,artel,93.75,appliances,washer,0.0
f47d5266-10a8-11ed-adee-0242ac1c0002,2022-07-31 08:15:35.972490,view,xiaomi,200.73,electronics,smartphone,0.0
f477803e-10a8-11ed-adee-0242ac1c0002,2022-07-31 08:15:35.972490,view,bosch,20.57,"""other""","""other""",0.0
f488fe2c-10a8-11ed-adee-0242ac1c0002,2022-07-31 08:15:35.972490,cart,asel,46.31,appliances,oven,0.0
f58817b8-10a8-11ed-adee-0242ac1c0002,2022-07-31 08:15:37.657261,cart,kapsen,57.92,"""other""","""other""",0.0
f570e552-10a8-11ed-adee-0242ac1c0002,2022-07-31 08:15:37.657261,view,acer,300.88,computers,notebook,0.0
f57bdfac-10a8-11ed-adee-0242ac1c0002,2022-07-31 08:15:37.657261,view,pituso,95.73,furniture,chair,0.0
