# Hands on Course- Big Data Essentials

## Project Context
The H-1B is an employment-based, non-immigrant visa category for temporary foreign workers in the United States. For a foreign national to apply for H1-B visa, an US employer must offer a job and petition for H-1B visa with the US immigration department. This is the most common visa status applied for and held by international students once they complete college/ higher education (Masters, PhD) and work in a full-time position.
The Office of Foreign Labor Certification (OFLC) generates program data that is useful information about the immigration programs including the H1-B visa. The disclosure data updated annually is available at their official website.

### Data Set:
The dataset description is as follows: The columns in the dataset include:

#### CASE_STATUS: 
Status associated with the last significant event or decision. Valid values include "Certified","Certified-Withdrawn","Denied"and"Withdrawn". 

1. **Certified**: Employer filed the LCA, which was approved by DOL 
2. **Certified Withdrawn**: LCA was approved but later withdrawn by employer 
3. **Withdrawn**: LCA was withdrawn by employer before 
4. **Denied**: LCA was denied by DOL 

#### **EMPLOYER_NAME**:
Name of employer submitting labor condition application. 

#### **SOC_NAME**:
the Occupational name associated with the SOC_CODE. SOC_CODE is the occupational code associated with the job being requested for temporary labor condition, as classified by the Standard Occupational Classification (SOC) System. 

#### **JOB_TITLE**: Title of the job 

#### FULL_TIME_POSITION:
Y = Full Time Position N = Part Time Position 

#### PREVAILING_WAGE: 
Prevailing Wage for the job being requested for temporary labor condition. The wage is listed at annual scale in USD. The prevailing wage for a job position is defined as the average wage paid to similarly employed workers in the requested occupation in the area of intended employment. The prevailing wage is based on the employer’s minimum requirements for the position. 

#### YEAR: 
Year in which the H1B visa petition was filed 

#### WORKSITE: 
City and State information of the foreign worker’s intended area of employment 

#### lon:
longitude of the Worksite 

#### lat:
latitude of the Worksite

### Data Source:
File Name Format Size Location h1b_data.csv CSV 470 SharedLocation Note: Please don’t delete the CSV file once you download from the shared location.


### Big Data Technologies to be applied:
##### HDFS:
The input CSV file will be loaded into HDFS residing in respective cloud lab. The output will be stored on HDFS by creating dedicated directories for the same
##### Yarn and MapReduce:
It’s a processing framework. A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

#### Hive:
It’s a processing tool. Hive is a SQL like query language which is often used as the interface to an Apache Hadoop based data warehouse. Hive is considered friendlier and more familiar to users who are used to using SQL for querying data.

#### Pig:
A scripting platform for processing and analyzing large data sets. Apache Pig allows Apache Hadoop users to write complex MapReduce transformations using a simple scripting language called Pig Latin.

#### Hbase:
It's a non-relational (NoSQL) database that runs on top of HDFS. HBase is natively integrated with Hadoop and works seamlessly alongside other data access engines through YARN.

#### Spark:
Apache Spark is a fast, in-memory data processing engine with elegant and expressive development APIs to allow data workers to efficiently execute streaming, machine learning or SQL workloads that require fast iterative access to datasets.

### Requirements/Use cases/questions

1. Is the number of petitions with Data Engineer job title increasing over time? 
2. Find top 5 job titles who are having highest growth in applications.
3. Which part of the US has the most Hardware Engineer jobs for each year? 
4. Find top 5 locations in the US who have got certified visa for each year. 
5. Which industry has the most number of Data Scientist positions? 
6. Which top 5 employers file the most petitions each year? 
7. Find the most popular top 10 job positions for H1B visa applications for each year? 
8. Find the percentage and the count of each case status on total applications for each year.

9. Find the average Prevailing Wage for each Job for each Year (take part time and full time separate). Arrange the output in descending order. 
10. Which are employers along with the number of petitions who have the success rate more than 70% in petitions and total petitions filed more than 1000? 
11. Which are the job positions along with the number of petitions which have the success rate more than 70% in petitions and total petitions filed more than 1000?

### Solution expectation:
* Step 1: Load datasets to HDFS 
* Step 2: Write MapReduce program for questions: 1, 2 & 3 
* Step 3: Write Hive based queries for questions: 4 & 5 
* Step 4: Write Pig scripting for questions: 6 & 7 
* Step 5: Write Hbase queries for questions: 8 & 9 
* Step 6; Write Spark based queries for question: 10 & 11

### Procedure to submit the solution:
1. Submit both solution document for each questions along with screen capture of output from your screen. 
2. Solution document should contain respective program/query/script for the corresponding questions. 
3. Submit your solution as per guidelines shared by program management team

In [1]:
import pandas as pd

In [2]:
data = pd.read_csv("h-1b-visa/h1b_data.csv", index_col=0)

  mask |= (ar1 == a)


In [3]:
data.head()

Unnamed: 0,CASE_STATUS,EMPLOYER_NAME,SOC_NAME,JOB_TITLE,FULL_TIME_POSITION,PREVAILING_WAGE,YEAR,WORKSITE,lon,lat
1,CERTIFIED-WITHDRAWN,UNIVERSITY OF MICHIGAN,BIOCHEMISTS AND BIOPHYSICISTS,POSTDOCTORAL RESEARCH FELLOW,N,36067.0,2016.0,"ANN ARBOR, MICHIGAN",-83.743038,42.280826
2,CERTIFIED-WITHDRAWN,"GOODMAN NETWORKS, INC.",CHIEF EXECUTIVES,CHIEF OPERATING OFFICER,Y,242674.0,2016.0,"PLANO, TEXAS",-96.698886,33.019843
3,CERTIFIED-WITHDRAWN,"PORTS AMERICA GROUP, INC.",CHIEF EXECUTIVES,CHIEF PROCESS OFFICER,Y,193066.0,2016.0,"JERSEY CITY, NEW JERSEY",-74.077642,40.728158
4,CERTIFIED-WITHDRAWN,"GATES CORPORATION, A WHOLLY-OWNED SUBSIDIARY O...",CHIEF EXECUTIVES,"REGIONAL PRESIDEN, AMERICAS",Y,220314.0,2016.0,"DENVER, COLORADO",-104.990251,39.739236
5,WITHDRAWN,PEABODY INVESTMENTS CORP.,CHIEF EXECUTIVES,PRESIDENT MONGOLIA AND INDIA,Y,157518.4,2016.0,"ST. LOUIS, MISSOURI",-90.199404,38.627003


In [19]:
data.columns

Index(['CASE_STATUS', 'EMPLOYER_NAME', 'SOC_NAME', 'JOB_TITLE',
       'FULL_TIME_POSITION', 'PREVAILING_WAGE', 'YEAR', 'WORKSITE', 'lon',
       'lat'],
      dtype='object')

In [23]:
!cat HelloWorld.java

class HelloWorld 
{ 
    // Your program begins with a call to main(). 
    // Prints "Hello, World" to the terminal window. 
    public static void main(String args[]) 
    { 
        System.out.println("Hello, World"); 
    } 
} 

In [24]:
!javac HelloWorld.java
!java HelloWorld

Hello, World


required configuration
export HADOOP_CLASSPATH=/usr/lib/jvm/java-7-openjdk-amd64/lib/tools.jar


In [25]:
#%env HADOOP_CLASSPATH /usr/lib/jvm/java-7-openjdk-amd64/lib/tools.jar

In [26]:
# %env

In [27]:
!cat WordCount.java

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.wri

In [28]:
!hadoop com.sun.tools.javac.Main WordCount.java
!jar cf wc.jar WordCount*.class
!echo "Hello Hadoop Goodbye Hadoop" > wordcount_test.txt
#!hdfs dfs -copyFromLocal wordcount_test.txt /user/bdhfeb201/
!hdfs dfs -cat /user/bdhfeb201/wordcount_test.txt

Hello Hadoop Goodbye Hadoop


In [29]:
!hadoop jar wc.jar WordCount /user/bdhfeb201/wordcount_test.txt /user/bdhfeb201/wordcount/output

20/03/02 04:40:12 INFO client.RMProxy: Connecting to ResourceManager at ip-10-0-1-20.ec2.internal/10.0.1.20:8032
20/03/02 04:40:12 WARN security.UserGroupInformation: PriviledgedActionException as:bdhfeb201 (auth:SIMPLE) cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://ip-10-0-1-20.ec2.internal:8020/user/bdhfeb201/wordcount/output already exists
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://ip-10-0-1-20.ec2.internal:8020/user/bdhfeb201/wordcount/output already exists
	at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
	at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:270)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:143)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1307)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1304)
	at java.security.AccessController.doPrivileged(Nat

In [30]:
!hdfs dfs -cat /user/bdhfeb201/wordcount/output/part*

Goodbye	1
Hadoop	2
Hello	1


In [7]:
d1 = data[['JOB_TITLE', 'YEAR']]

In [15]:
d2 = d1['JOB_TITLE'].value_counts()

In [29]:
i2= d2.index.str.contains('data eng', case=False)

In [30]:
d2.loc[i2]

DATA ENGINEER                                  591
DATA ENGINEER II                               201
SENIOR DATA ENGINEER                           153
BIG DATA ENGINEER                              143
DATA ENGINEER I                                 89
                                              ... 
BIG DATA ENGINEER/SYSTEMS DEVELOPER              1
SENIOR SOFTWARE ENGINEER - DATA ENGINEERING      1
SOFTWARE ENGINER (DATA ENGINEER)                 1
LEAD DATA ENGINEER-ETL                           1
BACKEND DATA ENGINEER                            1
Name: JOB_TITLE, Length: 217, dtype: int64

In [None]:
!cat StringSplit.java

In [1]:
!javac StringSplit.java
!java StringSplit

engineer
manager
data scientist
analyste


In [4]:
#!hdfs dfs -copyFromLocal h-1b-visa-100/ /user/bdhfeb201/
!hdfs dfs -ls /user/bdhfeb201

Found 6 items
drwx------   - bdhfeb201 bdhfeb201          0 2020-03-02 05:00 /user/bdhfeb201/.Trash
drwx------   - bdhfeb201 bdhfeb201          0 2020-03-02 04:45 /user/bdhfeb201/.staging
drwxr-xr-x   - bdhfeb201 bdhfeb201          0 2020-02-26 06:21 /user/bdhfeb201/h-1b-visa
drwxr-xr-x   - bdhfeb201 bdhfeb201          0 2020-03-02 09:59 /user/bdhfeb201/h-1b-visa-100
drwxr-xr-x   - bdhfeb201 bdhfeb201          0 2020-03-02 04:45 /user/bdhfeb201/wordcount
-rw-r--r--   2 bdhfeb201 bdhfeb201         28 2020-02-27 10:47 /user/bdhfeb201/wordcount_test.txt


In [3]:
!cat Q1DataEngineerIncreasing.java

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Q1DataEngineerIncreasing {

  public static class DataEngineerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private String jobTitle;
    private final String titleOfInterest = "CHIEF EXECUTIVES";

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String[] arrOfStr = value.toString().split

In [1]:
!hadoop com.sun.tools.javac.Main Q1DataEngineerIncreasing.java
!jar cf Q1DataEngineerIncreasing.jar Q1DataEngineerIncreasing*.class
!hdfs dfs -rm -r /user/bdhfeb201/Q1DataEngineerIncreasing
!hadoop jar Q1DataEngineerIncreasing.jar Q1DataEngineerIncreasing /user/bdhfeb201/h-1b-visa-100 /user/bdhfeb201/Q1DataEngineerIncreasing

20/03/02 12:13:36 INFO fs.TrashPolicyDefault: Moved: 'hdfs://ip-10-0-1-20.ec2.internal:8020/user/bdhfeb201/Q1DataEngineerIncreasing' to trash at: hdfs://ip-10-0-1-20.ec2.internal:8020/user/bdhfeb201/.Trash/Current/user/bdhfeb201/Q1DataEngineerIncreasing
20/03/02 12:13:38 INFO client.RMProxy: Connecting to ResourceManager at ip-10-0-1-20.ec2.internal/10.0.1.20:8032
20/03/02 12:13:38 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
20/03/02 12:13:38 INFO input.FileInputFormat: Total input paths to process : 1
20/03/02 12:13:38 INFO mapreduce.JobSubmitter: number of splits:1
20/03/02 12:13:39 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1579257682689_8558
20/03/02 12:13:39 INFO impl.YarnClientImpl: Submitted application application_1579257682689_8558
20/03/02 12:13:39 INFO mapreduce.Job: The url to track the job: http://ip-10-0-1-20.ec2.internal:608

In [2]:
!hdfs dfs -cat /user/bdhfeb201/Q1DataEngineerIncreasing/part* | head