# The Visa Report

Author: Arthur Dysart  
Created on Sat Dec  1 14:12:09 2018

Analyzes H1-B Visa data by most common "Occupation" and "State." Reports are sorted by decreasing "Certified Visas" count and alphabetical "Occupation" title.

### Setup

Set path to input data file in system directory.

In [None]:
import_path = r"../input/H1B_FY_2014.csv"
occu_export_path = r"../output/top_10_occupations.txt"
state_export_path = r"../output/top_10_states.txt"

Import the "SparkSession" class from the "Spark SQL" package. Create spark cluster connector object.

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession\
        .builder\
        .appName("TheVisaReport")\
        .getOrCreate()

Import the "Pandas" package for external data export.

In [None]:
import pandas as pd

### Extract data

Import CSV source file as Spark dataframe object "raw_df." Save imported data as SparkSQL table "raw_data."

In [None]:
raw_df = spark.read.csv(import_path,
                        sep = ";",
                        header = True,
                        ignoreLeadingWhiteSpace = True,
                        ignoreTrailingWhiteSpace = True)

In [None]:
raw_df.createOrReplaceTempView("raw_data")

Identify the following required columns in the input data file:
* "Status" — identifies whether visa application is certified.
* "Occupation" — identifies the occupation title for given visa application.
* "State" — identifies job location (as U.S. state) for given visa application.

In [None]:
all_columns = raw_df.columns

In [None]:
col_status = [col
              for col
              in all_columns
              if 'STATUS' in col.upper()][0]

Count total number of certified visas. Save count as SparkSQL table "total_cert."

In [None]:
query = """
        SELECT
            COUNT(status)
                AS TOTAL_CERTIFIED_APPLICATIONS
        FROM
            raw_data
        WHERE
            status = 'CERTIFIED'
        """

cert_df = spark.sql(query)

In [None]:
cert_df.createOrReplaceTempView("total_cert")

### Transform & load: top occupations

Identify column representing occupation name.

In [None]:
col_occupation = [col
                  for col
                  in all_columns
                  if ('SOC' in col.upper() and
                      'NAME' in col.upper())][0]

For each occupation name:
1. count visa applications with status "certified," and
2. calculate percentage of certified applications relative to total certified applications.

In [None]:
query = """
        SELECT
            UPPER({})
                AS TOP_OCCUPATIONS,
            COUNT(status)
                AS NUMBER_CERTIFIED_APPLICATIONS,
            CONCAT(ROUND(COUNT(status) * 100 /(SELECT
                                                   *
                                               FROM
                                                   total_cert), 1), "%")
                AS PERCENTAGE
        FROM
            raw_data
        WHERE
            status = 'CERTIFIED'
        GROUP BY
            {}
        ORDER BY
            NUMBER_CERTIFIED_APPLICATIONS
                DESC,
            TOP_OCCUPATIONS
                ASC
        LIMIT
            10
        """\
        .format(col_occupation,
                col_occupation)

occu_df = spark.sql(query)

Combine "occu_df" SparkSQL dataframe into one partition on master node.

In [None]:
occu_df = occu_df.coalesce(1)

Convert "occu_df" SparkSQL dataframe to "occu_pdf" Pandas dataframe. Save result as TXT output file.

In [None]:
occu_pdf = occu_df.toPandas()

In [None]:
occu_pdf.to_csv(occu_export_path,
                sep = ";",
                index = False,
                header = True)

Alternatively, export "occu_df" SparkSQL dataframe result in output directory as CSV.PART files.

In [None]:
occu_df.write.csv(occu_export_path,
                  sep = ";",
                  header = True,
                  mode = "errorifexists",
                  ignoreLeadingWhiteSpace = True,
                  ignoreTrailingWhiteSpace = True)

### Transform & load: top states

Identify column representing state name of work location.

In [None]:
col_state = [col
             for col
             in all_columns
             if ('WORK' in col.upper() and
                 'STATE' in col.upper())][0]

For each state:
1. count visa applications with status "certified," and
2. calculate percentage of certified applications relative to total certified applications.

In [None]:
query = """
        SELECT
            UPPER({})
                AS TOP_STATES,
            COUNT(status)
                AS NUMBER_CERTIFIED_APPLICATIONS,
            CONCAT(ROUND(COUNT(status) * 100 /(SELECT
                                                   *
                                               FROM
                                                   total_cert), 1), "%")
                AS PERCENTAGE
        FROM
            raw_data
        WHERE
            status = 'CERTIFIED'
        GROUP BY
            {}
        ORDER BY
            NUMBER_CERTIFIED_APPLICATIONS
                DESC,
            TOP_STATES
                ASC
        LIMIT
            10
        """\
        .format(col_state,
                col_state)

state_df = spark.sql(query)

Combine "state_df" SparkSQL dataframe into one partition on master node.

In [None]:
state_df = state_df.coalesce(1)

Convert "state_df" SparkSQL dataframe to "state_pdf" Pandas dataframe. Save result as TXT output file.

In [None]:
state_pdf = state_df.toPandas()

In [None]:
state_pdf.to_csv(state_export_path,
                 sep = ";",
                 index = False,
                 header = True)

Alternatively, export "state_df" SparkSQL dataframe result in output directory as CSV.PART files.

In [None]:
state_df.write.csv(state_export_path,
                   sep = ";",
                   header = True,
                   mode = "errorifexists",
                   ignoreLeadingWhiteSpace = True,
                   ignoreTrailingWhiteSpace = True)