In [1]:
import pandas as pd

In [5]:
# read the parquet file from artifact3/reqid_001 folder
df = pd.read_parquet('../data/artifacts3/reqid_001/1757527232_1757278522.parquet')

In [6]:
df.head()

Unnamed: 0,managedObjectId,measuringNodeId,start,end,indicator1_MIN,indicator1_MAX,indicator2_MIN,indicator2_MAX,indicator2_AVG,indicator3_MIN,indicator3_MAX,indicator3_AVG
0,10540337,10540337,1743532199,1743532199,65.83,65.27,10.0,98.0,55.59,12.0,99.0,54.11
1,10540337,10540337,1743618599,1743618599,76.09,54.77,10.0,98.0,57.7,10.0,99.0,51.68
2,10540337,10540337,1743704999,1743704999,67.66,66.12,10.0,99.0,50.29,10.0,98.0,53.25
3,10540337,10540337,1743791399,1743791399,83.22,80.61,10.0,99.0,58.97,11.0,98.0,49.92
4,10540337,10540337,1743877799,1743877799,78.29,66.14,10.0,99.0,59.6,10.0,99.0,61.02


In [7]:
df.columns

Index(['managedObjectId', 'measuringNodeId', 'start', 'end', 'indicator1_MIN',
       'indicator1_MAX', 'indicator2_MIN', 'indicator2_MAX', 'indicator2_AVG',
       'indicator3_MIN', 'indicator3_MAX', 'indicator3_AVG'],
      dtype='object')

In [None]:
# split the above parquet file for indicator1 and indicator2 into two different files
df_indicator1 = df[df['indicator'] == 'indicator1']
df_indicator2 = df[df['indicator'] == 'indicator2']

In [None]:
### split parquet file based on schema split

In [8]:
# Select sensor1 columns
sensor1_cols = [col for col in df.columns if col.startswith("indicator1_")]
sensor1_df = df[["managedObjectId", "measuringNodeId", "start", "end"] + sensor1_cols]

# Select remaining columns (excluding sensor1 columns)
remaining_cols = [col for col in df.columns if col not in sensor1_cols]
remaining_cols = [col for col in remaining_cols if col not in ["managedObjectId", "measuringNodeId", "start", "end"]]
remaining_df = df[["managedObjectId", "measuringNodeId", "start", "end"] + remaining_cols]

# Write to separate Parquet files
sensor1_df.to_parquet("../data/artifacts3/reqid_001/1757527256_1757278582_sensor1.parquet", index=False)
remaining_df.to_parquet("../data/artifacts3/reqid_001/1757527256_1757278582_remaining.parquet", index=False)

In [2]:
# Now, let's read and merge parquet files from multiple folders and 
# maintain the order of operations for schema consistency
# 
import pandas as pd
import os
from collections import defaultdict

folders = ["../data/artifacts3/reqid_001", "../data/artifacts3/reqid_002",
            "../data/artifacts3/reqid_003"]
dfs = []

for folder in folders:
    parquet_files = [f for f in os.listdir(folder) if f.endswith(".parquet")]

    # Separate part files and full files
    part_files = [f for f in parquet_files if "_part" in f]
    print(f"Part files in {folder}:", part_files)
    full_files = [f for f in parquet_files if "_part" not in f]
    print(f"Full files in {folder}:", full_files)

    # Handle full files
    for f in full_files:
        df = pd.read_parquet(os.path.join(folder, f))
        dfs.append(df)

    # Handle part files: group by prefix before '_part'
    part_groups = defaultdict(list)
    for f in part_files:
        print("f: ", f)
        print("inside for loop, folder: ", folder)
        prefix = f.split("_part")[0]
        part_groups[prefix].append(f)

    for prefix, files in part_groups.items():
        # Sort to ensure consistent order
        files = sorted(files)
        part_dfs = [pd.read_parquet(os.path.join(folder, f)) for f in files]
        # Merge column-wise (axis=1)
        merged_df = pd.concat(part_dfs, axis=1)
        # Remove duplicate columns if any (e.g., 'time', 'TO')
        merged_df = merged_df.loc[:, ~merged_df.columns.duplicated()]
        dfs.append(merged_df)

# Combine all data vertically (axis=0)
final_df = pd.concat(dfs, ignore_index=True)
print(final_df.shape)
print(final_df.head())

Part files in ../data/artifacts3/reqid_001: ['1757527256_1757278582_part_002.parquet', '1757527256_1757278582_part_001.parquet']
Full files in ../data/artifacts3/reqid_001: ['1757565217_1757565282.parquet', '1757565217_1757565295.parquet', '1757527232_1757278522.parquet']
f:  1757527256_1757278582_part_002.parquet
inside for loop, folder:  ../data/artifacts3/reqid_001
f:  1757527256_1757278582_part_001.parquet
inside for loop, folder:  ../data/artifacts3/reqid_001
Part files in ../data/artifacts3/reqid_002: []
Full files in ../data/artifacts3/reqid_002: ['1757565217_1757565295.parquet']
Part files in ../data/artifacts3/reqid_003: []
Full files in ../data/artifacts3/reqid_003: ['1757565217_1757565282.parquet', '1757527232_1757278522.parquet']
(574, 12)
   managedObjectId  measuringNodeId       start         end  indicator1_MIN  \
0         10540337         10540337  1743532199  1743532199           65.83   
1         10540337         10540337  1743618599  1743618599           76.09   
2

In [3]:
final_df.head()

Unnamed: 0,managedObjectId,measuringNodeId,start,end,indicator1_MIN,indicator1_MAX,indicator2_MIN,indicator2_MAX,indicator2_AVG,indicator3_MIN,indicator3_MAX,indicator3_AVG
0,10540337,10540337,1743532199,1743532199,65.83,65.27,10.0,98.0,55.59,12.0,99.0,54.11
1,10540337,10540337,1743618599,1743618599,76.09,54.77,10.0,98.0,57.7,10.0,99.0,51.68
2,10540337,10540337,1743704999,1743704999,67.66,66.12,10.0,99.0,50.29,10.0,98.0,53.25
3,10540337,10540337,1743791399,1743791399,83.22,80.61,10.0,99.0,58.97,11.0,98.0,49.92
4,10540337,10540337,1743877799,1743877799,78.29,66.14,10.0,99.0,59.6,10.0,99.0,61.02


In [4]:
# Now, let's read and merge parquet files from multiple zip files in a folder and 
# maintain the order of operations for schema consistency

import pandas as pd
import os
import zipfile
from io import BytesIO
from collections import defaultdict

artifacts_folder = "../data/artifacts3"
dfs = []

# Find all zip files in the artifacts folder
zip_files = [f for f in os.listdir(artifacts_folder) if f.endswith(".zip")]
print("Zip files found:", zip_files)

for zip_filename in zip_files:
    zip_path = os.path.join(artifacts_folder, zip_filename)
    print(f"Processing {zip_path}...")
    with zipfile.ZipFile(zip_path, "r") as z:
        print("Files in zip:", z.namelist())
        # Group part files by prefix, collect full files
        part_groups = defaultdict(list)
        full_files = []
        for file in z.namelist():
            if file.endswith(".parquet"):
                if "_part" in os.path.basename(file):
                    prefix = os.path.basename(file).split("_part")[0]
                    part_groups[prefix].append(file)
                else:
                    full_files.append(file)
        # Handle full files
        for file in full_files:
            with z.open(file) as f:
                df = pd.read_parquet(BytesIO(f.read()))
                dfs.append(df)
        # Handle part files
        for prefix, files in part_groups.items():
            files = sorted(files)
            print(f"Processing part group {prefix} with files: {files}")
            part_dfs = []
            for file in files:
                with z.open(file) as f:
                    part_dfs.append(pd.read_parquet(BytesIO(f.read())))
            merged_df = pd.concat(part_dfs, axis=1)
            merged_df = merged_df.loc[:, ~merged_df.columns.duplicated()]
            dfs.append(merged_df)

# Combine all data vertically (axis=0)
final_df = pd.concat(dfs, ignore_index=True)
print(final_df.shape)
print(final_df.head())

Zip files found: ['reqid_002.zip', 'reqid_003.zip', 'reqid_001.zip']
Processing ../data/artifacts3/reqid_002.zip...
Files in zip: ['reqid_002/', 'reqid_002/1757565217_1757565295.parquet']
Processing ../data/artifacts3/reqid_003.zip...
Files in zip: ['reqid_003/', 'reqid_003/1757565217_1757565282.parquet', 'reqid_003/1757527232_1757278522.parquet']
Processing ../data/artifacts3/reqid_001.zip...
Files in zip: ['reqid_001/', 'reqid_001/1757565217_1757565282.parquet', 'reqid_001/1757527256_1757278582_part_002.parquet', 'reqid_001/1757527256_1757278582_part_001.parquet', 'reqid_001/1757565217_1757565295.parquet', 'reqid_001/1757527232_1757278522.parquet']
Processing part group 1757527256_1757278582 with files: ['reqid_001/1757527256_1757278582_part_001.parquet', 'reqid_001/1757527256_1757278582_part_002.parquet']
(574, 12)
   managedObjectId  measuringNodeId       start         end  indicator1_MIN  \
0         10540337         10540337  1743532199  1743532199           65.83   
1         10

In [None]:
### what will be impact of zip file order not being maintained while merging


In [None]:
# this is code for merging parquet file from different zip files. inside zip file some of the parquet files are based on schema split. this case is also handeled. Now i want to translate this code to production standard code with small methods, definition. break the code in such a way so that i can take this to production

In [5]:
# ...existing code...
import pandas as pd
import os
import zipfile
from io import BytesIO
from collections import defaultdict
from typing import List

def find_zip_files(folder: str) -> List[str]:
    """Return list of zip files in the given folder."""
    return [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith(".zip")]

def group_parquet_files(zip_file: zipfile.ZipFile) -> (List[str], dict):
    """Group parquet files in zip into full files and part groups."""
    part_groups = defaultdict(list)
    full_files = []
    for file in zip_file.namelist():
        if file.endswith(".parquet"):
            if "_part" in os.path.basename(file):
                prefix = os.path.basename(file).split("_part")[0]
                part_groups[prefix].append(file)
            else:
                full_files.append(file)
    return full_files, part_groups

def read_parquet_from_zip(zip_file: zipfile.ZipFile, file: str) -> pd.DataFrame:
    """Read a parquet file from a zip archive."""
    with zip_file.open(file) as f:
        return pd.read_parquet(BytesIO(f.read()))

def merge_part_files(zip_file: zipfile.ZipFile, files: List[str]) -> pd.DataFrame:
    """Merge part parquet files column-wise, removing duplicate columns."""
    part_dfs = [read_parquet_from_zip(zip_file, file) for file in sorted(files)]
    merged_df = pd.concat(part_dfs, axis=1)
    merged_df = merged_df.loc[:, ~merged_df.columns.duplicated()]
    return merged_df

def load_all_parquet_from_zips(artifacts_folder: str) -> pd.DataFrame:
    """Load and merge all parquet files from all zip files in a folder."""
    dfs = []
    zip_files = find_zip_files(artifacts_folder)
    print("Zip files found:", zip_files)
    for zip_path in zip_files:
        print(f"Processing {zip_path}...")
        with zipfile.ZipFile(zip_path, "r") as z:
            print("Files in zip:", z.namelist())
            full_files, part_groups = group_parquet_files(z)
            # Handle full files
            for file in full_files:
                dfs.append(read_parquet_from_zip(z, file))
            # Handle part files
            for prefix, files in part_groups.items():
                print(f"Processing part group {prefix} with files: {files}")
                dfs.append(merge_part_files(z, files))
    if not dfs:
        raise ValueError("No parquet data found in any zip files.")
    final_df = pd.concat(dfs, ignore_index=True)
    return final_df

In [6]:
# Usage
artifacts_folder = "../data/artifacts3"
final_df = load_all_parquet_from_zips(artifacts_folder)
print(final_df.shape)
print(final_df.head())
# ...existing code...

Zip files found: ['../data/artifacts3/reqid_002.zip', '../data/artifacts3/reqid_003.zip', '../data/artifacts3/reqid_001.zip']
Processing ../data/artifacts3/reqid_002.zip...
Files in zip: ['reqid_002/', 'reqid_002/1757565217_1757565295.parquet']
Processing ../data/artifacts3/reqid_003.zip...
Files in zip: ['reqid_003/', 'reqid_003/1757565217_1757565282.parquet', 'reqid_003/1757527232_1757278522.parquet']
Processing ../data/artifacts3/reqid_001.zip...
Files in zip: ['reqid_001/', 'reqid_001/1757565217_1757565282.parquet', 'reqid_001/1757527256_1757278582_part_002.parquet', 'reqid_001/1757527256_1757278582_part_001.parquet', 'reqid_001/1757565217_1757565295.parquet', 'reqid_001/1757527232_1757278522.parquet']
Processing part group 1757527256_1757278582 with files: ['reqid_001/1757527256_1757278582_part_002.parquet', 'reqid_001/1757527256_1757278582_part_001.parquet']
(574, 12)
   managedObjectId  measuringNodeId       start         end  indicator1_MIN  \
0         10540337         1054033