In [None]:
import pandas as pd
import numpy as np
import os
import glob
import logging
import time
import re
import threading
import psutil
from concurrent.futures import ThreadPoolExecutor
import threading
import time
from sqlalchemy import create_engine, inspect, text, MetaData
from sqlalchemy.exc import SQLAlchemyError
import psycopg2.errors
import psycopg2
import csv
from datetime import datetime, timedelta
from io import StringIO
from urllib.parse import urlparse

# clean local csv

In [None]:
new_column_names = {
    'GKGRECORDID': 'gkg_2.1_English_GKGRecordID',
    'DATE': 'gkg_2.1_English_V2.1Date',
    'SourceCollectionIdentifier': 'gkg_2.1_English_V2SourceCollectionIdentifier',
    'SourceCommonName': 'gkg_2.1_English_V2SourceCommonName',
    'DocumentIdentifier': 'gkg_2.1_English_V2DocumentIdentifier',
    'Counts': 'gkg_2.1_English_V1Counts',
    'V2Counts': 'gkg_2.1_English_V2.1Counts',
    'Themes': 'gkg_2.1_English_V1Themes',
    'V2Themes': 'gkg_2.1_English_V2EnhancedThemes',
    'Locations': 'gkg_2.1_English_V1Locations',
    'V2Locations': 'gkg_2.1_English_V2EnhancedLocations',
    'Persons': 'gkg_2.1_English_V1Persons',
    'V2Persons': 'gkg_2.1_English_V2EnhancedPersons',
    'Organizations': 'gkg_2.1_English_V1Organizations',
    'V2Organizations': 'gkg_2.1_English_V2EnhancedOrganizations',
    'V2Tone': 'gkg_2.1_English_V2.1Tone',
    'Dates': 'gkg_2.1_English_V2.1EnhancedDates',
    'GCAM': 'gkg_2.1_English_V2GCAM',
    'SharingImage': 'gkg_2.1_English_V2.1SharingImage',
    'RelatedImages': 'gkg_2.1_English_V2.1RelatedImages',
    'SocialImageEmbeds': 'gkg_2.1_English_V2.1SocialImageEmbeds',
    'SocialVideoEmbeds': 'gkg_2.1_English_V2.1SocialVideoEmbeds',
    'Quotations': 'gkg_2.1_English_V2.1Quotations',
    'AllNames': 'gkg_2.1_English_V2.1AllNames',
    'Amounts': 'gkg_2.1_English_V2.1Amounts',
    'TranslationInfo': 'gkg_2.1_English_V2.1TranslationInfo',
    'Extras': 'gkg_2.1_English_V2ExtrasXML',
    'FinalThemes': 'gkg_2.1_English_V2FinalThemes'
}

In [None]:
# V1 & V2: Define the columns and their respective subcolumns
column_mapping = {
    'gkg_2.1_English_V1Counts': ([
        'gkg_2.1_English_V1.CountsCountType', 'gkg_2.1_English_V1.CountsCount', 'gkg_2.1_English_V1.CountsObjectType', 
        'gkg_2.1_English_V1.CountsLocationType', 'gkg_2.1_English_V1.CountsLocationFullName', 
        'gkg_2.1_English_V1.CountsLocationCountryCode', 'gkg_2.1_English_V1.CountsLocationADM1Code', 
        'gkg_2.1_English_V1.CountsLocationLatitude', 
        'gkg_2.1_English_V1.CountsLocationLongitude', 'gkg_2.1_English_V1.CountsLocationFeatureID'
    ], ';', '#'),
    'gkg_2.1_English_V1Locations': ([
        'gkg_2.1_English_V1Locations.LocationType', 'gkg_2.1_English_V1Locations.LocationFullName', 
        'gkg_2.1_English_V1Locations.LocationCountryCode', 
        'gkg_2.1_English_V1Locations.LocationADM1Code', 'gkg_2.1_English_V1Locations.LocationLatitude', 
        'gkg_2.1_English_V1Locations.LocationLongitude', 'gkg_2.1_English_V1Locations.LocationFeatureID'
    ], ';', '#'),
    'gkg_2.1_English_V2EnhancedLocations': ([
        'gkg_2.1_English_V2EnhancedLocations.LocationType', 'gkg_2.1_English_V2EnhancedLocations.LocationFullName', 
        'gkg_2.1_English_V2EnhancedLocations.LocationCountryCode', 
        'gkg_2.1_English_V2EnhancedLocations.LocationADM1Code', 
        'gkg_2.1_English_V2EnhancedLocations.LocationLatitude', 
        'gkg_2.1_English_V2EnhancedLocations.LocationLongitude', 
        'gkg_2.1_English_V2EnhancedLocations.LocationFeatureID',
        'gkg_2.1_English_V2EnhancedLocations.LocationADM2Code'
    ], ';', '#'),
    'gkg_2.1_English_V2.1EnhancedDates': ([
        'gkg_2.1_English_V2.1EnhancedDates.Resolution', 'gkg_2.1_English_V2.1EnhancedDates.Month', 
        'gkg_2.1_English_V2.1EnhancedDates.Day', 'gkg_2.1_English_V2.1EnhancedDates.Year', 
        'gkg_2.1_English_V2.1EnhancedDates.Offset'
    ], ';', '#'),
    'gkg_2.1_English_V2.1Quotations': ([
        'gkg_2.1_English_V2.1Quotations.Offset', 'gkg_2.1_English_V2.1Quotations.Length',
        'gkg_2.1_English_V2.1Quotations.Verb', 'gkg_2.1_English_V2.1Quotations.Quote'
    ], '#', '|'),
    'gkg_2.1_English_V2.1Amounts': ([
        'gkg_2.1_English_V2.1Amounts.Amount', 'gkg_2.1_English_V2.1Amounts.Object', 
        'gkg_2.1_English_V2.1Amounts.Offset'
    ], ';', ','),
    'gkg_2.1_English_V2.1TranslationInfo': ([
        'gkg_2.1_English_V2.1TranslationInfo.SRCLC', 'gkg_2.1_English_V2.1TranslationInfo.ENG'
    ], ';', ':'),
    'gkg_2.1_English_V2ExtrasXML': ([
        'gkg_2.1_English_V2ExtrasXML.Authors', 'gkg_2.1_English_V2ExtrasXML.Title', 
        'gkg_2.1_English_V2ExtrasXML.BookTitle', 'gkg_2.1_English_V2ExtrasXML.Date', 
        'gkg_2.1_English_V2ExtrasXML.Journal', 
        'gkg_2.1_English_V2ExtrasXML.Volume', 'gkg_2.1_English_V2ExtrasXML.Issue', 
        'gkg_2.1_English_V2ExtrasXML.Pages', 'gkg_2.1_English_V2ExtrasXML.Institution', 
        'gkg_2.1_English_V2ExtrasXML.Publisher', 
        'gkg_2.1_English_V2ExtrasXML.Location', 'gkg_2.1_English_V2ExtrasXML.Marker'
    ], ';', ','),
    'gkg_2.1_English_V2.1Counts': ([
        'gkg_2.1_English_V2.1.CountsCountType', 'gkg_2.1_English_V2.1.CountsCount', 
        'gkg_2.1_English_V2.1.CountsObjectType', 
        'gkg_2.1_English_V2.1.CountsLocationType', 'gkg_2.1_English_V2.1.CountsLocationFullName', 
        'gkg_2.1_English_V2.1.CountsLocationCountryCode', 'gkg_2.1_English_V2.1.CountsLocationADM1Code', 
        'gkg_2.1_English_V2.1.CountsLocationLatitude', 
        'gkg_2.1_English_V2.1.CountsLocationLongitude', 'gkg_2.1_English_V2.1.CountsLocationFeatureID'
    ], ';', '#')
}

In [None]:
# parse all designated columns
def parse_columns_to_subcolumns(df, column_mapping):
    """
    Parse specified columns into subcolumns based on provided mapping.

    Parameters:
    - df: DataFrame containing the data.
    - column_mapping: Dictionary where keys are the names of columns to parse,
                      and values are tuples of (subcolumns, delimiter_block, delimiter_field).

    Returns:
    - df: Updated DataFrame with parsed subcolumns.
    """
    for column_name, (subcolumns, delimiter_block, delimiter_field) in column_mapping.items():
        # Add new subcolumns to the DataFrame
        for subcolumn in subcolumns:
            df[subcolumn] = ''

        # Iterate over each row in the DataFrame
        for idx, row in df.iterrows():
            example_data = row[column_name]

            if pd.isna(example_data) or example_data == '':
                continue

            # Split the data into blocks
            blocks = example_data.split(delimiter_block)

            # Initialize lists to collect parsed subcolumn data
            parsed_subcolumns = {subcolumn: [] for subcolumn in subcolumns}

            # Process each block
            for block in blocks:
                if block:
                    subfeature_values = block.split(delimiter_field)[1:]  # Skip the first empty split before the first '#'

                    # Ensure we have the correct number of subfeatures
                    subfeature_values = (subfeature_values + [''] * len(subcolumns))[:len(subcolumns)]

                    # Append subfeature values to corresponding lists
                    for subcolumn, value in zip(subcolumns, subfeature_values):
                        parsed_subcolumns[subcolumn].append(value)

            # Concatenate multiple values with ';' unless all values are empty
            for subcolumn in subcolumns:
                if all(value == '' for value in parsed_subcolumns[subcolumn]):
                    df.at[idx, subcolumn] = ''
                else:
                    df.at[idx, subcolumn] = ';'.join(parsed_subcolumns[subcolumn])
    return df

In [None]:
# change 1 log name and 2 dir if needed

# Set up logging
logging.basicConfig(filename='parse_log_2020.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
error_file_lock = threading.Lock()
counter_lock = threading.Lock()
processed_counter = 0 # Counter for processed files

# Function to modify a CSV file and save it to a new file
def modify_and_save_csv(input_file, output_file):
    global processed_counter
    start_time = time.time()
    try:
        df = pd.read_csv(input_file, index_col=0)#.drop(['Unnamed: 0'],axis=1)
        
        df.rename(columns=new_column_names, inplace=True) # change ori column name
        df = parse_columns_to_subcolumns(df, column_mapping) # add subcolumns
        df.fillna('None', inplace=True) # fill in blank cells
        df.replace('', 'None', inplace=True)
        df.to_csv(output_file, index=True, header=True)

        logger.info(f"Successfully modified CSV file: {input_file}")

    except Exception as e:
        logger.error(f"Error modifying CSV file {input_file}: {str(e)}")
        
        file_name = os.path.basename(input_file)
        with error_file_lock:
            with open('error_parse.txt', 'a') as f:
                f.write(file_name + '\n')

    finally:
        end_time = time.time()
        duration = end_time - start_time
        logger.info(f"Modification operation took {duration:.2f} seconds for file: {input_file}")
        
        with counter_lock:
            processed_counter += 1
            if processed_counter % 100 == 0:
                print(f"Processed {processed_counter} files.")
    
input_directory = '/Volumes/T5EVO/gdelt_download/2020'
output_directory = '/Volumes/T5EVO/gdelt_download/2020_clean'

# Ensure output directory exists
os.makedirs(output_directory, exist_ok=True)

# Iterate through CSV files in the input directory
csv_files = glob.glob(os.path.join(input_directory, '*.csv'))

max_cores = psutil.cpu_count(logical=False)
print(f"Using {max_cores} CPU cores for processing.")

with ThreadPoolExecutor(max_workers=max_cores) as executor:
    for csv_file in csv_files:
        file_name = os.path.basename(csv_file)
        output_file = os.path.join(output_directory, file_name)
        modify_and_save_csv(csv_file, output_file)

# Insert Modified tables into db

In [None]:
column_types = {
    'gkg_2.1_English_GKGRecordID': 'TEXT',
    'gkg_2.1_English_V2.1Date': 'BIGINT',
    'gkg_2.1_English_V2SourceCollectionIdentifier': 'INT',
    'gkg_2.1_English_V2SourceCommonName': 'TEXT',
    'gkg_2.1_English_V2DocumentIdentifier': 'TEXT',
    'gkg_2.1_English_V1Counts':'TEXT',
    'gkg_2.1_English_V2.1Counts':'TEXT',
    'gkg_2.1_English_V1Themes':'TEXT',
    'gkg_2.1_English_V2EnhancedThemes':'TEXT',
    'gkg_2.1_English_V1Locations':'TEXT',
    'gkg_2.1_English_V2EnhancedLocations':'TEXT',
    'gkg_2.1_English_V1Persons':'TEXT',
    'gkg_2.1_English_V2EnhancedPersons':'TEXT',
    'gkg_2.1_English_V1Organizations':'TEXT',
    'gkg_2.1_English_V2EnhancedOrganizations':'TEXT',
    'gkg_2.1_English_V2.1Tone': 'FLOAT8',
    'gkg_2.1_English_V2.1EnhancedDates':'TEXT',
    'gkg_2.1_English_V2GCAM':'TEXT',
    'gkg_2.1_English_V2.1SharingImage':'TEXT',
    'gkg_2.1_English_V2.1RelatedImages':'TEXT',
    'gkg_2.1_English_V2.1SocialImageEmbeds':'TEXT',
    'gkg_2.1_English_V2.1SocialVideoEmbeds':'TEXT',
    'gkg_2.1_English_V2.1Quotations': 'TEXT',
    'gkg_2.1_English_V2.1AllNames':'TEXT',
    'gkg_2.1_English_V2.1Amounts': 'TEXT',
    'gkg_2.1_English_V2.1TranslationInfo':'TEXT',
    'gkg_2.1_English_V2ExtrasXML': 'TEXT',
    'gkg_2.1_English_V2FinalThemes':'TEXT',
    'gkg_2.1_English_V1.CountsCountType': 'TEXT',
    'gkg_2.1_English_V1.CountsCount': 'TEXT',
    'gkg_2.1_English_V1.CountsObjectType': 'TEXT',
    'gkg_2.1_English_V1.CountsLocationType': 'TEXT',
    'gkg_2.1_English_V1.CountsLocationFullName': 'TEXT',
    'gkg_2.1_English_V1.CountsLocationCountryCode': 'TEXT',
    'gkg_2.1_English_V1.CountsLocationADM1Code': 'TEXT',
    'gkg_2.1_English_V1.CountsLocationLatitude': 'TEXT',
    'gkg_2.1_English_V1.CountsLocationLongitude': 'TEXT',
    'gkg_2.1_English_V1.CountsLocationFeatureID': 'TEXT',
    'gkg_2.1_English_V2EnhancedLocations.LocationType': 'TEXT',
    'gkg_2.1_English_V2EnhancedLocations.LocationFullName': 'TEXT',
    'gkg_2.1_English_V2EnhancedLocations.LocationCountryCode': 'TEXT',
    'gkg_2.1_English_V2EnhancedLocations.LocationADM1Code': 'TEXT',
    'gkg_2.1_English_V2EnhancedLocations.LocationLatitude': 'TEXT',
    'gkg_2.1_English_V2EnhancedLocations.LocationLongitude': 'TEXT',
    'gkg_2.1_English_V2EnhancedLocations.LocationFeatureID': 'TEXT',
    'gkg_2.1_English_V2EnhancedLocations.LocationADM2Code': 'TEXT',
    'gkg_2.1_English_V1Locations.LocationType': 'TEXT',
    'gkg_2.1_English_V1Locations.LocationFullName': 'TEXT',
    'gkg_2.1_English_V1Locations.LocationCountryCode': 'TEXT',
    'gkg_2.1_English_V1Locations.LocationADM1Code': 'TEXT',
    'gkg_2.1_English_V1Locations.LocationLatitude': 'TEXT',
    'gkg_2.1_English_V1Locations.LocationLongitude': 'TEXT',
    'gkg_2.1_English_V1Locations.LocationFeatureID': 'TEXT',
    'gkg_2.1_English_V2.1EnhancedDates.Resolution': 'TEXT',
    'gkg_2.1_English_V2.1EnhancedDates.Month': 'TEXT',
    'gkg_2.1_English_V2.1EnhancedDates.Day': 'TEXT',
    'gkg_2.1_English_V2.1EnhancedDates.Year': 'TEXT',
    'gkg_2.1_English_V2.1EnhancedDates.Offset': 'TEXT',
    'gkg_2.1_English_V2.1Quotations.Offset': 'TEXT',
    'gkg_2.1_English_V2.1Quotations.Length': 'TEXT',
    'gkg_2.1_English_V2.1Quotations.Verb': 'TEXT',
    'gkg_2.1_English_V2.1Quotations.Quote': 'TEXT',
    'gkg_2.1_English_V2.1Amounts.Amount': 'TEXT',
    'gkg_2.1_English_V2.1Amounts.Object': 'TEXT',
    'gkg_2.1_English_V2.1Amounts.Offset': 'TEXT',
    'gkg_2.1_English_V2.1TranslationInfo.SRCLC': 'TEXT',
    'gkg_2.1_English_V2.1TranslationInfo.ENG': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Authors': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Title': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.BookTitle': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Date': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Journal': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Volume': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Issue': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Pages': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Institution': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Publisher': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Location': 'TEXT',
    'gkg_2.1_English_V2ExtrasXML.Marker': 'TEXT',    
    'gkg_2.1_English_V2.1.CountsCountType': 'TEXT',
    'gkg_2.1_English_V2.1.CountsCount': 'TEXT',
    'gkg_2.1_English_V2.1.CountsObjectType': 'TEXT',
    'gkg_2.1_English_V2.1.CountsLocationType': 'TEXT',
    'gkg_2.1_English_V2.1.CountsLocationFullName': 'TEXT',
    'gkg_2.1_English_V2.1.CountsLocationCountryCode': 'TEXT',
    'gkg_2.1_English_V2.1.CountsLocationADM1Code': 'TEXT',
    'gkg_2.1_English_V2.1.CountsLocationLatitude': 'TEXT',
    'gkg_2.1_English_V2.1.CountsLocationLongitude': 'TEXT',
    'gkg_2.1_English_V2.1.CountsLocationFeatureID': 'TEXT'
}

In [None]:
def copy_to_database(df, database_url, schema_name, table_name):
    try:
        start_time = time.time()
        table_name = f'"{schema_name}"."{table_name}"'

        # Generate the CREATE TABLE query based on column names and types
        create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} (\n"
        for col, col_type in column_types.items():
            create_table_sql += f'"{col}" {col_type},\n'
        create_table_sql = create_table_sql.rstrip(',\n') + "\n)"

        with engine.connect() as conn:
            with conn.begin():
                '''
                # Check if the schema exists
                schema_exists = conn.execute(
                    f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{schema_name}'"
                ).fetchone()
                # Create schema if it does not exist
                if not schema_exists:
                    create_schema_sql = f'CREATE SCHEMA "{schema_name}"'
                    conn.execute(create_schema_sql)
                '''
                    
                # Create the table
                conn.execute(create_table_sql)

                copy_sql = f"""
                    COPY {table_name}
                    FROM STDIN
                    WITH CSV HEADER DELIMITER AS '\t'
                """

                # Write DataFrame to a buffer
                buffer = StringIO()
                df.to_csv(buffer, sep='\t', header=True, index=False, quoting=csv.QUOTE_NONNUMERIC)
                buffer.seek(0)
                #print(f"DataFrame written to buffer for {table_name}")

                # Copy data from buffer to database
                conn.connection.cursor().copy_expert(copy_sql, buffer)
                #print(f"Data copied to table {table_name}")

        end_time = time.time()
        insertion_time = end_time - start_time
        logging.info(f"Insertion time for {table_name}: {insertion_time} seconds")

    except Exception as e:
        error_message = f"Error copying data to database for table {table_name}: {e}"
        logging.error(error_message)
        print("copy_to_database error: ", error_message)
        with open('error_parse_insertion.txt', 'a') as error_file:
            error_file.write(f"{file_path} copy_to_database error: {error_message}\n")
        raise
        raise  

def process_csv(file_path, schema_name):
    global processed_files
    try:
        #print(f"Starting process_csv for file: {file_path}")
        name = os.path.basename(file_path)
        #year = name.split(".")[0][:4]

        table_name = name.split(".")[0]

        inspector = inspect(engine)
        if inspector.has_table(table_name, schema=schema_name):
            logging.info(f"Table {table_name} skipped because it exists.")
            return

        # Read the CSV file using pandas within a context manager
        df = pd.read_csv(file_path).drop(["Unnamed: 0"], axis=1)
        #print(f"CSV file {file_path} read into DataFrame")
        
        # Check if DataFrame columns match the expected column types
        if set(df.columns) != set(column_types.keys()):
            error_message = f"Skipping file {file_path}: columns do not match expected schema."
            logging.error(error_message)
            print(error_message)
            with open('error_parse_insertion.txt', 'a') as error_file:
                error_file.write(f"{file_path}\n")
            return

        copy_to_database(df, database_url, schema_name, table_name)

        log_message = f"Successfully wrote table from file: {file_path} at {datetime.now()}"
        logging.info(log_message)
        
        processed_files += 1
        if processed_files % 100 == 0:
            print(f"Processed {processed_files} files so far.")

    except Exception as e:
        error_message = f"Error processing file: {file_path} - {e}"
        logging.error(error_message)
        print("process_csv error: ", error_message)
        with open('error_parse_insertion.txt', 'a') as error_file:
            error_file.write(f"{file_path} process_csv error: {error_message}\n")
        raise  

In [None]:
# change 'root_dir', 2 log names, and 1 'schema_name'

def job(root_dir, schema_name):
    global processed_files
    all_csv_files = []
    for subdir, _, files in os.walk(root_dir):
        for file in files:
            if file.endswith('.csv'):
                all_csv_files.append(os.path.join(subdir, file))
    
    max_cores = psutil.cpu_count(logical=False)
    print(f"Using {max_cores} CPU cores for processing.")

    with ThreadPoolExecutor(max_workers=max_cores) as executor:
        executor.map(lambda file_path: process_csv(file_path, schema_name), all_csv_files)
    #with ThreadPoolExecutor(max_workers=max_cores) as executor:
    #    for file_path in all_csv_files:
    #        executor.submit(process_csv, file_path, schema_name)
    save_processed_files_count(processed_files)

def save_processed_files_count(count):
    with open('processed_files.txt', 'w') as file:
        file.write(f"Processed files: {count}\n")

if __name__ == "__main__":
    database_url = "postgresql://postgres@localhost:5432/gdelt_english"
    try:
        engine = create_engine(database_url)
        with engine.connect() as connection:
            print("Connection successful.")
    except Exception as e:
        print("Connection failed:", e)
    
    processed_files = 0
    
    logging.basicConfig(filename='parse_insertion_log_2017.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)

    file_handler = logging.FileHandler('parse_insertion_log_2017.txt')
    file_handler.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    
    #root_dir = '/Volumes/T5EVO/gdelt_download/2017_clean'
    root_dir = '/Volumes/T5EVO/gdelt_download/missing_zip_empty_csv/empty_2017'
    schema_name = "gdelt_english"
    job(root_dir, schema_name)

# create empty csv from missing timestamps

In [None]:
columns = [
    'gkg_2.1_English_GKGRecordID',
    'gkg_2.1_English_V2.1Date',
    'gkg_2.1_English_V2SourceCollectionIdentifier',
    'gkg_2.1_English_V2SourceCommonName',
    'gkg_2.1_English_V2DocumentIdentifier',
    'gkg_2.1_English_V1Counts',
    'gkg_2.1_English_V2.1Counts',
    'gkg_2.1_English_V1Themes',
    'gkg_2.1_English_V2EnhancedThemes',
    'gkg_2.1_English_V1Locations',
    'gkg_2.1_English_V2EnhancedLocations',
    'gkg_2.1_English_V1Persons',
    'gkg_2.1_English_V2EnhancedPersons',
    'gkg_2.1_English_V1Organizations',
    'gkg_2.1_English_V2EnhancedOrganizations',
    'gkg_2.1_English_V2.1Tone',
    'gkg_2.1_English_V2.1EnhancedDates',
    'gkg_2.1_English_V2GCAM',
    'gkg_2.1_English_V2.1SharingImage',
    'gkg_2.1_English_V2.1RelatedImages',
    'gkg_2.1_English_V2.1SocialImageEmbeds',
    'gkg_2.1_English_V2.1SocialVideoEmbeds',
    'gkg_2.1_English_V2.1Quotations',
    'gkg_2.1_English_V2.1AllNames',
    'gkg_2.1_English_V2.1Amounts',
    'gkg_2.1_English_V2.1TranslationInfo',
    'gkg_2.1_English_V2ExtrasXML',
    'gkg_2.1_English_V2FinalThemes',
    'gkg_2.1_English_V1.CountsCountType',
    'gkg_2.1_English_V1.CountsCount',
    'gkg_2.1_English_V1.CountsObjectType',
    'gkg_2.1_English_V1.CountsLocationType',
    'gkg_2.1_English_V1.CountsLocationFullName',
    'gkg_2.1_English_V1.CountsLocationCountryCode',
    'gkg_2.1_English_V1.CountsLocationADM1Code',
    'gkg_2.1_English_V1.CountsLocationLatitude',
    'gkg_2.1_English_V1.CountsLocationLongitude',
    'gkg_2.1_English_V1.CountsLocationFeatureID',
    'gkg_2.1_English_V2.1.CountsCountType',
    'gkg_2.1_English_V2.1.CountsCount',
    'gkg_2.1_English_V2.1.CountsObjectType',
    'gkg_2.1_English_V2.1.CountsLocationType',
    'gkg_2.1_English_V2.1.CountsLocationFullName',
    'gkg_2.1_English_V2.1.CountsLocationCountryCode',
    'gkg_2.1_English_V2.1.CountsLocationADM1Code',
    'gkg_2.1_English_V2.1.CountsLocationLatitude',
    'gkg_2.1_English_V2.1.CountsLocationLongitude',
    'gkg_2.1_English_V2.1.CountsLocationFeatureID',
    'gkg_2.1_English_V1Locations.LocationType',
    'gkg_2.1_English_V1Locations.LocationFullName',
    'gkg_2.1_English_V1Locations.LocationCountryCode',
    'gkg_2.1_English_V1Locations.LocationADM1Code',
    'gkg_2.1_English_V1Locations.LocationLatitude',
    'gkg_2.1_English_V1Locations.LocationLongitude',
    'gkg_2.1_English_V1Locations.LocationFeatureID',
    'gkg_2.1_English_V2EnhancedLocations.LocationType',
    'gkg_2.1_English_V2EnhancedLocations.LocationFullName',
    'gkg_2.1_English_V2EnhancedLocations.LocationCountryCode',
    'gkg_2.1_English_V2EnhancedLocations.LocationADM1Code',
    'gkg_2.1_English_V2EnhancedLocations.LocationLatitude',
    'gkg_2.1_English_V2EnhancedLocations.LocationLongitude',
    'gkg_2.1_English_V2EnhancedLocations.LocationFeatureID',
    'gkg_2.1_English_V2EnhancedLocations.LocationADM2Code',
    'gkg_2.1_English_V2.1EnhancedDates.Resolution',
    'gkg_2.1_English_V2.1EnhancedDates.Month',
    'gkg_2.1_English_V2.1EnhancedDates.Day',
    'gkg_2.1_English_V2.1EnhancedDates.Year',
    'gkg_2.1_English_V2.1EnhancedDates.Offset',
    'gkg_2.1_English_V2.1Quotations.Offset',
    'gkg_2.1_English_V2.1Quotations.Length',
    'gkg_2.1_English_V2.1Quotations.Verb',
    'gkg_2.1_English_V2.1Quotations.Quote',
    'gkg_2.1_English_V2.1Amounts.Amount',
    'gkg_2.1_English_V2.1Amounts.Object',
    'gkg_2.1_English_V2.1Amounts.Offset',
    'gkg_2.1_English_V2.1TranslationInfo.SRCLC',
    'gkg_2.1_English_V2.1TranslationInfo.ENG',
    'gkg_2.1_English_V2ExtrasXML.Authors',
    'gkg_2.1_English_V2ExtrasXML.Title',
    'gkg_2.1_English_V2ExtrasXML.BookTitle',
    'gkg_2.1_English_V2ExtrasXML.Date',
    'gkg_2.1_English_V2ExtrasXML.Journal',
    'gkg_2.1_English_V2ExtrasXML.Volume',
    'gkg_2.1_English_V2ExtrasXML.Issue',
    'gkg_2.1_English_V2ExtrasXML.Pages',
    'gkg_2.1_English_V2ExtrasXML.Institution',
    'gkg_2.1_English_V2ExtrasXML.Publisher',
    'gkg_2.1_English_V2ExtrasXML.Location',
    'gkg_2.1_English_V2ExtrasXML.Marker'
]

In [None]:
# change 1 'log_file_path' and 1 'csv_file_path'

df = pd.DataFrame(columns=columns)
log_file_path = 'create_missing_csv_log.txt'
txt_file_path = '/Users/macglobalai/Desktop/Gdelt Database/log_record/missing_timestamp_log/error_urls_2017.txt'
with open(txt_file_path, 'r') as file:
    lines = file.readlines()

# extract timestamp from txt lines
timestamp_pattern = re.compile(r'(\d{14})')

# process each line and save the df as a CSV file with the extracted timestamp
with open(log_file_path, 'w') as log_file:
    for line in lines:
        start_time = time.time()
        match = timestamp_pattern.search(line)
        if match:
            timestamp = match.group(1)
            csv_file_path = f'/Volumes/T5EVO/gdelt_download/missing_zip_empty_csv/empty_2017/{timestamp}.csv'
            df.to_csv(csv_file_path, index=True)
            end_time = time.time()
            elapsed_time = end_time - start_time
            log_file.write(f"{timestamp} takes {elapsed_time:.4f} seconds\n")
        else:
            log_file.write(f"Missing {line.strip()}\n")



# check the timestamps numbers among sources

In [None]:
def generate_gdelt_urls(start_year, start_month, start_day, end_year, end_month, end_day):
    start_date = datetime(start_year, start_month, start_day)
    current_date = datetime(end_year, end_month, end_day) 

    delta = current_date - start_date
    total_intervals = int(delta.total_seconds() / 900)

    urls = []

    for i in range(total_intervals):
        interval_date = start_date + timedelta(minutes = 15 * i)

        formatted_date = interval_date.strftime('%Y%m%d%H%M%S')

        #url = f'http://data.gdeltproject.org/gdeltv2/{formatted_date}.gkg.csv.zip'
        urls.append(formatted_date)

    return urls

urls=[]
urls=generate_gdelt_urls(2017,1,1, 2018,1,1) #starts at 20150218230000
len(urls),urls[0]

In [None]:
# Get local numbers and url numbers
dir_clean = '/Volumes/T5EVO/gdelt_download/2017_clean'
dir_missing_zip_empty_csv = '/Volumes/T5EVO/gdelt_download/missing_zip_empty_csv/empty_2017'

def count_files(directory):
    return len([file for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))])

def compare_counts(dir1, dir2, urls):
    count_dir1 = count_files(dir1)
    count_dir2 = count_files(dir2)
    urls_count = len(urls)
    
    print(f"Number of files in '{dir1}': {count_dir1}")
    print(f"Number of files in '{dir2}': {count_dir2}")
    print(f"Number of URLs: {urls_count}")
    
    if (count_dir1+count_dir2) != urls_count:
        print(f"The number of files in total {count_dir1+count_dir2} does not match the number of URLs {urls_count}.")
    else:
        print(f"The number of files in total matches the number of URLs.")

if __name__ == "__main__":
    compare_counts(dir_clean, dir_missing_zip_empty_csv, urls)


In [None]:
# Find out the missing timestamp between local and generated urls

dir_clean = '/Volumes/T5EVO/gdelt_download/2017'
dir_missing_zip_empty_csv = '/Volumes/T5EVO/gdelt_download/missing_zip_empty_csv/empty_2017'

# Function to extract file timestamps from a directory
def get_file_timestamps(directory):
    files = [file for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]
    timestamps = {os.path.splitext(file)[0] for file in files}
    return timestamps

# Function to extract timestamps from URLs
def get_url_timestamps(urls):
    timestamps = {os.path.splitext(os.path.basename(urlparse(url).path))[0] for url in urls}
    return timestamps

# Function to find missing timestamps
def find_missing_timestamps(dir1, dir2, urls):
    timestamps_dir1 = get_file_timestamps(dir1)
    timestamps_dir2 = get_file_timestamps(dir2)
    timestamps_urls = get_url_timestamps(urls)
    
    all_local_timestamps = timestamps_dir1.union(timestamps_dir2)
    missing_timestamps = timestamps_urls - all_local_timestamps
    
    print(f"{len(missing_timestamps)} Missing timestamps: {sorted(missing_timestamps)}")
    return missing_timestamps

if __name__ == "__main__":
    missing_timestamps = find_missing_timestamps(dir_clean, dir_missing_zip_empty_csv, urls)


In [None]:
# Find out the missing timestamps between db and generated urls
import os
from sqlalchemy import create_engine, inspect
from urllib.parse import urlparse

# Database connection URL
database_url = "postgresql://postgres@localhost:5432/gdelt_english"

# Function to extract timestamps from URLs
def get_url_timestamps(urls):
    timestamps = {os.path.splitext(os.path.basename(urlparse(url).path))[0] for url in urls}
    return timestamps

# Function to retrieve table names from the database schema
def get_db_table_names(engine, schema_name):
    inspector = inspect(engine)
    table_names = inspector.get_table_names(schema=schema_name)
    return set(table_names)

# Function to find missing timestamps
def find_missing_timestamps(db_table_names, urls):
    timestamps_urls = get_url_timestamps(urls)
    missing_timestamps = timestamps_urls - db_table_names
    
    print(f"{len(missing_timestamps)} Missing timestamps: {sorted(missing_timestamps)}")
    return missing_timestamps

if __name__ == "__main__":
    # Create a database engine
    engine = create_engine(database_url)
    
    # Define the schema name
    schema_name = "2016"
    
    # Retrieve table names from the database
    db_table_names = get_db_table_names(engine, schema_name)
    
    # Find missing timestamps
    missing_timestamps = find_missing_timestamps(db_table_names, urls)

In [None]:
# # Find out the missing timestamp between local and db

database_url = "postgresql://postgres@localhost:5432/gdelt_english"
schema_name = "2015"
csv_directory = '/Volumes/T5EVO/gdelt_download/aa'

engine = create_engine(database_url)

def get_tables_in_schema(engine, schema):
    with engine.connect() as connection:
        inspector = inspect(engine)
        # Get the list of tables in the schema
        tables = inspector.get_table_names(schema=schema)
        return tables

def get_csv_files(directory):
    csv_files = [os.path.splitext(file)[0] for file in os.listdir(directory) if file.endswith('.csv')]
    return csv_files

def find_missing_tables(tables, csv_files):
    # Find CSV files that are not in the list of tables
    missing_tables = set(csv_files) - set(tables)
    return list(missing_tables)

if __name__ == "__main__":
    # Get list of tables in the schema
    tables = get_tables_in_schema(engine, schema_name)
    
    # Get list of CSV files in the directory
    csv_files = get_csv_files(csv_directory)
    
    # Find missing tables
    missing_tables = find_missing_tables(tables, csv_files)
    
    print(f"{len(missing_tables)} Missing tables (present as CSV files but not in schema '{schema_name}')

# transfer tables between schemas

In [None]:
database_url = "postgresql://postgres@localhost:5432/gdelt_english"
source_schema = "2017"
target_schema = "gdelt_english"

engine = create_engine(database_url)

logging.basicConfig(filename='move_log_2017.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def move_tables(engine, source_schema, target_schema):
    with engine.connect() as connection:
        inspector = inspect(engine)
        
        # Get the list of tables in the source schema
        tables = inspector.get_table_names(schema=source_schema)
        
        for i, table in enumerate(tables, start=1):
            start_time = time.time()
            try:
                # Generate the SQL command to move the table
                move_table_sql = text(f"""
                ALTER TABLE "{source_schema}"."{table}" SET SCHEMA "{target_schema}";
                """)
                
                # Execute the SQL command
                connection.execute(move_table_sql)
                end_time = time.time()
                duration = end_time - start_time
                log_message = f"Moved table {table} from schema {source_schema} to {target_schema} in {duration:.2f} seconds."
                logger.info(log_message)
                
                if i % 1000 == 0:
                    print(f"Moved {i} tables so far.")
                    
            except Exception as e:
                error_message = f"Error moving table {table}: {e}"
                logger.error(error_message)
                print(error_message)

if __name__ == "__main__":
    move_tables(engine, source_schema, target_schema)


# Extra SQL operations

In [None]:
# Change schema name

database_url = "postgresql://postgres@localhost:5432/gdelt_english"
engine = create_engine(database_url)
conn = engine.connect()
sql_command = 'ALTER SCHEMA "2017" RENAME TO "2018";'
conn.execute(sql_command)
conn.close()

In [None]:
# Extract the date column from one example table

def select_column(database_url, schema_name, table_name, column_name):
    try:
        # Create a database engine
        engine = create_engine(database_url)
        
        # Define the full table name with schema
        full_table_name = f'"{schema_name}"."{table_name}"'
        
        # Define the SQL query to select the specific column
        query = text(f'SELECT "{column_name}" FROM {full_table_name}')
        
        # Execute the query and fetch the results
        with engine.connect() as connection:
            result = connection.execute(query)
            data = result.fetchall()
            
            # Print the retrieved data
            for row in data:
                print(row)
    
    except Exception as e:
        print(f"Failed to select column {column_name} from table {schema_name}.{table_name}: {e}")

if __name__ == "__main__":
    database_url = "postgresql://postgres@localhost:5432/gdelt_english"
    schema_name = "2015"
    table_name = "20150218230000"
    column_name = "gkg_2.1_English_V2.1Date"

    select_column(database_url, schema_name, table_name, column_name)

In [None]:
# delete specific table
def delete_table(database_url, schema_name, table_name):
    try:
        engine = create_engine(database_url)
        with engine.connect() as connection:
            # Define the table name with schema
            full_table_name = f'"{schema_name}"."{table_name}"'
            
            # SQL command to drop the table
            drop_table_sql = f"DROP TABLE IF EXISTS {full_table_name}"
            
            # Execute the SQL command
            connection.execute(drop_table_sql)
            print(f"Table {full_table_name} has been successfully deleted.")
    
    except Exception as e:
        print(f"Failed to delete table {schema_name}.{table_name}: {e}")

if __name__ == "__main__":
    database_url = "postgresql://postgres@localhost:5432/gdelt_english"
    schema_name = ""
    table_name = "20150218234500"  
    
    delete_table(database_url, schema_name, table_name)
