In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from functools import reduce

In [3]:
import pandas as pd
import glob


In [4]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CSV Merger").enableHiveSupport().getOrCreate()



# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [5]:
def create_dict_from_csv(file_path):
    df = pd.read_csv(file_path)
    df['file_name'] = df['file_name'].str[:2]
    result_dict = pd.Series(df['file_name'].values, index=df['state']).to_dict()
    return result_dict

# Example usage
file_path = r'/home/talentum/myproject/dataSource/stations_info.csv'
state_dict = create_dict_from_csv(file_path)
print(state_dict)


{'Andhra Pradesh': 'AP', 'Arunachal Pradesh': 'AR', 'Assam': 'AS', 'Bihar': 'BR', 'Chhattisgarh': 'CG', 'Chandigarh': 'CH', 'Delhi': 'DL', 'Gujarat': 'GJ', 'Himachal Pradesh': 'HP', 'Haryana': 'HR', 'Jharkhand': 'JH', 'Jammu and Kashmir': 'JK', 'Karnataka': 'KA', 'Kerala': 'KL', 'Maharashtra': 'MH', 'Meghalaya': 'ML', 'Manipur': 'MN', 'Madhya Pradesh': 'MP', 'Mizoram': 'MZ', 'Nagaland': 'NL', 'Odisha': 'OR', 'Punjab': 'PB', 'Puducherry': 'PY', 'Rajasthan': 'RJ', 'Sikkim': 'SK', 'Telangana': 'TG', 'Tamil Nadu': 'TN', 'Tripura': 'TR', 'Uttarakhand': 'UK', 'Uttar Pradesh': 'UP', 'West Bengal': 'WB'}


In [6]:
print(list(state_dict.values()))

['AP', 'AR', 'AS', 'BR', 'CG', 'CH', 'DL', 'GJ', 'HP', 'HR', 'JH', 'JK', 'KA', 'KL', 'MH', 'ML', 'MN', 'MP', 'MZ', 'NL', 'OR', 'PB', 'PY', 'RJ', 'SK', 'TG', 'TN', 'TR', 'UK', 'UP', 'WB']


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
import os

# Initialize Spark session
spark = SparkSession.builder.appName("CSVProcessor").getOrCreate()

# Define the directory paths
directory_path = 'file:////home/talentum/myproject/dataSource/archive'
savepath = 'file:////home/talentum/myproject/dataSource/output'

# Define the prefixes you want to handle (assuming state_dict is defined elsewhere)
prefixes = list(state_dict.values())  # Add other prefixes as needed

# Function to normalize column names
def normalize_column_names(df):
    normalized_columns = [col.strip().replace(' ', '_').replace('.', '_').replace('(', '').replace(')', '') for col in df.columns]
    return df.toDF(*normalized_columns)

# Function to ensure correct data types
def ensure_data_types(df, columns):
    for column in columns:
        if column not in df.columns:
            df = df.withColumn(column, lit(None).cast(StringType()))
    return df.select(columns)

# Process each prefix
for prefix in prefixes:
    # Define the search pattern
    search_pattern = os.path.join(directory_path, f'{prefix}*.csv')
    
    # Read all CSV files using Spark's read method
    df = spark.read.option("header", "true").csv(search_pattern, inferSchema=True)
    
    if df.count() > 0:
        # Normalize column names
        df = normalize_column_names(df)
        
        # Identify all columns in the DataFrame
        all_columns = sorted(df.columns)  # Sort columns for consistency
        
        # Ensure DataFrame has the correct columns and data types
        df = ensure_data_types(df, all_columns)
        
        # Extract state abbreviation from file names (assuming prefix is the state abbreviation)
        state_abbr = prefix
        state_name = next((name for name, abbr in state_dict.items() if abbr == state_abbr), None)
        
        # Add the state column
        df = df.withColumn('state', lit(state_name).cast(StringType()))
        
        # Save the DataFrame to a single CSV file
        output_path = os.path.join(savepath, f'{prefix}')
        df.coalesce(1).write.option("header", "true").csv(output_path, mode='overwrite')
        print(f"Merged CSV files for prefix '{prefix}' saved as part csv file inside {prefix} folder'")
    else:
        print(f"No valid files found for prefix '{prefix}'.")

# Stop the Spark session
spark.stop()


Merged CSV files for prefix 'AP' saved as 'merged_output_AP.csv'
Merged CSV files for prefix 'AR' saved as 'merged_output_AR.csv'
Merged CSV files for prefix 'AS' saved as 'merged_output_AS.csv'
Merged CSV files for prefix 'BR' saved as 'merged_output_BR.csv'
Merged CSV files for prefix 'CG' saved as 'merged_output_CG.csv'
Merged CSV files for prefix 'CH' saved as 'merged_output_CH.csv'
Merged CSV files for prefix 'DL' saved as 'merged_output_DL.csv'
Merged CSV files for prefix 'GJ' saved as 'merged_output_GJ.csv'
Merged CSV files for prefix 'HP' saved as 'merged_output_HP.csv'
Merged CSV files for prefix 'HR' saved as 'merged_output_HR.csv'
Merged CSV files for prefix 'JH' saved as 'merged_output_JH.csv'
Merged CSV files for prefix 'JK' saved as 'merged_output_JK.csv'
Merged CSV files for prefix 'KA' saved as 'merged_output_KA.csv'
Merged CSV files for prefix 'KL' saved as 'merged_output_KL.csv'
Merged CSV files for prefix 'MH' saved as 'merged_output_MH.csv'
Merged CSV files for pref

### data cleaning(state wise)

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, mean as _mean, to_date, count, isnull
from pyspark.sql.types import DoubleType
import os

# Initialize Spark session
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

def handle_outliers(df, column):
    """Handle outliers in a specific column using the IQR method."""
    Q1 = df.approxQuantile(column, [0.25], 0.01)[0]
    Q3 = df.approxQuantile(column, [0.75], 0.01)[0]
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    df = df.withColumn(column, when((col(column) < lower_bound) | (col(column) > upper_bound), None).otherwise(col(column)))
    median_value = df.approxQuantile(column, [0.5], 0.01)[0]
    df = df.fillna({column: median_value})
    return df

def process_csv_file(file_path, output_dir, state_code):
    """Read, clean, and save a single CSV file."""
    # Read the CSV file into a DataFrame
    df = spark.read.option("header", "true").csv(file_path, inferSchema=True)
    
    total_columns = len(df.columns)
    missing_threshold_row = total_columns * 0.85
    df = df.withColumn('missing_count', sum([isnull(col(c)).cast('int') for c in df.columns]))
    df = df.filter(col('missing_count') <= missing_threshold_row).drop('missing_count')

    # Step 2: Drop columns with more than 50% missing values (based on the new row count)
    total_rows = df.count()
    missing_threshold_col = total_rows * 0.60
    missing_value_counts = {c: df.filter(isnull(col(c))).count() for c in df.columns}
    cols_to_drop = [c for c in missing_value_counts if missing_value_counts[c] > missing_threshold_col]
    df = df.drop(*cols_to_drop)
    
    # Handle outliers and impute missing values for numeric columns
    numeric_columns = [f.name for f in df.schema.fields if isinstance(f.dataType, DoubleType)]
    for column in numeric_columns:
        df = handle_outliers(df, column)
    
    # Remove duplicates
    df = df.dropDuplicates()
    
    # Convert date columns
    if 'From Date' in df.columns and 'To Date' in df.columns:
        df = df.withColumn('From_Date', to_date(col('From Date'), 'yyyy-MM-dd'))
        df = df.withColumn('To_Date', to_date(col('To Date'), 'yyyy-MM-dd'))
        df = df.drop('From Date', 'To Date')
    
    # Normalize column names
    normalized_columns = [col.strip().replace(' ', '_').replace('.', '_').replace('(', '').replace(')', '').lower() for col in df.columns]
    df = df.toDF(*normalized_columns)
    
    # Show the DataFrame and print remaining columns
    df.show()
    print(f"Columns after cleaning: {df.columns}")
    
    # Print a red warning if the number of columns is less than 12
    if len(df.columns) < 12:
        print("\033[91mWarning: The number of columns after cleaning is less than 10.\033[0m")
    
    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Save the cleaned DataFrame to a single CSV file
    output_file = os.path.join(output_dir, f'{state_code}')
    df.coalesce(1).write.option("header", "true").csv(f'file://{output_file}', mode='overwrite')
    print(f'Cleaned data saved to {output_file}')

def process_all_files(state_codes, base_path, output_dir):
    """Process CSV files from multiple state codes."""
    for state_code in state_codes:
        file_path = os.path.join(base_path, state_code, '*.csv')
        process_csv_file(file_path, output_dir, state_code)

# Define state codes, base path, and output directory
state_codes = list(state_dict.values())  # Example state codes
# state_codes = ['JH']
base_path = 'file:///home/talentum/myproject/dataSource/output'
output_dir = '/home/talentum/myproject/dataSource/output/Cleaned'

# Process all files
process_all_files(state_codes, base_path, output_dir)

# Stop the Spark session
spark.stop()
# base_cleaned_file_path='hdfs:///user/talentum/output'

+-----------+-------+-------------+--------+----------------+---------+---------+--------+-------+-----------+----------+-----------+-----+-----+---------+--------+-------------+----------------+-------------+-------+------+------+------------+--------------+
|at_degree_c|bp_mmhg|benzene_ug/m3|co_mg/m3|       from_date|nh3_ug/m3|no2_ug/m3|no_ug/m3|nox_ppb|ozone_ug/m3|pm10_ug/m3|pm2_5_ug/m3|rf_mm| rh_%|so2_ug/m3|sr_w/mt2|temp_degree_c|         to_date|toluene_ug/m3|vws_m/s|wd_deg|ws_m/s|xylene_ug/m3|         state|
+-----------+-------+-------------+--------+----------------+---------+---------+--------+-------+-----------+----------+-----------+-----+-----+---------+--------+-------------+----------------+-------------+-------+------+------+------------+--------------+
|      21.47|  218.5|          0.4|    0.26|06-07-2016 22:00|     6.88|    25.78|     3.4|  15.08|      13.78|      12.5|       3.75|  0.0|78.75|     3.28|    10.0|         32.6|06-07-2016 23:00|          3.7|   -0.1| 29

In [None]:
#JH has less params
#red color

### EDA

In [51]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean as _mean, isnull
from pyspark.sql.types import DoubleType, IntegerType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CSVProcessing") \
    .getOrCreate()

def process_csv_files_from_path(path_pattern):
    """Process all CSV files matching a path pattern."""
    try:
        # Load all CSV files into a DataFrame using the path pattern
        df = spark.read.option("header", "true").csv(path_pattern, inferSchema=True)
        
        # Calculate the mean of each numeric column
        numeric_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, (DoubleType, IntegerType))]
        mean_values = df.select([_mean(col(column)).alias(column) for column in numeric_columns]).collect()[0].asDict()
        
        # Calculate the number of null values per column
        null_counts = {column: df.filter(isnull(col(column))).count() for column in df.columns}
        
        # Total number of rows and columns
        total_rows = df.count()
        total_columns = len(df.columns)

        # Print the results
        print(f"Processing files matching: {path_pattern}")
        print(f"Total Rows: {total_rows}")
        print(f"Total Columns: {total_columns}")
        print("Mean of each feature column:")
        print(mean_values)
        print("Number of null values per column:")
        print(null_counts)
        print()  # Blank line for better readability
    except Exception as e:
        print(f"An error occurred while processing files with pattern {path_pattern}: {e}")

def process_files_for_state_code(state_code, base_paths):
    """Process all CSV files for a single state code across multiple base paths."""
    for base_path in base_paths: 
        print("******************************************************************************************")
        print(f"Processing base path: {base_path} for state code: {state_code}")
        directory = os.path.join(base_path, state_code)
        path_pattern = os.path.join(directory, '*.csv')  # Use wildcard to match all CSV files in the directory
        process_csv_files_from_path(path_pattern)

# Define base paths for directories
base_paths = [
    'file:///home/talentum/myproject/dataSource/output',
    'file:///home/talentum/myproject/dataSource/output/Cleaned'
]

# Example state codes
state_codes = list(state_dict.values())

# Process all files for each state code across all base paths
for state_code in state_codes:
    process_files_for_state_code(state_code, base_paths)

# Stop the Spark session
spark.stop()


******************************************************************************************
Processing base path: file:///home/talentum/myproject/dataSource/output for state code: AP
Processing files matching: file:///home/talentum/myproject/dataSource/output/AP/*.csv
Total Rows: 272217
Total Columns: 24
Mean of each feature column:
{'AT_degree_C': 19.804636198831712, 'BP_mmHg': 329.26510965319864, 'Benzene_ug/m3': 1.3425025154651606, 'CO_mg/m3': 0.6732711137891778, 'NH3_ug/m3': 11.814804842056729, 'NO2_ug/m3': 26.583451043266106, 'NO_ug/m3': 8.807595412416097, 'NOx_ppb': 21.877951389045055, 'Ozone_ug/m3': 33.42829703974618, 'PM10_ug/m3': 75.24238959315365, 'PM2_5_ug/m3': 36.22938336606634, 'RF_mm': 0.39835663419207146, 'RH_%': 36.018222775945446, 'SO2_ug/m3': 9.714749217093633, 'SR_W/mt2': 46.66403755271794, 'Temp_degree_C': 21.804508160189453, 'Toluene_ug/m3': 4.2030759198924175, 'VWS_m/s': 75.46110700906699, 'WD_deg': 124.91501163174138, 'WS_m/s': 10.932443972660211, 'Xylene_ug/m3': 

In [84]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, IntegerType
import os
import glob

# Initialize Spark session
spark = SparkSession.builder \
    .appName("OutlierDetection") \
    .getOrCreate()

def count_outliers_in_csv(file_path):
    """Count outliers in a given CSV file."""
    # Ensure the file path uses the correct prefix for local files
    if not file_path.startswith('file:///'):
        file_path = 'file://' + file_path

    # Read the CSV file into a DataFrame
    df = spark.read.option("header", "true").csv(file_path, inferSchema=True)

    # Function to identify and count outliers using the IQR method
    def count_outliers(df, column):
        # Calculate Q1 and Q3 using approxQuantile function
        quantiles = df.approxQuantile(column, [0.25, 0.75], 0.05)
        q1, q3 = quantiles
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr

        # Count outliers
        outliers = df.filter((col(column) < lower_bound) | (col(column) > upper_bound))
        outlier_count = outliers.count()

        return outlier_count

    # Dictionary to store the count of outliers for each column
    outlier_counts = {}

    # Apply the outlier counting function to numeric columns only
    for column in df.columns:
        dtype = df.schema[column].dataType
        if isinstance(dtype, (DoubleType, IntegerType)):
            outlier_count = count_outliers(df, column)
            outlier_counts[column] = outlier_count

    # Print the results
    for column, count in outlier_counts.items():
        if count > 0:
            print(f"Column '{column}' has {count} outlier(s).")
        else:
            print(f"Column '{column}' has no outliers.")

    return outlier_counts

def process_csv_files(base_path, state_codes):
    """Process all CSV files for the given state codes."""
    for state_code in state_codes:
        # Construct the directory path
        print("*"*20+f'{state_code}'+"*"*20)
        directory_path = os.path.join(base_path, state_code)
        print(f"Looking for files in directory: {directory_path}")  # Debugging line

        # Construct the search pattern for CSV files
        search_pattern = os.path.join(directory_path, '*.csv')
        print(f"Search pattern: {search_pattern}")  # Debugging line

        # List all CSV files in the directory
        csv_files = glob.glob(search_pattern)

        if not csv_files:
            print(f"No CSV files found in directory: {directory_path}")
            continue

        # Process each CSV file
        for file_path in csv_files:
            print(f"Processing file: {file_path}")
            try:
                outlier_counts = count_outliers_in_csv(file_path)
            except Exception as e:
                print(f"Error processing file {file_path}: {e}")

# Example usage
state_codes = list(state_dict.values()) # Replace with actual state codes
base_path = '/home/talentum/myproject/dataSource/output'  # Local base path

process_csv_files(base_path, state_codes)

# Stop the Spark session
spark.stop()


********************AP********************
Looking for files in directory: /home/talentum/myproject/dataSource/output/AP
Search pattern: /home/talentum/myproject/dataSource/output/AP/*.csv
Processing file: /home/talentum/myproject/dataSource/output/AP/part-00000-bea4240b-0224-4c5a-a59e-1cf2f7877304-c000.csv
Column 'AT_degree_C' has 127 outlier(s).
Column 'BP_mmHg' has no outliers.
Column 'Benzene_ug/m3' has 21443 outlier(s).
Column 'CO_mg/m3' has 8297 outlier(s).
Column 'NH3_ug/m3' has 5084 outlier(s).
Column 'NO2_ug/m3' has 16565 outlier(s).
Column 'NO_ug/m3' has 22226 outlier(s).
Column 'NOx_ppb' has 12556 outlier(s).
Column 'Ozone_ug/m3' has 9507 outlier(s).
Column 'PM10_ug/m3' has 7771 outlier(s).
Column 'PM2_5_ug/m3' has 8702 outlier(s).
Column 'RF_mm' has 67013 outlier(s).
Column 'RH_%' has 39 outlier(s).
Column 'SO2_ug/m3' has 7084 outlier(s).
Column 'SR_W/mt2' has 41066 outlier(s).
Column 'Temp_degree_C' has 65 outlier(s).
Column 'Toluene_ug/m3' has 21676 outlier(s).
Column 'VW

In [97]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, IntegerType
import os
import glob

# Initialize Spark session
spark = SparkSession.builder \
    .appName("OutlierDetection") \
    .getOrCreate()

def count_outliers_in_csv(file_path):
    """Count outliers in a given CSV file."""
    # Ensure the file path uses the correct prefix for local files
    if not file_path.startswith('file:///'):
        file_path = 'file://' + file_path

    # Read the CSV file into a DataFrame
    df = spark.read.option("header", "true").csv(file_path, inferSchema=True)

    # Function to identify and count outliers using the IQR method
    def count_outliers(df, column):
        # Calculate Q1 and Q3 using approxQuantile function
        quantiles = df.approxQuantile(column, [0.25, 0.75], 0.05)
        q1, q3 = quantiles
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr

        # Count outliers
        outliers = df.filter((col(column) < lower_bound) | (col(column) > upper_bound))
        outlier_count = outliers.count()

        return outlier_count

    # Dictionary to store the count of outliers for each column
    outlier_counts = {}

    # Apply the outlier counting function to numeric columns only
    for column in df.columns:
        dtype = df.schema[column].dataType
        if isinstance(dtype, (DoubleType, IntegerType)):
            outlier_count = count_outliers(df, column)
            outlier_counts[column] = outlier_count

    # Print the results
    for column, count in outlier_counts.items():
        if count > 0:
            print(f"Column '{column}' has {count} outlier(s).")
        else:
            print(f"Column '{column}' has no outliers.")

    return outlier_counts

def process_csv_files(base_path, state_codes):
    """Process all CSV files for the given state codes."""
    for state_code in state_codes:
        # Construct the directory path
        print("*"*20+f'{state_code}'+"*"*20)
        directory_path = os.path.join(base_path, state_code)
        print(f"Looking for files in directory: {directory_path}")  # Debugging line

        # Construct the search pattern for CSV files
        search_pattern = os.path.join(directory_path, '*.csv')
        print(f"Search pattern: {search_pattern}")  # Debugging line

        # List all CSV files in the directory
        csv_files = glob.glob(search_pattern)

        if not csv_files:
            print(f"No CSV files found in directory: {directory_path}")
            continue

        # Process each CSV file
        for file_path in csv_files:
            print(f"Processing file: {file_path}")
            try:
                outlier_counts = count_outliers_in_csv(file_path)
            except Exception as e:
                print(f"Error processing file {file_path}: {e}")

# Example usage
state_codes = list(state_dict.values()) # Replace with actual state codes
base_path = '/home/talentum/myproject/dataSource/output/Cleaned'  # Local base path

process_csv_files(base_path, state_codes)

# Stop the Spark session
spark.stop()


********************AP********************
Looking for files in directory: /home/talentum/myproject/dataSource/output/Cleaned/AP
Search pattern: /home/talentum/myproject/dataSource/output/Cleaned/AP/*.csv
Processing file: /home/talentum/myproject/dataSource/output/Cleaned/AP/part-00000-6fb4c158-0a9c-44f2-b760-f08ac2a14898-c000.csv
Column 'at_degree_c' has 15 outlier(s).
Column 'bp_mmhg' has 44881 outlier(s).
Column 'benzene_ug/m3' has 16058 outlier(s).
Column 'co_mg/m3' has 3871 outlier(s).
Column 'nh3_ug/m3' has 2842 outlier(s).
Column 'no2_ug/m3' has 11369 outlier(s).
Column 'no_ug/m3' has 31573 outlier(s).
Column 'nox_ppb' has 14169 outlier(s).
Column 'ozone_ug/m3' has 10806 outlier(s).
Column 'pm10_ug/m3' has 4379 outlier(s).
Column 'pm2_5_ug/m3' has 2515 outlier(s).
Column 'rf_mm' has 25608 outlier(s).
Column 'rh_%' has no outliers.
Column 'so2_ug/m3' has 6297 outlier(s).
Column 'sr_w/mt2' has 9399 outlier(s).
Column 'temp_degree_c' has 69821 outlier(s).
Column 'toluene_ug/m3' has

In [92]:
***************************************2*******************************************

In [95]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# Initialize a Spark session
spark = SparkSession.builder.appName("CSV Merge").getOrCreate()

# List of state codes (assuming state_dict is defined elsewhere in your code)
state_codes = list(state_dict.values())

# Read the CSV files into a list of DataFrames
dfs = []
print("Starting to read CSV files...")
for state_code in state_codes:
    file_path = f"file:///home/talentum/myproject/dataSource/output/Cleaned/{state_code}/*.csv"
    print(f"Reading files from: {file_path}")
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    print(f"Read {df.count()} rows for state code: {state_code}")
    dfs.append(df)

print("Finished reading CSV files.")
print("Starting to clean column names...")

# Function to clean column names by replacing special characters with underscores
def clean_column_names(df):
    for col_name in df.columns:
        new_col_name = col_name.replace('-', '_').replace('/', '_').replace(' ', '_')
        df = df.withColumnRenamed(col_name, new_col_name)
    return df

# Clean column names for each DataFrame
dfs = [clean_column_names(df) for df in dfs]

print("Finished cleaning column names.")
print("Starting to ensure all DataFrames have the same columns...")

# Get the list of all columns
all_columns = set()
for df in dfs:
    all_columns.update(df.columns)

# Ensure all DataFrames have all columns
def add_missing_columns(df, all_columns):
    for column in all_columns:
        if column not in df.columns:
            df = df.withColumn(column, lit(None).cast("string"))
    return df

dfs = [add_missing_columns(df, all_columns) for df in dfs]

print("Finished ensuring all DataFrames have the same columns.")
print("Starting to select columns in the same order for consistency...")

# Select the columns in the same order for consistency
dfs = [df.select(*all_columns) for df in dfs]

print("Finished selecting columns in the same order.")
print("Starting to merge DataFrames...")

# Merge the DataFrames
merged_df = dfs[0]
for df in dfs[1:]:
    merged_df = merged_df.unionByName(df)

print("Finished merging DataFrames.")
print("Starting to save the merged DataFrame to a single CSV file...")

# Coalesce to a single partition and save the merged DataFrame to a single CSV file
merged_df.coalesce(1).write.csv("file:///home/talentum/myproject/dataSource/output/merged_final", header=True)

print("Finished saving the merged DataFrame.")
print("Stopping the Spark session...")

# Stop the Spark session
spark.stop()

print("Spark session stopped.")


Starting to read CSV files...
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/AP/*.csv
Read 225716 rows for state code: AP
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/AR/*.csv
Read 8131 rows for state code: AR
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/AS/*.csv
Read 73172 rows for state code: AS
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/BR/*.csv
Read 636501 rows for state code: BR
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/CG/*.csv
Read 49048 rows for state code: CG
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/CH/*.csv
Read 54786 rows for state code: CH
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/DL/*.csv
Read 2118037 rows for state code: DL
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/GJ/*.csv
Read 346017 rows for state

In [96]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# Initialize a Spark session
spark = SparkSession.builder.appName("CSV Merge").getOrCreate()

# List of state codes (assuming state_dict is defined elsewhere in your code)
state_codes = list(state_dict.values())

# Read the CSV files into a list of DataFrames
dfs = []
print("Starting to read CSV files...")
for state_code in state_codes:
    file_path = f"file:///home/talentum/myproject/dataSource/output/Cleaned/{state_code}/*.csv"
    print(f"Reading files from: {file_path}")
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    print(f"Read {df.count()} rows for state code: {state_code}")
    dfs.append(df)

print("Finished reading CSV files.")
print("Starting to clean column names...")

# Function to clean column names by replacing special characters with underscores
def clean_column_names(df):
    for col_name in df.columns:
        new_col_name = col_name.replace('-', '_').replace('/', '_').replace(' ', '_')
        df = df.withColumnRenamed(col_name, new_col_name)
    return df

# Clean column names for each DataFrame
dfs = [clean_column_names(df) for df in dfs]

print("Finished cleaning column names.")
print("Starting to ensure all DataFrames have the same columns...")

# Get the list of all columns
all_columns = set()
for df in dfs:
    all_columns.update(df.columns)

# Ensure all DataFrames have all columns
def add_missing_columns(df, all_columns):
    for column in all_columns:
        if column not in df.columns:
            df = df.withColumn(column, lit(None).cast("string"))
    return df

dfs = [add_missing_columns(df, all_columns) for df in dfs]

print("Finished ensuring all DataFrames have the same columns.")
print("Starting to select columns in the same order for consistency...")

# Select the columns in the same order for consistency
dfs = [df.select(*all_columns) for df in dfs]

print("Finished selecting columns in the same order.")
print("Starting to merge DataFrames...")

# Merge the DataFrames
merged_df = dfs[0]
for df in dfs[1:]:
    merged_df = merged_df.unionByName(df)

print("Finished merging DataFrames.")
print("Starting to save the merged DataFrame to a Parquet file...")

# Coalesce to a single partition and save the merged DataFrame to a Parquet file
merged_df.coalesce(1).write.parquet("file:///home/talentum/myproject/dataSource/output/merged_final")

print("Finished saving the merged DataFrame as Parquet.")
print("Stopping the Spark session...")

# Stop the Spark session
spark.stop()

print("Spark session stopped.")


Starting to read CSV files...
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/AP/*.csv
Read 225716 rows for state code: AP
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/AR/*.csv
Read 8131 rows for state code: AR
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/AS/*.csv
Read 73172 rows for state code: AS
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/BR/*.csv
Read 636501 rows for state code: BR
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/CG/*.csv
Read 49048 rows for state code: CG
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/CH/*.csv
Read 54786 rows for state code: CH
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/DL/*.csv
Read 2118037 rows for state code: DL
Reading files from: file:///home/talentum/myproject/dataSource/output/Cleaned/GJ/*.csv
Read 346017 rows for state