<a href="https://colab.research.google.com/github/Gilade98/Machine_Learning_Projects/blob/main/Data_Mining_BGU/GDELT_data_query.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import requests
import zipfile
import os
import pandas as pd
from io import BytesIO
from bs4 import BeautifulSoup
import gc

In [None]:
!pip install fastparquet pyarrow tqdm gdelt



In [None]:
import gdelt

# GDELT GKG Index Page
GDELT_INDEX_URL = "http://data.gdeltproject.org/gkg/index.html"

# Define themes to filter
TARGET_THEMES = {
    "ECON_HOUSING_PRICES",
    "GENTRIFICATION",
    "NEW_CONSTRUCTION",
    "PROPERTY_RIGHTS",
    "POPULATION_DENSITY",
    "URBAN",
    "URBAN_SPRAWL",
    "POVERTY"
}

OUTPUT_FOLDER = "/content/drive/MyDrive/data_toolbox/datasets"
START_YEAR = 2015
TIME_DELTA=13
gd1 = gdelt.gdelt(version=1)


here


In [None]:
def get_gdelt_data(start_date, end_date):
    """Retrieve GKG data for a date range using gdelt package."""
    date_list = [(start_date + timedelta(days=i)).strftime('%Y %m %d')
                 for i in range((end_date - start_date).days + 1)]

    # Fetch GKG data
    results = gd1.Search(date=date_list, table='gkg', coverage=True, output='df')
    return results


def get_last_processed_date():
    """Find the earliest processed date from the last row of the first yearly Parquet file."""

    if not os.path.exists(OUTPUT_FOLDER):
        return None  # No files exist yet

    # Get the earliest file by sorting file names (assuming 'gdelt_YYYY.parquet' format)
    parquet_files = sorted(f for f in os.listdir(OUTPUT_FOLDER) if f.endswith(".parquet"))

    if not parquet_files:
        return None  # No Parquet files found

    earliest_file = os.path.join(OUTPUT_FOLDER, parquet_files[1])  # Pick the first (earliest year) skipping the large file

    # Read only the last row to get the earliest date (efficient)
    df = pd.read_parquet(earliest_file, columns=["DATE"]).tail(1)  # Read last row

    if df.empty:
        return None  # No data in the file

    # Convert DATE column and return the earliest date
    earliest_date = pd.to_datetime(df["DATE"].iloc[0], format="%d-%m-%Y").strftime("%Y-%m-%d")

    print(f"Resuming from last processed date: {earliest_date}")
    return earliest_date


def process_gkg_data(df):
  """Filter the DataFrame to retain only relevant themes and source URLs."""
  if df.empty:
      return df

  df = df.loc[:, ["DATE", "THEMES", "SOURCEURLS"]]
  df = df.dropna(subset=["THEMES", "SOURCEURLS"])

  # Filter rows with target themes
  df = df[df['THEMES'].apply(lambda x: any(theme in str(x).split(";") for theme in TARGET_THEMES))]

  # Convert DATE column format
  df["DATE"] = pd.to_datetime(df["DATE"], format="%Y%m%d").dt.strftime("%d-%m-%Y")

  return df.loc[:, ["DATE", "SOURCEURLS"]]

def load_year_data(year):
    year_file = f"{OUTPUT_FOLDER}/gdelt_{year}.parquet"
    if os.path.exists(year_file):
        return pd.read_parquet(year_file, columns=["DATE", "SOURCEURLS"])
    else:
        return pd.DataFrame(columns=["DATE", "SOURCEURLS"])  # Empty DataFrame


In [None]:
last_date = get_last_processed_date()
last_date

Resuming from last processed date: 2015-03-23


'2015-03-23'

In [None]:
from tqdm import tqdm
from datetime import date, timedelta

# Filter out files that were already processed
if last_date:
    start_date = date.fromisoformat(last_date)
else:
    start_date = date.today()

current_year = start_date.year
year_file = f"{OUTPUT_FOLDER}/gdelt_{current_year}.parquet"
year_data = load_year_data(current_year)

total_iterations = (start_date - date(START_YEAR, 1, 1)).days // (TIME_DELTA+1)  # Total number of 2-week periods

with tqdm(total=total_iterations, desc="Processing Batches", unit="batch") as pbar:
  while start_date >= date(START_YEAR, 1, 1):  # Set a lower bound to avoid indefinite looping
    end_date = start_date
    start_date = start_date - timedelta(days=TIME_DELTA)  # 2-week window

    print(f"\nFetching data from {start_date} to {end_date}...")
    raw_data = get_gdelt_data(start_date, end_date)
    filtered_data = process_gkg_data(raw_data)

    if not filtered_data.empty:
        # Convert DATE column to datetime
        filtered_data["DATE"] = pd.to_datetime(filtered_data["DATE"], format="%d-%m-%Y")

        # Get unique years in the new data
        for year, batch_year_data in filtered_data.groupby(filtered_data["DATE"].dt.year):
          if year != current_year:
                    # Save the previous year's data before switching
                    year_data.to_parquet(year_file, index=False, compression="snappy", engine="fastparquet")
                    print(f"Saved {len(year_data)} rows to {year_file}")

                    # Free memory of the old year
                    del year_data
                    gc.collect()

                    # Load new year data (or create an empty one)
                    current_year = year
                    year_file = f"{OUTPUT_FOLDER}/gdelt_{current_year}.parquet"
                    year_data = load_year_data(current_year)
          # Append batch data for the current year
          year_data = pd.concat([year_data, batch_year_data], ignore_index=True)
    # Save after each 2-week batch
    year_data.to_parquet(year_file, index=False, compression="snappy", engine="fastparquet")
    print(f"\nSaved batch {len(year_data)} rows to {year_file}")
    del filtered_data
    del raw_data
    gc.collect()

    pbar.update(1)

# Final save before exit
year_data.to_parquet(year_file, index=False, compression="snappy", engine="fastparquet")
print(f"\nFinal save for {current_year}: {len(year_data)} rows")







Processing Batches:   0%|          | 0/5 [00:00<?, ?batch/s]


Fetching data from 2015-03-10 to 2015-03-23...


Processing Batches:  20%|██        | 1/5 [01:51<07:26, 111.56s/batch]


Saved batch 3137756 rows to /content/drive/MyDrive/data_toolbox/gdelt_2015.parquet

Fetching data from 2015-02-25 to 2015-03-10...


Processing Batches:  40%|████      | 2/5 [03:20<04:55, 98.41s/batch] 


Saved batch 3259320 rows to /content/drive/MyDrive/data_toolbox/gdelt_2015.parquet

Fetching data from 2015-02-12 to 2015-02-25...


Processing Batches:  60%|██████    | 3/5 [04:27<02:48, 84.13s/batch]


Saved batch 3352752 rows to /content/drive/MyDrive/data_toolbox/gdelt_2015.parquet

Fetching data from 2015-01-30 to 2015-02-12...


Processing Batches:  80%|████████  | 4/5 [05:18<01:10, 70.80s/batch]


Saved batch 3416238 rows to /content/drive/MyDrive/data_toolbox/gdelt_2015.parquet

Fetching data from 2015-01-17 to 2015-01-30...


Processing Batches: 100%|██████████| 5/5 [06:10<00:00, 64.01s/batch]


Saved batch 3485743 rows to /content/drive/MyDrive/data_toolbox/gdelt_2015.parquet

Fetching data from 2015-01-04 to 2015-01-17...


Processing Batches: 6batch [06:59, 59.14s/batch]                    


Saved batch 3548136 rows to /content/drive/MyDrive/data_toolbox/gdelt_2015.parquet

Fetching data from 2014-12-22 to 2015-01-04...
Saved 3548136 rows to /content/drive/MyDrive/data_toolbox/gdelt_2015.parquet


  year_data = pd.concat([year_data, batch_year_data], ignore_index=True)


Saved 36940 rows to /content/drive/MyDrive/data_toolbox/gdelt_2014.parquet


Processing Batches: 7batch [07:53, 67.58s/batch]


Saved batch 3559861 rows to /content/drive/MyDrive/data_toolbox/gdelt_2015.parquet






Final save for 2015: 3559861 rows


In [None]:
def remove_duplicates_from_parquet():
    """Remove duplicates from all yearly Parquet files and overwrite them."""

    if not os.path.exists(OUTPUT_FOLDER):
        print("No Parquet folder found.")
        return

    # Select only files that match the 'gdelt_YYYY.parquet' format
    parquet_files = sorted(f for f in os.listdir(OUTPUT_FOLDER) if f.endswith(".parquet"))

    for file in parquet_files:
        file_path = os.path.join(OUTPUT_FOLDER, file)

        # Load Parquet file
        df = pd.read_parquet(file_path)

        # Drop duplicates based on DATE and SOURCEURLS
        df = df.drop_duplicates(subset=["DATE", "SOURCEURLS"])

        # Save back to the same Parquet file (overwrite)
        df.to_parquet(file_path, index=False, compression="snappy", engine="fastparquet")
        print(f"Updated {file}: Removed duplicates.")

        # Free memory
        del df
        gc.collect()

# Run the function to clean all Parquet files
remove_duplicates_from_parquet()

Updated filtered_gdelt_data.parquet: Removed duplicates.
Updated gdelt_2014.parquet: Removed duplicates.
Updated gdelt_2015.parquet: Removed duplicates.
Updated gdelt_2016.parquet: Removed duplicates.
Updated gdelt_2017.parquet: Removed duplicates.
Updated gdelt_2018.parquet: Removed duplicates.
Updated gdelt_2019.parquet: Removed duplicates.
Updated gdelt_2020.parquet: Removed duplicates.
Updated gdelt_2021.parquet: Removed duplicates.
Updated gdelt_2022.parquet: Removed duplicates.
Updated gdelt_2023.parquet: Removed duplicates.
Updated gdelt_2024.parquet: Removed duplicates.
Updated gdelt_2025.parquet: Removed duplicates.
