
  <h1>Real-Time Users' Reputation Analysis</h1>

<img src="../images/reputation_0.png">

<h1>Introduction</h1>

Nowadays social networks are so much popular that we probably can't think that we can live without using them. They influence our live in different ways:

- Social life

- Work/Learning

- Business

More and more now we all are figuring out that we need to get a good reputation in this new virtual world. Not only this is true for a normal person, but it becomes a real deal for whoever runs a business.

<h1>Why should business care of its own virtual reputation?</h1>

<img src="../images/meme_0.jpeg">

<img src="../images/reputation_1.jpeg">

<h1>Yeah, i see, but how do i get a good reputation?</h1>

That's the real deal. We can't know what makes you more popular, but we can surely figure out what we should avoid if we want to build up a good reputation.

<img src="../images/meme_1.jpg">

<h1>A possible solution...</h1>

We can't read the future, but we can read big data! We could indeed plan to study all the data generated by users on a social network about us. If we could do that, we could understand better what helps to build up a good reputation in real-time.

<img src="../images/meme_3.jpg">

<h1>...thinking of which...</h1>

The software we are going to illustrate is all about this: to retrieve datas an user and to quantify the user's reputation so we could tell if it's good or bad, by studying the reactions of the followers on a social network.

You will be able to understand what made you raise up your reputation and what pushed all your followers away just by reading the output of this software.

So what are you waiting for?

<b>Let's start!</b>

<h1>Break the problem down</h1>

The problem itself can be really tricky.

We want to study a social network, so we need to define which social network we want to check out.

We are talking about big data. We probably can't really figure out how much big is the data users are generating each second using a social network. We do need really badly a system which can handle this hella big quantity of informations.

Then we want to define a way to understand when a data generated on a social network could be seen as something which helps the user to get a better reputation or not.

Lastly we need a way to display easily and in a simple way all of this data to the user.

<h1>Which social network?</h1>

Our choice is not really limited, but surely we want to choose the one which would allow us to study its data without much hassles. We need to keep in mind that datas from social networks are money and they probably don't want to give it away so easily and for free.

That's why we will go for Twitter.

<img src="../images/twitter_icon.png" height="128">

This is a good choice because:

- It is still quite popular

- It is the easiest to get data from

- It is common that businesses have a Twitter account

<h1>When big data looks like a flume...</h1>

To handle data from a social network we need something which can help us handle all informations, good enough so it won't stop while running for some random mistake or misuse.

There are different solutions. We will use Apache Flume and a combination of ZooKeeper/Kafka Servers.

Flume will help us to link the data-source (Twitter) to Kafka.

Kafka will handle the streaming of the big ammount of data Flume will generate over time.

This solution will allow us to keep the system reliable without giving up on efficiency.

<h1>How to understand what followers think?</h1>

At this point, we need a system which allows us to easily understand when an information (data generated by a follower of the user on Twitter) will improve or not the user's reputation.

We probably won't need to tell you why we need some Machine learning to do this job.

<img src="../images/meme_4.jpg">

We will indeed study the text of the tweets we gather and apply some sentiment analysis algorythms on them. This will lead us to understand when that tweet is generally talking "well" or "bad" about an user/agency.

<h1>A picture is worth a thousand words</h1>

Data generated by social networks and Machine Learning Algorythms are surely interesting, but noone can read them if we won't model them to be easy to read.

<img src="../images/meme_5.jpeg">

That's why we will use Spark to refine the informations we get by the source and keep only what we need. Then we will use Elastic Search and Kibana to index this data and show it in a not ambiguous way.

<h1>A really useful map to sum all we said</h1>

<img src="../images/tap-progetto.png">

<h1>Code Snippets</h1>

<img src="../images/meme_9.jpeg">

<h1>A flume of words</h1>

What do Flume and ZK/Kafka servers really do? This section will help you understand better the importance to have them.

Let's just start the servers and flume.

In [1]:
import os
os.system("gnome-terminal -e 'bash ../bin/progettoServersStart.sh'")

0

In [4]:
os.system("gnome-terminal -e 'bash ../bin/progettoCreateTopic.sh tap'")

0

In [20]:
os.system("gnome-terminal -e 'bash ../bin/progettoFlumeStart.sh tap \"realDonaldTrump\"'")

0

<h1>Scripted consumer</h1>

We start with something we could do, but it is better not to. We could indeed use a python script to get all the data from flume.

(Press CTRL+C to stop the snippet code)

In [27]:
os.system("gnome-terminal -e 'bash ../bin/progettoCreateConsumerScript.sh tap'")

0

It works, but there is a big problem: it is not distributed. It is not even multithreaded.

So it will be a very bad problem when it comes to talk about efficienty and performance.

Can we da better job?

<h1>Distributed Consumer</h1>

Let's now use a different approach. We will show you how this project handle this data by trying to print the same output the previous script was trying to print.

(Press CTRL+C to stop the snippet code)

In [23]:
os.system("gnome-terminal -e 'bash ../bin/progettoSparkSnippet.sh tap'")

0

The result is very similar, but we have assured that the software is running in a distributed way: that's possibile because Spark is a distributed solution.

<h1>What to pick?</h1>

<img src="../images/meme_10.jpeg">

When it comes to efficienty and performance, the distributed way is the only path we should choose. It would probably work fine with the script consumer too, but when the data load is too big you would probably lose data on the road, and this is not something we really want to happen!

<h1>Tell me your sentiment about it</h1>

Let's jump onto the sentiment analysis topic. 

This is a very important part of the whole software so it is worth watch it closely.

We have provided two algorithms:
- The first uses Vader for english tweets
- The second uses SparkMLib for italian tweets

<h1>Vader</h1>

<img src="../images/meme_11.jpg">

Vader is a model used to perform sentiment analysis. It can tell the polarity (positive, negative) of a text in a very precise way. It follows just 5 rules and uses a dictionary of terms updated by normal users.

We now see an example.

In [None]:
import nltk
nltk.download('vader_lexicon')

from nltk.sentiment.vader import SentimentIntensityAnalyzer
sid = SentimentIntensityAnalyzer()

In [30]:
text = "I really hate eating vegetables, but i do love carrots!"
polarity = sid.polarity_scores(text)['compound']
sentiment = "positive" if polarity > 0.1 else ("negative" if polarity < 0.1 else "neutral")

print("Sentiment of '{}' is: {}\nScore: {}".format(text, sentiment, polarity))

Sentiment of 'I really hate eating vegetables, but i do love carrots!' is: positive
Score: 0.6804


As you can see, it is not really that bad and it really gets accurate previsions.

<h1>Spark MLib</h1>

Let's see now some snippets about this part. Remember this is only for italian tweets!

In [55]:
import pyspark
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.types as tp

# Create Spark Context
sc = SparkContext(appName="Tweet")
spark = SparkSession(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Tweet, master=local[*]) created by __init__ at <ipython-input-47-8fcff753112b>:16 

In [52]:
schema = tp.StructType([
    tp.StructField(name= 'id', dataType= tp.StringType(),  nullable= True),
    tp.StructField(name= 'subjective',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'positive',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'negative',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'ironic',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'lpositive',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'lnegative',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'top',       dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])

sc.setLogLevel("WARN")

# read the dataset  
training_set = spark.read.csv('../spark/dataset/training_set_sentipolc16.csv',
                         schema=schema,
                         header=True,
                         sep=',')

# define stage 1: tokenize the tweet text    
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# define stage 3: create a word vector of the size 100
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol= 'vector', labelCol= 'positive')
# setup the pipeline
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

# fit the pipeline model with the training data
pipelineFit = pipeline.fit(training_set)

modelSummary=pipelineFit.stages[-1].summary

IllegalArgumentException: 'Field "text" does not exist.\nAvailable fields: id, subjective, positive, negative, ironic, lpositive, lnegative, top, tweet'

In [56]:
# Define Training Set Structure
textSchema = tp.StructType([
    tp.StructField(name= 'id', dataType= tp.LongType(),  nullable= True),
    tp.StructField(name= 'tweet', dataType= tp.StringType(),  nullable= True)
])

text = Row([0,"Fai schifo!"])
wordsDataFrame = spark.createDataFrame(text, schema=textSchema)
# transform the data using the pipeline and get the predicted sentiment
data=pipelineFit.transform(wordsDataFrame)
data.show(truncate=True)

+---+-----------+-------------+--------------+--------------------+--------------------+--------------------+----------+
| id|      tweet|       tokens|filtered_words|              vector|       rawPrediction|         probability|prediction|
+---+-----------+-------------+--------------+--------------------+--------------------+--------------------+----------+
|  0|Fai schifo!|[fai, schifo]| [fai, schifo]|[-0.0951872244477...|[0.81620043206809...|[0.69342920258831...|       0.0|
+---+-----------+-------------+--------------+--------------------+--------------------+--------------------+----------+



In [57]:
#Once you finish run
spark.stop()

<h1>Working Demo</h1>

<img src="../images/meme_6.jpeg">

<h1>Initial Setup</h1>

To run all the software you must have <b>Docker</b> installed on your machine. You can surely install all the softwares one by one locally on your machine, but with docker it is way easier both to getting started and to change/fix stuff.

You have no restriction about OS, but surely UNIX machines will have way easier life to get Docker running. You can still use Windows using WSL 2.

You will need around <b>5/7 GB</b> to install all the stuff, but consider to have at least 10/15 GB in total because you will need space to store all the tweets you will gather using this software.

Lastly you need Twitter dev keys to scrap tweets. You can get them by sign in at this website: https://developer.twitter.com/en. Once you got them you will have to modify the file <a href="https://github.com/Mirkesx/tap-progetto/blob/master/flume/conf/twitterKafka.conf">twitterKafka.conf</a>.

<h1>First Step</h1>

We need to turn on both the Elastic Search/Kibana environment and the ZooKeeper/Kafka servers. To do so you just have to run this code below.

After a little while yuo can check all is operating by going to the <a href="http://localhost:5601">Kibana Client</a>.

In [8]:
%%bash
bash ../bin/progettoESKibanaStart.sh
bash ../bin/progettoServersStart.sh

Sending build context to Docker daemon   2.56kB
Step 1/3 : FROM docker.elastic.co/elasticsearch/elasticsearch:7.7.0
 ---> 7ec4f35ab452
Step 2/3 : MAINTAINER Salvo Nicotra
 ---> Using cache
 ---> 2ca07327979b
Step 3/3 : ADD elastic.sh /
 ---> Using cache
 ---> c277f98024c9
Successfully built c277f98024c9
Successfully tagged tap:elasticsearch
fbf7d53f4720839eb95f38fefcd7da826f620444471ebe7e983f79573dae66c6
Sending build context to Docker daemon  3.072kB
Step 1/3 : FROM  docker.elastic.co/kibana/kibana:7.7.0
 ---> eadc7b3d59dd
Step 2/3 : MAINTAINER Salvo Nicotra
 ---> Using cache
 ---> 13a978986b30
Step 3/3 : ADD kibana.yml /usr/share/kibana/config
 ---> Using cache
 ---> 01c96f01fe85
Successfully built 01c96f01fe85
Successfully tagged tap:kibana
84e5bc49373436f6a1a8127d005527747dcabd56d96456019c462c1343cdc01c
kafkaZK
kafkaZK
Sending build context to Docker daemon  62.38MB
Step 1/11 : FROM openjdk:8-jre-alpine
 ---> f7a292bbb70c
Step 2/11 : LABEL maintainer="Salvo Nicotra"
 ---> Usi

Error response from daemon: No such container: elasticsearch
Error: No such container: elasticsearch
Error response from daemon: No such container: kibana
Error: No such container: kibana


<h1>Second Step (Optional)</h1>

You now need to set a topic. What is a topic? A topic is a category or feed name to which records are published. It is used to let Kafka easily understand whom to send the informations it streams.

In few words: we have to give a name to the data we are streaming. For this example it will be called "tap", but it is really not important which name you pick, so long you remember it (we will need it later on).

This step is <b>required</b> only if it is the first time you run this software or if you deleted the kafka/zk volumes.

In [9]:
%%bash
bash ../bin/progettoCreateTopic.sh tap

kafkaTopic
kafkaTopic
Sending build context to Docker daemon  62.38MB
Step 1/11 : FROM openjdk:8-jre-alpine
 ---> f7a292bbb70c
Step 2/11 : LABEL maintainer="Salvo Nicotra"
 ---> Using cache
 ---> 6863e2590cc2
Step 3/11 : ENV PATH /opt/kafka/bin:$PATH
 ---> Using cache
 ---> 29831ea575bc
Step 4/11 : ENV KAFKA_DIR "/opt/kafka"
 ---> Using cache
 ---> 604e68798bc9
Step 5/11 : ARG KAFKA_VERSION="2.12-2.4.1"
 ---> Using cache
 ---> 4aba6e06c19d
Step 6/11 : RUN apk update && apk add --no-cache bash gcompat
 ---> Using cache
 ---> 06854bc9a245
Step 7/11 : ADD setup/kafka_${KAFKA_VERSION}.tgz /opt
 ---> Using cache
 ---> a1b0f9b68b2a
Step 8/11 : RUN ln -s /opt/kafka_${KAFKA_VERSION} ${KAFKA_DIR}
 ---> Using cache
 ---> d3e20114400a
Step 9/11 : ADD kafka-manager.sh ${KAFKA_DIR}/bin/kafka-manager
 ---> Using cache
 ---> 60b315e45d57
Step 10/11 : ADD conf/* ${KAFKA_DIR}/config/
 ---> Using cache
 ---> cc64bfcfa9a5
Step 11/11 : ENTRYPOINT [ "kafka-manager" ]
 ---> Using cache
 ---> b45d869a608f


# Stop
docker stop kafkaTopic

# Remove previuos container 
docker container rm kafkaTopic

docker build ../kafka/ --tag tap:kafka
docker run -e KAFKA_ACTION=create-topic \
            -e KAKFA_SERVER=10.0.100.23 \
            -e KAFKA_TOPIC=$1 \
            --network tap \
            --ip 10.0.100.24 \
            --name kafkaTopic \
            tap:kafka
[2020-06-26 16:19:49,023] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'tap' already exists.
 (kafka.admin.TopicCommand$)


CalledProcessError: Command 'b'\nbash ../bin/progettoCreateTopic.sh tap\n'' returned non-zero exit status 1.

<h1>Third Step</h1>

We have to turn on Flume to start gathering data.

This command needs two parameters:
- The topic
- Comma-separated list of terms to search in tweets

The topic will be "tap" (or whichever name you picked before). The list of terms will be a list of the Twitter usernames.

In this example we will have realDonaldTrump and matteosalvinimi just to show you. The system can handle more users. Just remember that the more users you append to this list the more it will drain your pc resources (speaking of usage of cpus/rams and disk space).

In [18]:
import os
os.system("gnome-terminal -e 'bash ../bin/progettoFlumeStart.sh tap \"realDonaldTrump, matteosalvinimi\"'")

0

<h1>Fourth Step</h1>

Now we have to turn on all the Spark environment to start processing the data we gather before showing it.

This command needs three parameters:
- The topic
- Comma-separated list of usernames
- Comma-separated list of IDs

This script needs to know which users we want to track. You have exactly two ways to define them:
- By typing their Twitter usernames
- By typing their Twitter IDs

You must know that this list must be consistent with the one you gave to Flume: one mistake inside the list of usernames means that you would see anything displayed as output.

It is required that you type "" if you want to enter an empty list.

In [19]:
import os
os.system("gnome-terminal -e 'bash ../bin/progettoSparkStart.sh tap \"realDonaldTrump, matteosalvinimi\" \"\"'")

0

<h1>Fifth Step</h1>

It is now time to see the output. You must reach the <a href="http://localhost:5601">Kibana Client</a>.

By default, you already have an index (called twitter) and a working dashboard to display the data. You can always decide to change the dashboard or the index, but this is not the point of this demo, so i won't be talking about that now.

First you have to refresh the index pattern.

Then you may go to the Dashboard and start watching the data.

Here you can use many filters to track specific data (such as the reputation of a single user, the % of bad tweets from users of one language and so on).

You can even go "back to the past" to see previous statistics and understand better how the reputation system works.
    
<img src="../images/meme_7.jpg">

<h1>Thank you for listening</h1>

<img src="../images/meme_8.jpg">