-
Notifications
You must be signed in to change notification settings - Fork 0
/
Spark_Structure_StreammingUsingKafkaNiFi.pyspark
60 lines (49 loc) · 2.07 KB
/
Spark_Structure_StreammingUsingKafkaNiFi.pyspark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# Consume Kafka topic
events = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "dztopic1")\
.load()
# Cast the JSON payload as a String
events = events.selectExpr("CAST(value AS STRING)")
def parse_json(df):
twitterid = str(json.loads(df[0])['id'])
created_at = str(json.loads(df[0])['created_at'])
tweet = str(json.loads(df[0])['text'])
screen_name = str(json.loads(df[0])['user']['screen_name'])
return [twitterid, created_at, tweet, screen_name]
def convert_twitter_date(timestamp_str):
output_ts = datetime.datetime.strptime(timestamp_str.replace('+0000 ',''), '%a %b %d %H:%M:%S %Y')
return output_ts
json_schema = StructType([
StructField("twitterid", StringType(), True),
StructField("created_at", StringType(), True),
StructField("tweet", StringType(), True),
StructField("screen_name", StringType(), True)
])
udf_parse_json = udf(parse_json , json_schema)
udf_convert_twitter_date = udf(convert_twitter_date, TimestampType())
jsonoutput = events.withColumn("parsed_field", udf_parse_json(struct([events[x] for x in events.columns]))) \
.where(col("parsed_field").isNotNull()) \
.withColumn("created_at", col("parsed_field.created_at")) \
.withColumn("screen_name", col("parsed_field.screen_name")) \
.withColumn("tweet", col("parsed_field.tweet")) \
.withColumn("created_at_ts", udf_convert_twitter_date(col("parsed_field.created_at")))
windowedCounts = jsonoutput.groupBy(
window(jsonoutput.created_at_ts, "1 minutes", "15 seconds"),
jsonoutput.screen_name
).count()
query_window = windowedCounts \
.writeStream \
.outputMode("complete") \
.format("memory") \
.queryName("myTable_window") \
.start()
query_json = jsonoutput \
.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("myTable_json") \
.start()
spark.sql("select created_at, screen_name, tweet from myTable_json limit 15").show(15,False)