## Import libraries

In [1]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession, SQLContext

import utils

## Set constants

In [2]:
BOOTSTRAP_SERVERS = '10.156.0.3:6667,10.156.0.4:6667,10.156.0.5:6667'
GROUP_ID = 'dborisov_spark_task_2'
SPARK_JARS_PACKAGES = 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0'
SPARK_YARN_QUEUE = 'dborisov'
TOPIC_NAME = 'mles.sopr'

## Create SparkSession

In [3]:
spark = SparkSession \
    .builder \
    .appName('dborisov_task_2') \
    .master('yarn') \
    .config('spark.yarn.queue', SPARK_YARN_QUEUE) \
    .config('spark.executor.cores', '3') \
    .config('spark.executor.memory', '6g') \
    .config('spark.jars.packages', SPARK_JARS_PACKAGES) \
    .enableHiveSupport() \
    .getOrCreate()

## Load announcements from Kafka

In [4]:
df = spark \
    .read \
    .format('kafka') \
    .option('kafka.bootstrap.servers', BOOTSTRAP_SERVERS) \
    .option('subscribe', TOPIC_NAME) \
    .option('group.id', GROUP_ID) \
    .option('startingOffsets', utils.get_starting_offsets(TOPIC_NAME)) \
    .load() \
    .cache()

## Parse announcements

In [5]:
kafka_schema = T.StructType([
    T.StructField('timestamp', T.TimestampType()),
    T.StructField('offer_id', T.LongType()),
    T.StructField('user_id', T.StringType()),
    T.StructField('event_type', T.StringType()),
    T.StructField('page_type', T.StringType()),
])

In [6]:
ads_data = df.select(
    F.from_json(F.col('value').cast('string'), kafka_schema) \
    .alias('json')
) \
.select('json.*') \
.withColumn('ptn_dadd', F.col('timestamp').cast(T.DateType()))

## Write announcements to Hive

In [7]:
ads_data \
    .write \
    .format('orc') \
    .mode('append') \
    .partitionBy('ptn_dadd') \
    .saveAsTable('dborisov.mles_sopr')

## Dump offsets

In [8]:
partition_offsets_mapping = {
    str(partition): offset + 1
    for partition, offset in df.groupBy('partition').agg({'offset': 'max'}).collect()
}

utils.dump_offsets(TOPIC_NAME, partition_offsets_mapping)