<h1><center>Twitter Analysis</center></h1>

<h2>Import packages</h2>

In [47]:
import re
from textblob import TextBlob
from pyspark.sql.functions import udf, when, substring, col, lit, size
from pyecharts import Bar, Pie, WordCloud, Map, Liquid
from geopy.geocoders import Nominatim
import pandas as pd

<h2>Define input path and load data</h2>

In [2]:
# Specify input data path
INPUT_FILE_PATH = '/home/yong/Desktop/CS5590_Spark_Project/yong/data.json'
# Read data with Spark
data_df = spark.read.json(INPUT_FILE_PATH)

<h2 style="color:blue">
<center>
Query 1: Find polarity percentage for tweets' text field
</center>
</h2>

In [3]:
# Revemove the special characters in the tweet
def clean_tweet(tweet):
    return ' '.join(re.sub('(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)', ' ', tweet).split())

# Calculate the polarity of the tweet
def get_polarity(tweet):
    return TextBlob(clean_tweet(tweet)).sentiment.polarity

# Create UDF for calculating polarity
get_polarity_udf = udf(get_polarity)

In [4]:
# Create temp view for polarities
data_df.select(get_polarity_udf('text').alias('polarity')).createOrReplaceTempView('polarities')

# Calculate percentage for polarity
polarity_df = spark.sql('''
    SELECT
        SUM(CASE WHEN CAST(polarity AS FLOAT) > 0 THEN 1 ELSE 0 END) / COUNT(*) * 100 AS positive,
        SUM(CASE WHEN CAST(polarity AS FLOAT) < 0 THEN 1 ELSE 0 END) / COUNT(*) * 100 AS negative,
        SUM(CASE WHEN CAST(polarity AS FLOAT) = 0 THEN 1 ELSE 0 END) / COUNT(*) * 100 AS neutral
    FROM
        polarities
''')
# Convert into pandas dataframe
polarity_pd = polarity_df.toPandas()

In [5]:
# Render graph
attr = polarity_pd.columns.values
values = polarity_pd.values[0]
pie = Pie('Polarity Percentage')
pie.add('Percentage', attr, values, is_label_show=True)
pie

<h2 style="color:blue">
<center>
Query 2: Find the created year distribution for users
</center>
</h2>

In [6]:
# Create temp view for user_created_year
data_df.select(
    'user.id',
    substring('user.created_at', -4, 4).alias('year')
).distinct().createOrReplaceTempView('user_created_year')

# Get count for each year from distinct users
created_year_df = spark.sql('''
    SELECT
        year,
        COUNT(*) AS count
    FROM
        user_created_year
    GROUP BY
        year
    ORDER BY
        year DESC
''')

# Convert into pandas dataframe
created_year_pd = created_year_df.toPandas()

In [7]:
# Render graph
created_year_bar = Bar('Created year distribution')
attr = created_year_pd.values[:,0]
values = created_year_pd.values[:,1]
created_year_bar.add('Count', attr, values, mark_point=['max', 'min'])

<h2 style="color:blue">
<center>
Query 3: Find most popular login sources
</center>
</h2>

In [4]:
# Create temp view for user_login_source
data_df.select(
    'user.id',
    'source'
).distinct().createOrReplaceTempView('user_login_source')

# Get count for top 10 login source type
user_login_source_df = spark.sql('''
    SELECT
        source,
        COUNT(*) AS count
    FROM (
        SELECT
            SUBSTR(
                source,
                LOCATE('>', source) + 1,
                LOCATE('<', SUBSTR(source, LOCATE('>', source))) -2
            ) AS source
        FROM
            user_login_source
    ) t1
    GROUP BY
        source
    ORDER BY
        count DESC
    LIMIT 5
''')

# Convert into pandas dataframe
user_login_source_pd = user_login_source_df.toPandas()

In [5]:
# Render graph
user_login_source_bar = Bar('Login source distribution')
attr = user_login_source_pd.values[:,0]
values = user_login_source_pd.values[:,1]
user_login_source_bar.add('Count', attr, values, mark_line=["average"])

<h2 style="color:blue">
<center>
Query 4: Generate word cloud for hashtags' popularity
</center>
</h2>

In [10]:
# Create temp view for hashtags
data_df.select('entities.hashtags.text').createOrReplaceTempView('hashtags')

# Get count for each hashtag
hashtags_df = spark.sql('''
    SELECT
        LOWER(text) as text,
        COUNT(*) AS count
    FROM (
        SELECT
            EXPLODE(text) AS text
        FROM
            hashtags
    ) t1
    GROUP by
        LOWER(text)
    ORDER BY
        count DESC
''')

# Convert into pandas dataframe
hashtags_pd = hashtags_df.toPandas()

In [11]:
# Render graph
wordcloud = WordCloud(width=800, height=600)
attr = hashtags_pd.values[:,0]
values = hashtags_pd.values[:,1]
wordcloud.add("", attr, values)

<h2 style="color:blue">
<center>
Query 5: Count null and not null for geo tag
</center>
</h2>

In [12]:
# Create temp view for geo_location
data_df.select('geo').createOrReplaceTempView('geo_location')

# Get count for total number of tweets with geo info
geo_df = spark.sql('''
    SELECT
        SUM(CASE WHEN geo IS NULL THEN 1 ELSE 0 END) AS count_nulls,
        COUNT(geo) AS count_not_nulls
    FROM
        geo_location
''')

# Convert into pandas dataframe
geo_pd = geo_df.toPandas()

In [13]:
# Render graph
attr = geo_pd.columns
values = geo_pd.values[0]
pie = Pie('Count Null and Not Null for Geo')
pie.add('Count', geo_pd.columns, values, is_label_show=True)

<h2 style="color:blue">
<center>
Query 6: Get country code distribution from geo tag
</center>
</h2>

In [14]:
# Create a geo locator
# With geo, we can get the extrat location of the user who sent the tweets
geolocator = Nominatim(user_agent="Twitter Analysis")

In [15]:
# Get all geo that are not null
coordinates_df = spark.sql('''
    SELECT
        geo.coordinates
    FROM
        geo_location
    WHERE
        geo IS NOT NULL
''')

In [16]:
# Create a function to reverse coordinates to country code
def get_geo_country(coordinates):
    return geolocator.reverse(str(coordinates[0]) + ", " + str(coordinates[1])).raw['address']['country']

In [17]:
# Convert coordinates_df into pandas dataframe then into Python list
coordinates = list(coordinates_df.toPandas().coordinates)

In [18]:
country_code_dict = dict()
for _ in coordinates:
    country_code = get_geo_country(_)
    if country_code in country_code_dict:
        country_code_dict[country_code] = country_code_dict[country_code] + 1
    else:
        country_code_dict[country_code] = 1

In [19]:
country_code_df = pd.DataFrame.from_dict(country_code_dict, orient='index',columns=['Count'])

In [20]:
country_code_df.sort_values(by='Count', ascending=False)

Unnamed: 0,Count
USA,119
Canada,11
New Zealand/Aotearoa,3
UK,3
Brasil,2
Philippines,2
ประเทศไทย,1
Chile,1
Ireland,1
Australia,1


<h2 style="color:blue">
<center>
Query 7: Get top 10 country codes distribution from place tag
</center>
</h2>

In [22]:
data_df.select('place').createOrReplaceTempView('places')

In [23]:
spark.sql('''
    SELECT
        place.country_code,
        count(*) AS count
    FROM
        places
    WHERE
        place IS NOT NULL
    GROUP BY
        place.country_code
    ORDER BY
        2 desc
    LIMIT 10
''').show()

+------------+-----+
|country_code|count|
+------------+-----+
|          US| 1147|
|          CA|   49|
|          GB|   47|
|          AU|   17|
|          PH|   10|
|          BR|    6|
|          IN|    6|
|          JP|    5|
|          NZ|    4|
|          MY|    4|
+------------+-----+



<h2 style="color:blue">
<center>
Query 8: Message truncated percentage 
</center>
</h2>

In [24]:
data_df.select('truncated').createOrReplaceTempView('tweet_truncated')

In [25]:
# Create a dataframe to calculte the message truncated percentage
truncated_percentage_df = spark.sql('''
    SELECT
        truncated,
        COUNT(*) / (SELECT COUNT(*) FROM tweet_truncated) AS percentage
    FROM
        tweet_truncated
    GROUP BY 
        truncated
''')

# Convert into pandas dataframe
truncated_percentage_pd = truncated_percentage_df.toPandas()

In [26]:
# Render graph
liquid = Liquid("Message truncated percentage")
liquid.add("Liquid", [truncated_percentage_pd.query('truncated == True').percentage])

<h2 style="color:blue">
<center>
Query 9: User verified distribution and analysis
</center>
</h2>

In [27]:
# Check how many users have verified account
data_df.groupBy('user.verified').count().show()

+--------+-----+
|verified|count|
+--------+-----+
|    true| 1221|
|   false|98779|
+--------+-----+



In [29]:
data_df.select('user.verified', 'user.default_profile').createOrReplaceTempView('verified_users_profile')

In [34]:
# Check how many verified users are/are not using default profile
spark.sql('''
    SELECT
        default_profile,
        COUNT(*) AS count
    FROM
        verified_users_profile
    WHERE
        verified = 'true'
    GROUP BY
        default_profile
''').show()

+---------------+-----+
|default_profile|count|
+---------------+-----+
|           true|  218|
|          false| 1003|
+---------------+-----+



In [35]:
# Check how many un-verified users are/are not using default profile
spark.sql('''
    SELECT
        default_profile,
        COUNT(*) AS count
    FROM
        verified_users_profile
    WHERE
        verified = 'false'
    GROUP BY
        default_profile
''').show()

+---------------+-----+
|default_profile|count|
+---------------+-----+
|           true|55598|
|          false|43181|
+---------------+-----+



<h2 style="color:blue">
<center>
Query 10: Analysis on friends_count and followers_count
</center>
</h2>

In [40]:
# Users count where the number of friends_count is greater than followers_count
data_df.where('user.friends_count > user.followers_count').count()

61730

In [41]:
# Users count where the number of friends_count is less than followers_count
data_df.where('user.friends_count < user.followers_count').count()

37952

In [42]:
# Users count where the number of friends_count is the same as followers_count
data_df.where('user.friends_count = user.followers_count').count()

318

<h2 style="color:blue">
<center>
Machine Learning: Frequent Pattern Mining
</center>
</h2>

In [72]:
from pyspark.ml.fpm import FPGrowth

In [84]:
# Generate computed columns for geo_enabled, profile_background_tile, profile_use_background_image,
# and default_profile and save the result to a list
data_list = data_df.select(
    when(data_df.truncated == "true", 1).otherwise(0).alias("truncated"),
    when(data_df.user.verified == "true", 1).otherwise(0).alias("verified"),
    when(data_df.user.geo_enabled == "true", 1).otherwise(0).alias("geo_enabled"),
    when(data_df.user.profile_background_tile == "true", 1).otherwise(0).alias("profile_background_tile"),
    when(data_df.user.profile_use_background_image == "true", 1).otherwise(0).alias("profile_use_background_image"),
    when(data_df.user.default_profile == "true", 1).otherwise(0).alias("default_profile")
).collect()

In [85]:
# Generate new data list for ML
ml_data_list = []
index = 0

for row in data_list:
    temp_data_list = []
    if row[0] == 1:
        temp_data_list.append('truncated')
    if row[1] == 1:
        temp_data_list.append('verified')
    if row[2] == 1:
        temp_data_list.append('geo_enabled')
    if row[3] == 1:
        temp_data_list.append('profile_background_tile')
    if row[4] == 1:
        temp_data_list.append('profile_use_background_image')
    if row[5] == 1:
        temp_data_list.append('default_profile')
    if temp_data_list:
        ml_data_list.append((index, temp_data_list))
        index += 1

In [86]:
# Generate analysis dataframe
analysis_df = spark.createDataFrame(ml_data_list, ['id', 'items'])

In [92]:
# Create the model and fit the dataframe into the model
fp_growth = FPGrowth(itemsCol="items", minSupport=0.50, minConfidence=0.6)
model = fpGrowth.fit(analysis_df)

In [93]:
# Display frequent itemsets
model.freqItemsets.show(truncate=False)

+-----------------------------------------------+-----+
|items                                          |freq |
+-----------------------------------------------+-----+
|[profile_use_background_image]                 |80317|
|[default_profile]                              |55816|
|[default_profile, profile_use_background_image]|55816|
+-----------------------------------------------+-----+



In [94]:
# Display generated association rules
model.associationRules.show(truncate=False)

+------------------------------+------------------------------+------------------+
|antecedent                    |consequent                    |confidence        |
+------------------------------+------------------------------+------------------+
|[default_profile]             |[profile_use_background_image]|1.0               |
|[profile_use_background_image]|[default_profile]             |0.6949462753837917|
+------------------------------+------------------------------+------------------+



In [95]:
# Show distinct prediction count
model.transform(analysis_df).groupBy("prediction").count().show()

+-----------------+-----+
|       prediction|count|
+-----------------+-----+
|               []|64601|
|[default_profile]|24501|
+-----------------+-----+



In [96]:
# Display prediction
model.transform(analysis_df).show(tdruncate=False)

+---+-----------------------------------------------------------------------+-----------------+
|id |items                                                                  |prediction       |
+---+-----------------------------------------------------------------------+-----------------+
|0  |[profile_use_background_image, default_profile]                        |[]               |
|1  |[geo_enabled, profile_use_background_image, default_profile]           |[]               |
|2  |[profile_use_background_image]                                         |[default_profile]|
|3  |[truncated, profile_use_background_image, default_profile]             |[]               |
|4  |[profile_use_background_image, default_profile]                        |[]               |
|5  |[profile_use_background_image, default_profile]                        |[]               |
|6  |[profile_use_background_image, default_profile]                        |[]               |
|7  |[geo_enabled, profile_use_backgroun

<h2 style="color:blue">
<center>
Graph: algorithms
</center>
</h2>

In [3]:
from graphframes import *

In [29]:
# Create edges
e = data_df.select(
    col("user.id").alias("src"),
    col("retweeted_status.user.id").alias("dst"),
    lit("retweet").alias("relationship")
).where(
    col("retweeted_status.user.id").isNotNull()
).distinct()

In [30]:
# Create vertices
v = data_df.select(
    col("user.id"),
    col("user.screen_name")
).union(
    data_df.select(
        col("retweeted_status.user.id"),
        col("retweeted_status.user.name"),
    ).where(
        col("retweeted_status.user.id").isNotNull()
    )
).distinct()

In [31]:
# Create graph
g = GraphFrame(v, e)

In [32]:
# Get top 10 id with highest in degree
g.inDegrees.sort("inDegree", ascending=False).show(10, False)

+-------------------+--------+
|id                 |inDegree|
+-------------------+--------+
|1604444052         |2201    |
|487297085          |1979    |
|130496027          |1579    |
|4196983835         |1359    |
|1008440487915720708|1319    |
|16989178           |1110    |
|19697415           |1070    |
|150078976          |1056    |
|2828212668         |984     |
|3267456386         |930     |
+-------------------+--------+
only showing top 10 rows



In [54]:
# Get top 10 id with lowest in degree
g.inDegrees.sort("inDegree").show(10, False)

+------------------+--------+
|id                |inDegree|
+------------------+--------+
|3358687222        |1       |
|18611344          |1       |
|2859622263        |1       |
|20813564          |1       |
|179176923         |1       |
|948171093818343425|1       |
|540321025         |1       |
|72954856          |1       |
|95755482          |1       |
|135138678         |1       |
+------------------+--------+
only showing top 10 rows



In [33]:
# Get top 10 id with highest out degree
g.outDegrees.sort("outDegree", ascending=False).show(10, False)

+-------------------+---------+
|id                 |outDegree|
+-------------------+---------+
|988374538491777025 |48       |
|1053011875007483905|24       |
|824216698551209984 |18       |
|822210115784806400 |18       |
|80602426           |17       |
|2905278738         |16       |
|62147616           |16       |
|4053477192         |16       |
|954454874665771013 |16       |
|2874358611         |15       |
+-------------------+---------+
only showing top 10 rows



In [34]:
g.edges.show(10, False)

+-------------------+----------+------------+
|                src|       dst|relationship|
+-------------------+----------+------------+
| 828822710478331904|4196983835|     retweet|
| 798687870240182272| 487297085|     retweet|
|         2199552860| 112047805|     retweet|
| 903177352259260419| 460226159|     retweet|
|          498933814|1398759560|     retweet|
|          231156352| 111490230|     retweet|
|1045729609793208320|1604444052|     retweet|
| 921562006955724800|  49698134|     retweet|
|          572011574|4429003533|     retweet|
|         2776139595| 788110982|     retweet|
|         2448240715| 435207636|     retweet|
|          437169409|  11856032|     retweet|
|          774786524|  37824038|     retweet|
| 933613870756892673|2696243754|     retweet|
|         3374292849|4196983835|     retweet|
|1045631048187617280| 593948174|     retweet|
| 795963986650939392|  14580438|     retweet|
| 992038732114034690|  20878297|     retweet|
|           57198714|  42447494|  

In [55]:
# Compute shortest paths from each vertex to the given set of landmark vertices
# For one id with low number of in degree
g.shortestPaths(
    landmarks=["3358687222"]
).select("id", "distances").where(
    size(col("distances")) > 0
).show(10, False)

+------------------+-----------------+
|id                |distances        |
+------------------+-----------------+
|885245073813798912|[3358687222 -> 1]|
|3358687222        |[3358687222 -> 0]|
+------------------+-----------------+



In [56]:
# Compute shortest paths from each vertex to the given set of landmark vertices
# For one id with high number of in degree
g.shortestPaths(
    landmarks=["150078976"]
).select("id", "distances").where(
    size(col("distances")) > 0
).show(10, False)

+------------------+----------------+
|id                |distances       |
+------------------+----------------+
|740005760801726465|[150078976 -> 1]|
|334776400         |[150078976 -> 1]|
|849374799868637185|[150078976 -> 1]|
|935707527580397569|[150078976 -> 1]|
|3424129696        |[150078976 -> 1]|
|827544166624325632|[150078976 -> 1]|
|303950400         |[150078976 -> 1]|
|498077600         |[150078976 -> 2]|
|241941600         |[150078976 -> 1]|
|1566650000        |[150078976 -> 1]|
+------------------+----------------+
only showing top 10 rows



In [60]:
# Computes the number of triangles passing through each vertex
g.triangleCount().select("id", "count").where(col("count") > 0).show(10, False)

+-------------------+-----+
|id                 |count|
+-------------------+-----+
|908149255654854656 |104  |
|883855148136763392 |4    |
|883855148136763392 |4    |
|1003631763133001729|4    |
|719662077934243840 |12   |
|197111814          |4    |
|862106769862078464 |2    |
|909519301299892225 |40   |
|909519301299892225 |40   |
|3100613023         |4    |
+-------------------+-----+
only showing top 10 rows

