In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.elasticsearch:elasticsearch-hadoop:7.6.2 pyspark-shell'

In [2]:
from pyspark import SparkContext,SparkConf
#    Spark Streaming
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json
import nltk
import logging
from datetime import datetime
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from elasticsearch import Elasticsearch
from elasticsearch import helpers

In [5]:
def process(time,rdd):
    
     print("========= %s =========" % str(time))
     
     try:
        if rdd.count()==0: 
            raise Exception('Empty')
            
        sqlContext = getSqlContextInstance(rdd.context)
        
        df = sqlContext.read.json(rdd)
        df = df.filter("text not like 'RT @%'")
        
        if df.count() == 0: 
            raise Exception('Empty')
            
        udf_func = udf(lambda x: analyze_sentiment(x),returnType=StringType())
        df = df.withColumn("Sentiment",lit(udf_func(df.text)))
        
        print(type(df))
        print(df.take(10))
        
        
        
        
        #df['date'] = datetime.strptime(result["date"],"%Y-%m-%d %H:%M:%S")
        
        df.writeStream.outputMode('append').format('org.elasticsearch.spark.sql').option('es.nodes','localhost').option('es.port',9200)\
        .option('checkpointLocation','/checkpoint').option('es.spark.sql.streaming.sink.log.enabled',False).start('PythonSparkStreamingKafka_RM_01').awaitTermination()
        
        #df.write.format('org.elasticsearch.spark.sql').option('es.nodes','localhost').option('es.port',9200).save("tweets/doc")
        
        
        #results = df.toJSON().map(lambda j: json.loads(j)).collect()
        
        #for result in results:
         #   result["date"]= datetime.strptime(result["date"],"%Y-%m-%d %H:%M:%S")
          #  result["sentiment"]=json.loads(result["sentiment"])
        
        
        #sth2elastic(results,"tweets","doc")
        
     except Exception as e:
        print(e)
        pass
    

In [3]:
def evaluate_sentiment(avg):
    try:
        if avg < 0:
            return 'Negative'
        elif avg > 0:
            return 'Positive'
        else:
            return 'Neutral'
    except TypeError:
        return 'Neutral'
    
eval_udf = udf(evaluate_sentiment,StringType())

In [4]:
def start_stream(df):
    df.writeStream.outputMode('append').format('org.elasticsearch.spark.sql')\
    .option('checkpointLocation','/checkpoint')\
    .option('es.nodes','localhost').option('es.port',9200)\
    .option('es.spark.sql.streaming.sink.log.enabled',False).start('twitter_stream_analysis/doc').awaitTermination()

In [5]:
conf = SparkConf().setAppName('twitter_analysis')
spark = SparkSession.builder.appName('twitter_analysis').getOrCreate()
conf.set("es.index.auto.create", "true")

#sc = SparkContext.getOrCreate(appName="PythonSparkStreamingKafka_RM_01")
#sc.setLogLevel("INFO")

#spark = SparkSession(sc)

#sc = spark._sc


<pyspark.conf.SparkConf at 0x24b3b1bf438>

In [6]:
schema = StructType([StructField("date", StringType(), True),
                    StructField("user", StringType(), True),
                    StructField("text", StringType(), True),
                    StructField("reply_count", IntegerType(), True),
                    StructField("retweet_count", IntegerType(), True),
                    StructField("favorite_count", IntegerType(), True),
                    StructField("sentiment_score", DoubleType(), True)])

In [9]:
#ssc = StreamingContext(sc, 20)

In [5]:
logging.getLogger("py4j").setLevel(logging.ERROR)

In [7]:
kafkaStream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "kafkaspark")\
.option('failOnDataLoss',False).load()

In [8]:
esTimeStampFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"

In [9]:

parsed_df = kafkaStream.select(from_json(col('value').cast('string'),schema).alias('parsed_value'))\
            .withColumn('timestamp', date_format(to_timestamp(lit(current_timestamp()),"yyyy-MM-dd'T'HH:mm:ss.SSSZ"),esTimeStampFormat))

df = parsed_df.select('parsed_value.*', 'timestamp')

In [10]:
evaluated_df = df.withColumn('status',eval_udf('sentiment_score'))\
               .withColumn('date',date_format(to_timestamp(col('timestamp'),"yyyy-MM-dd'T'HH:mm:ss.SSSZ"),esTimeStampFormat))

In [None]:
start_stream(evaluated_df)