In [1]:
# import project directory helper
import os, sys
ROOT_DIR = os.path.abspath('/home/hduser/document/jupyter/FYP/')
BACKEND_DIR = os.path.abspath(
    '/home/hduser/document/jupyter/FYP/crawler/backend')
sys.path.insert(0, BACKEND_DIR)

In [2]:
from consumer import *

In [3]:
# import pyspark packages
# set the kafka dependencies before create spark context or session
# import os
# os.environ[
#     'PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 pyspark-shell'
from pyspark.sql import SparkSession, functions, types
from pyspark.sql.types import *

In [4]:
spark = SparkSession.builder.appName('attraction').getOrCreate()
spark

# List kafka topics

In [12]:
%%sh
kafka-topics.sh --list --zookeeper 10.123.10.26:2181

__consumer_offsets
testing
testing2
theculturetrip
tripad_activity
tripad_attr_activity
tripad_attr_location
tripad_attr_review
tripad_hotel_info
tripad_hotel_review
tripad_location
tripad_review


# Consume theculturetrip data to json

In [7]:
ds_dir = '/home/hduser/document/jupyter/FYP/crawler/datasets/theculturetrip_dataset/'
broker = ['10.123.10.26:9092']
kafka_topic = 'theculturetrip'
key = []
filepath = ds_dir + kafka_topic + '.json'

In [11]:
kafka_consumer(broker, kafka_topic, filepath, key)

processing...
done.


# Consume tripad_attr_location data to json

In [15]:
ds_dir = '/home/hduser/document/jupyter/FYP/crawler/datasets/tripadvisor_dataset/attractions/'
broker = ['10.123.10.26:9092']
kafka_topic = 'tripad_attr_location'
filepath = ds_dir + kafka_topic + '.json'
key = []

In [16]:
kafka_to_json(broker, kafka_topic, filepath, key)

processing...
done.


# Consume tripad_attr_activity data to json

In [17]:
ds_dir = '/home/hduser/document/jupyter/FYP/crawler/datasets/tripadvisor_dataset/attractions/'
broker = ['10.123.10.26:9092']
kafka_topic = 'tripad_attr_activity'
filepath = ds_dir + kafka_topic + '.json'
key = []

In [18]:
kafka_to_json(broker, kafka_topic, filepath, key)

processing...
done.


# Consume tripad_attr_review data to json

In [19]:
ds_dir = '/home/hduser/document/jupyter/FYP/crawler/datasets/tripadvisor_dataset/attractions/'
broker = ['10.123.10.26:9092']
kafka_topic = 'tripad_attr_review'
key = [0, 'data', 'locations', 0, 'reviewListPage', 'reviews']
filepath = ds_dir + kafka_topic + '.json'

In [20]:
kafka_to_json(broker, kafka_topic, filepath, key)

processing...
done.


# Consume tripad_hotel_info data to json

In [21]:
ds_dir = '/home/hduser/document/jupyter/FYP/crawler/datasets/tripadvisor_dataset/hotels/'
broker = ['10.123.10.26:9092']
kafka_topic = 'tripad_hotel_info'
filepath = ds_dir + kafka_topic + '.json'
key = []

In [22]:
kafka_to_json(broker, kafka_topic, filepath, key)

processing...
done.


# Consume tripad_hotel_review data to json

In [23]:
ds_dir = '/home/hduser/document/jupyter/FYP/crawler/datasets/tripadvisor_dataset/hotels/'
broker = ['10.123.10.26:9092']
kafka_topic = 'tripad_hotel_review'
filepath = ds_dir + kafka_topic + '.json'
key = [0, 'data', 'locations', 0, 'reviewListPage', 'reviews']

In [24]:
kafka_to_json(broker, kafka_topic, filepath, key)

processing...
done.


# Validate json data using spark

In [None]:
# read json as spark dataframe
spark_df = spark.read.json(ds_dir + kafka_topic + '.json').repartition(160)

In [None]:
# drop unwanted columns
spark_df = spark_df.drop('__COMMENT')
# tripad_attr_review
spark_df = spark_df.dropna(subset='__typename')

In [None]:
# showing
display(spark_df.toPandas())

In [None]:
# write spark dataframe to parquet
spark_df.write.parquet(ds_dir + kafka_topic, mode='overwrite')

# Pyspark subscribe kafka topic 

#####  set advertised.listeners=PLAINTEXT://your-kafka-server-ip:9092 in server.properties

In [None]:
import os
os.environ[
    'PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 pyspark-shell'

In [None]:
from pyspark.sql import SparkSession, functions, types
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [None]:
spark = SparkSession.builder.appName("attraction").getOrCreate()
spark

In [None]:
# create streaming contexts
# ssc = StreamingContext(spark.sparkContext, 120)

In [None]:
# kafkaStream = KafkaUtils.createDirectStream(ssc, ['testing2'], {
#     'bootstrap.servers': '10.123.10.26:9092',
#     'auto.offset.reset': 'smallest'
# })

# kafkaStream.pprint()

In [None]:
# parsed = kafkaStream.map(lambda x: json.loads(x[1]))

In [None]:
# ssc.start()
# ssc.awaitTermination()

In [None]:
# ssc.stop()

In [None]:
# spark.stop()

# Structured kafka reading

In [None]:
kafka_topic = 'tripad_attr_review'

In [None]:
# default for startingOffsets is "latest", but "earliest" allows rewind for missed data
attr_review_kafka_msg = spark.read.format("kafka").option(
    "kafka.bootstrap.servers",
    "10.123.10.26:9092").option("subscribe",
                                kafka_topic).option("startingOffsets",
                                                    "earliest").load()

##### Read schema from kafka message value

In [None]:
kafka_msg_df = attr_review_kafka_msg.withColumn(
    "value", functions.expr("string(value)")).select("value")

In [None]:
kafka_msg_df_json = spark.read.json(kafka_msg_df.rdd.map(lambda x: x.value),
                                    multiLine=True)
kafka_msg_df_json.count()

In [None]:
kafka_msg_df_json.schema

In [None]:
# required schema
# kafka_msg_df_json = kafka_msg_df.select(
#     functions.from_json(
#         functions.col("value"),
#         kafka_msg_df_json.schema).alias("data")).select("data.*")

In [None]:
reviews_spark_df = kafka_msg_df_json.select(
    kafka_msg_df_json.data.locations.reviewListPage.reviews.alias("reviews"))

In [None]:
reviews_spark_df.schema

In [None]:
reviews_df = reviews_spark_df.toPandas()

In [None]:
reviews_df

# Consume kafka data to hbase

In [1]:
broker = ['10.123.10.26:9092']
hbase_host = 'localhost'
kafka_topic = 'theculturetrip'
key = ["props, pageProps, articleStoreState, articleData, data, link"]
column = ["m:url"]

In [None]:
# theculturetrip url
kafka_to_hbase(broker, hbase_host, kafka_topic, key, column)

In [None]:
broker = ['10.123.10.26:9092']
hbase_host = 'localhost'
kafka_topic = 'tripad_attr_location'
key = ["web_url"]
column = ["m:url"]

In [None]:
# tripadvisor location
kafka_to_hbase(broker, hbase_host, kafka_topic, key, column)

In [None]:
broker = ['10.123.10.26:9092']
hbase_host = 'localhost'
kafka_topic = 'tripad_attr_activity'
key = ["productHeader", "activityId"]
column = ["m:activityId"]

In [None]:
# tripadvisor activity
kafka_to_hbase(broker, hbase_host, kafka_topic, key, column)

In [None]:
broker = ['10.123.10.26:9092']
hbase_host = 'localhost'
kafka_topic = 'tripad_attr_review'
key = [0, "data", "locations", 0, "locationId"]
column = ["m:locationId"]

In [None]:
# tripadvisor activity review
kafka_to_hbase(broker, hbase_host, kafka_topic, key, column)