In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Twitter Search Read Kafka').getOrCreate()

In [3]:
rawDF = spark.read.format("kafka").option("kafka.bootstrap.servers","167.172.61.77:9092").option("subscribe","twitter-search").load()

In [4]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

In [5]:
schema = StructType() \
        .add("screen_name",StringType()) \
        .add("twit",StringType()) \
        .add("create_date",LongType()) \
        .add("followers_count",IntegerType()) \
        .add("friends_count",IntegerType()) \
        .add("description",StringType()) \
        .add("favorite_count",IntegerType()) \
        .add("retweet_count",IntegerType())

In [6]:
valueDF = rawDF.select(from_json(col("value").cast("string"),schema).alias("data")).select("data.*")

In [7]:
dateDF = valueDF.withColumn("date",from_unixtime("create_date","dd/MM/yyyy"))

In [8]:
dateGroupDF = dateDF.groupBy("date").count().withColumnRenamed("date","dateGroup")

In [9]:
retweetDF = dateDF.groupBy("date").sum("retweet_count").withColumnRenamed("sum(retweet_count)","retweet_count")

In [10]:
dateGroupDF.join(retweetDF, retweetDF["date"] == dateGroupDF["dateGroup"]).select("date","count","retweet_count").show()

In [11]:
screenDF = valueDF.groupBy("screen_name").count() 
screenDF.count()

In [12]:
valueDF.sort(desc("followers_count")).limit(5).show()

In [13]:
FenomenDF = valueDF.filter("followers_count < 200").groupBy("screen_name").avg("followers_count").sort(desc("avg(followers_count)"))

In [14]:
FamousDF = valueDF.withColumn("isFamous",when(valueDF["followers_count"] < 200, "False").otherwise("True"))

In [15]:
FamousTotal = FamousDF.filter("isFamous == False").groupBy("isFamous").count()

In [16]:
FamousTotal.show()