In [1]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

RAW_PATH = r"C:\Users\Deepak\OneDrive\Desktop\ObjectRegon\data\raw"
PROCESSED_PATH = r"C:\Users\Deepak\OneDrive\Desktop\ObjectRegon\data\processed"

os.makedirs(PROCESSED_PATH, exist_ok=True)

In [2]:
def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """Downcast numerics safely (without changing schema across chunks)."""
    for col in df.select_dtypes(include=["int", "float"]).columns:
        # always downcast to float32 to keep schema consistent
        df[col] = pd.to_numeric(df[col], downcast="float")
    return df


In [3]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

def process_csv(filename, chunksize=500_000):
    input_path = os.path.join(RAW_PATH, filename)
    output_path = os.path.join(PROCESSED_PATH, filename.replace(".csv", ".parquet"))

    # Infer dtypes from a small sample
    sample = pd.read_csv(input_path, nrows=500)
    sample_dtypes = sample.dtypes.to_dict()

    # üîπ Convert all int columns ‚Üí float to handle NaN
    for col, dtype in sample_dtypes.items():
        if pd.api.types.is_integer_dtype(dtype):
            sample_dtypes[col] = "float64"

    writer = None

    for chunk in pd.read_csv(input_path, chunksize=chunksize, dtype=sample_dtypes, low_memory=False):
        chunk = optimize_dtypes(chunk)
        table = pa.Table.from_pandas(chunk, preserve_index=False)

        if writer is None:
            writer = pq.ParquetWriter(output_path, table.schema, compression="snappy")

        writer.write_table(table)

    if writer:
        writer.close()

    print(f"‚úÖ Processed {filename} ‚Üí {output_path}")

In [None]:
import os
import pandas as pd

RAW_PATH = r"C:\Users\Deepak\OneDrive\Desktop\ObjectRegon\data\raw"
OUTPUT_PATH = r"C:\Users\Deepak\OneDrive\Desktop\ObjectRegon\data\processed"

def clean_and_save_csv(filename, usecols=None, chunksize=500_000, to_parquet=True):
    """
    Clean a large CSV in chunks and save optimized version.
    
    Args:
        filename (str): CSV file name inside RAW_PATH
        usecols (list): columns to keep (None = keep all)
        chunksize (int): number of rows per chunk
        to_parquet (bool): save as parquet (True) or CSV (False)
    """
    input_path = os.path.join(RAW_PATH, filename)
    output_path = os.path.join(
        OUTPUT_PATH, filename.replace(".csv", ".parquet" if to_parquet else "_clean.csv")
    )

    all_chunks = []
    for chunk in pd.read_csv(input_path, chunksize=chunksize, usecols=usecols, low_memory=False):
        # üîπ Convert ints ‚Üí float32 (handles NaN safely)
        for col in chunk.select_dtypes(include=["int", "float"]).columns:
            chunk[col] = chunk[col].astype("float32")

        # üîπ Drop fully empty columns
        chunk = chunk.dropna(axis=1, how="all")

        all_chunks.append(chunk)

    df = pd.concat(all_chunks, ignore_index=True)

    if to_parquet:
        df.to_parquet(output_path, engine="pyarrow", compression="snappy", index=False)
    else:
        df.to_csv(output_path, index=False)

    print(f"‚úÖ Cleaned {filename}, final shape: {df.shape} ‚Üí saved to {output_path}")
    return df

In [1]:
import os
import pandas as pd

RAW_PATH = "C:/Users/Deepak/OneDrive/Desktop/ObjectRegon/data/raw"
PROC_PATH = "C:/Users/Deepak/OneDrive/Desktop/ObjectRegon/data/processed"
os.makedirs(PROC_PATH, exist_ok=True)

def convert_to_parquet(filename, usecols=None, chunksize=200_000):
    input_path = os.path.join(RAW_PATH, filename)
    output_path = os.path.join(PROC_PATH, filename.replace(".csv", ".parquet"))

    all_chunks = []
    for chunk in pd.read_csv(input_path, chunksize=chunksize, usecols=usecols, low_memory=False):
        # Convert all numeric columns ‚Üí float32 (saves memory)
        for col in chunk.select_dtypes(include=["int", "float"]).columns:
            chunk[col] = chunk[col].astype("float32")
        all_chunks.append(chunk)

    df = pd.concat(all_chunks, ignore_index=True)
    df.to_parquet(output_path, index=False, engine="pyarrow", compression="snappy")

    print(f"‚úÖ Converted {filename} ‚Üí {output_path}, final shape: {df.shape}")
    return output_path


In [12]:
required_columns = {
    "epidemiology.csv": [
        "date", "location_key", "new_confirmed", "new_deceased", "cumulative_confirmed", "cumulative_deceased"
    ],
    "mobility.csv": [
        "date", "location_key", "mobility_retail_and_recreation", "mobility_workplaces"
    ],
    "weather.csv": [
        "date", "location_key", "average_temperature_celsius", "rainfall_mm", "relative_humidity"
    ],
    "hospitalizations.csv": [
        "date", "location_key", "new_hospitalized_patients", "cumulative_hospitalized_patients"
    ],
    "vaccinations.csv": [
        "date", "location_key", "new_persons_vaccinated", "cumulative_persons_vaccinated"
    ],
    "oxford-government-response.csv": [
        "date", "location_key", "stringency_index", "school_closing", "workplace_closing"
    ],
    "google-search-trends.csv": [
        "date", "location_key", "search_trends_covid", "search_trends_vaccine"
    ],
    "by-age.csv": [
        "date", "location_key", "age_bin_0", "age_bin_1", "age_bin_2", "age_bin_3", "age_bin_4", "age_bin_5",
        "age_bin_6", "age_bin_7", "age_bin_8", "age_bin_9"
    ],
    "by-sex.csv": [
        "date", "location_key", "new_confirmed_male", "new_confirmed_female", "cumulative_confirmed_male",
        "cumulative_confirmed_female"
    ],
    "index.csv": [
        "location_key", "country_name", "subregion1_name", "subregion2_name", "locality_name",
        "population", "gdp_per_capita", "human_development_index"
    ]
}


In [13]:
# Index ‚Äì keep the most relevant columns for joins
convert_to_parquet(
    "index.csv",
    usecols=[
        "location_key",
        "country_code",
        "country_name",
        "subregion1_name",
        "subregion2_name",
        "locality_name",
        "aggregation_level"
    ]
)


‚úÖ Converted index.csv ‚Üí C:/Users/Deepak/OneDrive/Desktop/ObjectRegon/data/processed\index.parquet, final shape: (22963, 7)


'C:/Users/Deepak/OneDrive/Desktop/ObjectRegon/data/processed\\index.parquet'

In [12]:
# By Age ‚Äì safer version
def convert_to_parquet_flexible(filename, try_usecols=None):
    input_path = os.path.join(RAW_PATH, filename)
    output_path = os.path.join(PROC_PATH, filename.replace(".csv", ".parquet"))

    # Peek at columns first
    cols = pd.read_csv(input_path, nrows=5).columns.tolist()
    print(f"üîç {filename} columns detected:", cols)

    # Intersect requested usecols with actual columns
    if try_usecols:
        usecols = [c for c in try_usecols if c in cols]
        if not usecols:
            print(f"‚ö†Ô∏è None of the requested columns found in {filename}, keeping all")
            usecols = None
    else:
        usecols = None

    all_chunks = []
    for chunk in pd.read_csv(input_path, chunksize=500_000, usecols=usecols, low_memory=False):
        for col in chunk.select_dtypes(include=["int", "float"]).columns:
            chunk[col] = chunk[col].astype("float32")
        all_chunks.append(chunk)

    df = pd.concat(all_chunks, ignore_index=True)
    df.to_parquet(output_path, index=False, compression="snappy")
    print(f"‚úÖ Saved {filename} ‚Üí {output_path}, shape: {df.shape}")
    return df


# Now run for by-age
by_age = convert_to_parquet_flexible(
    "index.csv",
    try_usecols=[
        "location_key",
        "country_code",
        "country_name",
        "subregion1_name",
        "population",
        "gdp_per_capita",
        "human_development_index"
    ])


üîç index.csv columns detected: ['location_key', 'place_id', 'wikidata_id', 'datacommons_id', 'country_code', 'country_name', 'subregion1_code', 'subregion1_name', 'subregion2_code', 'subregion2_name', 'locality_code', 'locality_name', 'iso_3166_1_alpha_2', 'iso_3166_1_alpha_3', 'aggregation_level']
‚úÖ Saved index.csv ‚Üí C:/Users/Deepak/OneDrive/Desktop/ObjectRegon/data/processed\index.parquet, shape: (22963, 4)


In [14]:
import pandas as pd
import os

# üìÇ Path where your parquet files are stored
DATA_PATH = "C:/Users/Deepak/OneDrive/Desktop/ObjectRegon/data/processed"  # change if needed

# üìå List of parquet files (preprocessed)
files = {
    "epidemiology": "epidemiology.parquet",
    "mobility": "mobility.parquet",
    "weather": "weather.parquet",
    "hospitalizations": "hospitalizations.parquet",
    "vaccinations": "vaccinations.parquet",
    "oxford": "oxford-government-response.parquet",
    "google": "google-search-trends.parquet",
    "by_age": "by-age.parquet",
    "by_sex": "by-sex.parquet",
    "index": "index.parquet"
}

# üìå Load all parquet files
datasets = {name: pd.read_parquet(os.path.join(DATA_PATH, fname)) for name, fname in files.items()}

# üìå Start merging
# Merge epidemiology first (it will be the base table)
merged = datasets["epidemiology"]

# Merge all other datasets on ['date', 'location_key'] if they have date
# Index dataset is static (location-level), so merge only on 'location_key'

for name, df in datasets.items():
    if name == "epidemiology":
        continue
    
    if "date" in df.columns:
        merged = pd.merge(merged, df, on=["date", "location_key"], how="left")
    else:
        merged = pd.merge(merged, df, on="location_key", how="left")

print("‚úÖ Final merged shape:", merged.shape)

# üìå Save final merged dataset
output_path = os.path.join(DATA_PATH, "merged_dataset.parquet")
merged.to_parquet(output_path, index=False)

print("üìÇ Merged dataset saved to:", output_path)


‚úÖ Final merged shape: (12525825, 43)
üìÇ Merged dataset saved to: C:/Users/Deepak/OneDrive/Desktop/ObjectRegon/data/processed\merged_dataset.parquet


In [18]:
import pandas as pd

df = pd.read_parquet("C:/Users/Deepak/OneDrive/Desktop/ObjectRegon/data/processed/merged_reduced.parquet")
print(df.head())
print(df.info())
print(df.isna().sum().sort_values(ascending=False).head(20))  # check missing values


         date location_key  new_confirmed  new_deceased  cumulative_confirmed  \
0  2020-01-01           AD            0.0           0.0                   0.0   
1  2020-01-02           AD            0.0           0.0                   0.0   
2  2020-01-03           AD            0.0           0.0                   0.0   
3  2020-01-04           AD            0.0           0.0                   0.0   
4  2020-01-05           AD            0.0           0.0                   0.0   

   mobility_retail_and_recreation  mobility_grocery_and_pharmacy  \
0                             NaN                            NaN   
1                             NaN                            NaN   
2                             NaN                            NaN   
3                             NaN                            NaN   
4                             NaN                            NaN   

   mobility_transit_stations  mobility_workplaces  mobility_residential  ...  \
0                       

In [16]:
# drop near-empty columns
threshold = 0.8  # keep only columns with <80% missing
df_reduced = df[df.columns[df.isna().mean() < threshold]]

print("Reduced shape:", df_reduced.shape)
df_reduced.head()


Reduced shape: (12525825, 22)


Unnamed: 0,date,location_key,new_confirmed,new_deceased,cumulative_confirmed,mobility_retail_and_recreation,mobility_grocery_and_pharmacy,mobility_transit_stations,mobility_workplaces,mobility_residential,...,relative_humidity,new_confirmed_male,new_confirmed_female,cumulative_confirmed_male,cumulative_confirmed_female,country_code,country_name,subregion1_name,subregion2_name,aggregation_level
0,2020-01-01,AD,0.0,0.0,0.0,,,,,,...,72.773048,,,,,AD,Andorra,,,0.0
1,2020-01-02,AD,0.0,0.0,0.0,,,,,,...,70.841316,,,,,AD,Andorra,,,0.0
2,2020-01-03,AD,0.0,0.0,0.0,,,,,,...,71.117249,,,,,AD,Andorra,,,0.0
3,2020-01-04,AD,0.0,0.0,0.0,,,,,,...,77.338638,,,,,AD,Andorra,,,0.0
4,2020-01-05,AD,0.0,0.0,0.0,,,,,,...,60.762379,,,,,AD,Andorra,,,0.0


In [17]:
df_reduced.to_parquet("C:/Users/Deepak/OneDrive/Desktop/ObjectRegon/data/processed/merged_reduced.parquet")
