In [205]:
import pyspark as ps    # for the pyspark suite

In [206]:
spark = (ps.sql.SparkSession # Create a pyspark session for using spark sql
         .builder            # build it
         .master('local[4]') # using 4 local cpu cores
         .appName('lecture') # named 'lecture'
         .getOrCreate()      # If it already exists, return that object. Else create one.
        )
sc = spark.sparkContext      # Get (return) the spark Context for direct interaction with spark. 

In [207]:
tweet_df = spark.read.json('french_tweets.json')


# We started by looking at a few snapshots of the dataset: 
* General description
* Line count
* Schema

In [208]:
tweet_df.describe().show()

+-------+------------+--------------------+--------------+------------+--------------------+--------------------+-----------------------+---------------------+-------------------------+--------------------+-----------------------+------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+
|summary|contributors|          created_at|favorite_count|filter_level|                  id|              id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str| in_reply_to_user_id|in_reply_to_user_id_str|  lang|    quoted_status_id|quoted_status_id_str|retweet_count|              source|                text|        timestamp_ms|
+-------+------------+--------------------+--------------+------------+--------------------+--------------------+-----------------------+---------------------+-------------------------+--------------------+-----------------------+------+--------------------+--------------------+-------

In [209]:
tweet_df.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |

In [210]:
print("line count: {}\n".format(tweet_df.count()))

line count: 214936



In [211]:
tweet_df.take(1)

[Row(contributors=None, coordinates=None, created_at='Wed Apr 26 13:30:45 +0000 2017', display_text_range=None, entities=Row(hashtags=[], media=None, symbols=[], urls=[], user_mentions=[]), extended_entities=None, extended_tweet=None, favorite_count=0, favorited=False, filter_level='low', geo=None, id=857225437122097152, id_str='857225437122097152', in_reply_to_screen_name=None, in_reply_to_status_id=None, in_reply_to_status_id_str=None, in_reply_to_user_id=None, in_reply_to_user_id_str=None, is_quote_status=False, lang='fr', limit=None, place=Row(bounding_box=Row(coordinates=[[[-0.061141, 49.208997], [-0.061141, 49.250115], [-0.032199, 49.250115], [-0.032199, 49.208997]]], type='Polygon'), country='France', country_code='FR', full_name='Dozulé, France', id='4da693e9b39923ab', name='Dozulé', place_type='city', url='https://api.twitter.com/1.1/geo/id/4da693e9b39923ab.json'), possibly_sensitive=None, quoted_status=None, quoted_status_id=None, quoted_status_id_str=None, retweet_count=0, r

# We then looked for tweet objects that might convey information about the situation in France in the lead-up to the Macron/La Pen election, and created a new SparkDF from those objects
- retweet count, text, created at, location, user, screen_name

In [212]:
something = tweet_df.select('created_at', 'user.screen_name', 'text', 'retweet_count')

# We found that the retweet number was zero for all entries in the DF, after trying two approaches. While we were initially surprised, we decided to look instead at the content of the 'text' column
* We attempted to create a wordcount dictionary, but the results do not make sense
* We looked at how many times Macron and La Pen were mentioned in tweets

In [None]:
# Looking for non-zero retweets
something.filter(something["retweet_count"]>0).show()
something.orderBy(‘retweet_count’, ascending = True).show()

In [198]:
# Number of times a tweet contained 'acron'
Macron = tweet_df.filter(tweet_df.text.contains('acron'))


# Number of times a tweet contained 'pen'
Pen = tweet_df.filter(tweet_df.text.rlike('pen')) 
print(f'Macron: {Macron.count()}')
print(f'La Pen: {Pen.count()}')

Macron: 6997
La Pen: 5170


# We grouped the replies and sorted by count, and interestingly, the 'null' field was the highest count. 
- We think this means there were a lot of one-off tweets: 441
- There were more replies for EmmanuelMacron than MLP_officiel: 247
- The highest non-null replies went to TPMP, a popular media news television station in France

In [255]:
reply_scrn_name = tweet_df.select('in_reply_to_screen_name', 'text').groupBy('in_reply_to_screen_name').count().orderBy('count', ascending=False).show()
reply_scrn_name

+-----------------------+------+
|in_reply_to_screen_name| count|
+-----------------------+------+
|                   null|151624|
|                   TPMP|   456|
|         EmmanuelMacron|   441|
|          trendinaliaFR|   312|
|                  BFMTV|   297|
|           MLP_officiel|   247|
|             TantrumJas|   200|
|                 GG_RMC|   198|
|           Cyrilhanouna|   162|
|           dupontaignan|   134|
|            bb39llnicox|   116|
|          Rev_de_Presse|   106|
|            Superminada|    88|
|                Valeurs|    80|
|              Le_Figaro|    77|
|             spideryoyo|    76|
|                Guuenne|    68|
|               AnaMrd__|    68|
|                   libe|    65|
|        Marion_M_Le_Pen|    65|
+-----------------------+------+
only showing top 20 rows



In [251]:
rply_tweet = tweet_df.groupBy('in_reply_to_status_id').count().orderBy('count', ascending=False).show()
rply_tweet

+---------------------+------+
|in_reply_to_status_id| count|
+---------------------+------+
|                 null|161053|
|   857936880393998336|    39|
|   857313617418563584|    36|
|   857665869522063361|    25|
|   857282345799483394|    21|
|   857619553412685824|    20|
|   857282823249645569|    20|
|   857554575624196096|    20|
|   857498990086156288|    19|
|   858033334521409536|    16|
|   857343582633644032|    16|
|   857984463791423488|    16|
|   857672324434321409|    16|
|   857928900839587840|    15|
|   857718577205583872|    15|
|   857655589048078336|    14|
|   857345269637287936|    14|
|   857854177485221888|    13|
|   857521279183843328|    13|
|   857321176208683008|    13|
+---------------------+------+
only showing top 20 rows



# Because we were interested in the tweets that received the most replies, we dropped the null values and grouped by twitter ID. 
- Attempted to located the records with the 

In [254]:
reply_df = tweet_df.select('id', 'text').na.drop()
reply_df[reply_df['id'].isin('857936880393998336')].collect()

# reply_df.select('text', reply_df.id.when('857936880393998336')).show()

[]

# We looked to see how many distinct screen names were in the DF, and then ordered the most frequent contributors in descending order

In [199]:
distinct_scrn_names = something.select('screen_name').distinct().count()

In [178]:
top_contributors = something.groupBy('screen_name').count().orderBy('count', ascending=False).show()

+--------------+-----+
|   screen_name|count|
+--------------+-----+
|focus_regional| 3337|
| trendinaliaFR|  757|
| mediasoignant|  695|
|   moi_c_yanis|  636|
|     franckjt1|  541|
|     LegalKant|  429|
|       coste51|  384|
|        Sylv33|  347|
|  LaLifeDeSoso|  347|
|         MLLSR|  340|
|  RomainDebois|  335|
|  Akije_Hirodi|  324|
|     cavaleyra|  311|
|  MylanFreeman|  303|
| Its_Mickaella|  289|
|     BCritique|  280|
|        lxncdn|  275|
|    odetostvmp|  272|
|   johncharle1|  272|
|     ibarbinho|  268|
+--------------+-----+
only showing top 20 rows



# While we are fairly certain the word counts below are not correct, we found that the most common words were prepositions, etc. In future study, we would remove those words from the DF before counting the number of occurences

In [None]:
word_counts = tweet_df.select('text').take(something.count())


In [165]:
word_counts = str(word_counts)


In [166]:
d = dict() 
words_split = word_counts.split(" ") 
# Loop through each line of the file 
for item in words_split: 

    # Convert the characters in line to  
    # lowercase to avoid case mismatch 
    words = item.lower() 

        # Check if the word is already in dictionary 
    if words in d: 
        # Increment count of word by 1 
        d[words] = d[words] + 1
    else: 
        # Add the word to dictionary with count 1 
        d[words] = 1
  



In [204]:
import pyspark.sql.functions as f
something.withColumn('word', f.explode(f.split(f.col('text'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)\
    .show(50)


+-----+-----+
| word|count|
+-----+-----+
|   de|53916|
|   la|29777|
|   le|25112|
|    à|22290|
|    a|19935|
|   et|18917|
|  pas|18661|
|  les|18022|
|  que|17059|
|     |16435|
|   je|15905|
|   en|15316|
| pour|15183|
|   un|13656|
|  des|13064|
|    !|11815|
|c'est|11412|
|  the|11356|
|   du|10552|
|  est|10446|
|   on| 9799|
|  qui| 9521|
|   to| 9288|
|  une| 9192|
|    -| 8332|
|   me| 8113|
|  sur| 8072|
|   il| 8029|
|    ?| 7705|
|   ce| 7695|
| dans| 7519|
|   au| 7424|
|   in| 7335|
| avec| 7150|
| mais| 7036|
|   ça| 6906|
| vous| 6221|
| plus| 6009|
|   of| 6003|
|   tu| 5601|
|   ne| 5575|
|    I| 5534|
|   Je| 5507|
|   Le| 5411|
|  and| 5298|
|    @| 5252|
| fait| 5119|
| j'ai| 5103|
|  mon| 5051|
|    :| 4862|
+-----+-----+
only showing top 50 rows



In [193]:
from collections import OrderedDict

OrderedDict(sorted(d.items(), key=lambda t: t[1]), reverse=True)

OrderedDict([("[row(text='je", 1),
             ('comprends', 1),
             ('fin', 1),
             ('why', 1),
             ("😓'),", 1),
             ("row(text='@julesbl99", 1),
             ('travailles', 1),
             ('lieu', 1),
             ('raconter', 1),
             ('ta', 1),
             ("vie'),", 1),
             ('row(text="@lecho_fr', 1),
             ('@lasaucelleoff', 1),
             ("j'espère", 1),
             ('mieux', 1),
             ('sûr', 1),
             ('dieu', 1),
             ('seul', 1),
             ('sait"),', 1),
             ('row(text="362', 1),
             ('jours', 1),
             ('an', 1),
             ('suis', 1),
             ('adorable,', 1),
             ('parti', 1),
             ('ces', 1),
             ('362....', 1),
             ('😡😡"),', 1),
             ("row(text='@julia71903850", 1),
             ('отличная', 1),
             ("реклама!'),", 1),
             ('row(text="la', 1),
             ('macronie', 1),
            