In [1]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# Initialize SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ParquetReader") \
    .getOrCreate()

print("SparkSession created successfully!")

SparkSession created successfully!


In [3]:
# Define the path to the folder in your Google Drive
folder_path = '/content/drive/MyDrive/kcc_all_states_combined.parquet'

# Read the parquet files from the folder using Spark
try:
    spark_df = spark.read.parquet(folder_path)

    print(f"Successfully loaded data from {folder_path} into a Spark DataFrame.")
    print("\nInitial Data Inspection (Spark DataFrame):")
    spark_df.printSchema()
    spark_df.show(5)
    print(f"\nNumber of rows: {spark_df.count()}")

except Exception as e:
    print(f"An error occurred while reading the parquet files with Spark: {e}")

Successfully loaded data from /content/drive/MyDrive/kcc_all_states_combined.parquet into a Spark DataFrame.

Initial Data Inspection (Spark DataFrame):
root
 |-- StateName: string (nullable = true)
 |-- DistrictName: string (nullable = true)
 |-- BlockName: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Sector: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Crop: string (nullable = true)
 |-- QueryType: string (nullable = true)
 |-- QueryText: string (nullable = true)
 |-- KccAns: string (nullable = true)
 |-- CreatedOn: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- source_file: string (nullable = true)

+--------------------+--------------------+--------------+------+------------+--------+-----------------+--------------------+--------------------+--------------------+--------------------+----+-----+--------------------+
|           StateName|        DistrictName|     BlockName|Season

In [4]:
# Define the path to the folder in your Google Drive
folder_path = 'D:\Scripts\Kissan Dataset\kcc_all_states_combined.parquet'

# Read the parquet files from the folder using Spark
try:
    spark_df = spark.read.parquet(folder_path)

    print(f"Successfully loaded data from {folder_path} into a Spark DataFrame.")
    print("\nInitial Data Inspection (Spark DataFrame):")
    spark_df.printSchema()
    spark_df.show(5)
    print(f"\nNumber of rows: {spark_df.count()}")


except Exception as e:
    print(f"An error occurred while reading the parquet files with Spark: {e}")

An error occurred while reading the parquet files with Spark: java.net.URISyntaxException: Relative path in absolute URI: D:%5CScripts%5CKissan%20Dataset%5Ckcc_all_states_combined.parquet


  folder_path = 'D:\Scripts\Kissan Dataset\kcc_all_states_combined.parquet'


In [5]:
from pyspark.sql.functions import col, sum

# Check for missing values (nulls) in each column
print("Checking for missing values (nulls) in all columns:")
spark_df.select([sum(col(c).isNull().cast("integer")).alias(c) for c in spark_df.columns]).show()

Checking for missing values (nulls) in all columns:
+---------+------------+---------+--------+--------+--------+--------+---------+---------+--------+---------+--------+--------+-----------+
|StateName|DistrictName|BlockName|  Season|  Sector|Category|    Crop|QueryType|QueryText|  KccAns|CreatedOn|    year|   month|source_file|
+---------+------------+---------+--------+--------+--------+--------+---------+---------+--------+---------+--------+--------+-----------+
|    14362|    24863620| 26347809|26880217|29885686|30747830|31102588| 31278797| 31383472|38250028| 45351733|45385186|45412518|          0|
+---------+------------+---------+--------+--------+--------+--------+---------+---------+--------+---------+--------+--------+-----------+



In [6]:
from pyspark.sql.functions import col
from functools import reduce

# Create a list of columns to check for emptiness, excluding 'source_files'
columns_to_check = [c for c in spark_df.columns if c != 'source_file']

# Build the condition for rows where all the columns in 'columns_to_check' are null or empty
condition = (col(c).isNull() | (col(c) == '') for c in columns_to_check)

# Combine all the conditions using the `&` (AND) operator
combined_condition = reduce(lambda x, y: x & y, condition)

# Filter the rows where at least one column in 'columns_to_check' is not null or empty
filtered_df = spark_df.filter(~combined_condition)

# Show the result
filtered_df.show()


+--------------------+--------------------+--------------+------+------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----+--------------------+
|           StateName|        DistrictName|     BlockName|Season|      Sector|  Category|                Crop|           QueryType|           QueryText|              KccAns|           CreatedOn|year|month|         source_file|
+--------------------+--------------------+--------------+------+------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----+--------------------+
|              ODISHA|            NAYAGARH|     DASAPALLA|    NA| AGRICULTURE|    Others|              Others|  Government Schemes|Mandi registratio...|Advised to contac...|2023-02-03T14:09:...|2023|    2|file:///D:/Script...|
|              ODISHA|             CUTTACK|TANGI CHOUDWAR|    NA|HORTICULTURE|   Flowers|   

In [7]:
from pyspark.sql.functions import col, sum

# Check for missing values (nulls) in each column
print("Checking for missing values (nulls) in all columns:")
filtered_df.select([sum(col(c).isNull().cast("integer")).alias(c) for c in filtered_df.columns]).show()

Checking for missing values (nulls) in all columns:
+---------+------------+---------+--------+--------+--------+--------+---------+---------+--------+---------+--------+--------+-----------+
|StateName|DistrictName|BlockName|  Season|  Sector|Category|    Crop|QueryType|QueryText|  KccAns|CreatedOn|    year|   month|source_file|
+---------+------------+---------+--------+--------+--------+--------+---------+---------+--------+---------+--------+--------+-----------+
|    12342|    24861600| 26345789|26878197|29883666|30745810|31100568| 31276777| 31381452|38248008| 45349713|45383166|45410498|          0|
+---------+------------+---------+--------+--------+--------+--------+---------+---------+--------+---------+--------+--------+-----------+



In [8]:
from pyspark.sql.functions import col

# Drop rows where QueryText is null, empty string, or "NULL" OR KccAns is null, empty string, or "NULL"
cleaned_spark_df = filtered_df.filter(
    ~((col("QueryText").isNull()) | (col("QueryText") == "") | (col("QueryText") == "NULL")) &
    ~((col("KccAns").isNull()) | (col("KccAns") == "") | (col("KccAns") == "NULL"))
)

print(f"Original number of rows: {filtered_df.count()}")
print(f"Number of rows after dropping missing QueryText/KccAns: {cleaned_spark_df.count()}")

Original number of rows: 79257331
Number of rows after dropping missing QueryText/KccAns: 41004159


In [9]:
from pyspark.sql.functions import col, sum

# Check for missing values (nulls) in each column
print("Checking for missing values (nulls) in all columns:")
cleaned_spark_df.select([sum(col(c).isNull().cast("integer")).alias(c) for c in cleaned_spark_df.columns]).show()

Checking for missing values (nulls) in all columns:
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+
|StateName|DistrictName|BlockName|Season|Sector|Category|Crop|QueryType|QueryText|KccAns|CreatedOn|   year|  month|source_file|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+
|      164|         355|      123|   633|   359|    2458| 818|     2505|        0|     0|  7267178|7300552|7327698|          0|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+



In [10]:
from pyspark.sql.functions import col, lower

# Define keywords to filter out (converted to lowercase)
print(f"Original number of rows: {cleaned_spark_df.count()}")
keywords_to_drop = ["wrong number", "test call"]

# Create a condition to filter OUT rows where KccAns_cleaned contains any of the keywords
# We'll convert KccAns_cleaned to lowercase for case-insensitive matching
filter_condition_to_drop = lower(col("KccAns")).contains(keywords_to_drop[0])
for keyword in keywords_to_drop[1:]:
    filter_condition_to_drop = filter_condition_to_drop | lower(col("KccAns")).contains(keyword)

# Filter the DataFrame to keep rows that DO NOT contain the keywords
cleaned_spark_df = cleaned_spark_df.filter(~filter_condition_to_drop)

# Print the row counts before and after the filtering

print(f"Number of rows after dropping 'test' or 'wrong number' in KccAns: {cleaned_spark_df.count()}")

# Optionally, you can save the updated DataFrame back to a file, if needed
# Example: Save it in Parquet format
# cleaned_spark_df.write.parquet("path_to_save/cleaned_spark_df.parquet")


Original number of rows: 41004159
Number of rows after dropping 'test' or 'wrong number' in KccAns: 40966773


In [11]:
from pyspark.sql.functions import col, sum

# Check for missing values (nulls) in each column
print("Checking for missing values (nulls) in all columns:")
cleaned_spark_df.select([sum(col(c).isNull().cast("integer")).alias(c) for c in cleaned_spark_df.columns]).show()

Checking for missing values (nulls) in all columns:
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+
|StateName|DistrictName|BlockName|Season|Sector|Category|Crop|QueryType|QueryText|KccAns|CreatedOn|   year|  month|source_file|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+
|      164|         355|      123|   633|   359|    2458| 818|     2505|        0|     0|  7267113|7300487|7327633|          0|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+



In [12]:
from pyspark.sql.functions import col, split, element_at, when

# Show a sample of the 'source_file' column to understand its structure
print("Sample values from 'source_file' column:")
cleaned_spark_df.select("source_file").show(20, truncate=False)

# Let's assume the state name is part of the path, just before the filename
# Extract the state name from the 'source_file' column
cleaned_spark_df = cleaned_spark_df.withColumn(
    "extracted_state",
    when(col("source_file").isNotNull(), element_at(split(col("source_file"), "/"), -2)).otherwise(None)
)

# Update 'StateName' only where it's null, empty, or 'NULL', using the extracted state name
cleaned_spark_df = cleaned_spark_df.withColumn(
    "StateName",
    when(
        (col("StateName").isNull()) | (col("StateName") == "") | (col("StateName") == "NULL"),
        col("extracted_state")
    ).otherwise(col("StateName"))
).drop("extracted_state")  # Drop the temporary 'extracted_state' column

# Show a sample of the updated 'StateName' and 'source_file'
print("\nSample values after attempting to populate StateName:")
cleaned_spark_df.select("StateName", "source_file").show(20, truncate=False)

# Print the number of rows after updating the 'StateName'
print(f"\nNumber of rows after attempting to populate StateName: {cleaned_spark_df.count()}")


Sample values from 'source_file' column:
+------------------------------------------------------------------+
|source_file                                                       |
+------------------------------------------------------------------+
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISHA.csv|
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISHA.csv|
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISHA.csv|
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISHA.csv|
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISHA.csv|
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISHA.csv|
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISHA.csv|
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISHA.csv|
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISHA.csv|
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISHA.csv|
|file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_ODISH

In [13]:
from pyspark.sql.functions import col, sum

# Check for missing values (nulls) in each column
print("Checking for missing values (nulls) in all columns:")
cleaned_spark_df.select([sum(col(c).isNull().cast("integer")).alias(c) for c in cleaned_spark_df.columns]).show()

Checking for missing values (nulls) in all columns:
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+
|StateName|DistrictName|BlockName|Season|Sector|Category|Crop|QueryType|QueryText|KccAns|CreatedOn|   year|  month|source_file|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+
|        0|         355|      123|   633|   359|    2458| 818|     2505|        0|     0|  7267113|7300487|7327633|          0|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+



In [14]:
from pyspark.sql.functions import col

# List of valid state names
valid_state_names = [
    "UTTAR PRADESH", "RAJASTHAN", "MAHARASHTRA", "MADHYA PRADESH",
    "HARYANA", "PUNJAB", "GUJARAT", "BIHAR", "TAMILNADU", "KARNATAKA",
    "ODISHA", "WEST BENGAL", "ANDHRA PRADESH", "TELANGANA",
    "HIMACHAL PRADESH", "CHHATTISGARH", "JAMMU AND KASHMIR",
    "JHARKAND", "UTTARAKHAND", "ASSAM", "KERALA", "DELHI", "TRIPURA",
    "PUDUCHERRY", "MANIPUR", "MIZORAM", "MEGHALAYA", "GOA", "SIKKIM",
    "ARUNACHAL PRADESH", "NAGALAND", "A AND N ISLANDS", "CHANDIGARH",
    "LAKSHADWEEP", "0", "DADRA AND NAGAR HAVELI", "DAMAN AND DIU"
]

# Filter the DataFrame to keep only rows where StateName is in the valid list
cleaned_spark_df = cleaned_spark_df.filter(col("StateName").isin(valid_state_names))

# Show the result
print(f"Number of rows after filtering valid StateNames: {cleaned_spark_df.count()}")
cleaned_spark_df.show(20, truncate=False)


Number of rows after filtering valid StateNames: 40757584
+---------+--------------+--------------+------+------------+----------+------------------------+-------------------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+----+-----+------------------------------------------------------------------+
|StateName|DistrictName  |BlockName     |Season|Sector      |Category  |Crop                    |QueryType                      |QueryText                                                   |KccAns                                                                                                                                                                         |CreatedOn              |year|month|source_file                                                       |
+---

In [15]:
from pyspark.sql.functions import col, sum

# Check for missing values (nulls) in each column
print("Checking for missing values (nulls) in all columns:")
cleaned_spark_df.select([sum(col(c).isNull().cast("integer")).alias(c) for c in cleaned_spark_df.columns]).show()

Checking for missing values (nulls) in all columns:
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+
|StateName|DistrictName|BlockName|Season|Sector|Category|Crop|QueryType|QueryText|KccAns|CreatedOn|   year|  month|source_file|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+
|        0|           0|        0|     0|     0|       0|   0|        3|        0|     0|  7209172|7210199|7210520|          0|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+



In [16]:
from pyspark.sql.functions import col

# Get the current total row count
total_row_count = cleaned_spark_df.count()

# Filter rows where 'QueryType' is null
df_query_type_null = cleaned_spark_df.filter(col("QueryType").isNull())

# Show the rows where 'QueryType' is null
df_query_type_null.show(20, truncate=False)

# Print the total row count and the number of rows with null 'QueryType'
print(f"Current total row count: {total_row_count}")
print(f"Number of rows with null 'QueryType': {df_query_type_null.count()}")


+-------------+--------------------------+------------+------+-----------+--------+------+---------+-----------------+-----------------+-----------------------+----+-----+-------------------------------------------------------------------------+
|StateName    |DistrictName              |BlockName   |Season|Sector     |Category|Crop  |QueryType|QueryText        |KccAns           |CreatedOn              |year|month|source_file                                                              |
+-------------+--------------------------+------------+------+-----------+--------+------+---------+-----------------+-----------------+-----------------------+----+-----+-------------------------------------------------------------------------+
|UTTAR PRADESH|FATEHPUR                  |DHATA       |NA    |AGRICULTURE|Cereals |Wheat |NULL     |OTHER STATE CALL |OTHER STATE CALL |2025-07-16T10:21:42.5  |2025|7    |file:///D:/Scripts/Kissan%20Dataset/Dataset/kcc_dataset_UTTAR_PRADESH.csv|
|UTTAR PRADESH|A

In [17]:
# Drop rows where 'QueryType' is null
cleaned_spark_df = cleaned_spark_df.filter(col("QueryType").isNotNull())

# Show the updated DataFrame (first 20 rows)
cleaned_spark_df.show(20, truncate=False)

# Print the new row count after dropping rows with null 'QueryType'
print(f"Number of rows after dropping 'QueryType' is null: {cleaned_spark_df.count()}")


+---------+--------------+--------------+------+------------+----------+------------------------+-------------------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+----+-----+------------------------------------------------------------------+
|StateName|DistrictName  |BlockName     |Season|Sector      |Category  |Crop                    |QueryType                      |QueryText                                                   |KccAns                                                                                                                                                                         |CreatedOn              |year|month|source_file                                                       |
+---------+--------------+--------------+------+------------+-

In [18]:
from pyspark.sql.functions import col, sum

# Check for missing values (nulls) in each column
print("Checking for missing values (nulls) in all columns:")
cleaned_spark_df.select([sum(col(c).isNull().cast("integer")).alias(c) for c in cleaned_spark_df.columns]).show()

Checking for missing values (nulls) in all columns:
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+
|StateName|DistrictName|BlockName|Season|Sector|Category|Crop|QueryType|QueryText|KccAns|CreatedOn|   year|  month|source_file|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+
|        0|           0|        0|     0|     0|       0|   0|        0|        0|     0|  7209172|7210199|7210520|          0|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+-----------+



In [19]:
# Drop the 'source_file' column
cleaned_spark_df = cleaned_spark_df.drop("source_file")

# Show the updated DataFrame (first 20 rows)
cleaned_spark_df.show(20, truncate=False)

# Print the schema to confirm that 'source_file' is dropped
cleaned_spark_df.printSchema()


+---------+--------------+--------------+------+------------+----------+------------------------+-------------------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+----+-----+
|StateName|DistrictName  |BlockName     |Season|Sector      |Category  |Crop                    |QueryType                      |QueryText                                                   |KccAns                                                                                                                                                                         |CreatedOn              |year|month|
+---------+--------------+--------------+------+------------+----------+------------------------+-------------------------------+------------------------------------------------------------+------

In [20]:
from pyspark.sql.functions import col, sum

# Check for missing values (nulls) in each column
print("Checking for missing values (nulls) in all columns:")
cleaned_spark_df.select([sum(col(c).isNull().cast("integer")).alias(c) for c in cleaned_spark_df.columns]).show()

Checking for missing values (nulls) in all columns:
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+
|StateName|DistrictName|BlockName|Season|Sector|Category|Crop|QueryType|QueryText|KccAns|CreatedOn|   year|  month|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+
|        0|           0|        0|     0|     0|       0|   0|        0|        0|     0|  7209172|7210199|7210520|
+---------+------------+---------+------+------+--------+----+---------+---------+------+---------+-------+-------+



In [21]:
# Define the path in your Google Drive where you want to save the cleaned data
output_drive_path = '/content/drive/My Drive/cleaned_kcc_dataset.parquet' # Replace with your desired path

# Save the cleaned_spark_df DataFrame to the specified path in Parquet format
try:
    cleaned_spark_df.write.parquet(output_drive_path)
    print(f"Successfully saved cleaned_spark_df to {output_drive_path}")
except Exception as e:
    print(f"An error occurred while saving the DataFrame: {e}")

Successfully saved cleaned_spark_df to /content/drive/My Drive/cleaned_kcc_dataset.parquet


In [22]:
# List of columns to drop
columns_to_drop = ["CreatedOn", "StateName", "DistrictName", "BlockName", "Sector"]

# Drop the specified columns from the DataFrame
cleaned_spark_df = cleaned_spark_df.drop(*columns_to_drop)

# Show the schema to confirm the columns have been dropped
cleaned_spark_df.printSchema()

# Show the first few rows of the updated DataFrame
cleaned_spark_df.show(5)

root
 |-- Season: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Crop: string (nullable = true)
 |-- QueryType: string (nullable = true)
 |-- QueryText: string (nullable = true)
 |-- KccAns: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)

+------+--------+-----------------+--------------------+--------------------+--------------------+----+-----+
|Season|Category|             Crop|           QueryType|           QueryText|              KccAns|year|month|
+------+--------+-----------------+--------------------+--------------------+--------------------+----+-----+
|    NA|  Others|           Others|  Government Schemes|Mandi registratio...|Advised to contac...|2023|    2|
|    NA| Flowers|Hibiscus (Gurhal)|\tPlant Protection\t|White mealy bug i...|Recommended to sp...|2023|    2|
|    NA|  Fruits|            Mango| Nutrient Management|About planofix ( ...|Planofix ( Alpha ...|2023|    2|
|    NA| Cereals|     Paddy