## Twitter GeoLocal Analysis App
Geo-localised and sentiment-tagged tweets are read from a Cloudant database and displayed as a heatmap <br>
A ML model is fitted to the data and evaluated to test data<br>
Aim: predict the sentiment given latitude/longitude<br>
If the model performance is acceptable on test data, we can use it for anomaly detection<br>
Incoming data can be compared to predictions and a lag is raised if an anomaly is detected


In [None]:
!pip install --upgrade folium

In [2]:
# The code was removed by Watson Studio for sharing.

In [7]:
import folium 
from folium.plugins import HeatMap
import branca.colormap as cm


The following cell contains Cloudant credentials<br>
#Cloudant credentials<br>
credentials_cloudant = {<br>
  'password':"""xxxxxxxxxxxxxxxxxxxxxx""",<br>
  'custom_url':'https://xxxxxxxxxxxxxxxxxxxxxxxx-bluemix.cloudant.com',<br>
  'username':'xxxxxxxxxxxxxxxxxxxxxxxxx-bluemix',<br>
  'url':'https://undefined'<br>
}

In [3]:
# The code was removed by Watson Studio for sharing.

In [4]:
#connect directly 
from cloudant.client import Cloudant
from cloudant.error import CloudantException
from cloudant.result import Result, ResultByKey

serviceUsername = credentials_cloudant['username']
servicePassword = credentials_cloudant['password']
serviceURL = credentials_cloudant['custom_url']
client = Cloudant(serviceUsername, servicePassword, url=serviceURL)
client.connect()

#connect via Spark driver

spark = SparkSession\
    .builder\
    .appName("Cloudant Storage for Tweets")\
    .config("cloudant.host",credentials_cloudant['custom_url'].split('@')[1])\
    .config("cloudant.username", credentials_cloudant['username'])\
    .config("cloudant.password",credentials_cloudant['password'])\
    .config("jsonstore.rdd.partitions", 1)\
    .getOrCreate()


In [89]:
import pandas as pd


from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

def load_db(db_name):
    try:
        df = spark.read.load(db_name, "org.apache.bahir.cloudant")
        df.createOrReplaceTempView("df")
        return spark.sql("SELECT * from df WHERE timeCreated <> '' ORDER BY dateCreated, timeCreated DESC")
    except:
        print("no feeds found")
        return sqlContext.createDataFrame(sc.emptyRDD(), StructType([]))


def plot_heat_map(coords, tiles="Mapbox Bright", min_opacity=0.5, radius=10, blur=0, max_zoom=1):
    colormap_colors = ['Blue', 'Yellow', 'Red']
    s_rng = [-1,1]
    colormap = cm.LinearColormap(colormap_colors, vmin=s_rng[0], vmax=s_rng[1]).to_step(100)
    colormap.caption = 'Sentiment Index'
    coords.longitude = coords.longitude.astype(float)
    coords.latitude = coords.latitude.astype(float)
    coords.score = coords.score.astype(float)
    coords = coords.dropna(axis=0)
    folium_map = folium.Map(location=[coords.latitude.mean(), coords.longitude.mean()], 
                            zoom_start=5, tiles=tiles) #"tiles=Stamen Toner"
    limit_up = 0.2 
    coords_plus = [[row['latitude'],row['longitude'], (row['score']-s_rng[0])/(s_rng[1]-s_rng[0])] \
              for index, row in coords[coords.score>=limit_up].iterrows()]    
    gradient = {0: colormap_colors[1], 1.0: colormap_colors[2]}
    HeatMap(coords_plus, gradient=gradient, min_opacity=min_opacity, radius=radius, 
            blur=blur, max_zoom=max_zoom).add_to(folium_map)
    limit_down = -0.2 
    coords_minus = [[row['latitude'],row['longitude'], (row['score']-s_rng[0])/(s_rng[1]-s_rng[0])] \
              for index, row in coords[coords.score<=limit_down].iterrows()]    
    gradient = {0: colormap_colors[0], 1.0: colormap_colors[1]}
    HeatMap(coords_minus, gradient=gradient, min_opacity=min_opacity, radius=radius, 
            blur=blur, max_zoom=max_zoom).add_to(folium_map)
    #folium_map.add_child(fg)
    folium_map.add_child(colormap)
    return folium_map
    

def plot_map(coords, tiles="Mapbox Bright"):
    colormap_colors = ['red', 'white', 'green']
    s_rng = [-1,1]
    colormap = cm.LinearColormap(colormap_colors, vmin=s_rng[0], vmax=s_rng[1]).to_step(100)
    colormap.caption = 'Sentiment Index'
    coords.longitude = coords.longitude.astype(float)
    coords.latitude = coords.latitude.astype(float)
    coords.score = coords.score.astype(float)
    coords = coords.dropna(axis=0)
    folium_map = folium.Map(location=[coords.latitude.mean(), coords.longitude.mean()], zoom_start=5, tiles=tiles) #"tiles=Stamen Toner"
    #                        tiles="Mapbox Light", API_key=mapBoxAPIKey)
    for index, row in coords.iterrows():
        popup = folium.Popup("{:0.2f}".format(row['score']), parse_html=True)
        folium.CircleMarker(location=[row['latitude'], row['longitude']], radius=5, 
                            weight=0,#remove outline
                            fill=True,
                            fill_opacity=0.8,
                            color=colormap.rgb_hex_str(row['score'])).add_to(folium_map)
                            #color=colormap.rgb_hex_str((row['score']-s_rng[0])/(s_rng[1]-s_rng[0])))
                            #popup = folium.Popup(str(row['score']))
        marker.add_to(folium_map)
        fg.add_child(marker)
    #folium_map.add_child(fg)
    folium_map.add_child(colormap)
    return folium_map


def train_model(data):
    #data has latitude longitude and score columns
    vectorAssembler = VectorAssembler(inputCols=["longitude","latitude"], outputCol="features")
    featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)
    # Split the data into training and test sets (30% held out for testing)
    trainingData, testData = data.randomSplit([0.7, 0.3])
    # Train a GBT model.
    gbt = GBTRegressor(featuresCol="indexedFeatures", labelCol='score', maxIter=20)
    pipeline = Pipeline(stages=[vectorAssembler, featureIndexer, gbt])
    # fit pipeline  This also runs the indexer.
    print("Training GBT model...")
    model = pipeline.fit(trainingData)
    print("Evaluating model on test data...")
    # Make predictions on test data
    predictions = model.transform(testData)
    # Select example rows to display.
    predictions.select("prediction", "score", "features").show(5)
    # Compute test error
    evaluator = RegressionEvaluator(labelCol="score", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    evaluator = RegressionEvaluator(labelCol="score", predictionCol="prediction", metricName="r2")
    r2 = evaluator.evaluate(predictions)
    print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
    print("R2 on test data = %g" % r2)
    return model, rmse, r2




In [None]:
#TwittedFeed class to manipulate and analyse the saved feeds

class TwitterFeed():
    def __init__(self, db_name):
        self.db_name = db_name
        self.data = load_db(self.db_name)
        self.map = None
        self.heat_map = None
        self.coords = self.get_coords()
        self.model=None
        self.rmse = 0
        self.r2 = 0
        
    def update_feed(self):
        self.data = load_db(self.db_name)
        self.coords = self.get_coords().toPandas()

    def get_coords(self): #add loads of conditions later
        self.data.createOrReplaceTempView("tweets")
        df = spark.sql("SELECT sentiment, geometry from tweets WHERE geometry IS NOT NULL")
        df.createOrReplaceTempView("df")
        return spark.sql("SELECT sentiment.score, geometry.coordinates[0] AS longitude  , \
                         geometry.coordinates[1] AS latitude from df WHERE sentiment.score IS NOT NULL")   
    
    def generate_heat_map(self):
        #self.coords = self.get_coords().toPandas()
        self.heat_map = plot_heat_map(self.coords.toPandas())
    
    def train_anomaly_model(self):
        self.model, self.rmse, self.r2 = train_model(self.coords)
        



In [90]:
brexit_feed = TwitterFeed("brexit_tweets")
brexit_feed.generate_heat_map()
brexit_feed.heat_map

## Anomaly detection 
We fit a machine learming model to the data: to predict the sentiment given latitude/longitude<br>
If the model performance is sufficiently good on test data, we can use it for anomaly detection<br>
ie for each new datapoint we predict the tweet sentiment given lat/lon.<br>
An anomaly is raised when the prediction is very different from the actual nm

In [91]:
brexit_feed.train_anomaly_model()

Training GBT model...
Evaluating model on test data...
+--------------------+---------+--------------------+
|          prediction|    score|            features|
+--------------------+---------+--------------------+
|-0.02673812155075842|-0.888036|[-0.2161080000000...|
|-0.16844047496706444| -0.81063|[110.333028,1.429...|
|-0.16595070797070496| -0.75617|[-0.1091815,51.54...|
|-0.08496630422360758| -0.72438|[-0.442361,51.537...|
|-0.15084220913585722| -0.48417|[-0.0991635,51.64...|
+--------------------+---------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 0.487036
R2 on test data = -0.00110838


This model is no better than random choice


To do:
* modify tweet feed update method to load newest tweets
* predict sentiment on new lat/lon using trained model 
* display differences

In [10]:
brexit_feed.data.count()

10972

In [130]:
brexit_feed.data.createOrReplaceTempView("tweets")
brexit_feed.data.printSchema()

root
 |-- _id: string (nullable = true)
 |-- _rev: string (nullable = true)
 |-- coords: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- country: string (nullable = true)
 |-- dateCreated: string (nullable = true)
 |-- emotion: struct (nullable = true)
 |    |-- anger: double (nullable = true)
 |    |-- disgust: double (nullable = true)
 |    |-- fear: double (nullable = true)
 |    |-- joy: double (nullable = true)
 |    |-- sadness: double (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- sentiment: struct (nullable = true)
 |    |-- label: string (nullable = true)
 |    |-- score: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timeCreated: string (nullable = true)
 |-- type: string (nullable = true)

