<center>

# $\textbf{Migration}$

<center>

### $\textbf{Code}$

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, lit, array, struct
import time

In [2]:
inicio = time.time()

In [3]:
spark = SparkSession.builder.appName('Migration').master("local").enableHiveSupport().getOrCreate()
spark

24/03/07 19:52:00 WARN Utils: Your hostname, Francisco-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.191.2.158 instead (on interface en0)
24/03/07 19:52:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/07 19:52:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Creating dataframe from the csv file and infering the schema
df = spark.read.load("Files/Migration2.csv", format="csv", sep=",", inferschema="true", header="true")

In [5]:
# Drop the columns you don't want
df = df.drop("Series Name", "Series Code", "Country Code")

In [6]:
# Rename the column
df = df.withColumnRenamed("Country Name","country")
df = df.withColumnRenamed("2000 [YR2000]","2000")
df = df.withColumnRenamed("2001 [YR2001]","2001")
df = df.withColumnRenamed("2002 [YR2002]","2002")
df = df.withColumnRenamed("2003 [YR2003]","2003")
df = df.withColumnRenamed("2004 [YR2004]","2004")
df = df.withColumnRenamed("2008 [YR2008]","2008")
df = df.withColumnRenamed("2009 [YR2009]","2009")
df = df.withColumnRenamed("2010 [YR2010]","2010")
df = df.withColumnRenamed("2011 [YR2011]","2011")
df = df.withColumnRenamed("2012 [YR2012]","2012")
df = df.withColumnRenamed("2013 [YR2013]","2013")
df = df.withColumnRenamed("2014 [YR2014]","2014")
df = df.withColumnRenamed("2015 [YR2015]","2015")
df = df.withColumnRenamed("2016 [YR2016]","2016")
df = df.withColumnRenamed("2017 [YR2017]","2017")
df = df.withColumnRenamed("2018 [YR2018]","2018")
df = df.withColumnRenamed("2019 [YR2019]","2019")
df = df.withColumnRenamed("2020 [YR2020]","2020")
df = df.withColumnRenamed("2021 [YR2021]","2021")
df = df.withColumnRenamed("2022 [YR2022]","2022")
df = df.withColumnRenamed("2023 [YR2023]","2023")
df = df.withColumnRenamed("2024 [YR2024]","2024")
df = df.withColumnRenamed("2025 [YR2025]","2025")

In [7]:
# Assuming df is properly defined DataFrame
df = df.select("country", explode(array([
    struct(lit(year).alias("year"), col(str(year)).alias("migration")) 
    for year in list(range(2000, 2005)) + list(range(2008, 2025))
])).alias("data")).selectExpr("country", "data.year", "data.migration")

In [8]:
# Cast columns to their desired types
df = df.withColumn("country", col("country").cast("string"))
df = df.withColumn("year", col("year").cast("int"))
df = df.withColumn("migration", col("migration").cast("int"))

In [9]:
# Filter data for years greater than 2010 and lower than 2024
df = df.filter(df["year"] > 2010)
df = df.filter(df["year"] < 2024)

In [10]:
# Order by country and then by year
df = df.orderBy("country", "year")

### $\textbf{Pre-Processing}$

In [11]:
replacements = {
    "Bahamas, The": "Bahamas",
    "Egypt, Arab Rep.": "Egypt",
    "Micronesia, Fed. Sts.": "Micronesia",
    "Sint Maarten (Dutch part)": "Sint Maarten",
    "St. Martin (French part)": "St. Martin",
    "Venezuela, RB": "Venezuela",
    "Yemen, Rep.": "Yemen",
    "Caribbean small states": "Caribbean",
    "Gambia, The": "Gambia",
    "Hong Kong SAR, China": "Hong Kong",
    "Iran, Islamic Rep.": "Iran",
    "Macao SAR, China": "Macao SAR",
    "Korea, Rep.": "South Korea",
    "Korea, Dem. People's Rep.": "Korea",
    "Congo, Rep.": "Republic of the Congo",
    "Congo, Dem. Rep.": "Democratic Republic of the Congo",
    "Viet Nam": "Vietnam"
}

# Rename columns based on replacements dictionary
for old_value, new_value in replacements.items():
    df = df.withColumnRenamed(old_value, new_value)

In [12]:
# Columns to drop
countries_to_remove = ['Africa Eastern and Southern', 
                       'Africa Western and Central', 
                       'Central Europe and the Baltics', 
                       'Early-demographic dividend', 
                       'East Asia & Pacific', 
                       'East Asia & Pacific (IDA & IBRD countries)', 
                       'East Asia & Pacific (excluding high income)', 
                       'Euro area',
                       'Europe & Central Asia',
                       'Europe & Central Asia (IDA & IBRD countries)',
                       'Europe & Central Asia (excluding high income)',
                       'European Union',
                       'Fragile and conflict affected situations',
                       'Heavily indebted poor countries (HIPC)',
                       'High income',
                       'IBRD only',
                       'IDA & IBRD total',
                       'IDA blend',
                       'IDA only',
                       'IDA total',
                       'Latin America & the Caribbean (IDA & IBRD countries)',
                       'Latin America & Caribbean (excluding high income)',
                       'Least developed countries: UN classification',
                       'Low & middle income',
                       'Low income',
                       'Lower middle income',
                       'Middle East & North Africa',
                       'Middle East & North Africa (IDA & IBRD countries)',
                       'Middle East & North Africa (excluding high income)',
                       'Middle income',
                       'Not classified',
                       'OECD members',
                       'Other small states',
                       'Pacific island small states'
                       'Post-demographic dividend',
                       'Pre-demographic dividend',
                       'Small states',
                       'South Asia',
                       'South Asia (IDA & IBRD)',
                       'Sub-Saharan Africa (IDA & IBRD countries)',
                       'Sub-Saharan Africa (excluding high income)',
                       'Upper middle income',
                       'World'
                       ]

# Drop the specified rows
df = df.filter(~df['country'].isin(countries_to_remove))

In [13]:
# Remove rows with missing values in the 'country' column
df = df.dropna(subset=['country'])

In [14]:
#Storing this dataframe in parquet
df.write.mode("overwrite").parquet("FilesParquet/Migration.parquet" )
spark.read.parquet("FilesParquet/Migration.parquet").show()
spark.stop()

                                                                                

+-----------+----+---------+
|    country|year|migration|
+-----------+----+---------+
|Afghanistan|2011|   418796|
|Afghanistan|2012|   105905|
|Afghanistan|2013|    48076|
|Afghanistan|2014|   255611|
|Afghanistan|2015|  -281739|
|Afghanistan|2016|   -90238|
|Afghanistan|2017|   -47090|
|Afghanistan|2018|   -47205|
|Afghanistan|2019|    -8082|
|Afghanistan|2020|   166821|
|Afghanistan|2021|  -183672|
|Afghanistan|2022|   -65846|
|Afghanistan|2023|   -65846|
|    Albania|2011|   -24465|
|    Albania|2012|   -19946|
|    Albania|2013|   -16845|
|    Albania|2014|   -14265|
|    Albania|2015|   -12240|
|    Albania|2016|   -10887|
|    Albania|2017|    -9768|
+-----------+----+---------+
only showing top 20 rows



In [15]:
fim = time.time()
final = fim - inicio
print(final)

7.757224798202515
