In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, year, month, count, col, sum, lower, explode, date_add, date_sub

spark = SparkSession.builder.appName('s4692034-project').getOrCreate()
spark

23/05/18 05:23:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/05/18 05:24:30 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


In [13]:
# Define a function to generate file paths for a given month
def generate_file_paths(year, start_month, end_month):
    file_paths = []
    for month in range(start_month, end_month + 1):
        file_paths.append(f'/data/ProjectDatasetFacebook/FBads-US-{year}{month}' + "*")
    return file_paths

# Generate file paths for the desired months
file_paths = generate_file_paths(2021, 3, 8)

# Define the columns you're interested in
cols = ["id", "ad_creative_body", "demographic_distribution", "region_distribution", "spend", "impressions", "funding_entity", "ad_delivery_start_time"]

# Read each file into a DataFrame and concatenate them into one
dfs = [spark.read.json(path).select(*cols) for path in file_paths]
df = dfs[0]
for other_df in dfs[1:]:
    df = df.union(other_df)

                                                                                

In [None]:
path = "/user/s4692034/projectData/df_all"
df.write.mode('overwrite').json(path)

In [14]:
# Convert ad_creative_body to lowercase and add as a new column
df = df.withColumn("ad_creative_body_lower", lower(col("ad_creative_body")))

# Filter rows where ad_creative_body_lower contains the word 'gun'
df_gun = df.filter(col("ad_creative_body_lower").rlike("\\bgun\\b"))

# Remove the temporary column
df_gun = df_gun.drop("ad_creative_body_lower")

# Drop missing values
df_gun = df_gun.na.drop(subset=cols)

# Remove duplicates
df_gun = df_gun.dropDuplicates()

# Extract lower_bound and upper_bound from spend and impressions
df_gun = df_gun.withColumn("spend_lower_bound", df_gun["spend.lower_bound"])\
               .withColumn("spend_upper_bound", df_gun["spend.upper_bound"])\
               .withColumn("impressions_lower_bound", df_gun["impressions.lower_bound"])\
               .withColumn("impressions_upper_bound", df_gun["impressions.upper_bound"])

# Calculate the average spend and impressions
df_gun = df_gun.withColumn("average_spend", (df_gun["spend_lower_bound"] + df_gun["spend_upper_bound"]) / 2)
df_gun = df_gun.withColumn("average_impressions", (df_gun["impressions_lower_bound"] + df_gun["impressions_upper_bound"]) / 2)

# Convert the 'ad_delivery_start_time' to datetime
df_gun = df_gun.withColumn("ad_delivery_start_time", to_date(col("ad_delivery_start_time"), "yyyy-MM-dd"))
df_gun = df_gun.withColumn("Month", month(col("ad_delivery_start_time")))
df_gun = df_gun.withColumn("Year", year(col("ad_delivery_start_time")))

print(df_gun.count())
print(df_gun.columns)
# df_gun.show(10)



233044
['id', 'ad_creative_body', 'demographic_distribution', 'region_distribution', 'spend', 'impressions', 'funding_entity', 'ad_delivery_start_time', 'spend_lower_bound', 'spend_upper_bound', 'impressions_lower_bound', 'impressions_upper_bound', 'average_spend', 'average_impressions', 'Month', 'Year']


                                                                                

In [15]:
path = "/user/s4692034/projectData/df_gun"
df_gun.write.mode('overwrite').json(path)

                                                                                

In [33]:
path = "/user/s4692034/projectData/df_gun"
df_gun = spark.read.json(path)
print(df_gun.columns)



['Month', 'Year', 'ad_creative_body', 'ad_delivery_start_time', 'average_impressions', 'average_spend', 'demographic_distribution', 'funding_entity', 'id', 'impressions', 'impressions_lower_bound', 'impressions_upper_bound', 'region_distribution', 'spend', 'spend_lower_bound', 'spend_upper_bound']


                                                                                

### 查看funding entity和spend，看哪些entity花的钱和投放的数量最多

In [17]:
# Group by funding entity
df_funding = df_gun.groupBy("funding_entity")\
                   .agg(count("id").alias("# of Ads"),
                        sum("average_spend").alias("Total Spend"))

# Order by number of ads and total spend
df_funding = df_funding.orderBy(["# of Ads", "Total Spend"], ascending=False)

df_funding.show()



+--------------------+--------+-------------+
|      funding_entity|# of Ads|  Total Spend|
+--------------------+--------+-------------+
|  Guard Our Freedoms|   44656|  2.9045672E7|
|Everytown for Gun...|   21441|    2914379.5|
|SECOND AMENDMENT ...|   19587|    2298356.5|
|  Sandy Hook Promise|   11551|1.842313745E8|
|     Texas LawShield|   10097|    3881551.5|
|     Liberty Brands |    6598|     451001.0|
|AMNESTY INTERNATI...|    5686|     350957.0|
|HERITAGE FOUNDATI...|    5655|     703422.5|
|     Internet Binges|    5294|    1201003.0|
|Texas Public Poli...|    4266|    1015867.0|
|Everytown for Gun...|    3470|     380265.0|
|  GunAssociation.org|    3270|    6168265.0|
|MOVEON.ORG CIVIC ...|    3248|     165476.0|
|           Brady Pac|    3220|    1251490.0|
|  Alvin Bragg for DA|    2905|     997297.5|
|Lucas Kunce for M...|    2874|     283563.0|
|Brady Campaign to...|    2752|    2065574.0|
|                DSCC|    2528|     392836.0|
|Gun Violence Prev...|    2442|   

                                                                                

In [7]:
path = "/user/s4692034/projectData/df_funding"
df_funding.write.mode('overwrite').json(path)
df_funding.coalesce(1).write.mode('overwrite').csv("/user/s4692034/project/funding_entity", header=True)

                                                                                

### 引入GVA和LGS数据集

In [18]:
# Load Gun Violence data
df_violence = spark.read.csv('/user/s4692034/external-dataset/GVA_2021.3.1-8.31.csv', header=True)

# Convert the 'Incident Date' to datetime
df_violence = df_violence.withColumn('Incident Date', to_date(col("Incident Date"), "d-MMM-yy"))

# Group by month and count the number of incidents
df_violence = df_violence.withColumn("Month", month(col("Incident Date")))
df_violence = df_violence.withColumn("Year", year(col("Incident Date")))
df_violence_grouped = df_violence.groupBy("Month", "Year").agg(count("Incident ID").alias("# of Violence Incidents")).sort("Month", "Year")
df_violence_grouped = df_violence_grouped.filter(col("Year") == 2021)

df_violence_grouped.show()

# Load Legislation data
df_legislation = spark.read.csv('/user/s4692034/external-dataset/LGIS_2021.3.1-8.31.csv', header=True)

# Convert the 'Introduced Date' to datetime
df_legislation = df_legislation.withColumn("Introduced Date", to_date(col("Introduced Date"), "MM/dd/yyyy"))

# Group by month and count the number of bills
df_legislation = df_legislation.withColumn("Month", month(col("Introduced Date")))
df_legislation = df_legislation.withColumn("Year", year(col("Introduced Date")))
df_legislation_grouped = df_legislation.groupBy("Month", "Year").agg(count("Name").alias("# of Legislation Introduced")).sort("Month", "Year")
df_legislation_grouped = df_legislation_grouped.filter(col("Year") == 2021)

df_legislation_grouped.show()

                                                                                

+-----+----+-----------------------+
|Month|Year|# of Violence Incidents|
+-----+----+-----------------------+
|    3|2021|                   4522|
|    4|2021|                   4995|
|    5|2021|                   5859|
|    6|2021|                   5383|
|    7|2021|                   5421|
|    8|2021|                   4768|
+-----+----+-----------------------+

+-----+----+---------------------------+
|Month|Year|# of Legislation Introduced|
+-----+----+---------------------------+
|    3|2021|                         26|
|    4|2021|                          5|
|    5|2021|                         15|
|    6|2021|                          4|
|    7|2021|                          2|
|    8|2021|                          1|
+-----+----+---------------------------+



In [19]:
path = "/user/s4692034/projectData/df_violence"
df_violence.write.mode('overwrite').json(path)

path = "/user/s4692034/projectData/df_legislation"
df_legislation.write.mode('overwrite').json(path)

                                                                                

In [34]:
path = "/user/s4692034/projectData/df_violence"
df_violence = spark.read.json(path)

path = "/user/s4692034/projectData/df_legislation"
df_legislation = spark.read.json(path)

                                                                                

### 每月的广告数量变化

In [20]:
df_gun_grouped = df_gun.groupBy("Month", "Year").agg(sum("average_spend").alias("Total Ad Spend"), sum("average_impressions").alias("Total Impressions"), count("*").alias("# of Ads")).filter("Year == 2021")

# Join on 'Month'
df_pattern = df_gun_grouped.join(df_violence_grouped, on=["Month", "Year"], how="outer")
df_pattern = df_pattern.join(df_legislation_grouped, on=["Month", "Year"], how="outer")
df_pattern = df_pattern.select("Month", "Year", "# of Ads", "Total Ad Spend", "Total Impressions", 
                                 "# of Violence Incidents", "# of Legislation Introduced")

df_pattern.show()

[Stage 54:>                                                         (0 + 1) / 1]

+-----+----+--------+--------------+-----------------+-----------------------+---------------------------+
|Month|Year|# of Ads|Total Ad Spend|Total Impressions|# of Violence Incidents|# of Legislation Introduced|
+-----+----+--------+--------------+-----------------+-----------------------+---------------------------+
|    3|2021|   40474|  1.94445413E8|   2.2641685255E9|                   4522|                         26|
|    4|2021|   81344|   4.1443728E7|   2.8340985075E9|                   4995|                          5|
|    5|2021|   33062|     5784569.0|     3.98469977E8|                   5859|                         15|
|    6|2021|   26599|     9533300.5|      6.1233326E8|                   5383|                          4|
|    7|2021|   31258|     5158921.0|     4.17557375E8|                   5421|                          2|
|    8|2021|   17032|     2249584.0|    1.535784955E8|                   4768|                          1|
+-----+----+--------+--------------+-

                                                                                

In [21]:
path = "/user/s4692034/projectData/df_pattern"
df_pattern.write.mode('overwrite').json(path)
df_pattern.coalesce(1).write.mode('overwrite').csv("/user/s4692034/project/pattern_analysis", header=True)

                                                                                

### 投放的(age, gender)和impression之间的关系

In [22]:
# Explode the demographic_distribution array
df_demographics = df_gun.select("id", "average_impressions", explode("demographic_distribution").alias("demographics"))

# Select useful columns and calculate the estimated impressions for each demographic group
df_demographics = df_demographics.select(
    "id",
    col("demographics.age").alias("age"),
    col("demographics.gender").alias("gender"),
    col("demographics.percentage").alias("percentage"),
    (col("average_impressions") * col("demographics.percentage")).alias("estimated_impressions"),
)

# Group by age_group and gender and sum the estimated impressions
df_demographics_grouped = df_demographics.groupBy("age", "gender").agg(sum("estimated_impressions").alias("total_estimated_impressions")).sort("total_estimated_impressions", ascending=False)

df_demographics_grouped.show()



+-------+-------+---------------------------+
|    age| gender|total_estimated_impressions|
+-------+-------+---------------------------+
|    65+|   male|       1.1743467653975523E9|
|    65+| female|       1.1014052001931577E9|
|  55-64|   male|        1.048033982143912E9|
|  45-54|   male|        7.889122136225573E8|
|  55-64| female|        7.453032969893608E8|
|  35-44|   male|        5.731779548089759E8|
|  45-54| female|       4.8938217493150616E8|
|  35-44| female|        4.075334868437011E8|
|  25-34|   male|        3.878765779307559E8|
|  25-34| female|       1.4199471842457575E8|
|  18-24|   male|        9.236846765027463E7|
|  18-24| female|        4.059424929850851E7|
|    65+|unknown|          1.9189201260421E7|
|  55-64|unknown|        1.406223875513303E7|
|  35-44|unknown|       1.0857198924901515E7|
|  45-54|unknown|       1.0574167603141513E7|
|  25-34|unknown|          4154485.140845998|
|  13-17| female|         1001823.3727760001|
|  18-24|unknown|          983205.

                                                                                

In [23]:
path = "/user/s4692034/projectData/df_demographics"
df_demographics.write.mode('overwrite').json(path)
df_demographics_grouped.coalesce(1).write.mode('overwrite').csv("/user/s4692034/project/demographics", header=True)

                                                                                

### 不同region枪击案数量和spend以及impression之间的关系

In [35]:
# Explode the region_distribution array
df_region = df_gun.select("id", "average_spend", "average_impressions", explode("region_distribution").alias("regions"))

# Select useful columns and calculate the estimated spend and impressions for each region
df_region = df_region.select(
    "id",
    col("regions.region").alias("region"),
    col("regions.percentage").alias("percentage"),
    (col("average_spend") * col("regions.percentage")).alias("estimated_spend"),
    (col("average_impressions") * col("regions.percentage")).alias("estimated_impressions"),
)
df_region = df_region.filter(col('region') != 'Unknown')

# Group by region and sum the estimated spend and impressions
df_region_grouped = df_region.groupBy("region").agg(
    sum("estimated_spend").alias("total_estimated_spend"),
    sum("estimated_impressions").alias("total_estimated_impressions")
)

# Group the already loaded Gun Violence data by State and count the number of incidents and victims
df_violence_state = df_violence.groupBy("State").agg(
    count("Incident ID").alias("# of Violence Incidents"),
    sum("# Victims Injured").alias("Total Victims Injured"),
    sum("# Victims Killed").alias("Total Victims Killed")
)

# Add Total Victims Injured and Total Victims Killed into one column named "Total Victims"
df_violence_state = df_violence_state.withColumn("Total Victims", col("Total Victims Injured") + col("Total Victims Killed")).withColumnRenamed("# of Violence Incidents", "Total Incidents")

# Drop the original columns
df_violence_state = df_violence_state.drop("Total Victims Injured", "Total Victims Killed")

# Join the ad data and violence data on region/state
df_regional_analysis = df_region_grouped.join(df_violence_state, df_region_grouped.region == df_violence_state.State, how="inner")
df_regional_analysis = df_regional_analysis.select("region", "total_estimated_spend", "total_estimated_impressions", "Total Victims", "Total Incidents")

df_regional_analysis.show()

[Stage 9:>                                                          (0 + 1) / 1]

+-------------+---------------------+---------------------------+-------------+---------------+
|       region|total_estimated_spend|total_estimated_impressions|Total Victims|Total Incidents|
+-------------+---------------------+---------------------------+-------------+---------------+
|         Utah|   1861063.2352315024|         3.99431801636824E7|        108.0|            136|
|       Hawaii|    871732.1032114992|       1.2432196615111511E7|         28.0|             43|
|    Minnesota|    5415362.082349495|        8.750821717799345E7|        502.0|            532|
|         Ohio|    8467240.674211973|       2.2684867973473936E8|       1404.0|           1331|
|       Oregon|    6157665.196865985|        1.807008319865407E8|        207.0|            235|
|     Arkansas|   1368978.8406545008|        4.042360395457002E7|        289.0|            305|
|        Texas| 1.4775643630750516E7|        4.651407167241722E8|       2284.0|           2328|
| North Dakota|    736318.9665129997|   

                                                                                

In [37]:
# Calculate the correlation between ad spend and gun violence
spend_correlation = df_regional_analysis.stat.corr("total_estimated_spend", "Total Incidents")
impression_correlation = df_regional_analysis.stat.corr("total_estimated_impressions", "Total Incidents")

print("Correlation between ad spend and gun violence incident: ", spend_correlation)
print("Correlation between ad impressions and gun violence incident: ", impression_correlation)

[Stage 33:>                                                         (0 + 1) / 1]

Correlation between ad spend and gun violence incident:  0.7400096261351077
Correlation between ad impressions and gun violence incident:  0.7068419982504235


                                                                                

In [38]:
path = "/user/s4692034/projectData/df_region"
df_region.write.mode('overwrite').json(path)
df_regional_analysis.coalesce(1).write.mode('overwrite').csv("/user/s4692034/project/regional", header=True)

                                                                                

### Ads spend efficiency

In [41]:
# Add a new column for cost per impression (average_spend / average_impressions)
df_efficiency = df_gun.withColumn("cost_per_impression", col("average_spend") / col("average_impressions"))

# Explode the region_distribution array into separate rows
df_efficiency = df_efficiency.select("*", explode("region_distribution").alias("region_info"))

# Add a column for the region
df_efficiency = df_efficiency.withColumn("region", df_efficiency["region_info.region"])

# Add a column for the impressions by region (percentage * total impressions)
df_efficiency = df_efficiency.withColumn("impressions_by_region", df_efficiency["region_info.percentage"] * df_efficiency["average_impressions"])

# Add a column for the spend by region (percentage * total spend)
df_efficiency = df_efficiency.withColumn("spend_by_region", df_efficiency["region_info.percentage"] * df_efficiency["average_spend"])

# Group by region and calculate average cost per impression (total spend by region / total impressions by region)
df_efficiency = df_efficiency.groupBy("region").agg(
    (sum("spend_by_region") / sum("impressions_by_region")).alias("average_cost_per_impression")
)

# Order by average cost per impression
df_efficiency = df_efficiency.orderBy("average_cost_per_impression", ascending=False)

df_efficiency.show()
df_efficiency.count()

                                                                                

+-----------------+---------------------------+
|           region|average_cost_per_impression|
+-----------------+---------------------------+
|           Yangon|                       null|
|          Wyoming|       0.012783641402250927|
|             Iowa|       0.015126975091407561|
|     South Dakota|       0.016640458918345705|
|     North Dakota|       0.017631079430519322|
|            Lares|       0.017802436571237958|
|         Nebraska|       0.018315536324074934|
|        Tennessee|       0.019064071630511773|
|            Magwe|       0.019543831370835422|
|        Louisiana|        0.02022664243961013|
|            Idaho|       0.020534694862430973|
|          Montana|       0.020552709895890718|
|    West Virginia|        0.02055682487485137|
|      Mississippi|       0.020767957272832503|
|         Missouri|       0.020960213165913948|
|        Chihuahua|       0.021164304325576373|
|          Guánica|         0.0213327837742225|
|          Indiana|       0.021768882720

                                                                                

134

In [47]:
# Define a list of valid US states
us_states = ["Alabama","Alaska","Arizona","Arkansas","California","Colorado",
  "Connecticut","Delaware","Florida","Georgia","Hawaii","Idaho","Illinois",
  "Indiana","Iowa","Kansas","Kentucky","Louisiana","Maine","Maryland",
  "Massachusetts","Michigan","Minnesota","Mississippi","Missouri","Montana",
  "Nebraska","Nevada","New Hampshire","New Jersey","New Mexico","New York",
  "North Carolina","North Dakota","Ohio","Oklahoma","Oregon","Pennsylvania",
  "Rhode Island","South Carolina","South Dakota","Tennessee","Texas","Utah",
  "Vermont","Virginia","Washington","West Virginia","Wisconsin","Wyoming"]

# Filter df_efficiency to include only the valid US states
df_efficiency = df_efficiency.filter(df_efficiency.region.isin(us_states))

df_efficiency.show()
df_efficiency.count()

                                                                                

+-------------+---------------------------+
|       region|average_cost_per_impression|
+-------------+---------------------------+
|      Wyoming|       0.012783641402250925|
|         Iowa|        0.01512697509140756|
| South Dakota|       0.016640458918345702|
| North Dakota|        0.01763107943051932|
|     Nebraska|        0.01831553632407493|
|    Tennessee|       0.019064071630511773|
|    Louisiana|        0.02022664243961013|
|        Idaho|       0.020534694862430966|
|      Montana|        0.02055270989589071|
|West Virginia|        0.02055682487485137|
|  Mississippi|         0.0207679572728325|
|     Missouri|       0.020960213165913955|
|      Indiana|       0.021768882720753724|
|      Alabama|        0.02316334956823037|
|       Kansas|       0.025361963873076064|
|        Maine|        0.02574616227526578|
| Pennsylvania|       0.026638311788435794|
|        Texas|        0.03176596479192436|
|     Arkansas|         0.0338658285439622|
|       Oregon|        0.0340765

                                                                                

50

In [48]:
path = "/user/s4692034/projectData/df_efficiency"
df_efficiency.write.mode('overwrite').json(path)
df_efficiency.coalesce(1).write.mode('overwrite').csv("/user/s4692034/project/efficiency", header=True)

                                                                                

### 法案颁布的前后，广告数量的变化

In [29]:
# Adding columns with the dates for three days before and after the introduced date
df_legislation = df_legislation.withColumn("Start Window", date_sub(col("Introduced Date"), 3))
df_legislation = df_legislation.withColumn("End Window", date_add(col("Introduced Date"), 3))

# Joining df_gun and df_legislation on the condition that the ad_delivery_start_time falls within the start and end window
df_join = df_gun.join(df_legislation, 
                      (df_gun["ad_delivery_start_time"] >= df_legislation["Start Window"]) & 
                      (df_gun["ad_delivery_start_time"] <= df_legislation["End Window"]))

# Group by Name of the legislation and aggregate on the necessary columns
df_legislation_period = df_join.groupBy(df_legislation["Name"]).agg(
                    count(df_gun["id"]).alias("Number of Ads"),
                    sum(df_gun["average_spend"]).alias("Total Spend"),
                    sum(df_gun["average_impressions"]).alias("Total Impressions")).sort("Number of Ads", ascending=False)

df_legislation_period.show()



+---------+-------------+-------------+-----------------+
|     Name|Number of Ads|  Total Spend|Total Impressions|
+---------+-------------+-------------+-----------------+
|   S.1155|        52566|  3.0673167E7|    2.227229855E9|
|   S.1101|        51308|  3.0553496E7|   2.1764275065E9|
|H.Res.275|        14301| 4.22759495E7|    5.046142505E8|
|S.Res.140|        14001| 4.25398995E7|    5.167314005E8|
|    S.974|        14001| 4.25398995E7|    5.167314005E8|
|    S.591|        12383|1.461588085E8|    1.270081148E9|
| H.R.1576|        12204| 1.46088948E8|   1.2461762375E9|
| H.R.1534|        12204| 1.46088948E8|   1.2461762375E9|
| H.R.1560|        12204| 1.46088948E8|   1.2461762375E9|
|    S.529|        11882| 1.46033009E8|   1.2374053985E9|
|    S.527|        11882| 1.46033009E8|   1.2374053985E9|
| H.R.1494|        11882| 1.46033009E8|   1.2374053985E9|
|    S.525|        11882| 1.46033009E8|   1.2374053985E9|
|   S.1775|        11417|     959541.5|     4.80552915E7|
| H.R.3432|   

                                                                                

In [30]:
path = "/user/s4692034/projectData/df_legislation_period"
df_legislation_period.write.mode('overwrite').json(path)
df_legislation_period.coalesce(1).write.mode('overwrite').csv("/user/s4692034/project/legislation_period", header=True)

                                                                                

In [31]:
spark.stop()