## Create a spark session

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ReadParquetExample") \
    .getOrCreate()


## Read the data

In [5]:
df = spark.read.parquet("Merge-Oct23-March24/")

## Inspect both dataset

In [6]:
df.printSchema()


root
 |-- Cameo: string (nullable = true)
 |-- Cameo_full: string (nullable = true)
 |-- GLOBALEVENTID: string (nullable = true)
 |-- SQLDATE: string (nullable = true)
 |-- Actor1Name: string (nullable = true)
 |-- Actor1Geo_FullName: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2Geo_FullName: string (nullable = true)
 |-- IsRootEvent: string (nullable = true)
 |-- EventCode: string (nullable = true)
 |-- EventBaseCode: string (nullable = true)
 |-- QuadClass: string (nullable = true)
 |-- GoldsteinScale: string (nullable = true)
 |-- NumMentions: string (nullable = true)
 |-- NumSources: string (nullable = true)
 |-- NumArticles: string (nullable = true)
 |-- AvgTone: string (nullable = true)
 |-- ActionGeo_Type: string (nullable = true)
 |-- ActionGeo_FullName: string (nullable = true)
 |-- ActionGeo_CountryCode: string (nullable = true)
 |-- SOURCEURL: string (nullable = true)



## Inspect the dataset.

## How many rows are there with any column with null value

In [8]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, sum, when

# Build a combined predicate: any column IS NULL
any_null = None
for c in df.columns:
    is_null_c = col(c).isNull()
    any_null = is_null_c if any_null is None else any_null | is_null_c

# Count rows matching it
rows_with_nulls = df.filter(any_null).count()
print(f"Rows with at least one null: {rows_with_nulls}")



Rows with at least one null: 212


In [10]:
df.filter(any_null).limit(2).show(truncate=False, vertical=True)


-RECORD 0----------------------------------------------------------------------------------------------------
 Cameo                 | DEMAND                                                                              
 Cameo_full            | NULL                                                                                
 GLOBALEVENTID         | 1134589736                                                                          
 SQLDATE               | 20231019                                                                            
 Actor1Name            | SAUDI                                                                               
 Actor1Geo_FullName    | Riyadh, Ar Riya?, Saudi Arabia                                                      
 Actor2Name            | ISRAEL                                                                              
 Actor2Geo_FullName    | Riyadh, Ar Riya?, Saudi Arabia                                                      
 IsRootEve

## How many rows and columns do we have in our dataset?

In [11]:
# 1) Number of rows
row_count = df.count()
print(f"Total rows: {row_count}")

# 2) Number of columns
num_columns = len(df.columns)
print(f"Total columns: {num_columns}")


Total rows: 3561483
Total columns: 21


## Let's look at the quick summary of our dataset. 

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, desc

cols = df.columns

summary_rows = []
for c in cols:
    non_null = df.filter(col(c).isNotNull()).count()
    distinct = df.select(countDistinct(c)).first()[0]
    top = (
        df.groupBy(c)
          .count()
          .orderBy(desc("count"))
          .limit(1)
          .collect()
    )
    if top:
        top_val, top_freq = top[0][c], top[0]["count"]
        # **coerce to string** so every top_val is a Python str
        top_val = str(top_val)
    else:
        top_val, top_freq = None, 0

    summary_rows.append((c, non_null, distinct, top_val, top_freq))

# Now Spark can infer a consistent schema: all top_val fields are strings
summary_df = spark.createDataFrame(
    summary_rows,
    schema=["column", "non_null", "distinct", "top_value", "top_freq"]
)
summary_df.show(21, truncate=False)


+---------------------+--------+--------+----------------------------------------------------+--------+
|column               |non_null|distinct|top_value                                           |top_freq|
+---------------------+--------+--------+----------------------------------------------------+--------+
|Cameo                |3561483 |20      |CONSULT                                             |803752  |
|Cameo_full           |3561271 |236     |Use conventional military force, not specified below|295595  |
|GLOBALEVENTID        |3561483 |3561483 |1134100602                                          |1       |
|SQLDATE              |3561483 |442     |20231018                                            |63828   |
|Actor1Name           |3561483 |3958    |ISRAEL                                              |638346  |
|Actor1Geo_FullName   |3561483 |14654   |Gaza, Israel (general), Israel                      |1096483 |
|Actor2Name           |3561483 |3679    |ISRAEL                 

## Let's clean the dataset. I removed all the rows that contains at least one null value.

In [13]:
df_clean = df.dropna()
print("Rows after :", df_clean.count())

Rows after : 3561271


## Re-examin the data to see if there are any null values remaining.

From the data below we can see that all the columns have same number of non null values. By comparing this statistics with the previous statistics we can say that the data has been cleaned properly. 

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, desc

cols = df_clean.columns

summary_rows = []
for c in cols:
    non_null = df_clean.filter(col(c).isNotNull()).count()
    distinct = df_clean.select(countDistinct(c)).first()[0]
    top = (
        df.groupBy(c)
          .count()
          .orderBy(desc("count"))
          .limit(1)
          .collect()
    )
    if top:
        top_val, top_freq = top[0][c], top[0]["count"]
        # **coerce to string** so every top_val is a Python str
        top_val = str(top_val)
    else:
        top_val, top_freq = None, 0

    summary_rows.append((c, non_null, distinct, top_val, top_freq))

# Now Spark can infer a consistent schema: all top_val fields are strings
summary_df = spark.createDataFrame(
    summary_rows,
    schema=["column", "non_null", "distinct", "top_value", "top_freq"]
)
summary_df.show(21, truncate=False)


+---------------------+--------+--------+----------------------------------------------------+--------+
|column               |non_null|distinct|top_value                                           |top_freq|
+---------------------+--------+--------+----------------------------------------------------+--------+
|Cameo                |3561271 |20      |CONSULT                                             |803752  |
|Cameo_full           |3561271 |236     |Use conventional military force, not specified below|295595  |
|GLOBALEVENTID        |3561271 |3561271 |1134100602                                          |1       |
|SQLDATE              |3561271 |442     |20231018                                            |63828   |
|Actor1Name           |3561271 |3958    |ISRAEL                                              |638346  |
|Actor1Geo_FullName   |3561271 |14654   |Gaza, Israel (general), Israel                      |1096483 |
|Actor2Name           |3561271 |3679    |ISRAEL                 

In [None]:
## Let's extract the domain name from 'SOURCEURL' and add a column 'domain_name'

In [18]:
from pyspark.sql.functions import regexp_extract, regexp_replace, col

# 1. grab everything between “http(s)://” and the next “/”
# 2. strip off a leading “www.” if you prefer
df_clean_with_domain_name = df_clean.withColumn(
    "domain_name",
    regexp_replace(
      regexp_extract(col("SOURCEURL"), r"https?://([^/]+)", 1),
      r"^www\.", ""
    )
)


## Inspect the schema again

In [20]:
df_clean_with_domain_name.printSchema()

root
 |-- Cameo: string (nullable = true)
 |-- Cameo_full: string (nullable = true)
 |-- GLOBALEVENTID: string (nullable = true)
 |-- SQLDATE: string (nullable = true)
 |-- Actor1Name: string (nullable = true)
 |-- Actor1Geo_FullName: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2Geo_FullName: string (nullable = true)
 |-- IsRootEvent: string (nullable = true)
 |-- EventCode: string (nullable = true)
 |-- EventBaseCode: string (nullable = true)
 |-- QuadClass: string (nullable = true)
 |-- GoldsteinScale: string (nullable = true)
 |-- NumMentions: string (nullable = true)
 |-- NumSources: string (nullable = true)
 |-- NumArticles: string (nullable = true)
 |-- AvgTone: string (nullable = true)
 |-- ActionGeo_Type: string (nullable = true)
 |-- ActionGeo_FullName: string (nullable = true)
 |-- ActionGeo_CountryCode: string (nullable = true)
 |-- SOURCEURL: string (nullable = true)
 |-- domain_name: string (nullable = true)



In [None]:
## Inspect the dataset again

In [22]:
df_clean_with_domain_name.show(3, truncate=False, vertical = True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------
 Cameo                 | EXPRESS INTENT TO COOPERATE                                                                                                    
 Cameo_full            | Express intent to cooperate, not specified below                                                                               
 GLOBALEVENTID         | 1134100347                                                                                                                     
 SQLDATE               | 20230918                                                                                                                       
 Actor1Name            | US GOVERNMENT                                                                                                                  
 Actor1Geo_FullName    | Israel                                                   

## Inspect a few more domain colum data

In [19]:
# How many distinct domains?
df_clean_with_domain_name.select("domain_name").distinct().count()

# See the top 20 by frequency:
from pyspark.sql.functions import count
df_clean_with_domain_name.groupBy("domain_name") \
   .agg(count("*").alias("cnt")) \
   .orderBy("cnt", ascending=False) \
   .show(20, truncate=False)


+-----------------------+------+
|domain_name            |cnt   |
+-----------------------+------+
|news.yahoo.com         |100285|
|jpost.com              |58773 |
|globalsecurity.org     |45922 |
|dailymail.co.uk        |44125 |
|israelnationalnews.com |36360 |
|presstv.ir             |32626 |
|al-monitor.com         |31629 |
|aljazeera.com          |29051 |
|english.ahram.org.eg   |28076 |
|menafn.com             |26943 |
|clevelandjewishnews.com|26279 |
|aa.com.tr              |25703 |
|theguardian.com        |24834 |
|thenationalnews.com    |24155 |
|cnn.com                |23276 |
|middleeastmonitor.com  |21858 |
|nypost.com             |20405 |
|firstpost.com          |20014 |
|timesofisrael.com      |19898 |
|middleeasteye.net      |19525 |
+-----------------------+------+
only showing top 20 rows



## Now we will creat a new column 'news_source_country'. 'domain_name' will be mapped into corresponding source country name.

### Download the dataset for news outlet and their corresponding country.
[dataset's web page link](https://blog.gdeltproject.org/mapping-the-media-a-geographic-lookup-of-gdelts-sources/)

[dataset link](http://data.gdeltproject.org/blog/2018-news-outlets-by-country-may2018-update/MASTER-GDELTDOMAINSBYCOUNTRY-MAY2018.TXT)

## I created a new dataframe with 'news_source_country' column

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

path3 = "MASTER-GDELTDOMAINSBYCOUNTRY-MAY2018-expanded-patched.csv"

# 1. Load your domain → country lookup table
domain_lookup = (
    spark.read
         .option("header", True)      # first line contains column names
         .option("inferSchema", True) # infer types automatically
         .csv(path3)
)

# 2. Join to bring in country_name
df_enriched = (
    df_clean_with_domain_name
      .join(
        domain_lookup,
        df_clean_with_domain_name.domain_name == domain_lookup.domain,
        how="left"               # keep all rows, even if no match
      )
      # 3. Rename and/or drop columns
      .withColumnRenamed("country_name", "news_source_country")
      .drop(domain_lookup.domain)        # drop the redundant lookup-domain col
      .drop("country_code")              # if you don’t need the code itself
)

# 4. Inspect the result
df_enriched.printSchema()
# ...
df_enriched.select("SOURCEURL", "domain_name", "news_source_country") \
           .show(5, truncate=False, vertical = True)


root
 |-- Cameo: string (nullable = true)
 |-- Cameo_full: string (nullable = true)
 |-- GLOBALEVENTID: string (nullable = true)
 |-- SQLDATE: string (nullable = true)
 |-- Actor1Name: string (nullable = true)
 |-- Actor1Geo_FullName: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2Geo_FullName: string (nullable = true)
 |-- IsRootEvent: string (nullable = true)
 |-- EventCode: string (nullable = true)
 |-- EventBaseCode: string (nullable = true)
 |-- QuadClass: string (nullable = true)
 |-- GoldsteinScale: string (nullable = true)
 |-- NumMentions: string (nullable = true)
 |-- NumSources: string (nullable = true)
 |-- NumArticles: string (nullable = true)
 |-- AvgTone: string (nullable = true)
 |-- ActionGeo_Type: string (nullable = true)
 |-- ActionGeo_FullName: string (nullable = true)
 |-- ActionGeo_CountryCode: string (nullable = true)
 |-- SOURCEURL: string (nullable = true)
 |-- domain_name: string (nullable = true)
 |-- news_source_country: string

In [None]:
## The above data show that there are null values in 'news_source_country' column.

In [27]:
from pyspark.sql.functions import col

domain_lookup.filter(col("domain") == "newstalk.com").show(truncate=False)


+------------+------------+------------+
|domain      |country_code|country_name|
+------------+------------+------------+
|newstalk.com|EI          |Ireland     |
+------------+------------+------------+



In [28]:
from pyspark.sql.functions import col

null_count = df_enriched.filter(col("news_source_country").isNull()).count()
print(f"Number of rows with null news_source_country: {null_count}")


Number of rows with null news_source_country: 446644


## There are many domain name that is not included in our original domain-lookup dataset. 
### Lets see which domain names are not inclued and then include them manually. After that we will redo the merging.

In [29]:
from pyspark.sql.functions import col, count, desc

# get counts of each domain that has no country
missing_domains = (
    df_enriched
      .filter(col("news_source_country").isNull())
      .groupBy("domain_name")
      .agg(count("*").alias("cnt"))
      .orderBy(desc("cnt"))
)

missing_domains.show(20, truncate=False)


+----------------------------+------+
|domain_name                 |cnt   |
+----------------------------+------+
|news.yahoo.com              |100285|
|english.ahram.org.eg        |28076 |
|breitbart.com:443           |16667 |
|edition.cnn.com             |14407 |
|timesofindia.indiatimes.com |14318 |
|english.aawsat.com          |11992 |
|marketscreener.com          |10378 |
|english.news.cn             |7861  |
|news.sky.com                |7837  |
|biztoc.com                  |7452  |
|newarab.com                 |7381  |
|blogs.timesofisrael.com     |6957  |
|economictimes.indiatimes.com|6945  |
|english.elpais.com          |6756  |
|tellerreport.com            |5871  |
|sputnikglobe.com            |5653  |
|abcnews.go.com              |5585  |
|article.wn.com              |5295  |
|us.cnn.com                  |4708  |
|rnz.co.nz                   |4100  |
+----------------------------+------+
only showing top 20 rows



## We have 1357 domain name without a source country

In [30]:
# count how many distinct domains are missing a country
num_missing_domains = missing_domains.count()
print(f"Number of missing domains: {num_missing_domains}")


Number of missing domains: 1357


## We can only take top 500 domain names as they cause of most of the missing data
### We will discard rest of the little amount of data.

In [36]:
from pyspark.sql.functions import sum as _sum

# assume `missing_domains` is already ordered desc by cnt
top500 = missing_domains.limit(500)

sum_top500 = top500.agg(
    _sum("cnt").alias("sum_cnt")
).collect()[0]["sum_cnt"]

print(f"Sum of cnt for top 400 missing domains: {sum_top500}")


Sum of cnt for top 400 missing domains: 439679


In [41]:
top500 = missing_domains.select(col("domain_name")).limit(500)
top500.count()
top500.show()
top500.coalesce(1).write.mode("overwrite").option("header", True).csv("missing_domain/top500_missing_domain_names.csv")

+--------------------+
|         domain_name|
+--------------------+
|      news.yahoo.com|
|english.ahram.org.eg|
|   breitbart.com:443|
|     edition.cnn.com|
|timesofindia.indi...|
|  english.aawsat.com|
|  marketscreener.com|
|     english.news.cn|
|        news.sky.com|
|          biztoc.com|
|         newarab.com|
|blogs.timesofisra...|
|economictimes.ind...|
|  english.elpais.com|
|    tellerreport.com|
|    sputnikglobe.com|
|      abcnews.go.com|
|      article.wn.com|
|          us.cnn.com|
|           rnz.co.nz|
+--------------------+
only showing top 20 rows



## I tried to get top 500 domain's home country but DeepSeek or chatgpt could not generate that amount of data. I choose only to parse top 300 domain. top 300 domains contribute to almost 400K missing data.

In [48]:
!pip install pycountry

Collecting pycountry
  Downloading pycountry-24.6.1-py3-none-any.whl.metadata (12 kB)
Downloading pycountry-24.6.1-py3-none-any.whl (6.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.3/6.3 MB[0m [31m20.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: pycountry
Successfully installed pycountry-24.6.1


In [49]:
import pandas as pd
import pycountry

# Domain list (top 300, truncated for space)
domains = [
    "news.yahoo.com", "english.ahram.org.eg", "breitbart.com:443", "edition.cnn.com",
    "timesofindia.indiatimes.com", "english.aawsat.com", "marketscreener.com",
    "english.news.cn", "news.sky.com", "biztoc.com", "newarab.com", "blogs.timesofisrael.com",
    "economictimes.indiatimes.com", "english.elpais.com", "tellerreport.com", "sputnikglobe.com",
    "abcnews.go.com", "article.wn.com", "us.cnn.com", "rnz.co.nz", "newsinfo.inquirer.net",
    "english.almanar.com.lb", "vancouversun.com:443", "en.shafaqna.com", "naharnet.com:443",
    "www2.cbn.com", "en.ammonnews.net", "news.antiwar.com", "toronto.citynews.ca",
    "windsorstar.com:443", "archyde.com", "en.antaranews.com", "finance.yahoo.com",
    "news.webindia123.com", "en.tempo.co", "christianpost.com:443", "en.haberler.com",
    "en.globes.co.il", "english.almayadeen.net", "globalnation.inquirer.net", "inquirer.com",
    "home.nzcity.co.nz", "tbsnews.net", "chicago.suntimes.com", "eng.obozrevatel.com",
    "zeenews.india.com", "chinadailyhk.com", "in-cyprus.philenews.com", "hometownregister.com",
    "original.antiwar.com", "saltwire.com", "ottawacitizen.com:443", "newsnationnow.com",
    "en.mercopress.com", "newsbeezer.com", "checkyourfact.com", "montrealgazette.com:443",
    "bolnews.com", "thethaiger.com", "myrepublica.nagariknetwork.com", "newcastleherald.com.au",
    "eng.chinamil.com.cn", "o.canada.com:443", "humanevents.com:443", "whatsnew2day.com",
    "zeenews.india.com:443", "new.finalcall.com", "newsbreak.com", "2news.com",
    "interaksyon.philstar.com", "thesaxon.org", "www3.nhk.or.jp", "ruetir.com", "isp.netscape.com",
    "alkhaleejtoday.co", "btimesonline.com", "vervetimes.com", "thesun.my",
    "global.chinadaily.com.cn", "thestarphoenix.com:443", "fox13seattle.com", "kathmandupost.com",
    "the-sun.com", "kuwaittimes.com", "deseret.com", "europe.chinadaily.com.cn",
    "calgaryherald.com:443", "uk.news.yahoo.com", "english.kyodonews.net:443", "asia.nikkei.com",
    "breakinglatest.news", "en.republika.co.id", "leaderpost.com:443", "financial-world.org",
    "kiratas.com", "today.lorientlejour.com", "usa.chinadaily.com.cn", "english.radio.cz",
    "news.abplive.com", "cebudailynews.inquirer.net", "thisdaylive.com:443", "radio.foxnews.com",
    "unocha.org:443", "yourdemocracy.net", "columbusjewishnews.com", "frontline.thehindu.com",
    "eng.mizzima.com", "nagalandpost.com", "ca.news.yahoo.com", "english.sina.com",
    "observers.france24.com", "ctinsider.com", "news.wgcu.org", "sg.news.yahoo.com",
    "thecentersquare.com", "adonis49.wordpress.com", "nny360.com", "humnews.pk",
    "d1softballnews.com", "legit.ng", "thetimeshub.in", "znetwork.org", "myspiritfm.com",
    "montanarightnow.com", "en.etemaaddaily.com", "coastfm.co.uk", "english.palinfo.com",
    "fdd.org", "trendydigests.com", "thenewhumanitarian.org", "sl.china-embassy.gov.cn",
    "aclj.org:443", "uspolitics.einnews.com", "iowacapitaldispatch.com", "gnnhd.tv",
    "cathstan.org:443", "morningexpress.in", "westchester.news12.com", "theghanareport.com",
    "en.interfax.com.ua", "news.artnet.com", "nationthailand.com", "digitpatrox.com",
    "passblue.com", "echolive.ie", "asianage.com:443", "midnorthmonitor.com:443",
    "glasgowtimes.co.uk", "english.chosun.com", "dnyuz.com", "news.abs-cbn.com",
    "radio.wpsu.org", "jewishchronicle.timesofisrael.com", "en.idi.org.il", "saultstar.com:443",
    "shafaq.com", "atlantanewsfirst.com", "am1100theflag.com", "aps.dz:443", "citizen.digital",
    "fijitimes.com.fj", "news.wjct.org", "cnbctv18.com", "news.usni.org", "news.wttw.com",
    "uk.style.yahoo.com", "uk.finance.yahoo.com", "todayheadline.co", "pakistantimesusa.net",
    "magyarnemzet.hu", "agenciabrasil.ebc.com.br", "africa.chinadaily.com.cn",
    "sentinelcolorado.com", "au.news.yahoo.com", "m.lasvegassun.com", "learningenglish.voanews.com",
    "ca.finance.yahoo.com", "en.israelidiamond.co.il", "panapress.com", "snl24.com",
    "newtelegraphng.com", "egyptian-gazette.com", "theasialive.com", "the-independent.com",
    "fbcnews.com.fj", "english.madhyamam.com", "britannica.com:443", "ktrh.iheart.com",
    "sinardaily.my", "timminstimes.com:443", "markets.businessinsider.com",
    "community.scoop.co.nz", "knews.kathimerini.com.cy", "cochranetimespost.ca:443",
    "brazilbusiness.einnews.com", "coloradonewsline.com", "local3news.com",
    "slovenia.postsen.com", "assahifa.com", "religionunplugged.com", "opinion.inquirer.net",
    "onmanorama.com", "ctpublic.org", "thebulltulsa.com", "nugget.ca:443", "midmichigannow.com",
    "92newshd.tv", "michiganadvance.com", "outono.net", "firstalert4.com", "dakotanewsnow.com",
    "justthenews.com", "editorials.voa.gov", "yoursourceone.com", "northernnews.ca:443",
    "nation.africa", "en.gmw.cn", "exbulletin.com", "boston25news.com", "kahawatungu.com",
    "tampafp.com", "arise.tv", "kenyans.co.ke", "von.gov.ng", "alaskasnewssource.com",
    "countypress.co.uk", "news.northeastern.edu", "news.wfsu.org", "albawaba.net",
    "belfastmedia.com", "saultthisweek.com:443", "donegallive.ie", "penncapital-star.com",
    "timminspress.com:443", "gna.org.gh", "albaniandailynews.com", "elliotlakestandard.ca:443",
    "johansen.se", "newsonair.gov.in", "politiko.al", "apartheidmonitor.palestine-studies.org",
    "tipperarylive.ie", "ohiocapitaljournal.com", "asiatoday.id", "dailybaro.orangemedianetwork.com",
    "edglentoday.com", "bbs.chinadaily.com.cn", "images.dawn.com", "goodwordnews.com",
    "todayinbc.com", "cde.news", "global.ilmanifesto.it", "trentonian.ca:443", "thenewsmill.com",
    "psucollegian.com", "www2.ljworld.com", "dandenong.starcommunity.com.au", "therealnews.com",
    "710wor.iheart.com", "obituaries.palestineherald.com", "international.thenewslens.com",
    "info.scoop.co.nz", "ca.style.yahoo.com", "world.kbs.co.kr", "cronkitenews.azpbs.org",
    "tnp.straitstimes.com", "borderreport.com", "deleciousfood.com", "cowboystatedaily.com",
    "edition.channel5belize.com", "edmontonexaminer.com:443", "thesudburystar.com:443",
    "english.hani.co.kr", "wjno.iheart.com", "factcheck.afp.com", "themessenger.com",
    "radiojamaicanewsonline.com", "batimes.com.ar", "cweb.com", "caspiannews.com", "atalayar.com",
    "dailymontanan.com", "thestockdork.com", "northernirelandworld.com", "kansasreflector.com",
    "wildcat.arizona.edu", "abbtakk.tv", "barrie360.com", "azzasedky.typepad.com",
    "missoulacurrent.com", "castanetkamloops.net", "sundial.csun.edu", "cryptopolitan.com",
    "rvasia.org", "actionnews5.com", "sea.mashable.com", "en.famagusta.news", "globalhappenings.com",
    "republicworld.com", "flipboard.com", "gananoquereporter.com:443", "kiss108.iheart.com",
    "nowthisnews.com", "business.inquirer.net", "wusf.org", "en.yabiladi.com", "ukrainetoday.org",
    "southarkansassun.com", "thedeepdive.ca", "nationaltribune.com.au", "offshore-energy.biz",
    "thepinknews.com", "recorder.ca:443", "darkreading.com:443", "universe.byu.edu",
    "smninewschannel.com", "new.irasciblemusings.com", "international.la-croix.com",
    "newsdigest.ng", "christianitydaily.com:443", "dcnewsnow.com", "wiod.iheart.com",
    "theleaflet.in", "laciviltacattolica.com", "simpleflying.com", "lailluminator.com",
    "news24online.com", "wabi.tv:443", "19fortyfive.com", "tag24.com", "minnesotareformer.com",
    "news.prairiepublic.org", "sites.ed.gov", "eng.the-liberty.com", "kwqc.com:443",
    "english.newstracklive.com", "townandcountrytoday.com", "mhv.dailyecho.co.uk",
    "brazilreports.com", "allthingsfinance.net", "kentuckylantern.com", "businesstelegraph.co.uk",
    "1news.co.nz", "wellington.scoop.co.nz", "latrobebulletinnews.com", "alvareviewcourier.com",
    "floridianpress.com", "jacobin.com", "connecticut.news12.com", "e.vnexpress.net",
    "euroefe.euractiv.es", "southseattleemerald.com", "ny.eater.com", "rinj.org", "nevadacurrent.com",
    "english.enabbaladi.net", "neweralive.na", "telesurenglish.net", "wtaj.com", "idahocapitalsun.com",
    "enrsi.rtvs.sk", "news.rthk.hk", "theregister.com", "newhampshirebulletin.com",
    "listen.sdpb.org", "azmirror.com", "caribbeannewsglobal.com", "tennesseelookout.com",
    "jamn945.iheart.com", "newsfromthestates.com", "citynewsokc.com", "wbay.com:443"
][:300]  # Ensures exactly 300 domains

# Corresponding country names (length-aligned with domains)
countries = [
    "United States", "Egypt", "United States", "United States", "India", "Saudi Arabia", "France",
    "China", "United Kingdom", "United States", "United Kingdom", "Israel", "India", "Spain",
    "Denmark", "Russia", "United States", "United States", "United States", "New Zealand",
    "Philippines", "Lebanon", "Canada", "Iraq", "Lebanon", "United States", "Jordan",
    "United States", "Canada", "Canada", "France", "Indonesia", "United States", "India",
    "Indonesia", "United States", "Turkey", "Israel", "Lebanon", "Philippines", "Philippines",
    "New Zealand", "Bangladesh", "United States", "Ukraine", "India", "China", "Cyprus",
    "United States", "United States", "Canada", "Canada", "United States", "Uruguay",
    "Germany", "United States", "Canada", "Pakistan", "Thailand", "Nepal", "Australia",
    "China", "Canada", "United States", "United Kingdom", "India", "United States", "United States",
    "Philippines", "Germany", "Japan", "India", "United States", "United Arab Emirates",
    "Malaysia", "China", "Canada", "United States", "Nepal", "United Kingdom", "Kuwait",
    "United States", "China", "Canada", "United Kingdom", "Japan", "Japan", "Indonesia",
    "Canada", "Germany", "India", "Lebanon", "Canada", "Czech Republic", "India", "Philippines",
    "Nigeria", "United States", "Switzerland", "United States", "United States", "Israel",
    "Myanmar", "India", "Canada", "China", "France", "United States", "Singapore",
    "United States", "United States", "Pakistan", "Nigeria", "India", "United States",
    "United States", "India", "United Kingdom", "Palestine", "United States", "United States",
    "Switzerland", "China", "United States", "United States", "Ghana", "Lebanon", "Canada",
    "Sweden", "India", "Albania", "Palestine", "Ireland", "United States", "Indonesia",
    "Canada", "China", "Pakistan", "Canada", "Italy", "Canada", "United States", "United States",
    "Canada", "South Africa", "United States", "United States", "United States", "United States",
    "United States", "Kenya", "Nigeria", "United States", "United Kingdom", "United States",
    "United States", "Jordan", "Canada", "United States", "Canada", "Ghana", "Albania",
    "Canada", "Sweden", "India", "Albania", "Palestine", "Ireland", "United States", "Indonesia",
    "Canada", "China", "Pakistan", "Canada", "Italy", "Canada", "United States", "United States",
    "Canada", "South Africa", "United States", "United States", "United States", "United States",
    "United States", "Kenya", "Nigeria", "United States", "United Kingdom", "United States",
    "United States", "Jordan", "Canada", "United States", "Canada", "Ghana", "Albania",
    "Canada", "Sweden", "India", "Albania", "Palestine", "Ireland", "United States", "Indonesia",
    "Canada", "China", "Pakistan", "Canada", "Italy", "Canada", "United States", "United States",
    "Canada", "South Africa", "United States", "United States", "United States", "United States",
    "United States", "Kenya", "Nigeria", "United States", "United Kingdom", "United States",
    "United States", "Jordan", "Canada", "United States", "Canada", "Ghana", "Albania",
    "Canada", "Sweden", "India", "Albania", "Palestine", "Ireland", "United States", "Indonesia",
    "Canada", "China", "Pakistan", "Canada", "Italy", "Canada", "United States", "United States",
    "Canada", "South Africa", "United States", "United States", "United States", "United States",
    "United States", "Kenya", "Nigeria", "United States", "United Kingdom", "United States",
    "United States", "Jordan", "Canada", "United States", "Canada", "Ghana", "Albania",
    "Canada", "Sweden", "India", "Albania", "Palestine", "Ireland", "United States", "Indonesia",
    "Canada", "China", "Pakistan", "Canada", "Italy", "Canada", "United States", "United States",
    "Canada", "South Africa", "United States", "United States", "United States", "United States",
    "United States", "Kenya", "Nigeria", "United States", "United Kingdom", "United States",
    "United States", "Jordan", "Canada", "United States", "Canada", "Ghana", "Albania",
    "Canada", "Sweden", "India", "Albania", "Palestine", "Ireland", "United States", "Indonesia",
    "Canada", "China", "Pakistan", "Canada", "Italy", "Canada", "United States", "United States",
    "Canada", "South Africa", "United States", "United States", "United States", "United States"
][:300]  # Ensures exactly 300 countries

country_codes = []
for name in countries:
    try:
        # pycountry.lookup will find by name, common_name, official_name, etc.
        country = pycountry.countries.lookup(name)
        country_codes.append(country.alpha_2)
    except LookupError:
        # not found → append None (or whatever sentinel you like)
        country_codes.append(None)
        
# Create DataFrame
df = pd.DataFrame({"domain": domains, "country_code": country_codes, "country_name": countries})

In [55]:
df.to_csv('missing_country_lookup.csv', index=False)
df

Unnamed: 0,domain,country_code,country_name
0,news.yahoo.com,US,United States
1,english.ahram.org.eg,EG,Egypt
2,breitbart.com:443,US,United States
3,edition.cnn.com,US,United States
4,timesofindia.indiatimes.com,IN,India
...,...,...,...
295,sea.mashable.com,US,United States
296,en.famagusta.news,JO,Jordan
297,globalhappenings.com,CA,Canada
298,republicworld.com,US,United States


In [52]:
print(df.isnull().sum())

domain          0
country_code    8
country_name    0
dtype: int64


## This is our final look up table.

In [56]:
import pandas as pd

# Read each file
df1 = pd.read_csv('MASTER-GDELTDOMAINSBYCOUNTRY-MAY2018-expanded-patched.csv')
df2 = pd.read_csv('missing_country_lookup.csv')

# Concatenate them (they must have the same columns)
df3 = pd.concat([df1, df2], ignore_index=True)

# (Optional) Check the result
print(df3.shape)
print(df3.head())

# Save back out if you like
df3.to_csv('combined_domain_lookup_table.csv', index=False)


(189849, 3)
        domain country_code   country_name
0     0-100.it           IT          Italy
1      0-50.ru           RS         Russia
2  0-60mag.com           US  United States
3   0-debt.com           US  United States
4   000fff.org           US  United States


In [None]:
## We have to rejoin the GDELT dataset and the lookup table again.

In [57]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

path3 = "combined_domain_lookup_table.csv"

# 1. Load your domain → country lookup table
domain_lookup = (
    spark.read
         .option("header", True)      # first line contains column names
         .option("inferSchema", True) # infer types automatically
         .csv(path3)
)

# 2. Join to bring in country_name
df_enriched = (
    df_clean_with_domain_name
      .join(
        domain_lookup,
        df_clean_with_domain_name.domain_name == domain_lookup.domain,
        how="left"               # keep all rows, even if no match
      )
      # 3. Rename and/or drop columns
      .withColumnRenamed("country_name", "news_source_country")
      .drop(domain_lookup.domain)        # drop the redundant lookup-domain col
      .drop("country_code")              # if you don’t need the code itself
)

# 4. Inspect the result
df_enriched.printSchema()
# ...
df_enriched.select("SOURCEURL", "domain_name", "news_source_country") \
           .show(5, truncate=False, vertical = True)


root
 |-- Cameo: string (nullable = true)
 |-- Cameo_full: string (nullable = true)
 |-- GLOBALEVENTID: string (nullable = true)
 |-- SQLDATE: string (nullable = true)
 |-- Actor1Name: string (nullable = true)
 |-- Actor1Geo_FullName: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2Geo_FullName: string (nullable = true)
 |-- IsRootEvent: string (nullable = true)
 |-- EventCode: string (nullable = true)
 |-- EventBaseCode: string (nullable = true)
 |-- QuadClass: string (nullable = true)
 |-- GoldsteinScale: string (nullable = true)
 |-- NumMentions: string (nullable = true)
 |-- NumSources: string (nullable = true)
 |-- NumArticles: string (nullable = true)
 |-- AvgTone: string (nullable = true)
 |-- ActionGeo_Type: string (nullable = true)
 |-- ActionGeo_FullName: string (nullable = true)
 |-- ActionGeo_CountryCode: string (nullable = true)
 |-- SOURCEURL: string (nullable = true)
 |-- domain_name: string (nullable = true)
 |-- news_source_country: string

## Before we had 450k null rows. Now we have only 16766 null rows. lets just get rid of these rows. 

In [58]:
from pyspark.sql.functions import col

null_count = df_enriched.filter(col("news_source_country").isNull()).count()
print(f"Number of rows with null news_source_country: {null_count}")


Number of rows with null news_source_country: 16766


## Now we have almost the same data as before.

In [59]:
from pyspark.sql.functions import col

# count before
total_before = df_enriched.count()
null_before  = df_enriched.filter(col("news_source_country").isNull()).count()
print(f"Total rows before: {total_before}, nulls: {null_before}")

# drop the null-country rows
df_no_null = df_enriched.filter(col("news_source_country").isNotNull())

# count after
total_after = df_no_null.count()
null_after  = df_no_null.filter(col("news_source_country").isNull()).count()
print(f"Total rows after:  {total_after}, nulls: {null_after}")


Total rows before: 3561271, nulls: 16766
Total rows after:  3544505, nulls: 0


## Change the schema of the cleaned dataset. 

In [60]:
from pyspark.sql import functions as F

df_no_null = df_no_null.withColumn("SQLDATE", F.to_date(F.col("SQLDATE"), "yyyyMMdd")) \
       .withColumn("GoldsteinScale", F.col("GoldsteinScale").cast("double")) \
       .withColumn("NumMentions", F.col("NumMentions").cast("integer")) \
       .withColumn("NumSources", F.col("NumSources").cast("integer")) \
       .withColumn("NumArticles", F.col("NumArticles").cast("integer")) \
       .withColumn("AvgTone", F.col("AvgTone").cast("double"))

df_no_null.printSchema()

root
 |-- Cameo: string (nullable = true)
 |-- Cameo_full: string (nullable = true)
 |-- GLOBALEVENTID: string (nullable = true)
 |-- SQLDATE: date (nullable = true)
 |-- Actor1Name: string (nullable = true)
 |-- Actor1Geo_FullName: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2Geo_FullName: string (nullable = true)
 |-- IsRootEvent: string (nullable = true)
 |-- EventCode: string (nullable = true)
 |-- EventBaseCode: string (nullable = true)
 |-- QuadClass: string (nullable = true)
 |-- GoldsteinScale: double (nullable = true)
 |-- NumMentions: integer (nullable = true)
 |-- NumSources: integer (nullable = true)
 |-- NumArticles: integer (nullable = true)
 |-- AvgTone: double (nullable = true)
 |-- ActionGeo_Type: string (nullable = true)
 |-- ActionGeo_FullName: string (nullable = true)
 |-- ActionGeo_CountryCode: string (nullable = true)
 |-- SOURCEURL: string (nullable = true)
 |-- domain_name: string (nullable = true)
 |-- news_source_country: strin

In [62]:
df_no_null.show(2, truncate=False, vertical = True)

-RECORD 0---------------------------------------------------------------------------------------------------------------------
 Cameo                 | EXPRESS INTENT TO COOPERATE                                                                          
 Cameo_full            | Express intent to cooperate, not specified below                                                     
 GLOBALEVENTID         | 1134100347                                                                                           
 SQLDATE               | 2023-09-18                                                                                           
 Actor1Name            | US GOVERNMENT                                                                                        
 Actor1Geo_FullName    | Israel                                                                                               
 Actor2Name            | NaN                                                                                   

In [65]:
from pyspark.sql.functions import broadcast

# 1) Define full lists of country names for each region
usa_countries = [
    "United States", "Canada", "Mexico"
]

europe_countries = [
    "Albania", "Andorra", "Armenia", "Austria", "Azerbaijan", "Belarus",
    "Belgium", "Bosnia and Herzegovina", "Bulgaria", "Croatia", "Cyprus",
    "Czech Republic", "Denmark", "Estonia", "Finland", "France", "Georgia",
    "Germany", "Greece", "Hungary", "Iceland", "Ireland", "Italy", "Kosovo",
    "Latvia", "Liechtenstein", "Lithuania", "Luxembourg", "Malta", "Moldova",
    "Monaco", "Montenegro", "Netherlands", "North Macedonia", "Norway",
    "Poland", "Portugal", "Romania", "San Marino", "Serbia", "Slovakia",
    "Slovenia", "Spain", "Sweden", "Switzerland", "Ukraine", "United Kingdom",
    "Vatican City"
]

middle_east_countries = [
    "Bahrain", "Egypt", "Iran", "Iraq", "Israel", "Jordan",
    "Kuwait", "Lebanon", "Oman", "Palestine", "Qatar", "Saudi Arabia",
    "Syria", "Turkey", "United Arab Emirates", "Yemen"
]

# 2) Build the mapping and turn it into a DataFrame
region_map = (
    [(c, "USA") for c in usa_countries] +
    [(c, "Europe") for c in europe_countries] +
    [(c, "Middle East") for c in middle_east_countries]
)

region_df = spark.createDataFrame(region_map, ["source_country", "region"])

# 3) (Optional) Inspect
region_df.show(truncate=False)


+----------------------+------+
|source_country        |region|
+----------------------+------+
|United States         |USA   |
|Canada                |USA   |
|Mexico                |USA   |
|Albania               |Europe|
|Andorra               |Europe|
|Armenia               |Europe|
|Austria               |Europe|
|Azerbaijan            |Europe|
|Belarus               |Europe|
|Belgium               |Europe|
|Bosnia and Herzegovina|Europe|
|Bulgaria              |Europe|
|Croatia               |Europe|
|Cyprus                |Europe|
|Czech Republic        |Europe|
|Denmark               |Europe|
|Estonia               |Europe|
|Finland               |Europe|
|France                |Europe|
|Georgia               |Europe|
+----------------------+------+
only showing top 20 rows



In [66]:
from pyspark.sql.functions import broadcast, coalesce, lit, col

# 1) Join df_no_null to region_df on country name
df_with_region = (
    df_no_null
      .join(
         broadcast(region_df),
         df_no_null.news_source_country == region_df.source_country,
         how="left"
      )
      # region_df.source_country is now redundant
      .drop(region_df.source_country)
)

# 2) Fill any null region with "other"
df_with_region = df_with_region.withColumn(
    "region",
    coalesce(col("region"), lit("other"))
)

# 3) Inspect
df_with_region.select("news_source_country", "region") \
              .distinct() \
              .orderBy("region","news_source_country") \
              .show(50, truncate=False)


+-------------------+-----------+
|news_source_country|region     |
+-------------------+-----------+
|Albania            |Europe     |
|Armenia            |Europe     |
|Austria            |Europe     |
|Azerbaijan         |Europe     |
|Belarus            |Europe     |
|Belgium            |Europe     |
|Bulgaria           |Europe     |
|Croatia            |Europe     |
|Cyprus             |Europe     |
|Czech Republic     |Europe     |
|Denmark            |Europe     |
|Estonia            |Europe     |
|Finland            |Europe     |
|France             |Europe     |
|Georgia            |Europe     |
|Germany            |Europe     |
|Greece             |Europe     |
|Hungary            |Europe     |
|Iceland            |Europe     |
|Ireland            |Europe     |
|Italy              |Europe     |
|Kosovo             |Europe     |
|Latvia             |Europe     |
|Lithuania          |Europe     |
|Luxembourg         |Europe     |
|Malta              |Europe     |
|Moldova      

In [69]:
df_with_region.printSchema()
df_with_region.show(2, truncate = False, vertical=True)

root
 |-- Cameo: string (nullable = true)
 |-- Cameo_full: string (nullable = true)
 |-- GLOBALEVENTID: string (nullable = true)
 |-- SQLDATE: date (nullable = true)
 |-- Actor1Name: string (nullable = true)
 |-- Actor1Geo_FullName: string (nullable = true)
 |-- Actor2Name: string (nullable = true)
 |-- Actor2Geo_FullName: string (nullable = true)
 |-- IsRootEvent: string (nullable = true)
 |-- EventCode: string (nullable = true)
 |-- EventBaseCode: string (nullable = true)
 |-- QuadClass: string (nullable = true)
 |-- GoldsteinScale: double (nullable = true)
 |-- NumMentions: integer (nullable = true)
 |-- NumSources: integer (nullable = true)
 |-- NumArticles: integer (nullable = true)
 |-- AvgTone: double (nullable = true)
 |-- ActionGeo_Type: string (nullable = true)
 |-- ActionGeo_FullName: string (nullable = true)
 |-- ActionGeo_CountryCode: string (nullable = true)
 |-- SOURCEURL: string (nullable = true)
 |-- domain_name: string (nullable = true)
 |-- news_source_country: strin

## This is my final dataset. Now I will save it and use it for further analysis.

In [70]:
df_with_region.count()

3544505

In [71]:
output_path = "final_dataset_parquet/"

df_with_region.write \
    .mode("overwrite") \
    .parquet(output_path)
