In [3]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName("proj").getOrCreate()

24/04/19 09:27:05 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
24/04/19 09:27:22 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


In [5]:
spark

In [2]:
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

### Load dataset

In [1]:
from pyspark.sql.functions import col, asc, month, sum, split, explode, lower
from pyspark.sql.types import DoubleType

In [3]:
df = spark.read.json("/data/ProjectDatasetFacebookAU")

                                                                                

In [5]:
df.printSchema()

root
 |-- ad_creation_time: string (nullable = true)
 |-- ad_creative_bodies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_creative_body: string (nullable = true)
 |-- ad_creative_link_caption: string (nullable = true)
 |-- ad_creative_link_captions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_creative_link_description: string (nullable = true)
 |-- ad_creative_link_descriptions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_creative_link_title: string (nullable = true)
 |-- ad_creative_link_titles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_delivery_start_time: string (nullable = true)
 |-- ad_delivery_stop_time: string (nullable = true)
 |-- ad_snapshot_url: string (nullable = true)
 |-- bylines: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- delivery_by_region: array (nullable = true)
 |    |-- element: struct (containsN

## Pre-processing

We first filter the data to be Feburary to May in 2022 which was the period before and during Australia Federal Election 2022. Then, we select useful columns for analysis. Null values and duplicates removal are performed. 

In [7]:
#Function for calculating mid value
def calculate_mid_value_mean(col):
    lower_bound = col.lower_bound.cast("int")
    upper_bound = col.upper_bound.cast("int")
    return (lower_bound + upper_bound) / 2

#### Select desirable columns and drop null values

In [4]:
df_dt = df.withColumn("ad_delivery_start_time", df["ad_delivery_start_time"].cast("date"))

# Filter data for Feb-May 2022 (period before the election)
start_date = "2022-02-01"
end_date = "2022-05-31"
filtered_data = df_dt.filter((df_dt["ad_delivery_start_time"] >= start_date) 
                             & (df_dt["ad_delivery_start_time"] <= end_date))

In [218]:
filtered_data.printSchema()

root
 |-- ad_creation_time: string (nullable = true)
 |-- ad_creative_bodies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_creative_body: string (nullable = true)
 |-- ad_creative_link_caption: string (nullable = true)
 |-- ad_creative_link_captions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_creative_link_description: string (nullable = true)
 |-- ad_creative_link_descriptions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_creative_link_title: string (nullable = true)
 |-- ad_creative_link_titles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ad_delivery_start_time: date (nullable = true)
 |-- ad_delivery_stop_time: string (nullable = true)
 |-- ad_snapshot_url: string (nullable = true)
 |-- bylines: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- delivery_by_region: array (nullable = true)
 |    |-- element: struct (containsNul

In [5]:
df1 = filtered_data.select("id", "ad_creative_body", "funding_entity","ad_delivery_start_time", "spend")\
        .filter(filtered_data['ad_creative_body'].isNotNull()).filter(filtered_data["funding_entity"].isNotNull())

In [174]:
df1.count()

                                                                                

2334585

In [9]:
#Calculate mid value of spending range
df1_mid = df1.withColumn("spend_midpoint", calculate_mid_value_mean(df_cor["spend"]))

#### Drop duplicates

In [10]:
#Drop duplicated ads
df_clean = df1_mid.dropDuplicates()

In [188]:
df_clean.count()

                                                                                

174445

In [196]:
# Show result after removing duplicates for a particular ad
df_clean.filter(col("id") == '404432794860058').show()



+---------------+--------------------+--------------------+----------------------+----------+
|             id|    ad_creative_body|      funding_entity|ad_delivery_start_time|     spend|
+---------------+--------------------+--------------------+----------------------+----------+
|404432794860058|🚨 EXCLUSIVE 🚨 \...|Australian Labor ...|            2022-04-28|   {0, 99}|
|404432794860058|🚨 EXCLUSIVE 🚨 \...|Australian Labor ...|            2022-04-28|{100, 199}|
|404432794860058|🚨 EXCLUSIVE 🚨 \...|Australian Labor ...|            2022-04-29|   {0, 99}|
|404432794860058|🚨 EXCLUSIVE 🚨 \...|Australian Labor ...|            2022-04-28|{200, 299}|
+---------------+--------------------+--------------------+----------------------+----------+



                                                                                

In [189]:
df_clean.show(10)



+----------------+--------------------+--------------------+----------------------+----------+
|              id|    ad_creative_body|      funding_entity|ad_delivery_start_time|     spend|
+----------------+--------------------+--------------------+----------------------+----------+
| 373145904782145|Visit the Climate...| The Climate Council|            2022-05-18|   {0, 99}|
| 414786736881564|The NDIS has been...| James Oliver Gibson|            2022-05-18|   {0, 99}|
|1307461829780910|Federal governmen...| James Oliver Gibson|            2022-05-18|   {0, 99}|
| 543858233848769|You may know me a...|       Jerome Laxale|            2022-05-18|   {0, 99}|
| 453902239835432|Floods, vaccines,...|    Queensland Labor|            2022-05-17|   {0, 99}|
|1758261167842097|It’s time we put ...|Senator Mehreen F...|            2022-05-17|   {0, 99}|
| 340436141530462|Scott Morrison's ...|Australian Educat...|            2022-05-17|   {0, 99}|
|5748870808461853|Last time so-call...|Senator Mat

                                                                                

## 1. Volume of Ads

We observe a skyrocket increase in ads volume when the federal election date approaches. 

In [191]:
# Group by month and count the number of ads
ads_per_month = df_clean.groupBy(month("ad_delivery_start_time").alias("month")).count().orderBy("month")

# Plot graph using other tools
ads_per_month_pd = ads_per_month.toPandas()
ads_per_month_pd.to_csv('ads.csv', encoding='utf-8', index=False)

                                                                                

## 2. Stopword Removal and Word Frequency

In this part, we are interested about the content of the sponsored posts across the federal election period. We will remove stop words and create word cloud for visualization.

In [12]:
# Find word freqeuncy 
lowercase_df = df_clean.select(lower(col("ad_creative_body")).alias("ad_creative_body"))

# Tokenize the text
tokenized_df = lowercase_df.withColumn("words", split(lowercase_df["ad_creative_body"], "\\s+"))
tokenized_df = tokenized_df.select(explode("words").alias("word"))

# Group by the word column and count occurrences
word_counts = tokenized_df.groupBy("word").count()
word_counts = word_counts.filter((col("word") != "") & (col("word") != "-"))

In [16]:
# Sort the word frequencies in descending order
sorted_wc = word_counts.orderBy("count", ascending=False)

In [17]:
# Export CSV file for viewing all words
sorted_wc = sorted_wc.toPandas()
sorted_wc.to_csv('all_words.csv', encoding='utf-8', index=False)

                                                                                

#### Stop word removal and select top 100 words

In [198]:
from pyspark.ml.feature import StopWordsRemover

# Load default stop words for English
stop_words = StopWordsRemover.loadDefaultStopWords("english")

# Filter out stop words
filtered_word_counts = word_counts.filter(~col("word").isin(stop_words))

# Sort the word frequencies in descending order
sorted_word_counts = filtered_word_counts.orderBy("count", ascending=False)

# Select the top 100 words
top_100_words = sorted_word_counts.limit(100)

In [199]:
top_100_words.show()



+----------+------+
|      word| count|
+----------+------+
|government|263113|
|      help|253264|
|      make|229714|
|      need|228298|
|   support|222382|
|    people|217059|
|   climate|215630|
|     labor|199297|
|      vote|198007|
|     local|193480|
|      sign|181036|
|       get|178485|
|australian|176257|
|      time|163952|
| australia|161882|
|      like|157178|
|   federal|154833|
|       new|150654|
| community|149056|
|        us|141102|
+----------+------+
only showing top 20 rows



                                                                                

In [122]:
# Export CSV file for making a word cloud
top_words = top_100_words.toPandas()
top_words.to_csv('words.csv', encoding='utf-8', index=False)

                                                                                

## 3. Select the highest spending fund entities and look for any significant shifts or trends in spending as the election date approaches:

### 3.1 Top 10 highest spending fund entities 

In [83]:
#Total number of funding entities
num_fundingentity = df_clean.select("funding_entity").distinct().count()
num_fundingentity 

                                                                                

1740

In [24]:
# Group by funding entity and month, summing the spending amount
spending_trends = df_clean.groupBy("funding_entity", month("ad_delivery_start_time").alias("month")) \
    .agg(sum("spend_midpoint").alias("total_spend")) \
    .orderBy("funding_entity", "month")
spending_trends.show()



+--------------------+-----+-----------+
|      funding_entity|month|total_spend|
+--------------------+-----+-----------+
| Australian Labor...|    5|     1741.5|
| Liberal Party of...|    3|    43436.5|
| Liberal Party of...|    4|    19237.5|
| Liberal Party of...|    5|    28321.5|
| اليونيسف باستخدا...|    4|      199.0|
|100% Renewable Co...|    2|     1242.5|
|100% Renewable Co...|    3|     2635.5|
|100% Renewable Co...|    4|     6969.0|
|100% Renewable Co...|    5|    26357.0|
|  1in50 Incorporated|    2|       49.5|
|  1in50 Incorporated|    3|     3077.0|
|  1in50 Incorporated|    4|    30300.0|
|  1in50 Incorporated|    5|     7709.5|
|        3 Steps Away|    4|       49.5|
|   350.org Australia|    2|      398.0|
|A Modern Gay's Guide|    5|      798.0|
|                AANT|    3|       99.0|
|                AANT|    4|      594.0|
|                AANT|    5|       99.0|
|         ABC Friends|    2|    11212.0|
+--------------------+-----+-----------+
only showing top

                                                                                

In [25]:
# Select top N funding entities with the highest total spending
N = 10
top_entities = spending_trends \
    .groupBy("funding_entity") \
    .sum("total_spend") \
    .orderBy("sum(total_spend)", ascending=False) \
    .limit(N)
top_entities.show(truncate=False)

                                                                                

+-----------------------------------------------+----------------+
|funding_entity                                 |sum(total_spend)|
+-----------------------------------------------+----------------+
|Australian Labor Party                         |1.26940215E7    |
|United Australia Party                         |7406032.5       |
|Australian Electoral Commission                |4619260.5       |
|Liberal Party of Australia                     |2287917.5       |
|Solutions for Australia                        |2110110.5       |
|Liberal Party of Australia (Victorian Division)|1762441.5       |
|Liberal National Party of Queensland           |1692410.5       |
|Amnesty International Australia                |1657240.5       |
|Greenpeace Australia Pacific                   |1650746.0       |
|Climate 200                                    |1565760.5       |
+-----------------------------------------------+----------------+



In [None]:
# Join the top N entities with the spending trends data
top_spending_trends = spending_trends.join(top_entities, "funding_entity", "inner")

In [27]:
top_spending_trends.show(truncate=False)

                                                                                

+-----------------------------------------------+-----+-----------+----------------+
|funding_entity                                 |month|total_spend|sum(total_spend)|
+-----------------------------------------------+-----+-----------+----------------+
|Liberal Party of Australia                     |4    |568446.0   |2287917.5       |
|Solutions for Australia                        |5    |1673412.0  |2110110.5       |
|Climate 200                                    |5    |305858.0   |1565760.5       |
|Liberal Party of Australia (Victorian Division)|5    |1142146.5  |1762441.5       |
|Greenpeace Australia Pacific                   |2    |336441.0   |1650746.0       |
|Amnesty International Australia                |2    |435574.0   |1657240.5       |
|United Australia Party                         |4    |2839583.5  |7406032.5       |
|Australian Electoral Commission                |4    |2650272.5  |4619260.5       |
|United Australia Party                         |2    |1197.0    

In [28]:
# Save as csv for graph plotting
spending_trends_pd = top_spending_trends.toPandas()
spending_trends_pd.to_csv('spend_top.csv', encoding='utf-8', index=False)

                                                                                

### 3.2 Major winning parties' spending patterns

We have explored the top spending entities previously and realized that not all political parties put vast amount of money on ads. We are going to explore winning parties' spending patterns.

In [29]:
# Filter entities containing keywords
top_entities_with_keywords = spending_trends.filter(
    (col("funding_entity").like("%Labor%")) |
    (col("funding_entity").like("%Liberal%")) |
    (col("funding_entity").like("%Greens%")) |
    (col("funding_entity").like("%National%")) |
    (col("funding_entity").like("%Centre Alliance%")) | 
    (col("funding_entity").like("%Independent%")) |
    (col("funding_entity").like("%Katter's%"))
)

In [208]:
top_entities_with_keywords.show(truncate=False)



+-------------------------------------------------------+-----+-----------+
|funding_entity                                         |month|total_spend|
+-------------------------------------------------------+-----+-----------+
| Australian Labor Party (NSW Branch)                   |5    |900.0      |
| Liberal Party of Australia Tas Division               |3    |39600.0    |
| Liberal Party of Australia Tas Division               |4    |17000.0    |
| Liberal Party of Australia Tas Division               |5    |24300.0    |
|ACT Greens                                             |2    |5300.0     |
|ACT Greens                                             |3    |2700.0     |
|ACT Greens                                             |4    |41200.0    |
|ACT Greens                                             |5    |9400.0     |
|ACT Labor                                              |2    |100.0      |
|ACT Labor                                              |3    |0.0        |
|ACT Labor  

                                                                                

In [30]:
# Save as csv for graph plotting
keywords_pd = top_entities_with_keywords.toPandas()
keywords_pd.to_csv('keywords.csv', encoding='utf-8', index=False)

                                                                                

Group by each entitiy and observe their total spending

In [31]:
#Group by each entity and observe their total spending
entities_total = top_entities_with_keywords.groupby("funding_entity").agg({"total_spend": "sum"}).alias("total")

In [211]:
entities_total.show()

                                                                                

+--------------------+----------------+
|      funding_entity|sum(total_spend)|
+--------------------+----------------+
|Mick Denton - Lab...|         71300.0|
|   NSW Liberal Party|          9400.0|
|Daniel Hulme, Lab...|          1000.0|
|Russell Robertson...|         15200.0|
|NSW Labor - Warri...|           300.0|
|The National Part...|         68600.0|
|University of Tas...|             0.0|
|           NT Greens|          2100.0|
|National Secular ...|           100.0|
|National Disabili...|         25600.0|
|LNP - Liberal Nat...|          8200.0|
|Independent Cowpe...|         13400.0|
|Parramatta & The ...|             0.0|
|Independent Educa...|           600.0|
|Liberal National ...|       1518000.0|
|Liberal Party of ...|       1917000.0|
|  NSW National Party|          1600.0|
|          ACT Greens|         58600.0|
|National Farmers ...|         26300.0|
|Liberal Democrats WA|          2700.0|
+--------------------+----------------+
only showing top 20 rows



In [32]:
# Save as csv for graph plotting
keyword_grp_pd = entities_total.toPandas()
keyword_grp_pd.to_csv('keywords_grp.csv', encoding='utf-8', index=False)

                                                                                