In [1]:
import pandas as pd
import numpy as np
import requests
import os

In [None]:
# Define paths
DATA_DIR = os.path.abspath("../data/parquet_files")
URLS_CSV = os.path.abspath("../data/ParquetFilesUrls.csv")

# Ensure directory exists
os.makedirs(DATA_DIR, exist_ok=True)

def download_files():
    """Download only missing files from CSV."""
    
    # Load URLs from CSV
    data_links = pd.read_csv(URLS_CSV)
    
    for url in data_links['ParquetFileUrl']:
        if pd.isna(url):
            continue
        
        filename = url.split('/')[-1]
        file_path = os.path.join(DATA_DIR, filename)
        
        if os.path.exists(file_path):
            print(f"File already exists: {filename}, skipping.")
            continue
        
        try:
            print(f"Downloading: {filename} ...")
            response = requests.get(url, stream=True)
            response.raise_for_status()
            
            with open(file_path, 'wb') as file:
                for chunk in response.iter_content(chunk_size=1024):
                    file.write(chunk)
                
            print(f"Downloaded: {filename}")
        
        except requests.exceptions.RequestException as e:
            print(f"Failed to download {url}: {e}")

# Run the function
download_files()
print("All files checked/downloaded!")


Files already downloaded


In [None]:
# merge all parquet files into one
def merge_files():
    files = os.listdir('../data/parquet_files')
    
    if len(files) == 0:
        print("No files to merge")
        return
    
    data = pd.DataFrame()
    
    for file in files:
        if file.endswith('.parquet'):
            df = pd.read_parquet(f'.data/parquet_files/{file}')
            data = pd.concat([data, df], ignore_index=True)
    
    data.to_parquet('../data/merged_air_quality.parquet')
    print("Files merged successfully")
    
if os.path.exists('../data/merged_air_quality.parquet'):
    print("Files already merged")
else:
    merge_files()

Files already merged


In [None]:
# inspect the merged data
air_quality_data = pd.read_parquet('../data/merged_air_quality.parquet')
display(air_quality_data.head())

Unnamed: 0,Samplingpoint,Pollutant,Start,End,Value,Unit,AggType,Validity,Verification,ResultTime,DataCapture,FkObservationLog
0,AT/SPO.03.1901.60140.5.1,5,2024-01-01 00:00:00,2024-01-01 01:00:00,28.0,ug.m-3,hour,1,2,2024-01-01 08:00:00,,580ed864-d846-4b79-bf9c-399a0c961a54
1,AT/SPO.03.1901.60140.5.1,5,2024-01-01 01:00:00,2024-01-01 02:00:00,16.5,ug.m-3,hour,1,2,2024-01-01 08:00:00,,580ed864-d846-4b79-bf9c-399a0c961a54
2,AT/SPO.03.1901.60140.5.1,5,2024-01-01 02:00:00,2024-01-01 03:00:00,12.5,ug.m-3,hour,1,2,2024-01-01 08:00:00,,580ed864-d846-4b79-bf9c-399a0c961a54
3,AT/SPO.03.1901.60140.5.1,5,2024-01-01 03:00:00,2024-01-01 04:00:00,9.5,ug.m-3,hour,1,2,2024-01-01 08:00:00,,580ed864-d846-4b79-bf9c-399a0c961a54
4,AT/SPO.03.1901.60140.5.1,5,2024-01-01 04:00:00,2024-01-01 05:00:00,8.0,ug.m-3,hour,1,2,2024-01-01 08:00:00,,580ed864-d846-4b79-bf9c-399a0c961a54


In [5]:
display(air_quality_data.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3951061 entries, 0 to 3951060
Data columns (total 12 columns):
 #   Column            Dtype         
---  ------            -----         
 0   Samplingpoint     object        
 1   Pollutant         int32         
 2   Start             datetime64[ns]
 3   End               datetime64[ns]
 4   Value             float64       
 5   Unit              object        
 6   AggType           object        
 7   Validity          int32         
 8   Verification      int32         
 9   ResultTime        datetime64[ns]
 10  DataCapture       float64       
 11  FkObservationLog  object        
dtypes: datetime64[ns](3), float64(2), int32(3), object(4)
memory usage: 316.5+ MB


None

In [None]:
# clean the air quality data
def clean_data(data):
    data["Samplingpoint"] = data["Samplingpoint"].str.split("/").str[-1]
    data.drop(columns=["FkObservationLog"], inplace=True)
    data.drop(columns=["End"], inplace=True)
    data.drop(columns=["AggType"], inplace=True)
    data.drop(columns=["ResultTime"], inplace=True)
    data.drop(columns=["DataCapture"], inplace=True)
    
if 'End' in air_quality_data.columns:
    clean_data(air_quality_data)
else:
    print("Data already cleaned")
    
display(air_quality_data.head())

# save the cleaned data
air_quality_data.to_parquet('../data/cleaned_air_quality.parquet')

Unnamed: 0,Samplingpoint,Pollutant,Start,Value,Unit,Validity,Verification
0,SPO.03.1901.60140.5.1,5,2024-01-01 00:00:00,28.0,ug.m-3,1,2
1,SPO.03.1901.60140.5.1,5,2024-01-01 01:00:00,16.5,ug.m-3,1,2
2,SPO.03.1901.60140.5.1,5,2024-01-01 02:00:00,12.5,ug.m-3,1,2
3,SPO.03.1901.60140.5.1,5,2024-01-01 03:00:00,9.5,ug.m-3,1,2
4,SPO.03.1901.60140.5.1,5,2024-01-01 04:00:00,8.0,ug.m-3,1,2


In [None]:
metadata = pd.read_csv("../data/metadata.csv")
display(metadata.head())

Unnamed: 0,Country,B-G Namespace,Year,Air Quality Network,Air Quality Network Name,Timezone,Air Quality Station EoI Code,Air Quality Station Nat Code,Air Quality Station Name,Sampling Point Id,...,Detection Limit,Detection Limit Unit,Documentation,QA Report,Duration,Duration Unit,Cadence,Cadence Unit,Source Data URL,Imported
0,Austria,AT.0008.20.AQ,2024,NET.01,Amt der Burgenländischen Landesregierung,UTC+01,AT10001,10001,Eisenstadt Laschoberstraße,SPO.01.0001.1556.38.1,...,0.1,ppbv,Documentation SPP,http://www.umweltbundesamt.at,1,hour,1,hour,http://cdr.eionet.europa.eu/at/eu/aqd/d/envz2g...,18/12/2024 13:00:08
1,Austria,AT.0008.20.AQ,2024,NET.01,Amt der Burgenländischen Landesregierung,UTC+01,AT10001,10001,Eisenstadt Laschoberstraße,SPO.01.0001.1556.38.1,...,0.9,ppbv,Documentation SPP,http://www.umweltbundesamt.at,1,hour,1,hour,http://cdr.eionet.europa.eu/at/eu/aqd/d/envz2g...,18/12/2024 13:00:08
2,Austria,AT.0008.20.AQ,2024,NET.01,Amt der Burgenländischen Landesregierung,UTC+01,AT10001,10001,Eisenstadt Laschoberstraße,SPO.01.0001.1556.38.1,...,1.5,ppbv,Documentation SPP,http://www.umweltbundesamt.at,1,hour,1,hour,http://cdr.eionet.europa.eu/at/eu/aqd/d/envz2g...,18/12/2024 13:00:08
3,Austria,AT.0008.20.AQ,2024,NET.01,Amt der Burgenländischen Landesregierung,UTC+01,AT10001,10001,Eisenstadt Laschoberstraße,SPO.01.0001.1556.38.1,...,0.1,ppbv,Documentation SPP,http://www.umweltbundesamt.at,1,hour,1,hour,http://cdr.eionet.europa.eu/at/eu/aqd/d/envz2g...,18/12/2024 13:00:08
4,Austria,AT.0008.20.AQ,2024,NET.01,Amt der Burgenländischen Landesregierung,UTC+01,AT10001,10001,Eisenstadt Laschoberstraße,SPO.01.0001.1557.8.1,...,0.1,ppbv,Documentation SPP,http://www.umweltbundesamt.at,1,hour,1,hour,http://cdr.eionet.europa.eu/at/eu/aqd/d/envz2g...,18/12/2024 13:00:08


In [None]:
metadata_filtered = metadata[
    ["Sampling Point Id", "Air Quality Station Name", "Longitude", "Latitude",
     "Altitude", "Air Quality Station Area", "Air Quality Station Type",
     "Operational Activity Begin", "Operational Activity End", "Main Emission Sources"]
]
# leave only the columns that are needed
def clean_metadata(data):
    data = data[
        ["Sampling Point Id", "Air Quality Station Name", "Longitude", "Latitude",
         "Altitude", "Air Quality Station Area", "Air Quality Station Type",
         "Operational Activity Begin", "Operational Activity End", "Main Emission Sources"]
    ]
    return data
    

if 'Country' in metadata.columns:
    metadata = clean_metadata(metadata)
else:
    print("Metadata already cleaned")

display(metadata.head())

metadata.to_parquet('../data/cleaned_metadata.parquet')


Unnamed: 0,Sampling Point Id,Air Quality Station Name,Longitude,Latitude,Altitude,Air Quality Station Area,Air Quality Station Type,Operational Activity Begin,Operational Activity End,Main Emission Sources
0,SPO.01.0001.1556.38.1,Eisenstadt Laschoberstraße,16.52617,47.84011,165.0,suburban,background,01/01/1995 00:00:00,,Transport
1,SPO.01.0001.1556.38.1,Eisenstadt Laschoberstraße,16.52617,47.84011,165.0,suburban,background,01/01/1995 00:00:00,,Transport
2,SPO.01.0001.1556.38.1,Eisenstadt Laschoberstraße,16.52617,47.84011,165.0,suburban,background,01/01/1995 00:00:00,,Transport
3,SPO.01.0001.1556.38.1,Eisenstadt Laschoberstraße,16.52617,47.84011,165.0,suburban,background,01/01/1995 00:00:00,,Transport
4,SPO.01.0001.1557.8.1,Eisenstadt Laschoberstraße,16.52617,47.84011,165.0,suburban,background,01/01/1995 00:00:00,,Transport


### Get Useful information from the datasets

In [None]:
clean_metadata = pd.read_parquet('../data/cleaned_metadata.parquet')
clean_air_quality = pd.read_parquet('../data/cleaned_air_quality.parquet')

##### Air Quality Data

In [10]:
display(clean_air_quality.head())

Unnamed: 0,Samplingpoint,Pollutant,Start,Value,Unit,Validity,Verification
0,SPO.03.1901.60140.5.1,5,2024-01-01 00:00:00,28.0,ug.m-3,1,2
1,SPO.03.1901.60140.5.1,5,2024-01-01 01:00:00,16.5,ug.m-3,1,2
2,SPO.03.1901.60140.5.1,5,2024-01-01 02:00:00,12.5,ug.m-3,1,2
3,SPO.03.1901.60140.5.1,5,2024-01-01 03:00:00,9.5,ug.m-3,1,2
4,SPO.03.1901.60140.5.1,5,2024-01-01 04:00:00,8.0,ug.m-3,1,2


In [11]:
display(clean_air_quality.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3951061 entries, 0 to 3951060
Data columns (total 7 columns):
 #   Column         Dtype         
---  ------         -----         
 0   Samplingpoint  object        
 1   Pollutant      int32         
 2   Start          datetime64[ns]
 3   Value          float64       
 4   Unit           object        
 5   Validity       int32         
 6   Verification   int32         
dtypes: datetime64[ns](1), float64(1), int32(3), object(2)
memory usage: 165.8+ MB


None

In [12]:
print(f"Polluant types: {clean_air_quality['Pollutant'].unique()}")
print(f"{{'PM2': 5, 'NO2': 7, 'SO2': 8, 'CO': 6001}}")
print(f"Units: {clean_air_quality['Unit'].unique()}")
print(f"Validity: {clean_air_quality['Validity'].unique()}")
print(f"Verification: {clean_air_quality['Verification'].unique()}")

Polluant types: [   5    7    8 6001]
{'PM2': 5, 'NO2': 7, 'SO2': 8, 'CO': 6001}
Units: ['ug.m-3']
Validity: [ 1 -1]
Verification: [2 1 3]


##### Meta Data

In [13]:
display(clean_metadata.head())

Unnamed: 0,Sampling Point Id,Air Quality Station Name,Longitude,Latitude,Altitude,Air Quality Station Area,Air Quality Station Type,Operational Activity Begin,Operational Activity End,Main Emission Sources
0,SPO.01.0001.1556.38.1,Eisenstadt Laschoberstraße,16.52617,47.84011,165.0,suburban,background,01/01/1995 00:00:00,,Transport
1,SPO.01.0001.1556.38.1,Eisenstadt Laschoberstraße,16.52617,47.84011,165.0,suburban,background,01/01/1995 00:00:00,,Transport
2,SPO.01.0001.1556.38.1,Eisenstadt Laschoberstraße,16.52617,47.84011,165.0,suburban,background,01/01/1995 00:00:00,,Transport
3,SPO.01.0001.1556.38.1,Eisenstadt Laschoberstraße,16.52617,47.84011,165.0,suburban,background,01/01/1995 00:00:00,,Transport
4,SPO.01.0001.1557.8.1,Eisenstadt Laschoberstraße,16.52617,47.84011,165.0,suburban,background,01/01/1995 00:00:00,,Transport


In [14]:
display(clean_metadata.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4761 entries, 0 to 4760
Data columns (total 10 columns):
 #   Column                      Non-Null Count  Dtype  
---  ------                      --------------  -----  
 0   Sampling Point Id           4761 non-null   object 
 1   Air Quality Station Name    4761 non-null   object 
 2   Longitude                   4761 non-null   float64
 3   Latitude                    4761 non-null   float64
 4   Altitude                    4761 non-null   float64
 5   Air Quality Station Area    4761 non-null   object 
 6   Air Quality Station Type    4761 non-null   object 
 7   Operational Activity Begin  4761 non-null   object 
 8   Operational Activity End    1009 non-null   object 
 9   Main Emission Sources       4742 non-null   object 
dtypes: float64(3), object(7)
memory usage: 372.1+ KB


None

In [16]:
print(f"All columns in the dataset: {metadata.columns}")
print(f"Main emission sources: {metadata['Main Emission Sources'].unique()}")
print(f"Air quality station type: {metadata['Air Quality Station Type'].unique()}")
print(f"Air quality station area: {metadata['Air Quality Station Area'].unique()}")

All columns in the dataset: Index(['Sampling Point Id', 'Air Quality Station Name', 'Longitude',
       'Latitude', 'Altitude', 'Air Quality Station Area',
       'Air Quality Station Type', 'Operational Activity Begin',
       'Operational Activity End', 'Main Emission Sources'],
      dtype='object')
Main emission sources: ['Transport' 'Long-range transboundary transport' 'Other sectors'
 'Secondary aerosols' 'Other' 'Energy Industries' 'Industrial Processes'
 nan 'Manufacturing Industries and Construction']
Air quality station type: ['background' 'industrial' 'traffic']
Air quality station area: ['suburban' 'rural' 'urban' 'rural-remote' 'rural-nearcity'
 'rural-regional']


# Merge air quality data with meta data

In [None]:
metadata.rename(columns={"Sampling Point Id": "Samplingpoint"}, inplace=True)

merged_df = clean_air_quality.merge(metadata, on="Samplingpoint", how="left")

# Save merged dataset
merged_df.to_parquet('../data/air_quality_complete.parquet', index=False)
print("Merged dataset saved as air_quality_complete.parquet!")

Merged dataset saved as air_quality_complete.parquet!


In [19]:
display(merged_df.head())
display(merged_df.info())

Unnamed: 0,Samplingpoint,Pollutant,Start,Value,Unit,Validity,Verification,Air Quality Station Name,Longitude,Latitude,Altitude,Air Quality Station Area,Air Quality Station Type,Operational Activity Begin,Operational Activity End,Main Emission Sources
0,SPO.03.1901.60140.5.1,5,2024-01-01,28.0,ug.m-3,1,2,Tulln Leopoldgasse,16.0625,48.32945,178.0,suburban,background,04/06/2007 00:00:00,,Long-range transboundary transport
1,SPO.03.1901.60140.5.1,5,2024-01-01,28.0,ug.m-3,1,2,Tulln Leopoldgasse,16.0625,48.32945,178.0,suburban,background,04/06/2007 00:00:00,,Long-range transboundary transport
2,SPO.03.1901.60140.5.1,5,2024-01-01,28.0,ug.m-3,1,2,Tulln Leopoldgasse,16.0625,48.32945,178.0,suburban,background,04/06/2007 00:00:00,,Long-range transboundary transport
3,SPO.03.1901.60140.5.1,5,2024-01-01,28.0,ug.m-3,1,2,Tulln Leopoldgasse,16.0625,48.32945,178.0,suburban,background,04/06/2007 00:00:00,,Long-range transboundary transport
4,SPO.03.1901.60140.5.1,5,2024-01-01,28.0,ug.m-3,1,2,Tulln Leopoldgasse,16.0625,48.32945,178.0,suburban,background,04/06/2007 00:00:00,,Long-range transboundary transport


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22772695 entries, 0 to 22772694
Data columns (total 16 columns):
 #   Column                      Dtype         
---  ------                      -----         
 0   Samplingpoint               object        
 1   Pollutant                   int32         
 2   Start                       datetime64[ns]
 3   Value                       float64       
 4   Unit                        object        
 5   Validity                    int32         
 6   Verification                int32         
 7   Air Quality Station Name    object        
 8   Longitude                   float64       
 9   Latitude                    float64       
 10  Altitude                    float64       
 11  Air Quality Station Area    object        
 12  Air Quality Station Type    object        
 13  Operational Activity Begin  object        
 14  Operational Activity End    object        
 15  Main Emission Sources       object        
dtypes: datetime64[ns

None

#### Optimize dataset

In [None]:
df = pd.read_parquet('../data/air_quality_complete.parquet')

for col in ["Air Quality Station Name", "Air Quality Station Area", "Air Quality Station Type", "Main Emission Sources"]:
    df[col] = df[col].astype("category")
    
# convert int32 to int16 
df["Pollutant"] = df["Pollutant"].astype("int16")
df["Validity"] = df["Validity"].astype("int8")
df["Verification"] = df["Verification"].astype("int8")

# convert float64 to float32 
df["Value"] = df["Value"].astype("float32")
df["Longitude"] = df["Longitude"].astype("float32")
df["Latitude"] = df["Latitude"].astype("float32")
df["Altitude"] = df["Altitude"].astype("float32")

# save optimized dataset with Parquet compression
df.to_parquet('../data/air_quality_optimized.parquet', index=False, compression='snappy')

# Check new memory usage
print(df.info(memory_usage='deep'))
print("Optimized dataset saved with reduced memory usage!")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22772695 entries, 0 to 22772694
Data columns (total 16 columns):
 #   Column                      Dtype         
---  ------                      -----         
 0   Samplingpoint               object        
 1   Pollutant                   int16         
 2   Start                       datetime64[ns]
 3   Value                       float32       
 4   Unit                        object        
 5   Validity                    int8          
 6   Verification                int8          
 7   Air Quality Station Name    category      
 8   Longitude                   float32       
 9   Latitude                    float32       
 10  Altitude                    float32       
 11  Air Quality Station Area    category      
 12  Air Quality Station Type    category      
 13  Operational Activity Begin  object        
 14  Operational Activity End    object        
 15  Main Emission Sources       category      
dtypes: category(4), 