In [1]:
import findspark
findspark.init()

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import pycountry
from fuzzywuzzy import process
import pycountry_convert as pcc
import pandas as pd 
import plotly.express as px

spark = SparkSession.builder.appName('End to end processing').getOrCreate()

df = spark.read.csv('D:/OneDrive-QuangNgai/Python_Programming/data-engineering/input/visa_number_in_japan.csv', header=True, inferSchema=True)

In [4]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Regional code: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Number of issued: integer (nullable = true)
 |-- Number of issued_numerical: integer (nullable = true)
 |-- Travel certificate: integer (nullable = true)
 |-- Diplomacy: integer (nullable = true)
 |-- Public use: integer (nullable = true)
 |-- Passing: integer (nullable = true)
 |-- Short -term stay: integer (nullable = true)
 |-- Staying in medical care: integer (nullable = true)
 |-- Advanced profession: integer (nullable = true)
 |-- Employment: integer (nullable = true)
 |-- Employment_Professor: integer (nullable = true)
 |-- Employment_Art: integer (nullable = true)
 |-- Employment_religion: integer (nullable = true)
 |-- Employment_Report: integer (nullable = true)
 |-- Employment_Management / Management: integer (nullable = true)
 |-- Employment_Law accounting: integer (nullable = true)
 |-- Employment_Medical care: integer (nullable = true)
 |-- E

In [5]:
new_column_names = [
    col_name.replace(' ', '_')
    .replace('/', '')
    .replace('.', '')
    .replace(',', '')
    for col_name in df.columns
]

In [6]:
df = df.toDF(*new_column_names)
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Regional_code: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Number_of_issued: integer (nullable = true)
 |-- Number_of_issued_numerical: integer (nullable = true)
 |-- Travel_certificate: integer (nullable = true)
 |-- Diplomacy: integer (nullable = true)
 |-- Public_use: integer (nullable = true)
 |-- Passing: integer (nullable = true)
 |-- Short_-term_stay: integer (nullable = true)
 |-- Staying_in_medical_care: integer (nullable = true)
 |-- Advanced_profession: integer (nullable = true)
 |-- Employment: integer (nullable = true)
 |-- Employment_Professor: integer (nullable = true)
 |-- Employment_Art: integer (nullable = true)
 |-- Employment_religion: integer (nullable = true)
 |-- Employment_Report: integer (nullable = true)
 |-- Employment_Management__Management: integer (nullable = true)
 |-- Employment_Law_accounting: integer (nullable = true)
 |-- Employment_Medical_care: integer (nullable = true)
 |-- Em

In [7]:
df = df.dropna(how='all')
df.show(10)

+----+-------------+---------------+----------------+--------------------------+------------------+---------+----------+-------+----------------+-----------------------+-------------------+----------+--------------------+--------------+-------------------+-----------------+---------------------------------+-------------------------+-----------------------+-------------------+--------------------+------------------+---------------------+-----------------------------------+-------------------------------+---------------+-----------------+----------------------+-----------------------+--------+-------------------------+-----------------------+---------------+----------------+-------------------+--------------------+---------------+--------------+---------------------------+--------------------+------------------------------------+------------------------+------------------+----------------------------+----------------------------------------+--------------------+---------------+
|Year|Reg

In [8]:
df1 = df.select('year', 'country', 'number_of_issued_numerical')
df1.show()

+----+-----------------+--------------------------+
|year|          country|number_of_issued_numerical|
+----+-----------------+--------------------------+
|2017|            total|                    741415|
|2017|      Afghanistan|                        46|
|2017|          Albania|                         7|
|2017|          Algeria|                        19|
|2017|            Andra|                         0|
|2017|           Angola|                        12|
|2017|  Antigua Berbuda|                         8|
|2017|       Azerbaijan|                         8|
|2017|        Argentina|                         0|
|2017|        Australia|                       281|
|2017|          Austria|                         0|
|2017|          Bahamas|                         0|
|2017|          Barrane|                        59|
|2017|       Bangladesh|                       159|
|2017|          Armenia|                         3|
|2017|         Barbados|                         0|
|2017|      

In [9]:
def correct_country_name(name, threshold=10):
    countries = [country.name for country in pycountry.countries]
    corrected_name, score = process.extractOne(name, countries)
    
    if score >= threshold:
        return corrected_name
    
    return name


correct_country_name_udf = udf(correct_country_name, StringType())

df1 = df1.withColumn('correctted_country', correct_country_name_udf(col('country')))
df1 = df1.select(*df1.columns)


In [10]:
df1.show(10, False)

+----+---------------+--------------------------+-------------------+
|year|country        |number_of_issued_numerical|correctted_country |
+----+---------------+--------------------------+-------------------+
|2017|total          |741415                    |Portugal           |
|2017|Afghanistan    |46                        |Afghanistan        |
|2017|Albania        |7                         |Albania            |
|2017|Algeria        |19                        |Algeria            |
|2017|Andra          |0                         |Andorra            |
|2017|Angola         |12                        |Angola             |
|2017|Antigua Berbuda|8                         |Antigua and Barbuda|
|2017|Azerbaijan     |8                         |Azerbaijan         |
|2017|Argentina      |0                         |Argentina          |
|2017|Australia      |281                       |Australia          |
+----+---------------+--------------------------+-------------------+
only showing top 10 

If you encountered an error with <b>show()</b> method, please change permission on folder <b>anaconda3</b>

In [11]:
try:
    df1.printSchema()
except Exception as e:
    print(e.with_traceback)

root
 |-- year: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- number_of_issued_numerical: integer (nullable = true)
 |-- correctted_country: string (nullable = true)



In [12]:
country_corrections = {
    'Andra': 'Russia',
    'Antigua Berduda': 'Antigua and Barbuda',
    'Barrane': 'Bahrain',
    'Brush': 'Bhutan',
    'Komoro': 'Comoros',
    'Benan': 'Benin',
    'Kiribass': 'Kiribati',
    'Gaiana': 'Guyana',
    'Court Jiboire': "Côte d'Ivoire",
    'Lesot': 'Lesotho',
    'Macau travel certificate': 'Macao',
    'Moldoba': 'Moldova',
    'Naure': 'Nauru',
    'Nigail': 'Niger',
    'Palao': 'Palau',
    'St. Christopher Navis': 'Saint Kitts and Nevis',
    'Santa Principa': 'Sao Tome and Principe',
    'Saechel': 'Seychelles',
    'Slinum': 'Saint Helena',
    'Swaji Land': 'Eswatini',
    'Torque menistan': 'Turkmenistan',
    'Tsubaru': 'Zimbabwe',
    'Kosovo': 'Kosovo'
}

df2 = df1.replace(country_corrections, subset='country')

In [13]:
def get_continent_name(country_name):
    try:
        country_code = pcc.country_name_to_country_alpha2(country_name, cn_name_format='default')
        continent_code = pcc.country_alpha2_to_continent_code(country_code)
        return pcc.convert_continent_code_to_continent_name(continent_code)
    except:
        return None

continent_df = udf(get_continent_name, StringType())
df2 = df2.withColumn('continent', continent_df(df2['country']))


In [14]:
spark.catalog.dropGlobalTempView("japan_visa")
df2.createGlobalTempView('japan_visa')

In [15]:
#VISUALISATION
df_cont = spark.sql("""
    SELECT year, continent, sum(number_of_issued_numerical) visa_issued
    FROM global_temp.japan_visa
    WHERE continent IS NOT NULL
    GROUP BY year, continent
""")

df_cont_pd = df_cont.toPandas()

fig = px.bar(df_cont_pd, x='year', y='visa_issued', color='continent')

fig.update_layout(title_text="Number of visa issued in Japan between 2006 and 2017",
             xaxis_title='Year',
             yaxis_title='Number of visa issued',
             legend_title='Continent')

fig.write_html('visa_number_in_japan_continent_2006_2017.html')


In [16]:
df_country = spark.sql("""
    SELECT country, sum(number_of_issued_numerical) visa_issued
    FROM global_temp.japan_visa
    WHERE country NOT IN ('total', 'others')
    AND country IS NOT NULL
    AND year = 2017
    GROUP BY country
    order by visa_issued DESC
    LIMIT 10
""")

df_country_pd = df_country.toPandas()

fig = px.bar(df_country_pd, x = 'country', y = 'visa_issued', color = 'country')

fig.update_layout(title_text="Top 10 countries with most issued visa in 2017",
                  xaxis_title='Country',
                  yaxis_title='Number of visa issued',
                  legend_title='Country')

fig.write_html('visa_number_in_japan_by_country_2017.html')