<a href="https://colab.research.google.com/github/DiegoLeonardoPaez/big_data/blob/main/csv_to_parquet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
# connect to Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Installing Spark and Dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Covid19Data").getOrCreate()
spark

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [15]:
# Import required libraries
import os
from os import walk
from pathlib import Path
from pyspark.sql.functions import to_date, date_format

# Function to validate if the input_dir exist
def directory_exist(input_dir):
  p = Path(input_dir)
  return p.exists()

# Function to list CSV files in a directory
def list_csv_files(input_dir):
    csv_files = []
    for (input_dir, dir_names, file_names) in os.walk(input_dir):
        for file in file_names:
            if file.lower().endswith('.csv'):
                csv_files.append(os.path.join(input_dir, file))
    if len(csv_files) == 0:
        raise Exception("No CSV files found in the directory")
    return csv_files

# Function to read CSV files and infer schema
def read_csv_files(spark_session, files):
    return {f'{os.path.splitext(os.path.basename(file))[0]}_df': spark_session.read.options(delimiter=",", header=True).option("inferSchema", "true").csv(file) for file in files}

# Function to create a 'period_id' column based on the 'Date' column
# period_id field will be useful to create the dataframe partition
def create_period_id_column(df):
    df = df.withColumn('Date', to_date('Date', 'yyyy-MM-dd'))
    df = df.withColumn('period_id', date_format('Date', 'yyyy-MM'))
    return df

# Function to clean column names by replacing spaces, special characters, etc
def clean_column_names(df):
    for column in df.columns:
        new_column = column.replace(' ', '_')
        new_column = new_column.replace('.', '_')
        new_column = new_column.replace('-', '_')
        new_column = new_column.replace('/', '_')
        new_column = new_column.replace('(', '')
        new_column = new_column.replace(')', '')
        new_column = new_column.replace(',', '_')
        df = df.withColumnRenamed(column, new_column)
    return df

# Function to write DataFrame to Parquet format with partitioning
def write_to_parquet(df, output_dir, partition_column, key_df):
    output_path = os.path.join(output_dir, key_df + '.parquet')
    df.write.mode('overwrite').partitionBy(partition_column).parquet(output_path)

# Main function to execute the data processing and Parquet writing
def main():
    input_dir = "/content/drive/MyDrive/covid"

    dir = directory_exist(input_dir)

    if dir == False:
      raise Exception("input_dir does not define. Program terminated")
      return

    print(f"Found directory {input_dir}")

    files = list_csv_files(input_dir)

    if not files:
        raise Exception("No CSV files found in the directory. Program terminated")
        return

    dfs = read_csv_files(spark, files)
    print(f"Found {len(dfs)} CSV file(s) in the directory.")

    # Code to create 'period_id' in the DataFrame 'day_wise_df'
    day_wise_df = dfs.get('day_wise_df', None)
    if day_wise_df is not None:
        day_wise_df = create_period_id_column(day_wise_df)
        dfs['day_wise_df'] = day_wise_df
        print("Column 'period_id' created in the DataFrame 'day_wise_df'.")

    output_dir = "/content/drive/MyDrive/covid/parquet_output"

    # Dictionary that maps each key to the desired partitioning column name
    partition_column_dict = {
        'country_wise_latest_df': 'Country_Region',
        'full_grouped_df': 'Country_Region',
        'worldometer_data_df': 'Country_Region',
        'covid_19_clean_complete_df': 'Country_Region',
        'usa_county_wise_df': 'Province_State',
        'day_wise_df': 'period_id'
    }

    for key_df, df in dfs.items():
        df = clean_column_names(df)
        partition_column = partition_column_dict.get(key_df, None)
        if partition_column is not None:
            write_to_parquet(df, output_dir, partition_column, key_df)
    print("All DataFrames successfully written to Parquet format.")

if __name__ == "__main__":
    main()

Found /content/drive/MyDrive/covid directory.
Found 6 CSV file(s) in the directory.
Column 'period_id' created in the DataFrame 'day_wise_df'.
All DataFrames successfully written to Parquet format.
