In [37]:
import os
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

In [27]:
tasks = pd.read_parquet("./tasks.parquet")
tasks

Unnamed: 0,id,submission_time,duration,cpu_count,cpu_capacity,mem_capacity
0,354695,2023-06-12 00:44:44,43172000,48,100800.0,196000000000
1,5322138,2023-06-12 11:10:06,1050000,48,100800.0,196000000000
2,2979308,2023-06-12 12:31:46,628000,48,100800.0,196000000000
3,2872246,2023-06-12 13:05:31,1588000,48,100800.0,196000000000
4,821602,2023-06-12 13:09:26,68034000,48,100800.0,196000000000
...,...,...,...,...,...,...
8371,2088308,2023-06-15 07:43:11,357000,48,100800.0,196000000000
8372,1394750,2023-06-15 08:27:36,382000,48,100800.0,196000000000
8373,4906495,2023-06-15 16:40:51,7200000,48,100800.0,196000000000
8374,5667091,2023-06-15 08:35:40,828000,48,100800.0,196000000000


In [28]:
# read the types of the columns in the tasks dataframe
pd.read_parquet("./tasks.parquet").dtypes

id                         object
submission_time    datetime64[ms]
duration                    int64
cpu_count                   int32
cpu_capacity              float64
mem_capacity                int64
dtype: object

In [29]:
pd.read_parquet("./tasks-old.parquet").dtypes

id                         object
submission_time    datetime64[ms]
duration                    int64
cpu_count                   int32
cpu_capacity              float64
mem_capacity                int64
dtype: object

In [30]:
old_schema = pq.read_schema("./tasks-old.parquet")
new_schema = pq.read_schema("./tasks.parquet")
print("Old Schema:\n", old_schema)
print("New Schema:\n", new_schema)

Old Schema:
 id: string not null
submission_time: timestamp[ms] not null
duration: int64 not null
cpu_count: int32 not null
cpu_capacity: double not null
mem_capacity: int64 not null
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 792
New Schema:
 id: string
submission_time: timestamp[ms]
duration: int64
cpu_count: int32
cpu_capacity: double
mem_capacity: int64
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 979


In [7]:
# minimum timestamp
tasks["submission_time"].min()

Timestamp('2023-05-31 04:40:10')

In [9]:
# maximum timestamp
tasks["submission_time"].max()

Timestamp('2023-06-28 21:49:28')

In [10]:
# save the new tasks to tasks.parquet
tasks.to_parquet("./tasks.parquet")

In [45]:
schema = pa.schema([
    pa.field("id", pa.string(), nullable=False),
    pa.field("submission_time", pa.timestamp('ms'), nullable=False),
    pa.field("duration", pa.int64(), nullable=False),
    pa.field("cpu_count", pa.int32(), nullable=False),
    pa.field("cpu_capacity", pa.float64(), nullable=False),
    pa.field("mem_capacity", pa.int64(), nullable=False)
])

tasks = pd.read_parquet("./tasks.parquet")
pq.write_table(pa.Table.from_pandas(tasks, schema=schema), "./tasks.parquet")

In [58]:
goodSchema = pq.read_schema("../co2/good_schema.parquet")
ourschema = pq.read_schema("../co2/AT-2023-06.parquet")
print("Good Schema:\n", goodSchema)
print("\n\nOur Schema:\n", ourschema)

Good Schema:
 timestamp: timestamp[ms] not null
carbon_intensity: double not null
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 362


Our Schema:
 timestamp: timestamp[ns]
carbon_intensity: double
__index_level_0__: int64
-- schema metadata --
pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 589


In [72]:
for file in os.listdir("../co2/"):
    print(file)

    frame = pd.read_parquet("../co2/" + file)
    schema = pa.schema([
        pa.field("timestamp", pa.timestamp('ms'), nullable=False),
        pa.field("carbon_intensity", pa.float64(), nullable=False)
    ])

    frame = pa.Table.from_pandas(frame, schema=schema, preserve_index=False)
    pq.write_table(frame, "../co2/" + file)

NO-2023-06.parquet
SK-2023-06.parquet
BE-2023-06.parquet
PT-2023-06.parquet
EU-migration=4h-2023-06.parquet
LV-2023-06.parquet
MK-2023-06.parquet
ES-2023-06.parquet
AT-2023-06.parquet
EU-migration=1h-2023-06.parquet
SE-2023-06.parquet
LU-2023-06.parquet
CH-2023-06.parquet
RS-2023-06.parquet
HR-2023-06.parquet
BA-2023-06.parquet
IT-2023-06.parquet
FI-2023-06.parquet
NL-2023-06.parquet
GR-2023-06.parquet
CZ-2023-06.parquet
HU-2023-06.parquet
DE-2023-06.parquet
DK-2023-06.parquet
EU-migration=15min-2023-06.parquet
RO-2023-06.parquet
EU-migration=8h-2023-06.parquet
BG-2023-06.parquet
SI-2023-06.parquet
FR-2023-06.parquet
LT-2023-06.parquet
EU-migration=24h-2023-06.parquet
EE-2023-06.parquet
PL-2023-06.parquet


Unnamed: 0,timestamp,carbon_intensity
0,2023-05-31 00:00:00,15.461407
1,2023-05-31 00:15:00,12.412120
2,2023-05-31 00:30:00,10.518862
3,2023-05-31 00:45:00,8.452430
4,2023-05-31 01:00:00,8.571025
...,...,...
2965,2023-06-30 21:15:00,7.950465
2966,2023-06-30 21:30:00,8.186405
2967,2023-06-30 21:45:00,8.369432
2968,2023-06-30 22:00:00,8.298972


In [53]:
# make all the co2 traces match the schema for the timestamp
schema = pa.schema([
    pa.field("timestamp", pa.timestamp('ms'), nullable=False),
    pa.field("carbon_intensity", pa.float64(), nullable=False)
])

for file in os.listdir("../co2"):
    co2 = pd.read_parquet(f"../co2/{file}")

    # Print column names for debugging (you can remove this line once confirmed)
    print(co2.columns)

    # Drop the unwanted '__index_level_0__' column if it exists
    if '__index_level_0__' in co2.columns:
        co2.drop(columns=['__index_level_0__'], inplace=True)

    # Convert DataFrame to PyArrow Table with the specified schema
    table = pa.Table.from_pandas(co2, schema=schema, preserve_index=False)

    # Write the table to a Parquet file
    pq.write_table(table, f"../co2/{file}")

Index(['timestamp', 'carbon_intensity'], dtype='object')
Index(['timestamp', 'carbon_intensity'], dtype='object')
Index(['timestamp', 'carbon_intensity'], dtype='object')


ArrowInvalid: Could not open Parquet input source '<Buffer>': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.