## Installing Necessary Packages 

Before we can get the data, we need to install some packages to handle the download process. In particular, we are going to install one main package:

*   ir_datasets (https://github.com/allenai/ir_datasets): A python package that provides a common interface to many IR ad-hoc ranking benchmarks, training datasets, etc. We can use this to download the raw event streams and information needs for each.

In [1]:
!pip install --upgrade git+https://github.com/allenai/ir_datasets.git@crisisfacts # install ir_datasets (crisisfacts branch)

Collecting git+https://github.com/allenai/ir_datasets.git@crisisfacts
  Cloning https://github.com/allenai/ir_datasets.git (to revision crisisfacts) to /private/var/folders/kd/mjydk2vd0pg_0q8h24fjc1l40000gn/T/pip-req-build-3tsj0ktn
  Running command git clone --filter=blob:none --quiet https://github.com/allenai/ir_datasets.git /private/var/folders/kd/mjydk2vd0pg_0q8h24fjc1l40000gn/T/pip-req-build-3tsj0ktn
  Running command git checkout -b crisisfacts --track origin/crisisfacts
  Switched to a new branch 'crisisfacts'
  branch 'crisisfacts' set up to track 'origin/crisisfacts'.
  Resolved https://github.com/allenai/ir_datasets.git to commit 3d657a7b9f159382776763ee093fc6ef2d7c6581
  Preparing metadata (setup.py) ... [?25ldone


## Structure of the CrisisFACTs Dataset

The CrisisFACTs dataset is divided into events, representing real-world crises. Each event is given an identifier, e.g. 'CrisisFACTS-001' is the Lilac Wildfire from 2017. We sometimes refer to the event number or 'eventNo', this is the last three digits of the event identifier, e.g. '001'. There are 8 events for CrisisFACTs 2022:

In [2]:
# Event numbers as a list
eventNoList = [
    "001", # Lilac Wildfire 2017
    "002", # Cranston Wildfire 2018
    "003", # Holy Wildfire 2018
    "004", # Hurricane Florence 2018
    "005", # 2018 Maryland Flood
    "006", # Saddleridge Wildfire 2019
    "007", # Hurricane Laura 2020
    "008", # Hurricane Sally 2020
    "009", # Beirut Explosion, 2020
    "010", # Houston Explosion, 2020
    "011", # Rutherford TN Floods, 2020
    "012", # TN Derecho, 2020
    "013", # Edenville Dam Fail, 2020
    "014", # Hurricane Dorian, 2019
    "015", # Kincade Wildfire, 2019
    "016", # Easter Tornado Outbreak, 2020
    "017", # Tornado Outbreak, 2020 Apr
    "018", # Tornado Outbreak, 2020 March
]

Each event has a duration, i.e. it lasts for a number of days. In the CrisisFACTs track, you need to produce a timeline summary for each day for a set of events. You can get the list of days for an event as shown

In [3]:
import requests

# Gets the list of days for a specified event number, e.g. '001'
def getDaysForEventNo(eventNo):

    # We will download a file containing the day list for an event
    url = "http://trecis.org/CrisisFACTs/CrisisFACTS-"+eventNo+".requests.json"

    # Download the list and parse as JSON
    dayList = requests.get(url).json()

    # Print each day
    # Note each day object contains the following fields
    #   {
    #      "eventID" : "CrisisFACTS-001",
    #      "requestID" : "CrisisFACTS-001-r3",
    #      "dateString" : "2017-12-07",
    #      "startUnixTimestamp" : 1512604800,
    #      "endUnixTimestamp" : 1512691199
    #   }

    return dayList

for day in getDaysForEventNo(eventNoList[9]):
    print(day["dateString"])

2020-01-23
2020-01-24
2020-01-25
2020-01-26
2020-01-27
2020-01-28
2020-01-29


For each day, we collected related content to the event from the following sources:


*   **Twitter**: We are re-using tweets collected as part of the TREC Incident Streams track (http://trecis.org). These tweets were crawled by keyword, and as such most are likely to be relevant to the event, but are not nessessaraly good candidates for inclusion into a summary of what is happening.
*   **Reddit**: Discussions regarding what happens during events also occurs on the forum platform Reddit. We collected relevant Reddit threads to each event, where we include both the original submission and subsequent comments within those threads.
*   **News**: Traditional news agencies are often a good source of information during an emergency and so we have also included a small number of news articles collected during each event as well.

Because these sources have different formatting and characteristics, we reformatted this data into a list of standardized 'stream items', where a stream item contains:


*   **event**: The identifier of the event, e.g. 'CrisisFACTS-001'
*   **streamID**: A unique identifier for the stream item. This will generally be of the form 'CrisisFACTS-\<eventNo\>-\<source\>-\<postID\>-\<sentenceID\>', e.g. CrisisFACTS-001-Twitter-15712-0.
*   **unixTimestamp**: This is the time that the content was originally posted, expressed as a unix timestamp in seconds (UTC timezone).
*   **text**: The text of the stream item. The maximum length of a stream item is 200 characters. 
*   **sourceType**: A string denoting the source, i.e. either Twitter, Reddit, News or Facebook.
*   **source**: This is the original post content formated as JSON (ir_datasets ignores this field).

Since, some types of content are longer than others (compare a news article vs. a tweet for instance), for long-form content we perform sentence segmentation, so one input post might form multiple stream items. In these cases, the 'sentenceID' component of the streamID denotes the number of the sentence in the source content.


The dataset is structured by day and event. To access the stream items for a particular \<event,day\> pair we generate a request string specifying the day and event we want, of the form:

*   '**crisisfacts/\<eventNo\>/\<day\>**'

## Importing data

In [4]:
import ir_datasets
import pandas as pd

def fetch_crisisfacts_data(event_id, dates):
    """
    Fetches data for a specified CrisisFACTS event and a list of dates.
    
    Parameters:
        event_id (str): The CrisisFACTS event identifier (e.g., '001').
        dates (list): A list of dates in 'YYYY-MM-DD' format for which to fetch the data.
    
    Returns:
        pd.DataFrame: A DataFrame containing all the data fetched for the specified dates.
    """
    all_data = []  # Initialize a list to store data for all dates

    for date in dates:
        try:
            # Load the dataset for the given event and date
            dataset = ir_datasets.load(f'crisisfacts/{event_id}/{date}')
            
            # Collect all documents from the iterator
            data = list(dataset.docs_iter())
            all_data.extend(data)  # Append data for this date to the main list
            
            print(f"Successfully fetched data for date: {date} ({len(data)} items)")
        
        except Exception as e:
            print(f"Error fetching data for date {date}: {e}")
    
    # Convert the collected data to a Pandas DataFrame
    df = pd.DataFrame(all_data)
    return df

# List of dates for which to fetch data
dates = [
    "2020-01-23",
    "2020-01-24",
    "2020-01-25",
    "2020-01-26",
    "2020-01-27",
    "2020-01-28",
    "2020-01-29"
]

# Event identifier
event_id = "010"

# Fetch the data
data_df = fetch_crisisfacts_data(event_id, dates)

# Show a preview of the DataFrame
print(data_df.head())

  from pandas.core import (


Successfully fetched data for date: 2020-01-23 (11207 items)
Successfully fetched data for date: 2020-01-24 (23981 items)
Successfully fetched data for date: 2020-01-25 (5000 items)
Successfully fetched data for date: 2020-01-26 (10698 items)
Successfully fetched data for date: 2020-01-27 (10574 items)
Successfully fetched data for date: 2020-01-28 (2931 items)
Successfully fetched data for date: 2020-01-29 (709 items)
                        doc_id            event  \
0  CrisisFACTS-010-Twitter-0-0  CrisisFACTS-010   
1  CrisisFACTS-010-Twitter-0-1  CrisisFACTS-010   
2  CrisisFACTS-010-Twitter-1-0  CrisisFACTS-010   
3  CrisisFACTS-010-Twitter-2-0  CrisisFACTS-010   
4  CrisisFACTS-010-Twitter-3-0  CrisisFACTS-010   

                                           text  \
0                                  @TheNerdyEsq   
1                            All the shout outs   
2  Typo- was supposed to be good sportsmanship!   
3                                           ???   
4              

## Converting to Pandas DataFrame:

In [5]:
twitter_data = data_df[data_df['source_type'] == "Twitter"]
print(twitter_data.head())

                        doc_id            event  \
0  CrisisFACTS-010-Twitter-0-0  CrisisFACTS-010   
1  CrisisFACTS-010-Twitter-0-1  CrisisFACTS-010   
2  CrisisFACTS-010-Twitter-1-0  CrisisFACTS-010   
3  CrisisFACTS-010-Twitter-2-0  CrisisFACTS-010   
4  CrisisFACTS-010-Twitter-3-0  CrisisFACTS-010   

                                           text  \
0                                  @TheNerdyEsq   
1                            All the shout outs   
2  Typo- was supposed to be good sportsmanship!   
3                                           ???   
4                       CEO of a market economy   

                                              source source_type  \
0  {"created_at":"Thu Jan 23 00:00:03 +0000 2020"...     Twitter   
1  {"created_at":"Thu Jan 23 00:00:03 +0000 2020"...     Twitter   
2  {"created_at":"Thu Jan 23 00:00:06 +0000 2020"...     Twitter   
3  {"created_at":"Thu Jan 23 00:00:20 +0000 2020"...     Twitter   
4  {"created_at":"Thu Jan 23 00:00:36 +0000 20

In [6]:
reddit_data = data_df[data_df['source_type'] == "Reddit"]
print(reddit_data.head())

                          doc_id            event  \
1753  CrisisFACTS-010-Reddit-0-0  CrisisFACTS-010   
1754  CrisisFACTS-010-Reddit-0-1  CrisisFACTS-010   
1755  CrisisFACTS-010-Reddit-0-2  CrisisFACTS-010   
1756  CrisisFACTS-010-Reddit-0-3  CrisisFACTS-010   
1757  CrisisFACTS-010-Reddit-0-4  CrisisFACTS-010   

                                                   text  \
1753  The interesting bits from "Siege of Terra : So...   
1754                    This is not a detailed summary.   
1755  It's a collection of interesting stuff and plo...   
1756  With that said, lets go.\n\n&#x200B;\n\n&#x200...   
1757  The new master of Terra doesn't have the wisdo...   

                                                 source source_type  \
1753  {"subreddit_display": "Fun is over, get back t...      Reddit   
1754  {"subreddit_display": "Fun is over, get back t...      Reddit   
1755  {"subreddit_display": "Fun is over, get back t...      Reddit   
1756  {"subreddit_display": "Fun is over, 

In [7]:
news_data = data_df[data_df['source_type'] == "News"]
print(news_data.head())

                       doc_id            event  \
314  CrisisFACTS-010-News-0-0  CrisisFACTS-010   
315  CrisisFACTS-010-News-0-1  CrisisFACTS-010   
316  CrisisFACTS-010-News-0-2  CrisisFACTS-010   
317  CrisisFACTS-010-News-0-3  CrisisFACTS-010   
318  CrisisFACTS-010-News-0-4  CrisisFACTS-010   

                                                  text  \
314  The Global Trust Crisis: As the world’s politi...   
315  In recent months, protests have multiplied acr...   
316  Unlike previous waves of protests such as the ...   
317  Unless the leaders convening at Davos can form...   
318           Why are so many people so disillusioned?   

                                                source source_type  \
314  {"id": "foreignpolicy--2020-01-22--The Global ...        News   
315  {"id": "foreignpolicy--2020-01-22--The Global ...        News   
316  {"id": "foreignpolicy--2020-01-22--The Global ...        News   
317  {"id": "foreignpolicy--2020-01-22--The Global ...        News   


## Data Preprocessing and cleaning
Using Spark for scalability:

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length, lower, regexp_replace

def preprocess_data(dataframes, spark):
    """
    Preprocess multiple datasets by cleaning text data.
    
    Parameters:
        dataframes (list): List of Spark DataFrames to be cleaned.
        spark (SparkSession): An active Spark session.
    
    Returns:
        DataFrame: A single cleaned DataFrame combining all datasets.
    """
    combined_df = None  # Initialize variable to store combined DataFrame

    for df in dataframes:
        # Clean the text data
        cleaned = df.withColumn("text", regexp_replace(col("text"), "[^a-zA-Z0-9\s]", "")) \
                    .withColumn("text", lower(col("text"))) \
                    .filter(length(col("text")) > 20)
        
        # Combine with the main DataFrame
        if combined_df is None:
            combined_df = cleaned
        else:
            combined_df = combined_df.union(cleaned)
    
    return combined_df

# Initialize Spark session
spark = SparkSession.builder.appName("CRISISFacts Preprocessing").getOrCreate()

# Create Spark DataFrames for Twitter, Reddit, and News data
twitter_df = spark.createDataFrame(twitter_data)  
reddit_df = spark.createDataFrame(reddit_data) 
news_df = spark.createDataFrame(news_data) 

# Combine all datasets into a list
all_dataframes = [twitter_df, reddit_df, news_df]

# Preprocess all datasets
cleaned_df = preprocess_data(all_dataframes, spark)

# Show the first few rows of the cleaned data
cleaned_df.show()

# Save the cleaned DataFrame for further processing
cleaned_df.write.mode("overwrite").parquet("cleaned_crisisfacts_data.parquet")

24/12/08 10:43:32 WARN Utils: Your hostname, Pranavs-MacBook-Air-2.local resolves to a loopback address: 127.0.0.1; using 10.0.0.14 instead (on interface en0)
24/12/08 10:43:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/08 10:43:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/08 10:43:39 WARN TaskSetManager: Stage 0 contains a task of very large size (27478 KiB). The maximum recommended task size is 1000 KiB.
24/12/08 10:43:44 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
                                                                                

+--------------------+---------------+--------------------+--------------------+-----------+--------------+
|              doc_id|          event|                text|              source|source_type|unix_timestamp|
+--------------------+---------------+--------------------+--------------------+-----------+--------------+
|CrisisFACTS-010-T...|CrisisFACTS-010|typo was supposed...|{"created_at":"Th...|    Twitter|    1579737606|
|CrisisFACTS-010-T...|CrisisFACTS-010|ceo of a market e...|{"created_at":"Th...|    Twitter|    1579737636|
|CrisisFACTS-010-T...|CrisisFACTS-010|stopping the cloc...|{"created_at":"Th...|    Twitter|    1579737656|
|CrisisFACTS-010-T...|CrisisFACTS-010|do you know someo...|{"created_at":"Th...|    Twitter|    1579737671|
|CrisisFACTS-010-T...|CrisisFACTS-010|realestate httpst...|{"created_at":"Th...|    Twitter|    1579737671|
|CrisisFACTS-010-T...|CrisisFACTS-010|do you know someo...|{"created_at":"Th...|    Twitter|    1579737675|
|CrisisFACTS-010-T...|Crisis

24/12/08 10:43:45 WARN TaskSetManager: Stage 1 contains a task of very large size (27478 KiB). The maximum recommended task size is 1000 KiB.
24/12/08 10:43:46 WARN MemoryManager: Total allocation exceeds 95.00% (926,914,958 bytes) of heap memory
Scaling row group sizes to 98.66% for 7 writers
24/12/08 10:43:46 WARN MemoryManager: Total allocation exceeds 95.00% (926,914,958 bytes) of heap memory
Scaling row group sizes to 86.33% for 8 writers
24/12/08 10:43:55 WARN MemoryManager: Total allocation exceeds 95.00% (926,914,958 bytes) of heap memory
Scaling row group sizes to 98.66% for 7 writers
                                                                                

In [9]:
# Save the cleaned data as CSV for compatibility with other tools
cleaned_df.write.mode("overwrite").csv("cleaned_crisisfacts_data.csv", header=True)

24/12/08 10:46:04 WARN TaskSetManager: Stage 2 contains a task of very large size (27478 KiB). The maximum recommended task size is 1000 KiB.
                                                                                