### Importing the libraries

In [14]:
import findspark
from elasticsearch import Elasticsearch
import requests
from pprint import pprint

### Finding locally installed Spark and importing PySpark libraries

In [15]:
findspark.init("/usr/local/spark/")
from pyspark.sql import SparkSession, functions as func

### Checking if Elasticsearch has successfully been connected

In [16]:
res = requests.get('http://localhost:9200')
pprint(res.content)

(b'{\n  "name" : "ip-172-31-14-175",\n  "cluster_name" : "elasticsearch",\n  "'
 b'cluster_uuid" : "ypf0RlCFQYytEO-JNW-MCg",\n  "version" : {\n    "number" :'
 b' "7.9.2",\n    "build_flavor" : "default",\n    "build_type" : "tar",\n    '
 b'"build_hash" : "d34da0ea4a966c4e49417f2da2f244e3e97b4e6e",\n    "build_da'
 b'te" : "2020-09-23T00:45:33.626720Z",\n    "build_snapshot" : false,\n    "'
 b'lucene_version" : "8.6.2",\n    "minimum_wire_compatibility_version" : "6'
 b'.8.0",\n    "minimum_index_compatibility_version" : "6.0.0-beta1"\n  },\n  '
 b'"tagline" : "You Know, for Search"\n}\n')


### Creating an Elasticsearch object

In [17]:
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

### Creating Spark Session

In [18]:
spark = SparkSession.builder.appName('task').getOrCreate()

### Reading CSV with Spark and creating Dataframe

In [19]:
df = spark.read.format("csv").option("header","true").load("/home/ubuntu/Hr5m.csv").fillna(0)["Emp ID", "First Name", "Last Name", "Gender", "E Mail", "Month Name of Joining", "SSN", "County", "State", "Region", "City", "Zip", "Salary", func.regexp_replace(func.col("Last % Hike"), "%", "").alias("Salary Hike in %")]
df.show(5)

+------+----------+---------+------+--------------------+---------------------+-----------+--------------------+-----+-------+----------+-----+------+----------------+
|Emp ID|First Name|Last Name|Gender|              E Mail|Month Name of Joining|        SSN|              County|State| Region|      City|  Zip|Salary|Salary Hike in %|
+------+----------+---------+------+--------------------+---------------------+-----------+--------------------+-----+-------+----------+-----+------+----------------+
|742048|    Lizeth|   Mccoll|     F|lizeth.mccoll@ibm...|              January|171-86-6830|               Stark|   OH|Midwest|  Alliance|44601|147446|              14|
|671135| Argentina|     Hern|     F|argentina.hern@nt...|                April|083-02-3078|District of Columbia|   DC|  South|Washington|20411|129174|               8|
|965851|    Damian|  Patillo|     M|damian.patillo@ou...|             December|326-11-9852|              Fresno|   CA|   West|    Burrel|93607|158746|          

### Checking the total records in the PySpark Dataframe

In [20]:
print(f"Total records present: {df.count()}")

Total records present: 5000000


### Checking the data types

In [21]:
df.dtypes

[('Emp ID', 'string'),
 ('First Name', 'string'),
 ('Last Name', 'string'),
 ('Gender', 'string'),
 ('E Mail', 'string'),
 ('Month Name of Joining', 'string'),
 ('SSN', 'string'),
 ('County', 'string'),
 ('State', 'string'),
 ('Region', 'string'),
 ('City', 'string'),
 ('Zip', 'string'),
 ('Salary', 'string'),
 ('Salary Hike in %', 'string')]

### Casting 'string' value to 'int'

In [22]:
df = df.withColumn("Emp ID", df["Emp ID"].cast("int"))
df = df.withColumn("Zip", df["Zip"].cast("int"))
df = df.withColumn("Salary", df["Salary"].cast("int"))
df = df.withColumn("Salary Hike in %", df["Salary Hike in %"].cast("int"))
df.dtypes

[('Emp ID', 'int'),
 ('First Name', 'string'),
 ('Last Name', 'string'),
 ('Gender', 'string'),
 ('E Mail', 'string'),
 ('Month Name of Joining', 'string'),
 ('SSN', 'string'),
 ('County', 'string'),
 ('State', 'string'),
 ('Region', 'string'),
 ('City', 'string'),
 ('Zip', 'int'),
 ('Salary', 'int'),
 ('Salary Hike in %', 'int')]

### Checking if the changes have been made successfully

In [23]:
df.show(5)

+------+----------+---------+------+--------------------+---------------------+-----------+--------------------+-----+-------+----------+-----+------+----------------+
|Emp ID|First Name|Last Name|Gender|              E Mail|Month Name of Joining|        SSN|              County|State| Region|      City|  Zip|Salary|Salary Hike in %|
+------+----------+---------+------+--------------------+---------------------+-----------+--------------------+-----+-------+----------+-----+------+----------------+
|742048|    Lizeth|   Mccoll|     F|lizeth.mccoll@ibm...|              January|171-86-6830|               Stark|   OH|Midwest|  Alliance|44601|147446|              14|
|671135| Argentina|     Hern|     F|argentina.hern@nt...|                April|083-02-3078|District of Columbia|   DC|  South|Washington|20411|129174|               8|
|965851|    Damian|  Patillo|     M|damian.patillo@ou...|             December|326-11-9852|              Fresno|   CA|   West|    Burrel|93607|158746|          

### Checking the Schema

In [24]:
df.printSchema()

root
 |-- Emp ID: integer (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- E Mail: string (nullable = true)
 |-- Month Name of Joining: string (nullable = true)
 |-- SSN: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zip: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Salary Hike in %: integer (nullable = true)



### Ingesting Pyspark Dataframe in Elasticsearch directly

In [25]:
df.write.format(
    "org.elasticsearch.spark.sql"
).option(
    "es.resource", '%s' % ('humans')
).option(
    "es.nodes", 'localhost'
).option(
    "es.port", '9200'
).save()
print("Successfully ingested data into Elasticsearch!")

Successfully ingested data into Elasticsearch!


### Setting the output limit

In [26]:
es.indices.put_settings(index="humans", body = {"index" : { "max_result_window" : 5000000 }})

{'acknowledged': True}

### Counting the records in index to see if PySpark Dataframe has been successfully ingested

In [27]:
es.count(index= "humans")

{'count': 5000000,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}

### Task 1: Count the number of employees in each County, Region and City

In [32]:
res = es.search(index="humans", size=0, body={"aggs": {"No. of Employees per County": {"terms": {"field": "County.keyword","size": 5, "order": {"_key": "asc"}}}}})
pprint(res["aggregations"]["No. of Employees per County"]["buckets"])

[{'doc_count': 90, 'key': 'Aaron'},
 {'doc_count': 509, 'key': 'Abbeville'},
 {'doc_count': 1373, 'key': 'Acadia'},
 {'doc_count': 4611, 'key': 'Accomack'},
 {'doc_count': 5104, 'key': 'Ada'}]


In [33]:
res = es.search(index="humans", size=0, body={"aggs":{"No. of Employees per Region" : {"terms":{"field": "Region.keyword", "order": {"_key": "asc"}}}}})
pprint(res["aggregations"]["No. of Employees per Region"]["buckets"])

[{'doc_count': 1372439, 'key': 'Midwest'},
 {'doc_count': 886984, 'key': 'Northeast'},
 {'doc_count': 1855656, 'key': 'South'},
 {'doc_count': 884921, 'key': 'West'}]


In [34]:
res = es.search(index="humans", size=0, body={"aggs":{"No. of Employees per City" : {"terms":{"field": "City.keyword", "size": 5, "order": {"_key": "asc"}}}}})
pprint(res["aggregations"]["No. of Employees per City"]["buckets"])

[{'doc_count': 90, 'key': 'Aaron'},
 {'doc_count': 96, 'key': 'Aaronsburg'},
 {'doc_count': 681, 'key': 'Abbeville'},
 {'doc_count': 139, 'key': 'Abbot'},
 {'doc_count': 118, 'key': 'Abbotsford'}]


### Task 2: Generate Employee Summary

In [35]:
res = es.search(index="humans", size=5, body={"query": {"match_all": {}}})
pprint(res["hits"]["hits"])

[{'_id': 'bAo_JnUB6zQKO643xqax',
  '_index': 'humans',
  '_score': 1.0,
  '_source': {'City': 'Charlotte',
              'County': 'Mecklenburg',
              'E Mail': 'walter.shelly@gmail.com',
              'Emp ID': 547291,
              'First Name': 'Walter',
              'Gender': 'M',
              'Last Name': 'Shelly',
              'Month Name of Joining': 'February',
              'Region': 'South',
              'SSN': '423-67-5915',
              'Salary': 117819,
              'Salary Hike in %': 21,
              'State': 'NC',
              'Zip': 28289},
  '_type': '_doc'},
 {'_id': 'bQo_JnUB6zQKO643xqax',
  '_index': 'humans',
  '_score': 1.0,
  '_source': {'City': 'Geraldine',
              'County': 'Chouteau',
              'E Mail': 'efren.samuel@yahoo.com',
              'Emp ID': 430700,
              'First Name': 'Efren',
              'Gender': 'M',
              'Last Name': 'Samuel',
              'Month Name of Joining': 'August',
              'Region'

### Task 3: Generate employee summary and ordering by Gender and Salary

In [36]:
res=es.search(index="humans", size=5, body={"sort":[{"Gender.keyword":{"order":"asc"}}, {"Salary":{"order":"asc"}}]})
pprint(res["hits"]["hits"])

[{'_id': 'Zgg_JnUB6zQKO643fQON',
  '_index': 'humans',
  '_score': None,
  '_source': {'City': 'Saint Landry',
              'County': 'Evangeline',
              'E Mail': 'ariana.cumberbatch@msn.com',
              'Emp ID': 506893,
              'First Name': 'Ariana',
              'Gender': 'F',
              'Last Name': 'Cumberbatch',
              'Month Name of Joining': 'June',
              'Region': 'South',
              'SSN': '057-02-5798',
              'Salary': 40000,
              'Salary Hike in %': 4,
              'State': 'LA',
              'Zip': 71367},
  '_type': '_doc',
  'sort': ['F', 40000]},
 {'_id': 'rk9FJnUB6zQKO643p0WP',
  '_index': 'humans',
  '_score': None,
  '_source': {'City': 'Dallas',
              'County': 'Dallas',
              'E Mail': 'amee.gagliano@yahoo.ca',
              'Emp ID': 616703,
              'First Name': 'Amee',
              'Gender': 'F',
              'Last Name': 'Gagliano',
              'Month Name of Joining': 'Augus

### Task 4: Summerize the number of employee joined and hikes granted based on month

In [37]:
res=es.search(index="humans", size=0, body={"aggs":{"No. of Employees joined in particular Month" : {"terms":{"field": "Month Name of Joining.keyword", "size": 12}}}})
pprint(res["aggregations"]["No. of Employees joined in particular Month"]["buckets"])

[{'doc_count': 468295, 'key': 'August'},
 {'doc_count': 450981, 'key': 'July'},
 {'doc_count': 434522, 'key': 'May'},
 {'doc_count': 427424, 'key': 'June'},
 {'doc_count': 422987, 'key': 'March'},
 {'doc_count': 414484, 'key': 'April'},
 {'doc_count': 413247, 'key': 'January'},
 {'doc_count': 408721, 'key': 'December'},
 {'doc_count': 400048, 'key': 'October'},
 {'doc_count': 392382, 'key': 'November'},
 {'doc_count': 384646, 'key': 'September'},
 {'doc_count': 382263, 'key': 'February'}]


In [39]:
res=es.search(index="humans", size=0, body={"aggs": {"Month": {"terms": {"field": "Month Name of Joining.keyword", "size":12},"aggs": {"Hikes granted in this Month": {"terms": {"field": "Salary Hike"}}}}}})
pprint(res["aggregations"]["Month"]["buckets"])

[{'Hikes granted in this Month': {'buckets': [],
                                  'doc_count_error_upper_bound': 0,
                                  'sum_other_doc_count': 0},
  'doc_count': 468295,
  'key': 'August'},
 {'Hikes granted in this Month': {'buckets': [],
                                  'doc_count_error_upper_bound': 0,
                                  'sum_other_doc_count': 0},
  'doc_count': 450981,
  'key': 'July'},
 {'Hikes granted in this Month': {'buckets': [],
                                  'doc_count_error_upper_bound': 0,
                                  'sum_other_doc_count': 0},
  'doc_count': 434522,
  'key': 'May'},
 {'Hikes granted in this Month': {'buckets': [],
                                  'doc_count_error_upper_bound': 0,
                                  'sum_other_doc_count': 0},
  'doc_count': 427424,
  'key': 'June'},
 {'Hikes granted in this Month': {'buckets': [],
                                  'doc_count_error_upper_bound': 0,
      

### Task 5: Generate employee summary and ordering by Salary

In [40]:
res=es.search(index="humans", size=5, body={"query":{"match_all":{}}, "sort":[{"Salary":{"order":"asc"}}]})
pprint(res["hits"]["hits"])

[{'_id': 'Bw1AJnUB6zQKO643Hnoz',
  '_index': 'humans',
  '_score': None,
  '_source': {'City': 'Louann',
              'County': 'Ouachita',
              'E Mail': 'kirk.deem@gmail.com',
              'Emp ID': 336973,
              'First Name': 'Kirk',
              'Gender': 'M',
              'Last Name': 'Deem',
              'Month Name of Joining': 'April',
              'Region': 'South',
              'SSN': '623-85-7102',
              'Salary': 40000,
              'Salary Hike in %': 28,
              'State': 'AR',
              'Zip': 71751},
  '_type': '_doc',
  'sort': [40000]},
 {'_id': 'Hgs_JnUB6zQKO643101x',
  '_index': 'humans',
  '_score': None,
  '_source': {'City': 'Tire Hill',
              'County': 'Somerset',
              'E Mail': 'rosario.hillyard@gmail.com',
              'Emp ID': 680938,
              'First Name': 'Rosario',
              'Gender': 'M',
              'Last Name': 'Hillyard',
              'Month Name of Joining': 'November',
         

### Deleting the index from elasticsearch

In [41]:
es.indices.delete(index='humans')

{'acknowledged': True}

### Stopping the SparkSession

In [42]:
spark.stop()
print("Successfully stopped SparkSession!")

Successfully stopped SparkSession!
