Global News Tone Analysis and Visulization
------------------------------------------

Analyze global news average tone from GDELT Global Knowledge Pubic Dataset in Big Query

100,000,000 global news with domain, tone, country mentioned in the news attributes.
Tones toward other countries is calculated based on the countries mentioned in the news and the normalized average new tone.

The analysis is doen in both Pandas and Spark environment

Output Analysis Result
------------------------------------------

General Average tone in Chinese news toward rest of the world

General Average tone in U.S. news toward rest of the world

General Average tone in U.K. news toward rest of the world

## Indexing

1. [Input Data](#Input-Data)
2. [Merging Dataset](#Merging-Dataset)
3. [Filtering Theme](#Filtering-Theme)
4. [Cleaning Data](#Cleaning-Data)
5. [Calculating worldwide Average Tones of News from China, US, UK](#Calculating-worldwide-Average-Tones-of-News-from-China,-US,-UK)
6. [Filtering Result](#Filtering-Result)
7. [Visualizaing Worldwide Average Tone](#Visualizaing-Worldwide-Average-Tone)


In [1]:
import pandas as pd
import numpy as np
import folium
import json
import re

## Filter Function
def mask(df, key, value):
    return df[df[key] == value]
pd.DataFrame.mask = mask

In [2]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "16g"))
sc = SparkContext(conf = conf)
#sc = pyspark.SparkContext()
from pyspark.sql.functions import udf
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, min, max
from pyspark.sql.types import StringType, IntegerType, StructType, StructField, FloatType,ArrayType,DataType
from pyspark.sql.functions import array_contains




In [3]:
sc.version

u'1.6.2'

## Input Data

In [4]:
#####################################################
###########Input Tone, Location Raw Data#############

sqlCtx = SQLContext(sc)
df = sqlCtx.read.format('com.databricks.spark.csv').option("header", "true").load("Data/Theme/spark/*.csv") 

sourceCountry = sqlCtx.read.format('com.databricks.spark.csv')\
                .option("header", "true").load('Data/sourceCountry.csv')


## Merging Dataset

Filter News from China, US, UK respectively

In [5]:
tone = df.join(sourceCountry, df.SourceCommonName == sourceCountry.Domain, 'left')
tone = tone.dropna()

In [6]:
tone.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- SourceCommonName: string (nullable = true)
 |-- Locations: string (nullable = true)
 |-- V2Tone: string (nullable = true)
 |-- V2Themes: string (nullable = true)
 |-- Domain: string (nullable = true)
 |-- FIPS: string (nullable = true)
 |-- CountryName: string (nullable = true)



In [7]:
china = tone[tone['CountryName'] == 'China']
us = tone[tone['CountryName'] == 'United States']
uk = tone[tone['CountryName'] == 'United Kingdom']

In [8]:

######################################################
###############Clean the Tone Data####################
#from pyspark.sql.functions import UserDefinedFunction
#pattern = re.compile("^\s+|\s*,\s*|\s+$")

#udf = UserDefinedFunction(lambda x: float(pattern.split(x)[0]))

pattern = re.compile("^\s+|\s*,\s*|\s+$")
#def cleanTone(Tone):
    #return 
    
UDF = udf(lambda x: float(pattern.split(x)[0]),FloatType())

china = china.withColumn('Tone',UDF(china.V2Tone))
us = us.withColumn('Tone',UDF(us.V2Tone))
uk = uk.withColumn('Tone',UDF(uk.V2Tone))

#us = us.map(lambda x:float(pattern.split(x.ToneV2)[0]))

## Cleaning Data

In [9]:
###########################################################
#######Clean the Location data mentioned in the news#######

def cleanTargetCountry(Loc,Tone):
    count={}
    result = {}
    for i in Loc.split(';'):
        key = i.split('#')[2][:2]
        if  key not in count.keys():
            count[key] = 1
        else:
            count[key] += 1
    totalCount = float(sum(count.values()))
    for i in Loc.split(';'):
        key = i.split('#')[2][:2]
        result[key] = count[key] / totalCount * Tone
        
    result = json.dumps(result)
    return result

UDF2 = udf(cleanTargetCountry)
us = us.withColumn('Target',UDF2(us.Locations,us.Tone))
china = china.withColumn('Target',UDF2(china.Locations,china.Tone))
uk = uk.withColumn('Target',UDF2(uk.Locations,uk.Tone))


In [10]:
us.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- SourceCommonName: string (nullable = true)
 |-- Locations: string (nullable = true)
 |-- V2Tone: string (nullable = true)
 |-- V2Themes: string (nullable = true)
 |-- Domain: string (nullable = true)
 |-- FIPS: string (nullable = true)
 |-- CountryName: string (nullable = true)
 |-- Tone: float (nullable = true)
 |-- Target: string (nullable = true)



## Filtering Theme 

Filtering databased on News' Themes

In [11]:
###########################################################
###########Clean the raw theme string######################


pattern = re.compile("^\s+|\s*,\s*|\s+$")

def themeFilter(row):
    themeList = row.split(';')
    #print themeList
    for i in range(len(themeList)):
        themeList[i] = pattern.split(themeList[i])[0]
    return themeList
UDF3 = udf(themeFilter,ArrayType(StringType()))
us = us.withColumn('Themes',UDF3(us.V2Themes))
uk = uk.withColumn('Themes',UDF3(uk.V2Themes))
china = china.withColumn('Themes',UDF3(china.V2Themes))



In [12]:
###########################################################
###########Filter data based on the theme##################

theme = "GENERAL_GOVERNMENT"

us_themeFilter = us.where(array_contains(us.Themes, theme))
uk_themeFilter = uk.where(array_contains(uk.Themes, theme))
china_themeFilter = china.where(array_contains(china.Themes, theme))



In [13]:
us_themeFilter.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- SourceCommonName: string (nullable = true)
 |-- Locations: string (nullable = true)
 |-- V2Tone: string (nullable = true)
 |-- V2Themes: string (nullable = true)
 |-- Domain: string (nullable = true)
 |-- FIPS: string (nullable = true)
 |-- CountryName: string (nullable = true)
 |-- Tone: float (nullable = true)
 |-- Target: string (nullable = true)
 |-- Themes: array (nullable = true)
 |    |-- element: string (containsNull = true)



## Calculating worldwide Average Tones of News from China, US, UK

In [14]:
###########################################################
##################Initial Result Dataframe#################

## China
countryInfo = pd.read_csv('Data/countryInfo.csv', index_col = False, low_memory = False)

china_result = countryInfo[['fips','country']]
china_result['tone'] = 0
china_result['nb_article'] = 0
china_result = china_result.set_index('fips')

## U.S.
us_result = countryInfo[['fips','country']]
us_result['tone'] = 0
us_result['nb_article'] = 0
us_result = us_result.set_index('fips')

## U.K
uk_result = countryInfo[['fips','country']]
uk_result['tone'] = 0
uk_result['nb_article'] = 0
uk_result = uk_result.set_index('fips')




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is tryin

In [15]:
###########################################################
###########Calculate Number of Articles for each target####
#############Maintain the tone result to result df#########

for row in us_themeFilter.rdd.collect():
    target = json.loads(row['Target'])
    for country in target.keys():
        try:
            us_result.ix[country,'tone'] += target[country]
            us_result.ix[country,'nb_article'] +=1
        except KeyError:
            pass


for row in uk_themeFilter.rdd.collect():
    target = json.loads(row['Target'])
    for country in target.keys():
        try:
            uk_result.ix[country,'tone'] += target[country]
            uk_result.ix[country,'nb_article'] +=1
        except KeyError:
            pass
        
for row in china_themeFilter.rdd.collect():
    target = json.loads(row['Target'])
    for country in target.keys():
        try:
            china_result.ix[country,'tone'] += target[country]
            china_result.ix[country,'nb_article'] +=1
        except KeyError:
            pass

    

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 102 in stage 4.0 failed 1 times, most recent failure: Lost task 102.0 in stage 4.0 (TID 343, localhost): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 150315 ms
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


In [None]:
######Calculate Average tone for other countries for China, US, UK


us_result['tone'] = us_result['tone']/us_result['nb_article']
china_result['tone'] = china_result['tone']/china_result['nb_article']
uk_result['tone'] = uk_result['tone']/uk_result['nb_article']


In [None]:
uk_result.nb_article.sum()

## Filtering Result

Filtering Top 20 Negative tone country

In [None]:
###########################################################
##################Filter Data##############################

## China
countryGeo = pd.read_csv('Data/countrygeo.csv',index_col = False,low_memory = False)
countryGeo.columns = ['fips','lat','lon','country']


china_result = china_result.dropna()
china_result = china_result.sort(columns = 'nb_article',ascending=False)[:200]
china_result = pd.merge(china_result,countryGeo, on = ['country'],how = 'left')
china_result = china_result.dropna()
china_result = china_result[china_result['nb_article']>10]

## US
us_result = us_result.dropna()
us_result = us_result.sort(columns = 'nb_article',ascending=False)[:200]
us_result = pd.merge(us_result,countryGeo, on = ['country'],how = 'left')
us_result = us_result.dropna()
us_result = us_result[us_result['nb_article']>10]

## UK
uk_result = uk_result.dropna()
uk_result = uk_result.sort(columns = 'nb_article',ascending=False)[:200]
uk_result = pd.merge(uk_result,countryGeo, on = ['country'],how = 'left')
uk_result = uk_result.dropna()
uk_result = uk_result[uk_result['nb_article']>10]

In [None]:
china_result_20 = china_result.sort(columns = 'tone',ascending=True)[:20]
us_result_20 = us_result.sort(columns = 'tone',ascending=True)[:20]
uk_result_20 = uk_result.sort(columns = 'tone',ascending=True)[:20]

## Visualizaing Worldwide Average Tone

In [None]:
china_Map = folium.Map(location=[35.85, 104.19], zoom_start=1)
us_Map = folium.Map(location=[35.85, 104.19], zoom_start=1)
uk_Map = folium.Map(location=[35.85, 104.19], zoom_start=1)

In [None]:
import folium

'''
for index, row in test2.iterrows():
    print index
    Location = [row['lat'],row['lon']]
    name = row['country']
    R = abs(row.tone)
    if row.tone>0:
        Color = '#3186cc'
    else:
        Color = '#cc3131'
    if name == 'Turkey':
        print name
    if index>30:
        folium.CircleMarker(location=Location, radius=1000000*R,
                            popup=name,
                            color=Color,
                            fill_color=Color).add_to(Map)

'''
def draw(Map,loc,r,name):
    if r > 0.05:
        Color = '#3186cc'
    elif r < -0.05:
        Color = '#cc3131'
    else:
        Color = '#0ed148'#0ed148
    
    folium.CircleMarker(location=loc, radius=700000*abs(r),
                            popup=name+': '+str(r),
                            color=Color,
                            fill_color=Color).add_to(Map)
    
china_result_20.apply(lambda x: draw(china_Map,[x['lat'],x['lon']],x['tone'],x['country']),axis = 1)
us_result_20.apply(lambda x: draw(us_Map,[x['lat'],x['lon']],x['tone'],x['country']),axis = 1)
uk_result_20.apply(lambda x: draw(uk_Map,[x['lat'],x['lon']],x['tone'],x['country']),axis = 1)

        

china_Map

In [None]:
us_Map

In [None]:
uk_Map

In [None]:
china_Map.save(theme+'_china.html')
us_Map.save(theme+'_us.html')
uk_Map.save(theme+'_uk.html')