**Usecase-1**  

Reading: Load employees table from MySQL database into Spark. 

Requirement: The report must include following information:   
1. Calculate the employees current age.
2. Calculate total number of years worked by employee.
3. Find the age of employee when they are hired at the company.
4. Show employee birth year.
5. Create employee abbreviated name that contains 2 first character from last name and all character from first in lower case.
6. Reverse employee number.

Ordering: Sort the data by employee abbreviated name in ascending order.

The ordinality, attributes name and type is defined below:
6. id (type: integer)
5. user (type: string)
1. age (type: integer)
4. birth_year (type: integer)
3. start_age (type: integer)
2. year_worked (type: integer)

Output 1: The output file must be written in each file type shown below. The directory structure is defined below:   
BASE_DIR = `/opt/spark_processing/data/employee`   
FILE_TYPE = { `csv | parquet | orc | avro | json` }   
DATA = { `employee` }   
CURRENT_DATE = `now()`   
FILE_NAME = `spark_$DATA_$CURRENT_DATE`   
LOCATION = `$BASE_DIR/$FILE_NAME/$DATA/$FILE_NAME`     
 
Output 2: Mysql table ( database name: analytics_tensor, table name: spark_employees_current_date)

***Reading: Load employees table from MySQL database into Spark.*** 

In [3]:
import findspark
findspark.init()
findspark.find()

'C:\\Users\\dipti\\spark-3.0.0-preview2-bin-hadoop3.2\\spark-3.0.0-preview2-bin-hadoop3.2'

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.master('local').appName('UseCase_1').getOrCreate()

In [11]:
import configparser
config_filename = '.\db_properties.ini'
db_properties = {}
config = configparser.ConfigParser()
config.read(config_filename)
db_prop = config['mysql']
db_url = db_prop ['url']#remove employees from 'url = jdbc:mysql://localhost:3306/employees' in db_properties.ini file
db_properties ['driver'] = db_prop['driver']
db_properties['user'] = db_prop['user']
db_properties['password'] = db_prop['password']
db_properties['timezone'] = db_prop ['timezone']
#db_properties['database'] = db_prop['database'] # remove database = employees from db_properties.inifile


In [12]:
employees_df = spark.read.jdbc(url = db_url, table = 'employees',\
                               properties = db_properties)

In [13]:
employees_df.printSchema()

root
 |-- emp_no: integer (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: date (nullable = true)



In [14]:
#Calculate the employees current age.
from pyspark.sql.functions import floor
from pyspark.sql.functions import col
from pyspark.sql.functions import current_date, datediff
curr_age = (datediff(current_date(), col("birth_date")))/365
age = employees_df.select('*', floor(curr_age).alias("age").cast("Integer"))
age.show(5)
age.printSchema()

+------+----------+----------+---------+------+----------+---+
|emp_no|birth_date|first_name|last_name|gender| hire_date|age|
+------+----------+----------+---------+------+----------+---+
| 10001|1953-09-02|    Georgi|  Facello|     M|1986-06-26| 66|
| 10002|1964-06-02|   Bezalel|   Simmel|     F|1985-11-21| 56|
| 10003|1959-12-03|     Parto|  Bamford|     M|1986-08-28| 60|
| 10004|1954-05-01| Chirstian|  Koblick|     M|1986-12-01| 66|
| 10005|1955-01-21|   Kyoichi| Maliniak|     M|1989-09-12| 65|
+------+----------+----------+---------+------+----------+---+
only showing top 5 rows

root
 |-- emp_no: integer (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- age: integer (nullable = true)



In [15]:
#Calculate total number of years worked by employee.
from pyspark.sql.functions import round
years = datediff(current_date(), "hire_date")/365
year_worked = age.select('*', round(years, 0).cast('Integer').alias("year_worked"))
year_worked.show(5)
year_worked.printSchema()

+------+----------+----------+---------+------+----------+---+-----------+
|emp_no|birth_date|first_name|last_name|gender| hire_date|age|year_worked|
+------+----------+----------+---------+------+----------+---+-----------+
| 10001|1953-09-02|    Georgi|  Facello|     M|1986-06-26| 66|         34|
| 10002|1964-06-02|   Bezalel|   Simmel|     F|1985-11-21| 56|         35|
| 10003|1959-12-03|     Parto|  Bamford|     M|1986-08-28| 60|         34|
| 10004|1954-05-01| Chirstian|  Koblick|     M|1986-12-01| 66|         34|
| 10005|1955-01-21|   Kyoichi| Maliniak|     M|1989-09-12| 65|         31|
+------+----------+----------+---------+------+----------+---+-----------+
only showing top 5 rows

root
 |-- emp_no: integer (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- year_worked: in

In [16]:
#Find the age of employee when they are hired at the company.
age_when_hired = datediff("hire_date", "birth_date")/365
start_age = year_worked.select('*', floor(age_when_hired).cast('Integer').alias("start_age"))
start_age.show(5)
start_age.printSchema()

+------+----------+----------+---------+------+----------+---+-----------+---------+
|emp_no|birth_date|first_name|last_name|gender| hire_date|age|year_worked|start_age|
+------+----------+----------+---------+------+----------+---+-----------+---------+
| 10001|1953-09-02|    Georgi|  Facello|     M|1986-06-26| 66|         34|       32|
| 10002|1964-06-02|   Bezalel|   Simmel|     F|1985-11-21| 56|         35|       21|
| 10003|1959-12-03|     Parto|  Bamford|     M|1986-08-28| 60|         34|       26|
| 10004|1954-05-01| Chirstian|  Koblick|     M|1986-12-01| 66|         34|       32|
| 10005|1955-01-21|   Kyoichi| Maliniak|     M|1989-09-12| 65|         31|       34|
+------+----------+----------+---------+------+----------+---+-----------+---------+
only showing top 5 rows

root
 |-- emp_no: integer (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- 

In [17]:
#Show employee birth year.
from pyspark.sql.functions import regexp_replace
birth_year = start_age.select('*',(regexp_replace("birth_date", r'-\d{2}-\d{2}$', '')\
                                  .cast('Integer').alias("birth_year")))
birth_year.show(5)
birth_year.printSchema()

+------+----------+----------+---------+------+----------+---+-----------+---------+----------+
|emp_no|birth_date|first_name|last_name|gender| hire_date|age|year_worked|start_age|birth_year|
+------+----------+----------+---------+------+----------+---+-----------+---------+----------+
| 10001|1953-09-02|    Georgi|  Facello|     M|1986-06-26| 66|         34|       32|      1953|
| 10002|1964-06-02|   Bezalel|   Simmel|     F|1985-11-21| 56|         35|       21|      1964|
| 10003|1959-12-03|     Parto|  Bamford|     M|1986-08-28| 60|         34|       26|      1959|
| 10004|1954-05-01| Chirstian|  Koblick|     M|1986-12-01| 66|         34|       32|      1954|
| 10005|1955-01-21|   Kyoichi| Maliniak|     M|1989-09-12| 65|         31|       34|      1955|
+------+----------+----------+---------+------+----------+---+-----------+---------+----------+
only showing top 5 rows

root
 |-- emp_no: integer (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- first_name: string (nu

In [18]:
#Create employee abbreviated name that contains 2 first character from last name 
#and all character from first in lower case.
from pyspark.sql.functions import concat, substring, lower, col

l_name_sub = substring("last_name", 0, 2)
conct = lower(concat(l_name_sub, col("first_name")))
user = birth_year.select('*', conct.alias("user"))
user.show(5)
user.printSchema()

+------+----------+----------+---------+------+----------+---+-----------+---------+----------+-----------+
|emp_no|birth_date|first_name|last_name|gender| hire_date|age|year_worked|start_age|birth_year|       user|
+------+----------+----------+---------+------+----------+---+-----------+---------+----------+-----------+
| 10001|1953-09-02|    Georgi|  Facello|     M|1986-06-26| 66|         34|       32|      1953|   fageorgi|
| 10002|1964-06-02|   Bezalel|   Simmel|     F|1985-11-21| 56|         35|       21|      1964|  sibezalel|
| 10003|1959-12-03|     Parto|  Bamford|     M|1986-08-28| 60|         34|       26|      1959|    baparto|
| 10004|1954-05-01| Chirstian|  Koblick|     M|1986-12-01| 66|         34|       32|      1954|kochirstian|
| 10005|1955-01-21|   Kyoichi| Maliniak|     M|1989-09-12| 65|         31|       34|      1955|  makyoichi|
+------+----------+----------+---------+------+----------+---+-----------+---------+----------+-----------+
only showing top 5 rows

roo

In [19]:
#Reverse employee number
from pyspark.sql.functions import reverse

id = user.select('*', reverse("emp_no").cast('Integer').alias("id"))
id.show(5)
id.printSchema()


+------+----------+----------+---------+------+----------+---+-----------+---------+----------+-----------+-----+
|emp_no|birth_date|first_name|last_name|gender| hire_date|age|year_worked|start_age|birth_year|       user|   id|
+------+----------+----------+---------+------+----------+---+-----------+---------+----------+-----------+-----+
| 10001|1953-09-02|    Georgi|  Facello|     M|1986-06-26| 66|         34|       32|      1953|   fageorgi|10001|
| 10002|1964-06-02|   Bezalel|   Simmel|     F|1985-11-21| 56|         35|       21|      1964|  sibezalel|20001|
| 10003|1959-12-03|     Parto|  Bamford|     M|1986-08-28| 60|         34|       26|      1959|    baparto|30001|
| 10004|1954-05-01| Chirstian|  Koblick|     M|1986-12-01| 66|         34|       32|      1954|kochirstian|40001|
| 10005|1955-01-21|   Kyoichi| Maliniak|     M|1989-09-12| 65|         31|       34|      1955|  makyoichi|50001|
+------+----------+----------+---------+------+----------+---+-----------+---------+----

In [50]:
employee = id.select("id", "user", "age", "birth_year", "start_age", "year_worked")
employee = employee.sort("user")
employee.show(20)
type(employee)
employee.printSchema()

+------+-------------+---+----------+---------+-----------+
|    id|         user|age|birth_year|start_age|year_worked|
+------+-------------+---+----------+---------+-----------+
|146852| aaabdelkader| 59|      1961|       33|         26|
|500852|    aaadhemar| 67|      1953|       37|         29|
|377554|   aaaemilian| 60|      1960|       27|         32|
| 65634|      aaalagu| 61|      1959|       32|         29|
|156662| aaaleksander| 61|      1959|       29|         31|
|895784|    aaalexius| 58|      1962|       32|         25|
|369612|      aaalois| 59|      1960|       35|         25|
| 72451|     aaaluzio| 61|      1959|       26|         35|
| 68001|    aaamabile| 55|      1964|       28|         27|
| 70701|    aaanestis| 66|      1954|       36|         30|
|507832|     aaanoosh| 62|      1957|       29|         33|
|488522|      aaanwar| 64|      1956|       32|         32|
|833602|    aaarlette| 67|      1952|       37|         30|
| 28192|   aaarumugam| 67|      1952|   

Output 1: The output file must be written in each file type shown below. The directory structure is defined below:
BASE_DIR = /opt/spark_processing/data/employee
FILE_TYPE = { csv | parquet | orc | avro | json }
DATA = { employee }
CURRENT_DATE = now()
FILE_NAME = spark_$DATA_$CURRENT_DATE
LOCATION = $BASE_DIR/$FILE_NAME/$DATA/$FILE_NAME

Output 2: Mysql table ( database name: analytics_tensor, table name: spark_employees_current_date)

In [51]:
from datetime import datetime
curr_date = datetime.now().strftime('%Y-%m-%d')
data = "employee"
file_name = "spark_"+data+"_"+curr_date


In [30]:
path_csv = 'C:/Users/dipti/PysparkLessons/spark_file_from_lab1usecase1/spark_employee/csv/'+file_name
employee.write.format("csv").mode("overwrite").option("path", path_csv).save()

In [None]:
path_parquet = 'C:/Users/dipti/PysparkLessons/spark_file_from_lab1usecase1/spark_employee/parquet/'+file_name
employee.write.mode("overwrite").option("path", path_parquet).save()

In [None]:
path_orc = 'C:/Users/dipti/PysparkLessons/spark_file_from_lab1usecase1/spark_employee/orc/'+file_name
employee.write.format("orc").mode("overwrite").option("path", path_orc).save()

In [None]:
path_avro = 'C:/Users/dipti/PysparkLessons/spark_file_from_lab1usecase1/spark_employee/avro/'+file_name
employee.write.format("avro").mode("overwrite").option("path", path_avro).save()

In [None]:
path_json = 'C:/Users/dipti/PysparkLessons/spark_file_from_lab1usecase1/spark_employee/json/'+file_name
employee.write.format("json").mode("overwrite").option("path", path_json).save()

Output 2: Mysql table ( database name: analytics_tensor, table name: spark_employees_current_date)

In [55]:

#table_name1 = "spark_employees_"+curr_date    
#database= "analytics_tensor"
#table_name = database.table_name1  --if you supply table = table_name inside jdbc method, it throws error--

employee.write.jdbc(url = db_url, table = "analytics_tensor.spark_employees", properties = db_properties)\
.mode('append').save()
#.mode('append') throws error in console but still writes dataframe into MySql table

AnalysisException: Table or view 'analytics_tensor.spark_employees' already exists. SaveMode: ErrorIfExists.;