In [1]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64/'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 pyspark-shell'

In [2]:
from pyspark import SparkContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import *

In [3]:
from pyspark.sql.types import (StructType, StringType,
                               IntegerType, FloatType,
                               BooleanType, ArrayType)

In [4]:
spark = SparkSession.builder.appName("Test1").getOrCreate()

## Streaming

In [None]:
df_stream = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092, localhost:9093, localhost:9094")\
    .option("subscribe", "nifi-test")\
    .load()

In [None]:
df_stream.printSchema()

In [None]:
df_stream.isStreaming

In [None]:
df_stream_value = df_stream.selectExpr("CAST(value AS STRING)")

In [None]:
df_stream_value.isStreaming

In [None]:
#df_stream_value.writeStream.format('console').start()

# MiniBatch

In [None]:
smallBatch = spark.read.format("kafka")\
                           .option("kafka.bootstrap.servers", "localhost:9092")\
                           .option("subscribe", "nifi-test")\
                           .option("startingOffsets", "earliest")\
                           .option("endingOffsets", """{"nifi-test":{"0":2}}""")\
                           .load()\
                           .selectExpr("CAST(value AS STRING)")

In [None]:
smallBatch.write.mode("overwrite").format('text').save("/home/daguito81/tweets")

In [None]:
tweet = spark.read.json("/home/daguito81/tweets/*")

In [None]:
tweet_schema = tweet.schema

In [None]:
tweet.printSchema()

## Batches

In [None]:
df_batch = spark \
    .read \
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092, localhost:9093, localhost:9094")\
    .option("subscribe", "nifi-test")\
    .option("value.deserializer", "json")\
    .load()

In [None]:
df_batch.count()

In [None]:
df_batch.printSchema()

In [None]:
df_batch.show(5)

In [None]:
df_batch_value = df_eth_batch.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")

In [None]:
df_batch_value.show(5)

In [None]:
df_batch.select(['partition', 'offset']).describe().show()

In [None]:
df_batch_test = spark \
    .read \
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092, localhost:9093, localhost:9094")\
    .option("subscribe", "nifi-test")\
    .load() \
    .select(from_json(col("value").cast("string"), tweet_schema).alias("parsed"))

In [None]:
df_batch_test.show(1, False)

# Dealing with Streams

In [5]:
tweet_schema = spark.read.json("/home/daguito81/tweets/*").schema

In [6]:
df_stream_eth = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092, localhost:9093, localhost:9094")\
    .option("subscribe", "ethereum")\
    .load()\
    .select(from_json(col("value").cast("string"), tweet_schema).alias("parsed_value"))\
    .selectExpr("parsed_value.entities.hashtags",
                  "parsed_value.favorite_count",
                  "parsed_value.lang",
                  "parsed_value.retweet_count",
                  "parsed_value.text",
                  "parsed_value.user.followers_count",
                  "parsed_value.user.name",
                  "parsed_value.id",
                  "parsed_value.created_at")\
    .selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "eth-processed") \
    .option("checkpointLocation", "/home/daguito81/Desktop/crypto-tweet-analysis/SparkStream-test/pyspark-tests/checkpoints") \
    .start()

In [7]:
df_stream_btc = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092, localhost:9093, localhost:9094")\
    .option("subscribe", "bitcoin")\
    .load()\
    .select(from_json(col("value").cast("string"), tweet_schema).alias("parsed_value"))\
    .selectExpr("parsed_value.entities.hashtags",
                  "parsed_value.favorite_count",
                  "parsed_value.lang",
                  "parsed_value.retweet_count",
                  "parsed_value.text",
                  "parsed_value.user.followers_count",
                  "parsed_value.user.name",
                  "parsed_value.id",
                  "parsed_value.created_at")\
    .selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "btc-processed") \
    .option("checkpointLocation", "/home/daguito81/Desktop/crypto-tweet-analysis/SparkStream-test/pyspark-tests/checkpoints2") \
    .start()

In [None]:
#new_df = df_stream.select(from_json(col("value").cast("string"), tweet_schema).alias("parsed_value"))

In [None]:
# final_df = new_df.selectExpr("parsed_value.entities.hashtags",
#                   "parsed_value.favorite_count",
#                   "parsed_value.lang",
#                   "parsed_value.retweet_count",
#                   "parsed_value.text",
#                   "parsed_value.user.followers_count",
#                   "parsed_value.user.name",
#                   "parsed_value.id",
#                   "parsed_value.created_at")

In [None]:
# process = final_df \
#   .selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
#   .writeStream \
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", "localhost:9092") \
#   .option("topic", "tweets-processed") \
#   .option("checkpointLocation", "/home/daguito81/Desktop/crypto-tweet-analysis/SparkStream-test/pyspark-tests/checkpoints") \
#   .trigger(continuous="1 second") \ 
#   .start()

# Final