In [1]:
import os
import sys

os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ["PATH"] += os.pathsep + "C:\\hadoop\\bin"

# Set python executable path
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [2]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, when
from pyspark.sql.types import StructType , StructField , StringType , IntegerType , FloatType
from datetime import datetime


In [3]:
spark = SparkSession.builder.appName("Migration-project").getOrCreate()

In [4]:
spark

In [5]:
Migration_data = ("indiaSummary.csv")

In [6]:
Migration_df = spark.read.csv(Migration_data,header = True,inferSchema = True)
Migration_df.show(5)
Migration_df.printSchema()

+----------+-----+--------+---------+--------------------+---------------+--------------------+----------------------+--------------------+----------------------+--------------------------+------------------------+--------------------------+----------------------------+--------------------------+----------------------------+---------------------------+--------------------------+----------------------------+------------------------------+----------------------------+------------------------------+---------------------------+--------------------+----------------------------+-----------------------------+---------------------------+-----------------------------+
|Table name|State|District|Area Name|     Last residence4|Last residence5|Place of enumeration|Total migrants-Persons|Total migrants-Males|Total migrants-Females|Duration (<1 year)-Persons|Duration (<1 year)-Males|Duration (<1 year)-Females|Duration (1-4 years)-Persons|Duration (1-4 years)-Males|Duration (1-4 years)-Females|Durati

In [7]:
duration_columns_to_remove = [
    "Duration (<1 year)-Persons", 
    "Duration (<1 year)-Males",
    "Duration (<1 year)-Females",
    "Duration (1-4 years)-Persons",
    "Duration (1-4 years)-Males",
    "Duration (1-4 years)-Females",
    "Duration(5-9 years)-Persons",
    "Duration (5-9 years)-Males",
    "Duration (5-9 years)-Females",
    "Duration (10-19 years)-Persons",
    "Duration (10-19 years)-Males",
    "Duration (10-19 years)-Females",
    "Duration(20+ years)-Persons",
    "Duration (20+) Males",
    "Duration (20+ years)-Females",
    "Duration (not stated)-Persons",
    "Duration (not stated)-Males",
    "Table name",
    "State",
    "District",
    "Duration (not stated)-Females"
]
data_df = Migration_df.drop(*duration_columns_to_remove)
data_df.show(10)
data_df.printSchema()

+---------+--------------------+---------------+--------------------+----------------------+--------------------+----------------------+
|Area Name|     Last residence4|Last residence5|Place of enumeration|Total migrants-Persons|Total migrants-Males|Total migrants-Females|
+---------+--------------------+---------------+--------------------+----------------------+--------------------+----------------------+
|    INDIA|               Total|          Total|               Total|             455787621|           146145967|             309641654|
|    INDIA|               Total|          Total|               Rural|             278203361|            64703974|             213499387|
|    INDIA|               Total|          Total|               Urban|             177584260|            81441993|              96142267|
|    INDIA|Last residence wi...|          Total|               Total|             449917418|           143467652|             306449766|
|    INDIA|Last residence wi...|         

In [8]:
data_df = data_df.withColumnRenamed("Last residence4", "Last_Place_of_residence") \
       .withColumnRenamed("Last residence5", "Number_of_Last_residence").withColumnRenamed("Area Name" , "Area_Name")
data_df.show(10)
data_df.printSchema()

+---------+-----------------------+------------------------+--------------------+----------------------+--------------------+----------------------+
|Area_Name|Last_Place_of_residence|Number_of_Last_residence|Place of enumeration|Total migrants-Persons|Total migrants-Males|Total migrants-Females|
+---------+-----------------------+------------------------+--------------------+----------------------+--------------------+----------------------+
|    INDIA|                  Total|                   Total|               Total|             455787621|           146145967|             309641654|
|    INDIA|                  Total|                   Total|               Rural|             278203361|            64703974|             213499387|
|    INDIA|                  Total|                   Total|               Urban|             177584260|            81441993|              96142267|
|    INDIA|   Last residence wi...|                   Total|               Total|             449917418|  

In [9]:
non_india_areas = data_df.filter(data_df.Area_Name != "India").select("Area_Name").distinct()

# Show the results
non_india_areas.show()

+---------+
|Area_Name|
+---------+
|    INDIA|
+---------+



That implies we don't need Area_Name too 

In [10]:
data_df = data_df.drop("Area_Name")
data_df.show()

+-----------------------+------------------------+--------------------+----------------------+--------------------+----------------------+
|Last_Place_of_residence|Number_of_Last_residence|Place of enumeration|Total migrants-Persons|Total migrants-Males|Total migrants-Females|
+-----------------------+------------------------+--------------------+----------------------+--------------------+----------------------+
|                  Total|                   Total|               Total|             455787621|           146145967|             309641654|
|                  Total|                   Total|               Rural|             278203361|            64703974|             213499387|
|                  Total|                   Total|               Urban|             177584260|            81441993|              96142267|
|   Last residence wi...|                   Total|               Total|             449917418|           143467652|             306449766|
|   Last residence wi...|  

In [12]:
gender_migration_summary = data_df.groupBy("Last_Place_of_residence","Place of enumeration") \
                                  .sum("Total migrants-Persons", "Total migrants-Males", "Total migrants-Females") \
                                  .orderBy("Last_Place_of_residence", "Place of enumeration")
gender_migration_summary.show()

+-----------------------+--------------------+---------------------------+-------------------------+---------------------------+
|Last_Place_of_residence|Place of enumeration|sum(Total migrants-Persons)|sum(Total migrants-Males)|sum(Total migrants-Females)|
+-----------------------+--------------------+---------------------------+-------------------------+---------------------------+
|   Andaman & Nicobar...|               Rural|                      13425|                     6053|                       7372|
|   Andaman & Nicobar...|               Total|                      38700|                    17540|                      21160|
|   Andaman & Nicobar...|               Urban|                      25275|                    11487|                      13788|
|         Andhra Pradesh|               Rural|                    1128980|                   302998|                     825982|
|         Andhra Pradesh|               Total|                    3919506|                  16109

In [26]:
# filter for rural areas
rural_migration = data_df.filter(data_df["Place of enumeration"].contains("Rural")) \
                        .groupBy("Last_Place_of_residence","Place of enumeration") \
                        .sum("Total migrants-Persons", "Total migrants-Males", "Total migrants-Females") \
                        .orderBy("Last_Place_of_residence", "Place of enumeration")

# filter for urban areas
urban_migration = data_df.filter(data_df["Place of enumeration"].contains("Urban")) \
                        .groupBy("Last_Place_of_residence","Place of enumeration") \
                        .sum("Total migrants-Persons", "Total migrants-Males", "Total migrants-Females") \
                        .orderBy("Last_Place_of_residence", "Place of enumeration")

# For the total (all combined)
total_migration = data_df.filter(data_df["Place of enumeration"].contains("Total")).groupBy("Last_Place_of_residence") \
                        .sum("Total migrants-Persons", "Total migrants-Males", "Total migrants-Females") \
                        .orderBy("Last_Place_of_residence")

# Display results
print("Rural Migration:")
rural_migration.show()

print("Urban Migration:")
urban_migration.show()

print("Total Migration by Last Place of Residence:")
total_migration.show()

Rural Migration:
+-----------------------+--------------------+---------------------------+-------------------------+---------------------------+
|Last_Place_of_residence|Place of enumeration|sum(Total migrants-Persons)|sum(Total migrants-Males)|sum(Total migrants-Females)|
+-----------------------+--------------------+---------------------------+-------------------------+---------------------------+
|   Andaman & Nicobar...|               Rural|                      13425|                     6053|                       7372|
|         Andhra Pradesh|               Rural|                    1128980|                   302998|                     825982|
|      Arunachal Pradesh|               Rural|                      39632|                    16916|                      22716|
|                  Assam|               Rural|                     543917|                   238605|                     305312|
|                  Bihar|               Rural|                    3381337|      

In [29]:
Male_migration = data_df.filter(data_df["Place of enumeration"].contains("Total")).groupBy("Last_Place_of_residence") \
                        .sum("Total migrants-Males") \
                        .orderBy("Last_Place_of_residence")

Male_migration.show()

+-----------------------+-------------------------+
|Last_Place_of_residence|sum(Total migrants-Males)|
+-----------------------+-------------------------+
|   Andaman & Nicobar...|                    17540|
|         Andhra Pradesh|                  1610978|
|      Arunachal Pradesh|                    33846|
|                  Assam|                   622176|
|                  Bihar|                  7500141|
|             Chandigarh|                   248525|
|           Chhattisgarh|                   486340|
|    Countries in Africa|                   250152|
|   Countries in Asia...|                  4300870|
|    Countries in Europe|                    78792|
|   Countries in Oceania|                    67486|
|   Countries in the ...|                   330030|
|   Dadra & Nagar Haveli|                     8342|
|            Daman & Diu|                    13234|
|   Elsewhere in the ...|                143999845|
|                    Goa|                    85223|
|           

In [30]:
Female_migration = data_df.filter(data_df["Place of enumeration"].contains("Total")).groupBy("Last_Place_of_residence") \
                        .sum("Total migrants-Females") \
                        .orderBy("Last_Place_of_residence")

Female_migration.show()

+-----------------------+---------------------------+
|Last_Place_of_residence|sum(Total migrants-Females)|
+-----------------------+---------------------------+
|   Andaman & Nicobar...|                      21160|
|         Andhra Pradesh|                    2308528|
|      Arunachal Pradesh|                      38337|
|                  Assam|                     658902|
|                  Bihar|                    7002410|
|             Chandigarh|                     263708|
|           Chhattisgarh|                     864454|
|    Countries in Africa|                     512572|
|   Countries in Asia...|                    4717150|
|    Countries in Europe|                     134368|
|   Countries in Oceania|                     119962|
|   Countries in the ...|                     471006|
|   Dadra & Nagar Haveli|                      23943|
|            Daman & Diu|                      23621|
|   Elsewhere in the ...|                  368430176|
|                    Goa|   

In [37]:
# Convert to Pandas for visualization
rural_pd = rural_migration.toPandas()
urban_pd = urban_migration.toPandas()
total_pd = total_migration.toPandas()
female_pd = Female_migration.toPandas()
male_pd = Male_migration.toPandas()
filtered_geo_pd = filtered_geo_df.toPandas()

In [32]:
# Save to CSV
rural_pd.to_csv("rural_migration.csv", index=False)
urban_pd.to_csv("urban_migration.csv", index=False)
total_pd.to_csv("total_migration.csv", index=False)
female_pd.to_csv("female_migration.csv", index=False)
male_pd.to_csv("male_migration.csv", index=False)


In [36]:
valid_states = [
    "Andhra Pradesh", "Arunachal Pradesh", "Assam", "Bihar", "Chhattisgarh",
    "Goa", "Gujarat", "Haryana", "Himachal Pradesh", "Jammu & Kashmir",
    "Jharkhand", "Karnataka", "Kerala", "Madhya Pradesh", "Maharashtra",
    "Manipur", "Meghalaya", "Mizoram", "Nagaland", "Odisha", "Punjab",
    "Rajasthan", "Sikkim", "Tamil Nadu", "Telangana", "Tripura",
    "Uttar Pradesh", "Uttarakhand", "West Bengal", "Delhi"
]

# Filter valid states only
filtered_geo_df = total_migration.filter(total_migration["Last_Place_of_residence"].isin(valid_states))

filtered_geo_df.show()

+-----------------------+---------------------------+-------------------------+---------------------------+
|Last_Place_of_residence|sum(Total migrants-Persons)|sum(Total migrants-Males)|sum(Total migrants-Females)|
+-----------------------+---------------------------+-------------------------+---------------------------+
|         Andhra Pradesh|                    3919506|                  1610978|                    2308528|
|      Arunachal Pradesh|                      72183|                    33846|                      38337|
|                  Assam|                    1281078|                   622176|                     658902|
|                  Bihar|                   14502551|                  7500141|                    7002410|
|           Chhattisgarh|                    1350794|                   486340|                     864454|
|                    Goa|                     203788|                    85223|                     118565|
|                Gujarat|   

In [38]:
filtered_geo_pd.to_csv("filtered_geo.csv", index=False)