In [None]:
import os
import requests
from datetime import datetime, timedelta
import paramiko

# Configurazioni
BASE_URL = "http://data.gdeltproject.org/gdeltv2"
START_DATE = datetime(2025, 1, 1)       # Data di inizio (1 gennaio 2020)
END_DATE = datetime.now()               # Data di fine (oggi)

# Configurazioni SFTP
SFTP_HOST = "93.43.225.19"
SFTP_PORT = 20022
SFTP_USERNAME = "cherryuser"            # Sostituisci con il tuo username
SFTP_PASSWORD = ""          # Sostituisci con la tua password
SFTP_REMOTE_DIR = "/data/Gabriele"      # Cartella remota sul server

# Funzione per scaricare un file e caricarlo via SFTP
def download_and_upload_file(date):
    filename = f"{date.strftime('%Y%m%d%H%M%S')}.gkg.csv.zip"
    url = f"{BASE_URL}/{filename}"

    print(f"Scaricando {url}...")
    try:
        # Scarica il file
        response = requests.get(url, stream=True)
        if response.status_code == 200:
            # Salva il file temporaneamente
            temp_path = f"/tmp/{filename}"
            with open(temp_path, "wb") as file:
                for chunk in response.iter_content(chunk_size=8192):
                    file.write(chunk)
            print(f"File scaricato: {temp_path}")

            # Carica il file sul server via SFTP
            upload_via_sftp(temp_path, filename)
            print(f"File caricato sul server: {filename}")

            # Elimina il file temporaneo
            os.remove(temp_path)
        else:
            print(f"File non trovato: {url}")
    except Exception as e:
        print(f"Errore durante il download: {e}")

# Funzione per caricare un file via SFTP
def upload_via_sftp(local_path, remote_filename):
    try:
        # Connessione al server SFTP
        transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
        transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
        sftp = paramiko.SFTPClient.from_transport(transport)

        # Carica il file
        remote_path = f"{SFTP_REMOTE_DIR}/{remote_filename}"
        sftp.put(local_path, remote_path)
        sftp.close()
        transport.close()
    except Exception as e:
        print(f"Errore durante il caricamento via SFTP: {e}")

# Loop attraverso tutte le date dal 2020 a oggi
current_date = START_DATE
while current_date <= END_DATE:
    download_and_upload_file(current_date)
    current_date += timedelta(minutes=15)  # ogni 15

print("Download e caricamento completati!")

prova con il server dell'ufficio, ha dato problemi ad accedere

In [18]:
import os
import requests
import zipfile
import pandas as pd
import concurrent.futures
from pathlib import Path
from tqdm import tqdm  # Libreria per la barra di avanzamento

# Configurazioni
DEST_DIR = "/Volumes/CHERRY_SSD/gdelt_gkg_2025"
MASTERFILE_URL = "http://data.gdeltproject.org/gdeltv2/masterfilelist.txt"
START_DATE = "20240101"  # Data di inizio
END_DATE= "20241231"
MYANMAR_KEYWORD = "Myanmar"

# Lista delle colonne GKG
GKG_COLUMNS = [
    "GKGRECORDID", "DATE", "SourceCollectionIdentifier", "SourceCommonName", "DocumentIdentifier",
    "Counts", "V2Counts", "Themes", "V2Themes", "Locations", "V2Locations",
    "Persons", "V2Persons", "Organizations", "V2Organizations", "V2Tone",
    "Dates", "GCAM", "SharingImage", "RelatedImages", "SocialImageEmbeds",
    "SocialVideoEmbeds", "Quotations", "AllNames", "Amounts", "TranslationInfo", "ExtrasXML"
]


# Crea la cartella se non esiste
Path(DEST_DIR).mkdir(parents=True, exist_ok=True)

def get_masterfile_list():
    """Scarica la lista dei file disponibili"""
    response = requests.get(MASTERFILE_URL)
    if response.status_code == 200:
        return response.text.split("\n")
    else:
        print("Errore nel download della masterfilelist.")
        return []

def filter_gkg_files(file_list):
    """Filtra i file GKG dal 1 gennaio 2025"""
    gkg_files = []
    for line in file_list:
        parts = line.split()
        if len(parts) < 2:
            continue
        url = parts[2]
        if "gkg" in url and url.endswith(".zip"):
            filename = os.path.basename(url)
            date_str = filename.split(".")[0][:8]  # Prendi solo AAAAMMGG
            if date_str >= START_DATE:
                gkg_files.append(url)
    return gkg_files

def download_file(url, dest_dir):
    """Scarica un file da un URL e lo salva nella directory di destinazione"""
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        file_path = os.path.join(dest_dir, os.path.basename(url))
        with open(file_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        return file_path
    return None

def process_gkg_file(zip_path, output_dir):
    """Estrae, filtra Myanmar e salva il risultato"""
    try:
        with zipfile.ZipFile(zip_path, 'r') as z:
            for file in z.namelist():
                with z.open(file) as f:
                    try:
                        df = pd.read_csv(f, delimiter="\t", dtype=str, names=GKG_COLUMNS, encoding='utf-8')
                    except UnicodeDecodeError:
                        try:
                            df = pd.read_csv(f, delimiter="\t", dtype=str, names=GKG_COLUMNS, encoding='ISO-8859-1')
                        except UnicodeDecodeError:
                            return  # Se anche latin-1 fallisce, saltiamo il file

                    df = df[df["V2Locations"].str.contains(MYANMAR_KEYWORD, na=False)]
                    if not df.empty:
                        output_file = os.path.join(output_dir, "gkg_myanmar_filtered.csv")
                        df.to_csv(output_file, mode="a", header=not os.path.exists(output_file), index=False)

        os.remove(zip_path)  # Elimina il file ZIP dopo il processamento
    except Exception:
        os.remove(zip_path)  # Elimina il file ZIP anche in caso di errore

def download_and_process_parallel():
    """Pipeline parallela: scarica e processa i file con barra di avanzamento"""
    masterfile_list = get_masterfile_list()
    gkg_files = filter_gkg_files(masterfile_list)

    print(f"Trovati {len(gkg_files)} file da processare.")

    # Barra di avanzamento per il download
    with tqdm(total=len(gkg_files), desc="Download in corso", unit="file") as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            futures = {executor.submit(download_file, url, DEST_DIR): url for url in gkg_files}

            for future in concurrent.futures.as_completed(futures):
                zip_path = future.result()
                if zip_path:
                    pbar.update(1)

    # Elenco file scaricati
    downloaded_files = [os.path.join(DEST_DIR, os.path.basename(url)) for url in gkg_files if os.path.exists(os.path.join(DEST_DIR, os.path.basename(url)))]

    # Barra di avanzamento per il processamento
    with tqdm(total=len(downloaded_files), desc="Processamento in corso", unit="file") as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            futures = {executor.submit(process_gkg_file, zip_path, DEST_DIR): zip_path for zip_path in downloaded_files}

            for future in concurrent.futures.as_completed(futures):
                pbar.update(1)

if __name__ == "__main__":
    download_and_process_parallel()


Trovati 6875 file da processare.


Download in corso: 100%|██████████| 6875/6875 [31:32<00:00,  3.63file/s]
Processamento in corso: 100%|██████████| 6875/6875 [14:30<00:00,  7.90file/s]   


questo li scarica tutti e poi li processa --> problema di memoria
velocità: 2 mesi e mezzo di dati in 45 minuti circa

In [19]:
import os
import requests
import zipfile
import pandas as pd
import concurrent.futures
from pathlib import Path
from tqdm import tqdm  # Barra di avanzamento

# Configurazioni
DEST_DIR = "/Volumes/CHERRY_SSD/gdelt_gkg_2023"
MASTERFILE_URL = "http://data.gdeltproject.org/gdeltv2/masterfilelist.txt"
START_DATE = "20230101"  # Data di inizio
END_DATE= "20231231"
MYANMAR_KEYWORD = "Myanmar"

# Lista delle colonne GKG
GKG_COLUMNS = [
    "GKGRECORDID", "DATE", "SourceCollectionIdentifier", "SourceCommonName", "DocumentIdentifier",
    "Counts", "V2Counts", "Themes", "V2Themes", "Locations", "V2Locations",
    "Persons", "V2Persons", "Organizations", "V2Organizations", "V2Tone",
    "Dates", "GCAM", "SharingImage", "RelatedImages", "SocialImageEmbeds",
    "SocialVideoEmbeds", "Quotations", "AllNames", "Amounts", "TranslationInfo", "ExtrasXML"
]

# Crea la cartella se non esiste
Path(DEST_DIR).mkdir(parents=True, exist_ok=True)

def get_masterfile_list():
    """Scarica la lista dei file disponibili"""
    response = requests.get(MASTERFILE_URL)
    if response.status_code == 200:
        return response.text.split("\n")
    else:
        print("Errore nel download della masterfilelist.")
        return []

def filter_gkg_files(file_list):
    """Filtra i file GKG dal 1 gennaio 2025"""
    gkg_files = []
    for line in file_list:
        parts = line.split()
        if len(parts) < 2:
            continue
        url = parts[2]
        if "gkg" in url and url.endswith(".zip"):
            filename = os.path.basename(url)
            date_str = filename.split(".")[0][:8]  # Prendi solo AAAAMMGG
            if date_str >= START_DATE and date_str <= END_DATE:
                gkg_files.append(url)
    return gkg_files

def download_and_process_file(url):
    """Scarica un file ZIP, lo processa e poi lo elimina"""
    try:
        # Scarica il file ZIP
        response = requests.get(url, stream=True)
        if response.status_code != 200:
            return False  # Errore nel download
        
        zip_path = os.path.join(DEST_DIR, os.path.basename(url))
        with open(zip_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        # Processa il file
        with zipfile.ZipFile(zip_path, 'r') as z:
            for file in z.namelist():
                with z.open(file) as f:
                    try:
                        df = pd.read_csv(f, delimiter="\t", dtype=str, names=GKG_COLUMNS, encoding='utf-8')
                    except UnicodeDecodeError:
                        try:
                            df = pd.read_csv(f, delimiter="\t", dtype=str, names=GKG_COLUMNS, encoding='ISO-8859-1')
                        except UnicodeDecodeError:
                            os.remove(zip_path)  # Se non può essere letto, eliminiamo il file
                            return False  

                    # Filtra solo i record che contengono Myanmar
                    df = df[df["V2Locations"].str.contains(MYANMAR_KEYWORD, na=False)]

                    # Salva nel file aggregato
                    if not df.empty:
                        output_file = os.path.join(DEST_DIR, "gkg_myanmar_filtered.csv")
                        df.to_csv(output_file, mode="a", header=not os.path.exists(output_file), index=False)

        os.remove(zip_path)  # Elimina il file ZIP dopo il processamento
        return True  # Successo

    except Exception:
        if os.path.exists(zip_path):
            os.remove(zip_path)  # Elimina il file anche in caso di errore
        return False

def download_and_process_parallel():
    """Pipeline parallela: scarica e processa i file uno alla volta"""
    masterfile_list = get_masterfile_list()
    gkg_files = filter_gkg_files(masterfile_list)

    print(f"Trovati {len(gkg_files)} file da processare.")

    with tqdm(total=len(gkg_files), desc="Download e processamento", unit="file") as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            futures = {executor.submit(download_and_process_file, url): url for url in gkg_files}

            for future in concurrent.futures.as_completed(futures):
                pbar.update(1)  # Aggiorna la barra di avanzamento

if __name__ == "__main__":
    download_and_process_parallel()


Trovati 35134 file da processare.


Download e processamento:   0%|          | 31/35134 [00:10<2:59:11,  3.26file/s]

questo li scarica e processa subito e poi elimina e continua--> forse troppo lento