In [18]:
import pandas as pd
import pyspark as ps

In [1]:
spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("Spark Session Intro") \
            .getOrCreate()
sc = spark.sparkContext

In [30]:
f_tweets_json = spark.read.json('../spark/french_tweets.json')

In [31]:
f_tweets_json.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |

In [34]:
f_tweets_json.createOrReplaceTempView('tweets')

In [85]:
type(f_tweets_json)

pyspark.sql.dataframe.DataFrame

In [94]:
query = '''
--select entities.hashtags, user.id, user.friends_count, user.followers_count, user.following, user. follow_request_sent, user.created_at, possibly_sensitive
--select entities.hashtags, created_at, in_reply_to_screen_name, in_reply_to_status_id, in_reply_to_user_id
--select id as tweet_id, user.id as user_id, user.created_at, quoted_status.favorite_count, quoted_status.retweet_count, quoted_status.retweeted, quoted_status.favorited
--create table top_screen_name as
select in_reply_to_screen_name, in_reply_to_user_id, count(distinct user.id)
from tweets
where in_reply_to_screen_name is not null
group by 1, 2
order by 3 desc
limit 20;
'''

top_screen_name = spark.sql(query)


In [95]:
spark.sql(query).show()

+-----------------------+-------------------+-------------------------------+
|in_reply_to_screen_name|in_reply_to_user_id|count(DISTINCT user.id AS `id`)|
+-----------------------+-------------------+-------------------------------+
|         EmmanuelMacron|         1976143068|                            245|
|                   TPMP|          832766641|                            173|
|           MLP_officiel|          217749896|                            131|
|                  BFMTV|          133663801|                            128|
|           Cyrilhanouna|          378632690|                            102|
|           dupontaignan|           38170599|                             99|
|              Qofficiel| 743432061088899073|                             49|
|                 GG_RMC|          114512948|                             47|
|               THEDAMSO|          529836126|                             47|
|        Marion_M_Le_Pen|          520449734|                   

In [96]:
top_screen_name_df = top_screen_name.select("*").toPandas()

In [97]:
top_screen_name_df

Unnamed: 0,in_reply_to_screen_name,in_reply_to_user_id,count(DISTINCT user.id AS `id`)
0,EmmanuelMacron,1976143068,245
1,TPMP,832766641,173
2,MLP_officiel,217749896,131
3,BFMTV,133663801,128
4,Cyrilhanouna,378632690,102
5,dupontaignan,38170599,99
6,Qofficiel,743432061088899073,49
7,THEDAMSO,529836126,47
8,GG_RMC,114512948,47
9,Marion_M_Le_Pen,520449734,43


In [104]:
spark.sql('drop view if exists top_screen_name;')
q2 = '''
create temp view top_screen_name as
select in_reply_to_screen_name, in_reply_to_user_id, count(distinct user.id) as reply_count
from tweets
where in_reply_to_screen_name is not null
group by 1, 2
order by 3 desc
limit 20;
'''
spark.sql(q2)
q3 = '''
select * from top_screen_name;
'''
spark.sql(q3).show()

+-----------------------+-------------------+-----------+
|in_reply_to_screen_name|in_reply_to_user_id|reply_count|
+-----------------------+-------------------+-----------+
|         EmmanuelMacron|         1976143068|        245|
|                   TPMP|          832766641|        173|
|           MLP_officiel|          217749896|        131|
|                  BFMTV|          133663801|        128|
|           Cyrilhanouna|          378632690|        102|
|           dupontaignan|           38170599|         99|
|              Qofficiel| 743432061088899073|         49|
|                 GG_RMC|          114512948|         47|
|               THEDAMSO|          529836126|         47|
|        Marion_M_Le_Pen|          520449734|         43|
|              jmaphatie|          369914677|         41|
|              Le_Figaro|            8350912|         39|
|                   libe|           68440549|         38|
|               leLab_E1|          376812938|         37|
|            J

In [101]:
q4 = '''
select 
    in_reply_to_screen_name, 
    in_reply_to_user_id, 
    reply_count, 
    tweets
from top_screen_name
left join tweets 
    on tweets.user.id = top_screen_name.in_reply_to_user_id
'''
spark.sql(q4)



DataFrame[]

In [133]:
#spark.sql('drop view top_acc_replies;')
q5 = '''
create temp view top_acc_replies as
select in_reply_to_screen_name, user.id, user.name, user.created_at as user_creation_date, created_at as tweet_date, id as tweet_id, place.name, place.country
from tweets
where in_reply_to_screen_name in (select in_reply_to_screen_name from top_screen_name)
order by 1 desc;
'''
spark.sql(q5)
replies = spark.sql('select * from top_acc_replies')
replies.show()

+-----------------------+------------------+--------------------+--------------------+--------------------+------------------+--------------+-------+
|in_reply_to_screen_name|                id|                name|  user_creation_date|          tweet_date|          tweet_id|          name|country|
+-----------------------+------------------+--------------------+--------------------+--------------------+------------------+--------------+-------+
|                   libe|        2354982361|    le sac de talent|Fri Feb 21 15:39:...|Fri Apr 28 07:29:...|857859214059991040|       Quévert| France|
|                   libe|         178630919|annette levy willard|Sun Aug 15 07:29:...|Fri Apr 28 06:44:...|857847995513155586|         Paris| France|
|                   libe|786567002345185280|           VIQUESNEL|Thu Oct 13 13:59:...|Thu Apr 27 10:17:...|857539118527598593|      Bordeaux| France|
|                   libe|         493446569|            CPasbien|Wed Feb 15 20:19:...|Thu Apr 27 07:

In [134]:
replies_df = replies.select("*").toPandas()

In [135]:
replies_df.head()

Unnamed: 0,in_reply_to_screen_name,id,name,user_creation_date,tweet_date,tweet_id,name.1,country
0,libe,493446569,CPasbien,Wed Feb 15 20:19:08 +0000 2012,Wed Apr 26 15:25:23 +0000 2017,857254286539067392,Niort,France
1,libe,579091759,Rêv de Presse,Sun May 13 16:29:36 +0000 2012,Wed Apr 26 17:33:11 +0000 2017,857286446939598850,Angers,France
2,libe,462343380,Cynthia-ZD,Thu Jan 12 21:17:40 +0000 2012,Thu Apr 27 07:01:47 +0000 2017,857489939549827074,Noisy-le-Grand,France
3,libe,493446569,CPasbien,Wed Feb 15 20:19:08 +0000 2012,Thu Apr 27 07:37:32 +0000 2017,857498936780705792,Niort,France
4,libe,493446569,CPasbien,Wed Feb 15 20:19:08 +0000 2012,Thu Apr 27 07:47:56 +0000 2017,857501554739466241,Niort,France


In [None]:
user.friends_count, user.statuses_count,