In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
#import org.apache.spark.SparkConf
#from pyspark.streaming.kafka import KafkaUtils
#from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import from_json, col, explode, split


In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.kafka:kafka-clients:3.3.1 pyspark-shell'

In [11]:
#import os
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 pyspark-shell'

In [4]:
spark = SparkSession.builder.master("local").appName("twitter").getOrCreate()

In [5]:
dsraw = spark \
                   .readStream \
                   .format("kafka") \
                   .option("kafka.bootstrap.servers", "localhost:9092") \
                   .option("subscribe", "tweets") \
                   .option("startingOffsets", "latest") \
                   .load()
dsraw

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [6]:
ds = dsraw.selectExpr("CAST(value AS STRING)")
print(type(dsraw))
print(type(ds))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


In [7]:
rawQuery = dsraw \
        .writeStream \
        .queryName("qraw")\
        .format("memory")\
        .start()

In [8]:
alertQuery = ds \
        .writeStream \
        .queryName("qalerts")\
        .format("memory")\
        .start()

In [9]:
raw = spark.sql("select * from qraw")
raw.show()

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+



In [26]:
# 1. Schema of a tweet coming from Twitter.

tweet_schema="""
created_at string,
id bigint,
id_str string,
text string,
source string,
truncated boolean,
in_reply_to_status_id bigint,
in_reply_to_status_id_str string,
in_reply_to_user_id bigint,
in_reply_to_user_id_str string,
in_reply_to_screen_name string,
`user` struct<
            id:bigint,
            id_str:string,
            name:string,
            screen_name:string,
            location:string,
            url:string,
            description:string,
            protected:boolean,
            verified:boolean,
            followers_count:bigint,
            friends_count:bigint,
            listed_count:bigint,
            favourites_count:bigint,
            statuses_count:bigint,
            created_at:string,
            profile_banner_url:string,
            profile_image_url_https:string,
            default_profile:boolean,
            default_profile_image:boolean,
            withheld_in_countries: array<string>,
            withheld_scope:string,
            geo_enabled:boolean
            >,
coordinates struct <
            coordinates:array<float>,
            type:string
            >,
place struct<
            country:string,
            country_code:string,
            full_name:string,
            place_type:string,
            url:string
            >,
quoted_status_id bigint,
quoted_status_id_str string,
is_quote_status boolean,
quote_count bigint,
reply_count bigint,
retweet_count bigint,
favorite_count bigint,
entities struct<
            user_mentions:array<struct<screen_name:string>>,
            hashtags:array<struct<text:string>>, 
            media:array<struct<expanded_url:string>>, 
            urls:array<struct<expanded_url:string>>, 
            symbols:array<struct<text:string>>
            >,
favorited boolean,
retweeted boolean,
possibly_sensitive boolean,
filter_level string,
lang string
"""

In [27]:


# 2. Cast the default data type of the field value (byte) to the String data type.
# 3. Convert the String into a proper JSON document by using the from_json function.
# 4. Flatten the JSON file and display event time, user name, text and the id.

tweetsDF = dsraw.selectExpr("CAST(value AS STRING)") \
                      .select(from_json(col("value"), tweet_schema).alias("data")) \
                      .select(col("data.created_at").alias("event_time"), 
                              col("data.user.screen_name"),
                              col("data.text"),
                              col("data.id"))
tweetsDF

DataFrame[event_time: string, screen_name: string, text: string, id: bigint]

In [35]:
# 5. Display the results in the console.

tweetsDF.writeStream \
        .format("console") \
        .outputMode("append") \
        .start() \
     #   .awaitTermination()


StreamingQueryException: Query [id = beb26f17-d4ff-4d74-b562-10ca5c9d662a, runId = 6db2d5c3-ee12-4517-9fc8-231a42c82cdc] terminated with exception: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

In [33]:
# Create dataframe
# NOTE on several methods employed here (hover over them for docs)
words = dsraw.select(explode(split(dsraw.value, " ")).alias("word"))

# Create word counter
# NOTE: store in a new dataframe
wordCounts = words.groupBy("word").count()

# * Print results
print(wordCounts.printSchema())

root
 |-- word: string (nullable = false)
 |-- count: long (nullable = false)

None
