In [84]:
import os
import requests
import zipfile
import subprocess
import pandas as pd
from multiprocessing import Pool
from datetime import datetime
import csv
from tqdm import tqdm

In [6]:

date_str = '2023-08-01'
# Define the URL of the ZIP file and the target directory
url = f"http://web.ais.dk/aisdata/aisdk-{date_str}.zip"
target_directory = "../data"

# Create the target directory if it doesn't exist
os.makedirs(target_directory, exist_ok=True)

# Define the path where the ZIP file will be saved
zip_file_path = os.path.join(target_directory, "aisdk-2023-08-01.zip")


In [4]:
# Download the ZIP file
response = requests.get(url)

# Check if the download was successful
if response.status_code == 200:
    with open(zip_file_path, "wb") as file:
        file.write(response.content)
    print(f"Downloaded successfully {url}.")
else:
    print(f"Failed to download {url}. Status code: {response.status_code}")
    exit()

# Unzip the downloaded file
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
    zip_ref.extractall(target_directory)
    print(f"Unzipped successfully {zip_file_path}.")

# Clean up: Remove the downloaded ZIP file
os.remove(zip_file_path)
print(f"Cleaned up the ZIP file {zip_file_path}.")


Downloaded successfully.
Unzipped successfully.
Cleaned up the ZIP file.


In [11]:
csv_file_path = os.path.join(target_directory, f"aisdk-{date_str}.csv")
# Use subprocess to run the 'wc -l' command and capture the output
wc_out = subprocess.run(["wc", "-l", csv_file_path], stdout=subprocess.PIPE, text=True, check=True)
csv_line_count = int(wc_out.stdout.split()[0])


In [12]:
csv_file_path

'../data/aisdk-2023-08-01.csv'

In [79]:
csv_line_count

11804940

In [70]:
df_columns = ['# Timestamp', 'Type of mobile', 'MMSI', 'Latitude', 'Longitude', \
       'Navigational status', 'ROT', 'SOG', 'COG', 'Heading', 'IMO', \
       'Callsign', 'Name', 'Ship type', 'Cargo type', 'Width', 'Length',\
       'Type of position fixing device', 'Draught', 'Destination', 'ETA',\
       'Data source type', 'A', 'B', 'C', 'D']

In [71]:
os.makedirs(f'{target_directory}/chunks{date_str}', exist_ok=True)

In [82]:
chunk_size = 100000
input_csv = csv_file_path
csv_reader = pd.read_csv(input_csv, chunksize=chunk_size, iterator=True)
for i, chunk in tqdm(enumerate(csv_reader)):
    chunk.to_csv(f'{target_directory}/chunks{date_str}/chunk_{i}.csv', index=False)


119it [02:17,  1.16s/it]


In [89]:
# get all files in the directory
directory_path = f'{target_directory}/chunks{date_str}'
chunk_file_names = os.listdir(directory_path)
chunk_files = [os.path.join(directory_path, file_name) for file_name in chunk_file_names]

In [61]:

chunk_size = 100000  # Number of lines to process at a time

# Define columns for which you want to calculate the mean
mean_columns = ['Latitude', 'Longitude', 'ROT', 'SOG', 'COG', 'Heading','Draught', 'Length', 'Width']

# Define columns for which you want the majority value
majority_columns = ['Type of mobile', 'Navigational status', 'IMO', 'Callsign', 'Name',
                    'Ship type', 'Cargo type', 'Type of position fixing device', 'Destination',
                    'ETA', 'Data source type', 'A', 'B', 'C', 'D']

# Define aggregation functions
agg_functions = {col: 'mean' for col in mean_columns}
# For majority_columns, return the mode if it exists, otherwise return None
for col in majority_columns:
    agg_functions[col] = lambda x: x.mode().iloc[0] if not x.empty and not x.mode().empty else None



In [86]:
df = pd.read_csv(f'{target_directory}/chunks{date_str}/chunk_0.csv')
df['# Timestamp'] = pd.to_datetime(df['# Timestamp'])
# Round the datetime values to the nearest minute
df['# Timestamp'] = df['# Timestamp'].dt.round('min')

# Group by MMSI and # Timestamp, and apply the aggregation functions
aggregated_chunk = df.groupby(['MMSI', '# Timestamp']).agg(agg_functions).reset_index()




In [90]:

# Define the function to aggregate
def aggregate_chunk(file_path):
    chunk = pd.read_csv(file_path)
    # Define your aggregation logic here
    # Example: Calculate mean for numeric columns and mode for categorical columns
    mean_columns = ['Latitude', 'Longitude', 'ROT', 'SOG', 'COG', 'Heading','Draught', 'Length', 'Width']
    majority_columns = ['Type of mobile', 'Navigational status', 'IMO', 'Callsign', 'Name',
                        'Ship type', 'Cargo type', 'Type of position fixing device', 'Destination',
                        'ETA', 'Data source type', 'A', 'B', 'C', 'D']

    # Apply aggregation functions
    agg_functions = {col: 'mean' for col in mean_columns}
    for col in majority_columns:
        agg_functions[col] = lambda x: x.mode().iloc[0] if not x.empty and not x.mode().empty else None

    # Apply the aggregation to the chunk
    aggregated_chunk = chunk.groupby(['MMSI', '# Timestamp']).agg(agg_functions).reset_index()

    return aggregated_chunk

if __name__ == "__main__":
    # List of CSV chunk file paths
    chunk_files = chunk_files

    # Create a pool of worker processes
    with Pool(processes=6) as pool:  # Adjust the number of processes as needed
        # Map the aggregate_chunk function to process each chunk in parallel
        result_dataframes = pool.map(aggregate_chunk, chunk_files)

    # Combine the results from all chunks
    final_result = pd.concat(result_dataframes)

    # Save the final aggregated DataFrame to a CSV file
    final_result.to_csv(f"{target_directory}/final_aggregated_data{date_str}.csv", index=False)


In [62]:
df = pd.read_csv('../data/aisdk-2023-08-01.csv', nrows=100000)
df['# Timestamp'] = pd.to_datetime(df['# Timestamp'])
        # Round the datetime values to the nearest minute
df['# Timestamp'] = df['# Timestamp'].dt.round('min')

In [63]:
agg_df = df.groupby(['MMSI', '# Timestamp']).agg(agg_functions).reset_index()

In [64]:
agg_df.to_csv('../data/aisdk-2023-08-01_agg.csv', index=False)

In [98]:
final_result.shape[0]- csv_line_count

-192340

In [93]:
type(result_dataframes)

list

In [96]:
result_dataframes[3].shape

(98452, 26)