<a href="https://colab.research.google.com/github/Adlinnithisha/big-data-by-pyspark/blob/main/BDA_MIGRATION_PROJECT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Introduction

train the model to predict whether the migration process is final or provisional by using pyspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=6f0f0cf1c87e21271ff55f7b98a3ef43da2d79605d06c79f90ac493e5e17a896
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as func
from pyspark.sql.functions import round,avg
from pyspark.sql.functions import col, unix_timestamp
spark=SparkSession.builder.appName("International migration").getOrCreate()

In [None]:
df=spark.read.csv("/content/drive/MyDrive/international-migration-December-2023-citizenship-by-visa-by-country-of-last-permanent-residence.csv",header=True,inferSchema=True)
df.printSchema()
df.show(truncate=False)

root
 |-- year_month: timestamp (nullable = true)
 |-- month_of_release: timestamp (nullable = true)
 |-- passenger_type: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- citizenship: string (nullable = true)
 |-- visa: string (nullable = true)
 |-- country_of_residence: string (nullable = true)
 |-- estimate: integer (nullable = true)
 |-- standard_error: integer (nullable = true)
 |-- status: string (nullable = true)

+-------------------+-------------------+-----------------+---------+-----------+--------------------------+--------------------+--------+--------------+------+
|year_month         |month_of_release   |passenger_type   |direction|citizenship|visa                      |country_of_residence|estimate|standard_error|status|
+-------------------+-------------------+-----------------+---------+-----------+--------------------------+--------------------+--------+--------------+------+
|2001-09-01 00:00:00|2020-09-01 00:00:00|Long-term migrant|Arrivals |NZ

 spark action

In [None]:
df.count()

329132

In [None]:
df.describe().show()

+-------+-----------------+---------+-----------+--------------------+--------------------+-----------------+-----------------+-----------+
|summary|   passenger_type|direction|citizenship|                visa|country_of_residence|         estimate|   standard_error|     status|
+-------+-----------------+---------+-----------+--------------------+--------------------+-----------------+-----------------+-----------+
|  count|           329132|   329132|     329132|              329132|              329132|           329132|           329132|     329132|
|   mean|             NULL|     NULL|       NULL|                NULL|                NULL|99.21397190185093|0.295097407726991|       NULL|
| stddev|             NULL|     NULL|       NULL|                NULL|                NULL|535.8028341130496|4.442546069239412|       NULL|
|    min|Long-term migrant| Arrivals|         NZ|NZ and Australian...|         Afghanistan|                0|                0|      Final|
|    max|Long-term m

 spark Transformation

In [None]:
x=df.groupBy('country_of_residence').count()
x.show()

+--------------------+-----+
|country_of_residence|count|
+--------------------+-----+
|                Chad|  214|
|            Anguilla|  154|
|            Paraguay|  402|
|              Russia| 3477|
|               Yemen|  390|
|British Indian Oc...|   12|
|Congo, the Democr...|  153|
|             Senegal|  194|
|          Cabo Verde|   34|
|              Sweden| 2883|
|             Tokelau| 1121|
|            Kiribati| 2297|
|     Samoa, American| 1856|
|              Guyana|  295|
|             Eritrea|  249|
|         Philippines| 3893|
|  St Kitts and Nevis|  131|
|            Djibouti|   94|
|      Norfolk Island|  806|
|               Tonga| 3812|
+--------------------+-----+
only showing top 20 rows



In [None]:
df = df.withColumn("year_month_sec", unix_timestamp("year_month")).withColumn("month_of_release_sec", unix_timestamp("month_of_release"))

time_gap_second is the difference of year_month and month_of_release


In [None]:
df = df.withColumn("time_gap_seconds", col("year_month_sec") - col("month_of_release_sec"))

In [None]:
df.show()

+-------------------+-------------------+-----------------+---------+-----------+--------------------+--------------------+--------+--------------+------+--------------+--------------------+----------------+
|         year_month|   month_of_release|   passenger_type|direction|citizenship|                visa|country_of_residence|estimate|standard_error|status|year_month_sec|month_of_release_sec|time_gap_seconds|
+-------------------+-------------------+-----------------+---------+-----------+--------------------+--------------------+--------+--------------+------+--------------+--------------------+----------------+
|2001-09-01 00:00:00|2020-09-01 00:00:00|Long-term migrant| Arrivals|         NZ|NZ and Australian...|         Afghanistan|       1|             0| Final|     999302400|          1598918400|      -599616000|
|2002-03-01 00:00:00|2020-09-01 00:00:00|Long-term migrant| Arrivals|         NZ|NZ and Australian...|         Afghanistan|       3|             0| Final|    1014940800

spark SQL

In [None]:
df.createOrReplaceTempView("timestamps")

In [None]:
df = spark.sql("SELECT *,abs( time_gap_seconds / 86400) AS time_gap_days FROM timestamps")
df.show()

+-------------------+-------------------+-----------------+---------+-----------+--------------------+--------------------+--------+--------------+------+--------------+--------------------+----------------+-------------+
|         year_month|   month_of_release|   passenger_type|direction|citizenship|                visa|country_of_residence|estimate|standard_error|status|year_month_sec|month_of_release_sec|time_gap_seconds|time_gap_days|
+-------------------+-------------------+-----------------+---------+-----------+--------------------+--------------------+--------+--------------+------+--------------+--------------------+----------------+-------------+
|2001-09-01 00:00:00|2020-09-01 00:00:00|Long-term migrant| Arrivals|         NZ|NZ and Australian...|         Afghanistan|       1|             0| Final|     999302400|          1598918400|      -599616000|       6940.0|
|2002-03-01 00:00:00|2020-09-01 00:00:00|Long-term migrant| Arrivals|         NZ|NZ and Australian...|         A

In [None]:
df.columns


['year_month',
 'month_of_release',
 'passenger_type',
 'direction',
 'citizenship',
 'visa',
 'country_of_residence',
 'estimate',
 'standard_error',
 'status',
 'year_month_sec',
 'month_of_release_sec',
 'time_gap_seconds',
 'time_gap_days']

spark ML

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [None]:
inputcol=['passenger_type','direction','citizenship','visa','country_of_residence','estimate','standard_error','status','time_gap_days']

In [None]:
indexer=StringIndexer(inputCols=['passenger_type','direction','citizenship','visa','country_of_residence','status'],outputCols=['passenger_index','direction_index','citizenship_index','visa_index','country_residence_index','status_index'])
indexed=indexer.fit(df).transform(df)
indexed.show()

+-------------------+-------------------+-----------------+---------+-----------+--------------------+--------------------+--------+--------------+------+--------------+--------------------+----------------+-------------+---------------+---------------+-----------------+----------+-----------------------+------------+
|         year_month|   month_of_release|   passenger_type|direction|citizenship|                visa|country_of_residence|estimate|standard_error|status|year_month_sec|month_of_release_sec|time_gap_seconds|time_gap_days|passenger_index|direction_index|citizenship_index|visa_index|country_residence_index|status_index|
+-------------------+-------------------+-----------------+---------+-----------+--------------------+--------------------+--------+--------------+------+--------------+--------------------+----------------+-------------+---------------+---------------+-----------------+----------+-----------------------+------------+
|2001-09-01 00:00:00|2020-09-01 00:00:00

In [None]:
assembler=VectorAssembler(inputCols=["time_gap_days","passenger_index","direction_index","citizenship_index","visa_index","country_residence_index"],outputCol='features')
output=assembler.transform(indexed)

In [None]:
output.printSchema()

root
 |-- year_month: timestamp (nullable = true)
 |-- month_of_release: timestamp (nullable = true)
 |-- passenger_type: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- citizenship: string (nullable = true)
 |-- visa: string (nullable = true)
 |-- country_of_residence: string (nullable = true)
 |-- estimate: integer (nullable = true)
 |-- standard_error: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- year_month_sec: long (nullable = true)
 |-- month_of_release_sec: long (nullable = true)
 |-- time_gap_seconds: long (nullable = true)
 |-- time_gap_days: double (nullable = true)
 |-- passenger_index: double (nullable = false)
 |-- direction_index: double (nullable = false)
 |-- citizenship_index: double (nullable = false)
 |-- visa_index: double (nullable = false)
 |-- country_residence_index: double (nullable = false)
 |-- status_index: double (nullable = false)
 |-- features: vector (nullable = true)



In [None]:
final_df=output.select('features','status_index')
final_df.show()

+--------------------+------------+
|            features|status_index|
+--------------------+------------+
|[6940.0,0.0,0.0,2...|         0.0|
|[6759.0,0.0,0.0,2...|         0.0|
|[5722.0,0.0,0.0,2...|         0.0|
|[5632.0,0.0,0.0,2...|         0.0|
|[5541.0,0.0,0.0,2...|         0.0|
|[5510.0,0.0,0.0,2...|         0.0|
|[5449.0,0.0,0.0,2...|         0.0|
|[5326.0,0.0,0.0,2...|         0.0|
|[5298.0,0.0,0.0,2...|         0.0|
|[5237.0,0.0,0.0,2...|         0.0|
|[5176.0,0.0,0.0,2...|         0.0|
|[5145.0,0.0,0.0,2...|         0.0|
|[5114.0,0.0,0.0,2...|         0.0|
|[5053.0,0.0,0.0,2...|         0.0|
|[5023.0,0.0,0.0,2...|         0.0|
|[4992.0,0.0,0.0,2...|         0.0|
|[4811.0,0.0,0.0,2...|         0.0|
|[4780.0,0.0,0.0,2...|         0.0|
|[4719.0,0.0,0.0,2...|         0.0|
|[4658.0,0.0,0.0,2...|         0.0|
+--------------------+------------+
only showing top 20 rows



In [None]:
train_df,test_df=final_df.randomSplit([0.7,0.3])

In [None]:
train_df.describe().show(2)

+-------+-------------------+
|summary|       status_index|
+-------+-------------------+
|  count|             230349|
|   mean|0.09733925478295977|
+-------+-------------------+
only showing top 2 rows



In [None]:
test_df.describe().show()

+-------+-------------------+
|summary|       status_index|
+-------+-------------------+
|  count|              98783|
|   mean|0.09732443841551684|
| stddev|0.29640054227160456|
|    min|                0.0|
|    max|                1.0|
+-------+-------------------+



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

In [None]:
binaryEvaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='status_index')
multiclassEvaluator=MulticlassClassificationEvaluator(labelCol='status_index',metricName="precisionByLabel")

LOGISTIC REGRESSION

In [None]:
classifier=LogisticRegression(maxIter=100,regParam=0.001,elasticNetParam=0.3,labelCol='status_index',featuresCol='features')
model=classifier.fit(train_df)
pred_df=model.transform(test_df)
pred_df.show()

+----------------+------------+--------------------+--------------------+----------+
|        features|status_index|       rawPrediction|         probability|prediction|
+----------------+------------+--------------------+--------------------+----------+
| (6,[0],[153.0])|         1.0|[0.36690760205257...|[0.59071153526349...|       0.0|
| (6,[0],[214.0])|         1.0|[0.60510632681249...|[0.64682368253258...|       0.0|
| (6,[0],[244.0])|         1.0|[0.72225324062884...|[0.67310300290484...|       0.0|
| (6,[0],[275.0])|         1.0|[0.84330505157241...|[0.69916084114357...|       0.0|
| (6,[0],[395.0])|         1.0|[1.31189270683782...|[0.78782970186750...|       0.0|
| (6,[0],[426.0])|         1.0|[1.43294451778138...|[0.80735969118742...|       0.0|
| (6,[0],[485.0])|         0.0|[1.66333344828688...|[0.84068497147488...|       0.0|
| (6,[0],[486.0])|         0.0|[1.66723834541409...|[0.84120727348850...|       0.0|
| (6,[0],[486.0])|         0.0|[1.66723834541409...|[0.8412072734

In [None]:
print(binaryEvaluator.evaluate(pred_df))

0.8069871625600259


In [None]:
print(multiclassEvaluator.evaluate(pred_df))

0.9601523143803329


In [None]:
schema=["time_gap_days","passenger_index","direction_index","citizenship_index","visa_index","country_residence_index"]
input_df=[(5643,0.0,1.0,2.0,1.0,82.0),(4587,0.0,0.0,2.0,1.0,82.0)]
output_list=spark.createDataFrame(data=input_df,schema=schema)
output_list.show()

+-------------+---------------+---------------+-----------------+----------+-----------------------+
|time_gap_days|passenger_index|direction_index|citizenship_index|visa_index|country_residence_index|
+-------------+---------------+---------------+-----------------+----------+-----------------------+
|         5643|            0.0|            1.0|              2.0|       1.0|                   82.0|
|         4587|            0.0|            0.0|              2.0|       1.0|                   82.0|
+-------------+---------------+---------------+-----------------+----------+-----------------------+



In [None]:
output1=assembler.transform(output_list)
final_df1=output1.select("features")
final_df1.show()

+--------------------+
|            features|
+--------------------+
|[5643.0,0.0,1.0,2...|
|[4587.0,0.0,0.0,2...|
+--------------------+



finally it predicted the given input data has a final status
in migration process

In [None]:
predictions=model.transform(final_df1)
predictions.show()

+--------------------+--------------+-----------+----------+
|            features| rawPrediction|probability|prediction|
+--------------------+--------------+-----------+----------+
|[5643.0,0.0,1.0,2...|[207927.0,0.0]|  [1.0,0.0]|       0.0|
|[4587.0,0.0,0.0,2...|[207927.0,0.0]|  [1.0,0.0]|       0.0|
+--------------------+--------------+-----------+----------+



In [None]:
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier

DECISION TREE CLASSIFIER

In [None]:
classifier=DecisionTreeClassifier(labelCol='status_index',featuresCol='features',maxBins=250)
model1=classifier.fit(train_df)
pred_df1=model.transform(test_df)
pred_df1.show()

+----------------+------------+--------------+-----------+----------+
|        features|status_index| rawPrediction|probability|prediction|
+----------------+------------+--------------+-----------+----------+
| (6,[0],[153.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[214.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[244.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[275.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[395.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[426.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[485.0])|         0.0|[207927.0,0.0]|  [1.0,0.0]|       0.0|
| (6,[0],[486.0])|         0.0|[207927.0,0.0]|  [1.0,0.0]|       0.0|
| (6,[0],[486.0])|         0.0|[207927.0,0.0]|  [1.0,0.0]|       0.0|
| (6,[0],[487.0])|         0.0|[207927.0,0.0]|  [1.0,0.0]|       0.0|
| (6,[0],[488.0])|         0.0|[207927.0,0.0]|  [1.0,0.0]|       0.0|
| (6,[0],[550.0])|  

In [None]:
print(binaryEvaluator.evaluate(pred_df1))

1.0


In [None]:
print(multiclassEvaluator.evaluate(pred_df1))

1.0


predicting the status by giving input as a dataframe


In [None]:
schema=["time_gap_days","passenger_index","direction_index","citizenship_index","visa_index","country_residence_index"]
input_df=[(5643,0.0,1.0,2.0,1.0,82.0),(4587,0.0,0.0,2.0,1.0,82.0)]
output_list=spark.createDataFrame(data=input_df,schema=schema)
output_list.show()

+-------------+---------------+---------------+-----------------+----------+-----------------------+
|time_gap_days|passenger_index|direction_index|citizenship_index|visa_index|country_residence_index|
+-------------+---------------+---------------+-----------------+----------+-----------------------+
|         5643|            0.0|            1.0|              2.0|       1.0|                   82.0|
|         4587|            0.0|            0.0|              2.0|       1.0|                   82.0|
+-------------+---------------+---------------+-----------------+----------+-----------------------+



In [None]:
output1=assembler.transform(output_list)
final_df1=output1.select("features")
final_df1.show()

+--------------------+
|            features|
+--------------------+
|[5643.0,0.0,1.0,2...|
|[4587.0,0.0,0.0,2...|
+--------------------+



finally it predicted the given input data has a final status
in migration process

In [None]:
predictions=model1.transform(final_df1)
predictions.show()

+--------------------+--------------+-----------+----------+
|            features| rawPrediction|probability|prediction|
+--------------------+--------------+-----------+----------+
|[5643.0,0.0,1.0,2...|[207927.0,0.0]|  [1.0,0.0]|       0.0|
|[4587.0,0.0,0.0,2...|[207927.0,0.0]|  [1.0,0.0]|       0.0|
+--------------------+--------------+-----------+----------+



RANDOM FOREST CLASSIFIER

In [None]:
classifier=RandomForestClassifier(numTrees=10,labelCol='status_index',featuresCol='features',maxBins=250)
model2=classifier.fit(train_df)
pred_df2=model.transform(test_df)
pred_df2.show()

+----------------+------------+--------------+-----------+----------+
|        features|status_index| rawPrediction|probability|prediction|
+----------------+------------+--------------+-----------+----------+
| (6,[0],[153.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[214.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[244.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[275.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[395.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[426.0])|         1.0| [0.0,22422.0]|  [0.0,1.0]|       1.0|
| (6,[0],[485.0])|         0.0|[207927.0,0.0]|  [1.0,0.0]|       0.0|
| (6,[0],[486.0])|         0.0|[207927.0,0.0]|  [1.0,0.0]|       0.0|
| (6,[0],[486.0])|         0.0|[207927.0,0.0]|  [1.0,0.0]|       0.0|
| (6,[0],[487.0])|         0.0|[207927.0,0.0]|  [1.0,0.0]|       0.0|
| (6,[0],[488.0])|         0.0|[207927.0,0.0]|  [1.0,0.0]|       0.0|
| (6,[0],[550.0])|  

In [None]:
print(binaryEvaluator.evaluate(pred_df2))

1.0


In [None]:
print(multiclassEvaluator.evaluate(pred_df1))

1.0


predicting the status by giving input as a dataframe

In [None]:
schema=["time_gap_days","passenger_index","direction_index","citizenship_index","visa_index","country_residence_index"]
input_df=[(5643,0.0,1.0,2.0,1.0,82.0),(4587,0.0,0.0,2.0,1.0,82.0)]
output_list=spark.createDataFrame(data=input_df,schema=schema)
output_list.show()

+-------------+---------------+---------------+-----------------+----------+-----------------------+
|time_gap_days|passenger_index|direction_index|citizenship_index|visa_index|country_residence_index|
+-------------+---------------+---------------+-----------------+----------+-----------------------+
|         5643|            0.0|            1.0|              2.0|       1.0|                   82.0|
|         4587|            0.0|            0.0|              2.0|       1.0|                   82.0|
+-------------+---------------+---------------+-----------------+----------+-----------------------+



In [None]:
output1=assembler.transform(output_list)
final_df1=output1.select("features")
final_df1.show()

+--------------------+
|            features|
+--------------------+
|[5643.0,0.0,1.0,2...|
|[4587.0,0.0,0.0,2...|
+--------------------+



finally it predicted the given input data has a final status in migration process



In [None]:
predictions=model2.transform(final_df1)
predictions.show()

+--------------------+-------------+-----------+----------+
|            features|rawPrediction|probability|prediction|
+--------------------+-------------+-----------+----------+
|[5643.0,0.0,1.0,2...|   [10.0,0.0]|  [1.0,0.0]|       0.0|
|[4587.0,0.0,0.0,2...|   [10.0,0.0]|  [1.0,0.0]|       0.0|
+--------------------+-------------+-----------+----------+



Create the temp dataframe for to store final status

In [None]:
df.createOrReplaceTempView("TAB")
spark.sql("select * from TAB where status like '%Final%'").show()

+-------------------+-------------------+-----------------+---------+-----------+--------------------+--------------------+--------+--------------+------+--------------+--------------------+----------------+-------------+
|         year_month|   month_of_release|   passenger_type|direction|citizenship|                visa|country_of_residence|estimate|standard_error|status|year_month_sec|month_of_release_sec|time_gap_seconds|time_gap_days|
+-------------------+-------------------+-----------------+---------+-----------+--------------------+--------------------+--------+--------------+------+--------------+--------------------+----------------+-------------+
|2001-09-01 00:00:00|2020-09-01 00:00:00|Long-term migrant| Arrivals|         NZ|NZ and Australian...|         Afghanistan|       1|             0| Final|     999302400|          1598918400|      -599616000|       6940.0|
|2002-03-01 00:00:00|2020-09-01 00:00:00|Long-term migrant| Arrivals|         NZ|NZ and Australian...|         A

In [None]:
from tabulate import tabulate

In [None]:
models = ['Logistic Regression','Random forest classification', 'Decision Tree classification']
Accuracy = [0.8,1.0,1.0]

table_data = list(zip(models, Accuracy))  #zip is used as function to combine the models and accuracy

headers = ['Model', 'Accuracy']

table = tabulate(table_data, headers=headers, tablefmt='grid')

print(table)

+------------------------------+------------+
| Model                        |   Accuracy |
| Logistic Regression          |        0.8 |
+------------------------------+------------+
| Random forest classification |        1   |
+------------------------------+------------+
| Decision Tree classification |        1   |
+------------------------------+------------+
