In [6]:
%matplotlib inline
from collections import Counter
from typing import Callable, List, Dict
import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as F
import pyspark as ps
import json
import requests
import scipy.stats as stats

# Always make it pretty.
plt.style.use('ggplot')

In [7]:
# Create spark session
spark = (ps.sql.SparkSession
         .builder
         .master('local[4]')
         .appName('julia_json')
         .getOrCreate()
        )
sc = spark.sparkContext

In [147]:
spark_df = spark.read.json('./data/french_tweets.json')
spark_df.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |

In [10]:
spark_df.show(1)

+------------+-----------+--------------------+------------------+-----------------+-----------------+--------------+--------------+---------+------------+----+------------------+------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+-----+--------------------+------------------+-------------+----------------+--------------------+-------------+---------+--------------------+--------------------+-------------+---------+--------------------+
|contributors|coordinates|          created_at|display_text_range|         entities|extended_entities|extended_tweet|favorite_count|favorited|filter_level| geo|                id|            id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|is_quote_status|lang|limit|               place|possibly_sensitive|quoted_status|quoted_status_id|quoted_status_id_str|retweet_count|retweete

In [9]:
spark_df.take(1)

[Row(contributors=None, coordinates=None, created_at='Wed Apr 26 13:30:45 +0000 2017', display_text_range=None, entities=Row(hashtags=[], media=None, symbols=[], urls=[], user_mentions=[]), extended_entities=None, extended_tweet=None, favorite_count=0, favorited=False, filter_level='low', geo=None, id=857225437122097152, id_str='857225437122097152', in_reply_to_screen_name=None, in_reply_to_status_id=None, in_reply_to_status_id_str=None, in_reply_to_user_id=None, in_reply_to_user_id_str=None, is_quote_status=False, lang='fr', limit=None, place=Row(bounding_box=Row(coordinates=[[[-0.061141, 49.208997], [-0.061141, 49.250115], [-0.032199, 49.250115], [-0.032199, 49.208997]]], type='Polygon'), country='France', country_code='FR', full_name='Dozulé, France', id='4da693e9b39923ab', name='Dozulé', place_type='city', url='https://api.twitter.com/1.1/geo/id/4da693e9b39923ab.json'), possibly_sensitive=None, quoted_status=None, quoted_status_id=None, quoted_status_id_str=None, retweet_count=0, r

In [29]:
#counting number of original tweets mentioning names of candidates:
origins = (spark_df.select('retweeted', 'text')
           .where((spark_df.retweeted == 'false')&(spark_df.text.contains('EmmanuelMacron'))))
origins.count()

2185

In [151]:
origins_retw = (spark_df.select('text')
           .where(spark_df.text.contains('EmmanuelMacron')))
origins_retw.count()

2185

In [30]:
origins_ML = (spark_df.select('retweeted', 'text')
           .where((spark_df.retweeted == 'false')&(spark_df.text.contains('MLP_officiel'))))
origins_ML.count()

1297

In [54]:
#extracting all hashtags used in campaign
hashtags = spark_df[['entities']].select('entities.hashtags')

hashtags.show()

+--------------------+
|            hashtags|
+--------------------+
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|[[[0, 12], AllEye...|
|[[[0, 10], RNCMob...|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|                  []|
|[[[0, 10], whirlp...|
|                  []|
|                  []|
+--------------------+
only showing top 20 rows



In [57]:
hashtags.count()

214936

In [86]:
rdd = hashtags.rdd.map(list)
rdd1 = rdd.flatMap(lambda x: x[0])
#rdd2 = rdd.faltMap(lambda x: x.split(','))
#rdd2 = rdd1.distinct()
rdd1.take(5)

[Row(indices=[0, 12], text='AllEyezOnIt'),
 Row(indices=[13, 23], text='Neochrome'),
 Row(indices=[24, 30], text='Eriah'),
 Row(indices=[31, 36], text='Loin'),
 Row(indices=[37, 45], text='Youtube')]

In [131]:
hashtags1 = spark.createDataFrame(rdd1)
hashtags1.show()


+--------+----------------+
| indices|            text|
+--------+----------------+
| [0, 12]|     AllEyezOnIt|
|[13, 23]|       Neochrome|
|[24, 30]|           Eriah|
|[31, 36]|            Loin|
|[37, 45]|         Youtube|
|[46, 63]|YoutubeNeochrome|
|[64, 71]|          RepDom|
| [0, 10]|       RNCMobile|
| [0, 10]|       whirlpool|
|[35, 45]|       Run4Water|
|[46, 61]|  FinalCountdown|
|[82, 94]|     shastadaisy|
|[81, 88]|          GE2017|
|  [2, 9]|          IC2113|
|[73, 79]|           CFL50|
|[57, 60]|              FN|
| [2, 12]|       Whirlpool|
|[13, 20]|          Amiens|
|[22, 26]|             tag|
|[27, 36]|        instapic|
+--------+----------------+
only showing top 20 rows



In [None]:
all_hashtags = hashtags1[['text']]
all_hashtags_rdd = all_hashtags.rdd
hashtags_list = all_hashtags_rdd.keys()
hashtags_list.distinct()
hashtags_list.take(5)

In [138]:
has_list = hashtags_list.take(1000)
type(has_list)

list

In [141]:
hashtag_counts = Counter(has_list)
popular_tags = hashtag_counts.most_common()
popular_tags[0:20]

[('whirlpool', 35),
 ('Macron', 20),
 ('jecoutenrjlyon', 19),
 ('JecouteNRJLyon', 17),
 ('Whirlpool', 14),
 ('paris', 12),
 ('HarryStylesChezCauetSurNRJ', 11),
 ('CanaryWharf', 11),
 ('SansMoiLe7Mai', 7),
 ('Presidentielle2017', 7),
 ('love', 7),
 ('trndnl', 7),
 ('JeSuisPasVieuxMais', 6),
 ('JamaisMacron', 6),
 ('Paris', 6),
 ('ConfSNACG', 6),
 ('FN', 5),
 ('Amiens', 5),
 ('instagood', 5),
 ('MarineLePen', 5)]

In [148]:
#looking for influencers (users with the highest numbers of retweets)

# influencers = spark_df.select('user', 'retweet_count')
# influencers.show()