In [2]:
# imports
import wget
from zipfile import ZipFile
import os
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark import SparkFiles
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, MapType
from pyspark.sql.functions import udf
import pandas as pd

In [83]:
#!pip install wget

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25ldone
[?25h  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9675 sha256=0eb774c262e23e9ab35aeefb118a66a4407436a9052ee1c92e0f55025c57c470
  Stored in directory: /Users/kovila/Library/Caches/pip/wheels/bd/a8/c3/3cf2c14a1837a4e04bd98631724e81f33f462d86a1d895fae0
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2
You should consider upgrading via the '/opt/anaconda3/bin/python -m pip install --upgrade pip' command.[0m[33m
[0m

TODO:
- delete csv option
- delete zips option
- create history files if they do not exist
- add columns to data
- single file option for tests

In [3]:
# global variables
MASTER_FILELIST_FILEPATH = 'masterfilelist-translation.txt'
DOWNLOAD_CSV_PATH = './gdelt_data/'
START_URL = 'http://data.gdeltproject.org/gdeltv2/'
HISTORY_EXTRACTED_FILEPATH = 'history_extracted'
HISTORY_LOADED_FILEPATH = 'history_loaded'
EVENTS_COLUMN_HEADERS = './gdelt_columns/events_column_headers'
YEAR = '2022'
MONTH = '06'

## get urls from master filelist

In [4]:
# WARNING WHY ARE SOME URLS MISSING???

def get_zip_urls_from_master_filelist(year_list, month_list, day_list, zip_type, master_filelist_path='masterfilelist-translation.txt', start_url='http://data.gdeltproject.org/gdeltv2/'):
    
    # zip_type: events | mentions | gkg
    zip_type_token = ''
    if zip_type == 'events':
        zip_type_token = '.export.'
    elif zip_type == 'mentions':
        zip_type_token = '.mentions.'
    elif zip_type == 'gkg':
        zip_type_token = '.gkg.'
    else:
        raise Exception('zip_type should be one of: events | mentions | gkg')
        
    # get masterfile list path
    with open(master_filelist_path) as f:
        raw_file_list = f.readlines()
    
    raw_file_list = [line.split() for line in raw_file_list]
    
    # extract zip urls from masterfile list path
    zip_urls = []
    for i in range(len(raw_file_list)):
        try:
            zip_urls.append(raw_file_list[i][2])
        except Exception:  
            pass
        
        
    # filter specified year, month and day
    filtered_zip_urls = []
    
    for year in year_list:
        for month in month_list:
            if day_list is None:
                filtered_zip_urls = filtered_zip_urls + [file for file in zip_urls if (start_url + year + month in file) and (zip_type_token in file)]
            else:
                for day in day_list:
                    filtered_zip_urls = filtered_zip_urls + [file for file in zip_urls if (start_url + year + month + day in file) and (zip_type_token in file)]

                
    return filtered_zip_urls


## download and extract urls

In [5]:
def download_and_extract(zip_urls, download_zip_path, start_url='http://data.gdeltproject.org/gdeltv2/'):
    
    extracted_filenames = []
    
    # create download path of it does not exist
    if not os.path.exists(download_zip_path):
        os.makedirs(download_zip_path)

    for zip_url in zip_urls:
        
        downloaded_filename = zip_url.replace(start_url,'')
        
        # do not downloaded if already exists
        already_downloaded = os.path.exists(download_zip_path+downloaded_filename)
    
        # download zip file
        if not already_downloaded:
            wget.download(zip_url, out=download_zip_path+downloaded_filename)
        extracted_filename = downloaded_filename.replace('.zip','')
    
        # do not unzip if already exists
        already_extracted = os.path.exists(download_zip_path+extracted_filename)
    
        # unzip file
        if not already_extracted:
            with ZipFile(download_zip_path+downloaded_filename, 'r') as zip_ref:
                zip_ref.extractall(download_zip_path)    
        
        # delete zip file
        os.remove(download_zip_path+downloaded_filename)
        
        extracted_filenames = extracted_filenames + [extracted_filename]
    
    return extracted_filenames

## data schema

In [6]:
# EVENTS SCHEMA
# StructField(field_name, field_type, nullable)
events_schema = StructType([
    
    StructField("events_GLOBALEVENTID", StringType(), True),
    StructField("SQLDATE", StringType(), True),
    StructField("MonthYear", IntegerType(), True),
    StructField("Year", IntegerType(), True),
    
    StructField("FractionDate", FloatType(), True),
    StructField("Actor1Code", StringType(), True),
    StructField("Actor1Name", StringType(), True),
    StructField("Actor1CountryCode", StringType(), True),
    StructField("Actor1KnownGroupCode", StringType(), True),
    
    StructField("Actor1EthnicCode", StringType(), True),
    StructField("Actor1Religion1Code", StringType(), True),
    StructField("Actor1Religion2Code", StringType(), True),
    StructField("Actor1Type1Code", StringType(), True),
    StructField("Actor1Type2Code", StringType(), True),
    
    StructField("Actor1Type3Code", StringType(), True),
    StructField("Actor2Code", StringType(), True),
    StructField("Actor2Name", StringType(), True),
    StructField("Actor2CountryCode", StringType(), True),
    StructField("Actor2KnownGroupCode", StringType(), True),
    
    StructField("Actor2EthnicCode", StringType(), True),
    StructField("Actor2Religion1Code", StringType(), True),
    StructField("Actor2Religion2Code", StringType(), True),
    StructField("Actor2Type1Code", StringType(), True),
    StructField("Actor2Type2Code", StringType(), True),
    
    StructField("Actor2Type3Code", StringType(), True),
    StructField("IsRootEvent", IntegerType(), True),
    StructField("EventCode", StringType(), True),
    StructField("EventBaseCode", StringType(), True),
    StructField("EventRootCode", StringType(), True),
    
    StructField("QuadClass", IntegerType(), True),
    StructField("GoldsteinScale", FloatType(), True),
    StructField("NumMentions", IntegerType(), True),
    StructField("NumSources", IntegerType(), True),
    StructField("NumArticles", IntegerType(), True),
    
    StructField("AvgTone", FloatType(), True),
    StructField("Actor1Geo_Type", IntegerType(), True),
    StructField("Actor1Geo_FullName", StringType(), True),
    StructField("Actor1Geo_CountryCode", StringType(), True),
    StructField("Actor1Geo_ADM1Code", StringType(), True),
    
    StructField("Actor1Geo_ADM2Code", StringType(), True),
    StructField("Actor1Geo_Lat", FloatType(), True),
    StructField("Actor1Geo_Long", FloatType(), True),
    StructField("Actor1Geo_FeatureID", StringType(), True),
    StructField("Actor2Geo_Type", IntegerType(), True),
    
    StructField("Actor2Geo_FullName", StringType(), True),
    StructField("Actor2Geo_CountryCode", StringType(), True),
    StructField("Actor2Geo_ADM1Code", StringType(), True),
    StructField("Actor2Geo_ADM2Code", StringType(), True),
    StructField("Actor2Geo_Lat", FloatType(), True),
    
    StructField("Actor2Geo_Long", FloatType(), True),
    StructField("Actor2Geo_FeatureID", StringType(), True),
    StructField("ActionGeo_Type", IntegerType(), True),
    StructField("ActionGeo_FullName", StringType(), True),
    StructField("ActionGeo_CountryCode", StringType(), True),
    
    StructField("ActionGeo_ADM1Code", StringType(), True),
    StructField("ActionGeo_ADM2Code", StringType(), True),
    StructField("ActionGeo_Lat", FloatType(), True),
    StructField("ActionGeo_Long", FloatType(), True),
    StructField("ActionGeo_FeatureID", StringType(), True),
    
    StructField("DATEADDED", IntegerType(), True),
    StructField("SOURCEURL", StringType(), True)
])

In [7]:
# MENTIONS SCHEMA
# StructField(field_name, field_type, nullable)
mentions_schema = StructType([
    
    StructField("mentions_GLOBALEVENTID", StringType(), True),
    StructField("EventTimeDate", IntegerType(), True),
    StructField("MentionTimeDate", IntegerType(), True),
    StructField("MentionType", IntegerType(), True),
    StructField("MentionSourceName", StringType(), True),
    
    StructField("MentionIdentifier", StringType(), True),
    StructField("SentenceID", IntegerType(), True),
    StructField("Actor1CharOffset", IntegerType(), True),
    StructField("Actor2CharOffset", IntegerType(), True),
    StructField("ActionCharOffset", IntegerType(), True),
    
    StructField("InRawText", IntegerType(), True),
    StructField("Confidence", IntegerType(), True),
    StructField("MentionDocLen", IntegerType(), True),
    StructField("MentionDocTone", FloatType(), True),
    StructField("MentionDocTranslationInfo", StringType(), True),
    
    StructField("Extras", StringType(), True)    
])

In [8]:
SCHEMA_DICTIONARY = {
    'events':events_schema,
    'mentions':mentions_schema,
    'gkg':None
}

## spark session

In [9]:
# create spark session
SPARK = SparkSession.builder.master('local') \
    .appName('SparkSession') \
    .config("spark.mongodb.read.connection.uri", "mongodb://tp-hadoop-50/") \
    .config("spark.mongodb.write.connection.uri", "mongodb://tp-hadoop-50/") \
    .getOrCreate()

## spark_read_csv

In [10]:
def spark_read_csv(spark_session, csv_filepath, csv_file_list, schema_dictionary, csv_type):

    df_read = None
    
    for file in csv_file_list:
        
        # file_type: events | mentions | gkg
        csv_type_token = ''
        schema = None
        
        if csv_type == 'events':
            csv_type_token = '.export.'
            schema = schema_dictionary['events']
        elif csv_type == 'mentions':
            csv_type_token = '.mentions.'
            schema = schema_dictionary['mentions']
        elif csv_type == 'gkg':
            csv_type_token = '.gkg.'
            schema = schema_dictionary['gkg']
        else:
            raise Exception('csv_type should be one of: events | mentions | gkg')
        
        # read csv
        df = spark_session.read.options(delimiter='\t').csv(csv_filepath+file, schema=schema)
        
        if df_read is None:
            df_read = df.select('*')
        else:
            df_read = df_read.unionByName(df)
        
    return df_read

## transform events data

In [11]:
def transform_events_data(events_df):
    
    events_columns = ['events_GLOBALEVENTID', 'SQLDATE', 'ActionGeo_CountryCode']
    transformed_df = events_df.select(events_columns)
    
    return transformed_df

## write spark dataframe to mongodb collection

In [12]:
def load_mongodb(spark_dataframe, mongodb_database, mongodb_collection):
    spark_dataframe.write.format('mongodb').option("database",mongodb_database).option("collection", mongodb_collection).mode("append").save()
    

## ETL EVENTS DATA

In [12]:
def etl_events(year_list, month_list, day_list, schema_dictionary, spark_session, mongodb_database, mongodb_collection, download_csv_path, start_url):
    
    # get events urls from master filelist
    zip_urls = get_zip_urls_from_master_filelist(year_list=year_list, month_list=month_list, day_list=day_list, zip_type='events')

    # download and extract csv
    extracted_csvs = download_and_extract(zip_urls, download_zip_path=download_csv_path, start_url=start_url)

    # read csv to spark dataframe
    events_df = spark_read_csv(spark_session=spark_session, csv_filepath=download_csv_path, csv_file_list=extracted_csvs, schema_dictionary=schema_dictionary, csv_type='events')

    # transform events data
    events_df = transform_events_data(events_df)
    
    # load events data to mongodb
    load_mongodb(events_df, mongodb_database=mongodb_database, mongodb_collection=mongodb_collection)
    
    # delete zip file
    os.remove(download_zip_path+extracted_csvs)
    

In [14]:
zip_urls = get_zip_urls_from_master_filelist(year_list=['2022'], month_list=['01'], day_list=['01','02','03'], zip_type='events')
extracted_csvs = download_and_extract(zip_urls, download_zip_path='./gdelt_data/', start_url='http://data.gdeltproject.org/gdeltv2/')
#events_df = transform_events_data(events_df)

NameError: name 'download_csv_path' is not defined

In [16]:
events_df = spark_read_csv(spark_session=SPARK, csv_filepath='./gdelt_data/', csv_file_list=extracted_csvs, schema_dictionary=SCHEMA_DICTIONARY, csv_type='events')


In [18]:
events_df.toPandas()

Unnamed: 0,events_GLOBALEVENTID,SQLDATE,MonthYear,Year,FractionDate,Actor1Code,Actor1Name,Actor1CountryCode,Actor1KnownGroupCode,Actor1EthnicCode,...,ActionGeo_Type,ActionGeo_FullName,ActionGeo_CountryCode,ActionGeo_ADM1Code,ActionGeo_ADM2Code,ActionGeo_Lat,ActionGeo_Long,ActionGeo_FeatureID,DATEADDED,SOURCEURL
0,1021421880,20211225,202112,2021,2021.972656,LTU,LITHUANIA,LTU,,,...,4,"Nemunas, Lithuania (general), Lithuania",LH,LH00,19135,55.301899,21.381701,-2617099,,http://www.pasienis.lt/lit/Ccfec91be70bad25853...
1,1021421881,20211231,202112,2021,2021.989014,CVL,VILLAGE,,,,...,4,"Friguia, Faranah, Guinea",GV,GV09,40719,10.033300,-10.950000,-1179975,,https://aminata.com/assistance-financiere-rusa...
2,1021421882,20220101,202201,2022,2022.002686,,,,,,...,1,South Africa,SF,SF,,-30.000000,26.000000,SF,,https://www.cadena3.com/noticia/sociedad/el-go...
3,1021421883,20220101,202201,2022,2022.002686,,,,,,...,4,"Sao Pedro, Minas Gerais, Brazil",BR,BR15,40197,-20.650000,-47.016701,-671921,,https://www.em.com.br/app/noticia/internaciona...
4,1021421884,20220101,202201,2022,2022.002686,,,,,,...,1,Greece,GR,GR,,39.000000,22.000000,GR,,https://www.em.com.br/app/noticia/internaciona...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99041,1021698394,20220103,202201,2022,2022.008179,VEN,VENEZUELAN,VEN,,,...,1,United States,US,US,,39.828175,-98.579498,US,,https://www.pagina12.com.ar/393222-al-menos-24...
99042,1021698395,20220103,202201,2022,2022.008179,VEN,VENEZUELA,VEN,,,...,4,"Arauca, Arauca, Colombia",CO,CO03,31882,7.084510,-70.755402,-577427,,https://www.pagina12.com.ar/393222-al-menos-24...
99043,1021698396,20220103,202201,2022,2022.008179,VEN,VENEZUELA,VEN,,,...,4,"Arauquita, Barinas, Venezuela",VE,VE05,32091,8.708330,-69.657204,-936118,,https://www.pagina12.com.ar/393222-al-menos-24...
99044,1021698397,20220103,202201,2022,2022.008179,VENREL,VENEZUELA,VEN,,,...,4,"Arauquita, Barinas, Venezuela",VE,VE05,32091,8.708330,-69.657204,-936118,,https://www.pagina12.com.ar/393222-al-menos-24...


In [22]:
df1 = events_df.toPandas()['events_GLOBALEVENTID']

AttributeError: 'Series' object has no attribute 'values_count'

In [23]:
df1 = events_df.toPandas()

In [27]:
df1['events_GLOBALEVENTID'].values_count()

AttributeError: 'Series' object has no attribute 'values_count'

In [13]:
etl_events(year_list=['2022'], month_list=['01'], day_list=['01','02','03'], schema_dictionary=SCHEMA_DICTIONARY, spark_session=SPARK, mongodb_database='test', mongodb_collection='events', download_csv_path='./gdelt_data/', start_url='http://data.gdeltproject.org/gdeltv2/')



Py4JJavaError: An error occurred while calling o3787.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: mongodb. Please find packages at
https://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: mongodb.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661)
	at org.apache.spark.sql.execution.datasources.DataSource$$$Lambda$1549/1171936102.apply(Unknown Source)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661)
	at org.apache.spark.sql.execution.datasources.DataSource$$$Lambda$1548/284708524.apply(Unknown Source)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661)
	... 16 more


In [30]:
def transform_mentions_data(spark_session, mentions_df, mongodb_events_collection, mongodb_database):
    
    # filter mentions columns
    mentions_columns = ['mentions_GLOBALEVENTID', 'MentionIdentifier', 'MentionDocTranslationInfo']
    mentions_columns = ['mentions_GLOBALEVENTID', 'MentionIdentifier']
    
    mentions_df = mentions_df.select(mentions_columns)
    
    # find event GLOBALEVENTID from mongodb events collection
    events_df = spark_session.read.format("mongodb").option("database",mongodb_database).option("collection", mongodb_events_collection).load()
    
    # join events and mentions
    mentions_events_df = mentions_df.join(events_df, events_df.events_GLOBALEVENTID == mentions_df.mentions_GLOBALEVENTID, 'left')
    
    # create nested mentions df
    build_nested_event_udf = udf(lambda SQLDATE, ActionGeo_CountryCode: {
        'SQLDATE': SQLDATE,
        'ActionGeo_CountryCode': ActionGeo_CountryCode
    }, MapType(StringType(), StringType()))

    mentions_events_df = (
        mentions_events_df
        .withColumn('event_fields', build_nested_event_udf(mentions_events_df['SQLDATE'], mentions_events_df['ActionGeo_CountryCode']))
        .drop('SQLDATE')
        .drop('ActionGeo_CountryCode')
        .drop('events_GLOBALEVENTID')
    )
    
    return mentions_events_df

In [None]:
def etl_mentions(year_list, month_list, day_list, schema_dictionary, spark_session, mongodb_database, mongodb_mentions_collection, mongodb_events_collection, download_csv_path, start_url):
    
    # get events urls from master filelist
    zip_urls = get_zip_urls_from_master_filelist(year_list=year_list, month_list=month_list, day_list=day_list, zip_type='mentions')

    # download and extract csv
    extracted_csvs = download_and_extract(zip_urls, download_zip_path=download_csv_path, start_url=start_url)

    # read csv to spark dataframe
    mentions_df = spark_read_csv(spark_session, csv_filepath=download_csv_path, csv_file_list=extracted_csvs, schema_dictionary=schema_dictionary, csv_type='mentions')

    # transform events data
    mentions_df = transform_mentions_data(spark_session=spark_session, mentions_df=mentions_df, mongodb_events_collection=mongodb_events_collection, mongodb_database=mongodb_database)
    mentions_df.show()
    # load events data to mongodb
    #load_mongodb(mentions_df, mongodb_database=mongodb_database, mongodb_collection=mongodb_mentions_collection)
    mentions_df.write.format('mongodb').option("database",mongodb_database).option("collection", mongodb_collection).mode("append").save()



In [None]:
#etl_mentions(year_list=['2022'], month_list=['01'], day_list=['01','02','03'], schema_dictionary=SCHEMA_DICTIONARY, spark_session=SPARK, mongodb_database='test', mongodb_mentions_collection='mentions', mongodb_events_collection='events', download_csv_path='./gdelt_data/', start_url='http://data.gdeltproject.org/gdeltv2/')



In [None]:
#zip_urls = get_zip_urls_from_master_filelist(year_list=['2022'], month_list=['01'], day_list=['01','02','03'], zip_type='mentions')

