In [0]:
%pip install scrapy
# Databricks notebook source
from pyspark.sql import SparkSession
import builtins
from pyspark.sql.functions import current_timestamp, to_date, mean, abs, lit, datediff, col
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, DoubleType
from datetime import datetime
import requests
from scrapy import Selector
import io
import zipfile
import tempfile
import os
# COMMAND ----------

# Configuration
bucket_name = "datathonfactored2024"
events_folder = "GDELT Event Files"
gkg_folder = "GDELT GKG Files"
gkgcounts_folder = "GDELT GKG Files/gkgcounts"
bronze_table_name = "bronze_layer"
silver_table_name = "silver_layer"
gold_table_name = "gold_layer"
gdelt_schema = StructType([
    StructField("GlobalEventID", IntegerType(), False),
    StructField("Day", IntegerType(), False),
    StructField("MonthYear", IntegerType(), False),
    StructField("Year", IntegerType(), False),
    StructField("FractionDate", DoubleType(), False),
    StructField("Actor1Code", StringType(), True),
    StructField("Actor1Name", StringType(), True),
    StructField("Actor1CountryCode", StringType(), True),
    StructField("Actor1KnownGroupCode", StringType(), True),
    StructField("Actor1EthnicCode", StringType(), True),
    StructField("Actor1Religion1Code", StringType(), True),
    StructField("Actor1Religion2Code", StringType(), True),
    StructField("Actor1Type1Code", StringType(), True),
    StructField("Actor1Type2Code", StringType(), True),
    StructField("Actor1Type3Code", StringType(), True),
    StructField("Actor2Code", StringType(), True),
    StructField("Actor2Name", StringType(), True),
    StructField("Actor2CountryCode", StringType(), True),
    StructField("Actor2KnownGroupCode", StringType(), True),
    StructField("Actor2EthnicCode", StringType(), True),
    StructField("Actor2Religion1Code", StringType(), True),
    StructField("Actor2Religion2Code", StringType(), True),
    StructField("Actor2Type1Code", StringType(), True),
    StructField("Actor2Type2Code", StringType(), True),
    StructField("Actor2Type3Code", StringType(), True),
    StructField("IsRootEvent", IntegerType(), True),
    StructField("EventCode", StringType(), True),
    StructField("EventBaseCode", StringType(), True),
    StructField("EventRootCode", StringType(), True),
    StructField("QuadClass", IntegerType(), True),
    StructField("GoldsteinScale", DoubleType(), True),
    StructField("NumMentions", IntegerType(), True),
    StructField("NumSources", IntegerType(), True),
    StructField("NumArticles", IntegerType(), True),
    StructField("AvgTone", DoubleType(), True),
    StructField("Actor1Geo_Type", IntegerType(), True),
    StructField("Actor1Geo_Fullname", StringType(), True),
    StructField("Actor1Geo_CountryCode", StringType(), True),
    StructField("Actor1Geo_ADM1Code", StringType(), True),
    StructField("Actor1Geo_Lat", DoubleType(), True),
    StructField("Actor1Geo_Long", DoubleType(), True),
    StructField("Actor1Geo_FeatureID", StringType(), True),
    StructField("Actor2Geo_Type", IntegerType(), True),
    StructField("Actor2Geo_Fullname", StringType(), True),
    StructField("Actor2Geo_CountryCode", StringType(), True),
    StructField("Actor2Geo_ADM1Code", StringType(), True),
    StructField("Actor2Geo_Lat", DoubleType(), True),
    StructField("Actor2Geo_Long", DoubleType(), True),
    StructField("Actor2Geo_FeatureID", StringType(), True),
    StructField("ActionGeo_Type", IntegerType(), True),
    StructField("ActionGeo_Fullname", StringType(), True),
    StructField("ActionGeo_CountryCode", StringType(), True),
    StructField("ActionGeo_ADM1Code", StringType(), True),
    StructField("ActionGeo_Lat", DoubleType(), True),
    StructField("ActionGeo_Long", DoubleType(), True),
    StructField("ActionGeo_FeatureID", StringType(), True),
    StructField("DATEADDED", IntegerType(), False),
    StructField("SOURCEURL", StringType(), False)
])
# COMMAND ----------

def extract_date(filename):
    try:
        date_str = filename.split('.')[0].split('/')[-1]
        return datetime.strptime(date_str, "%Y%m%d")
    except ValueError:
        return None

# COMMAND ----------

def get_most_recent_date(folder_name):
    files = dbutils.fs.ls(f"s3a://{bucket_name}/{folder_name}")
    dates = [extract_date(file.name) for file in files if extract_date(file.name)]
    return max(dates) if dates else datetime(1970, 1, 1)

# COMMAND ----------

def calculate_importance(df):
    """
    Calculates the importance score for each row in the dataframe.
    """
    window = Window.partitionBy()
    max_num_sources = df.select(F.max("AvgNumSources")).first()[0]
    max_abs_tone = df.select(F.max(abs("AvgAvgTone"))).first()[0]
    
    return df.withColumn(
        "Importance",
        (2 * col("AvgNumSources") / lit(max_num_sources)) + 
        (abs(col("AvgAvgTone")) / lit(max_abs_tone))
    )
# COMMAND ----------

def fetch_new_files(start_date):
    new_files = {'events': []}    

    events_url = "http://data.gdeltproject.org/events/index.html"
    base = "http://data.gdeltproject.org/events/"
    response = requests.get(events_url)
    sel = Selector(text=response.text)
    links = sel.xpath('//a/@href').extract()
    
    for link in links:
        if link.endswith('.zip'):
            date_str = link.split('.')[0]
            try:
                file_date = datetime.strptime(date_str, "%Y%m%d")
                if file_date > start_date:
                    new_files['events'].append(base + link)
            except ValueError:
                continue
    
    # Uncomment and modify this section if you want to include GKG files
    # gkg_url = 'http://data.gdeltproject.org/gkg/index.html'    
    # base = "http://data.gdeltproject.org/gkg/"
    # response = requests.get(gkg_url)
    # sel = Selector(text=response.text)
    # links = sel.xpath('//a/@href').extract()   

    # for link in links:
    #     if link.endswith('.zip'):
    #         date_str = link.split('.')[0]
    #         try:
    #             file_date = datetime.strptime(date_str, "%Y%m%d")
    #             if file_date > start_date:
    #                 file_type = 'gkgcounts' if 'gkgcounts' in link else 'gkg'
    #                 new_files[file_type].append(base + link)
    #         except ValueError:
    #             continue     

    return new_files

# COMMAND ----------

def ingest_to_bronze(file_path):
    df = spark.read.csv(file_path, schema=gdelt_schema, sep="\t")
    df = df.withColumn("ingestion_timestamp", current_timestamp())
    
    df.write.format("delta").mode("append").saveAsTable(bronze_table_name)
    print(f"Ingested {file_path} to bronze table: {bronze_table_name}")

def process_to_silver():
    bronze_df = spark.table(bronze_table_name)
    
    silver_df = bronze_df.select(
        "Day",
        "SOURCEURL",
        "NumSources",
        "AvgTone",
        "GoldsteinScale",
        "NumArticles"
    )
    
    silver_df = silver_df.groupBy("Day", "SOURCEURL").agg(
        mean("NumSources").alias("AvgNumSources"),
        mean("AvgTone").alias("AvgAvgTone"),
        mean("GoldsteinScale").alias("AvgGoldsteinScale"),
        mean("NumArticles").alias("AvgNumArticles")
    )
    
    silver_df = silver_df.withColumn("transform_timestamp", current_timestamp())
    
    silver_df.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
    print(f"Processed data to silver table: {silver_table_name}")

def process_to_gold():
    """
    Transforms all data from Silver to Gold layer, adding the Importance score.
    """
    # Read from Silver table
    silver_df = spark.table(silver_table_name)
    
    # Calculate importance for all rows
    gold_df = calculate_importance(silver_df)
    
    # Write to Gold Delta table
    gold_df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable(gold_table_name)
    
    print(f"All data transformed and saved to {gold_table_name}")

def process_new_files(new_files):
    for file_type, links in new_files.items():
        for link in links:
            file_name = link.split('/')[-1]
            response = requests.get(link)
            file_content = io.BytesIO(response.content)
            
            with zipfile.ZipFile(file_content, 'r') as zip_ref:
                for file_info in zip_ref.infolist():
                    date = file_info.filename.split('.')[0]

                    if file_type == 'events':
                        folder_name = events_folder
                        new_file_name = f"{date}.export.csv"
                    elif file_type == 'gkg':
                        folder_name = gkg_folder
                        new_file_name = f"{date}.{file_type}.csv"
                    elif file_type == 'gkgcounts':
                        folder_name = gkgcounts_folder
                        new_file_name = f"{date}.{file_type}.csv"

                    s3_path = f"/dbfs/mnt/{bucket_name}/{folder_name}/{new_file_name}"
                    
                    with zip_ref.open(file_info) as extracted_file:
                        # Create a temporary file
                        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
                            temp_file.write(extracted_file.read())
                            temp_file_path = temp_file.name

                    # Use dbutils to copy the temp file to S3
                    dbutils.fs.cp(f"file:{temp_file_path}", f"dbfs:/mnt/{bucket_name}/{folder_name}/{new_file_name}")
                    
                    # Remove the temporary file
                    os.unlink(temp_file_path)

                    print(f"Uploaded {new_file_name} to S3 path: s3a://{bucket_name}/{folder_name}/{new_file_name}")
                    ingest_to_bronze(f"dbfs:/mnt/{bucket_name}/{folder_name}/{new_file_name}")

# COMMAND ----------

# Main execution
start_date = get_most_recent_date(events_folder)
new_files = fetch_new_files(start_date)
process_new_files(new_files)

# COMMAND ----------
process_to_silver()
process_to_gold()
print("Data ingestion complete.")