In [1]:
# imports
import findspark
findspark.init()
findspark.find()

# Normal Imports
import pyspark
from pyspark.sql.window import Window
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import row_number
from pyspark.sql.functions import col,struct,when, lit
from pyspark.sql import Row

print("imported")

imported


For references use - https://sparkbyexamples.com/pyspark-tutorial/

In [2]:
# spark context
conf = SparkConf().setAppName("RowSpark").setMaster("local[*]")\
            .set("spark.driver.allowMultipleContexts","true")
sc= SparkContext.getOrCreate(conf = conf)
sc.setLogLevel("ERROR")

# spark session initialization
spark = SparkSession.builder.master("local[*]")\
            .appName("Scenarios").getOrCreate()

##### Mother and Father Scenario

In [3]:
p = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/p.csv")
p.show()

+---+------+------+
| id|  name|gender|
+---+------+------+
|101|  riya|     F|
|566|  Aman|     M|
|202|rakesh|     F|
|875| lucky|     M|
|202| reena|     M|
|322| raina|     M|
|345| Rohit|     F|
|322| Mohit|     F|
|345| Meena|     M|
+---+------+------+



In [4]:
r = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/r.csv")
r.show()

+---+---+
|cid|pid|
+---+---+
|101|202|
|566|322|
|875|345|
+---+---+



In [5]:
leftjoin = r.join(p, r.cid == p.id, "left").select("cid", "pid", "name")
leftjoin.show()

+---+---+-----+
|cid|pid| name|
+---+---+-----+
|101|202| riya|
|566|322| Aman|
|875|345|lucky|
+---+---+-----+



In [6]:
leftantijoin = p.join(r, r.cid == p.id, "left_anti")
leftantijoin.show()

+---+------+------+
| id|  name|gender|
+---+------+------+
|202|rakesh|     F|
|202| reena|     M|
|322| raina|     M|
|345| Rohit|     F|
|322| Mohit|     F|
|345| Meena|     M|
+---+------+------+



In [7]:
mothers = leftantijoin.filter(leftantijoin.gender == "M").withColumnRenamed("name", "mothers")
mothers.show()

+---+-------+------+
| id|mothers|gender|
+---+-------+------+
|202|  reena|     M|
|322|  raina|     M|
|345|  Meena|     M|
+---+-------+------+



In [8]:
fathers = leftantijoin.filter(leftantijoin.gender == "F").withColumnRenamed("name", "fathers")
fathers.show()

+---+-------+------+
| id|fathers|gender|
+---+-------+------+
|202| rakesh|     F|
|345|  Rohit|     F|
|322|  Mohit|     F|
+---+-------+------+



In [9]:
leftmothers = leftjoin.join(mothers, leftjoin.pid == mothers.id, "left")
leftfathers = leftmothers.join(fathers, leftmothers.pid == fathers.id, "left")

leftfathers.show()

+---+---+-----+---+-------+------+---+-------+------+
|cid|pid| name| id|mothers|gender| id|fathers|gender|
+---+---+-----+---+-------+------+---+-------+------+
|101|202| riya|202|  reena|     M|202| rakesh|     F|
|566|322| Aman|322|  raina|     M|322|  Mohit|     F|
|875|345|lucky|345|  Meena|     M|345|  Rohit|     F|
+---+---+-----+---+-------+------+---+-------+------+



In [10]:
finaldf = leftfathers.select("name", "mothers", "fathers")
finaldf.show()

+-----+-------+-------+
| name|mothers|fathers|
+-----+-------+-------+
| riya|  reena| rakesh|
| Aman|  raina|  Mohit|
|lucky|  Meena|  Rohit|
+-----+-------+-------+



# Batch 34 Qs

## sort after _

In [11]:
sortdata = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/sortusing.csv")
sortdata.show()

+-----------+---+
|       name|sal|
+-----------+---+
|sree_ramesh|100|
| chiran_tan|200|
|  ram_krish|300|
|  john_stan|400|
|   mar_jany|200|
+-----------+---+



In [23]:
# splitdata = sortdata.withColumn("lastname", expr("split(name, '_')[1]")).sort(col("lastname").asc())
# splitdata = sortdata.withColumn("lastname", expr("split(name, '_')[1]")).sort(col("lastname").asc()).drop(col("lastname"))
splitdata = sortdata.withColumn("lastname", expr("concat('_', split(name, '_')[1])")).sort(col("lastname").asc())

splitdata.show() 

+-----------+---+--------+
|       name|sal|lastname|
+-----------+---+--------+
|   mar_jany|200|   _jany|
|  ram_krish|300|  _krish|
|sree_ramesh|100| _ramesh|
|  john_stan|400|   _stan|
| chiran_tan|200|    _tan|
+-----------+---+--------+



### olderemployeeselect

In [24]:
sortdata = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/olderemployeeselect.csv")
sortdata.show()

+------+-------+---+---------+
|emp_id|dept_id|sal|hire_date|
+------+-------+---+---------+
|    10|     10|600|10-Jan-19|
|    20|     10|200|10-Jun-19|
|    30|     20|300|20-Jan-20|
|    40|     30|400|30-Jun-20|
+------+-------+---+---------+



In [36]:
windowfunc = Window.partitionBy(col("dept_id")).orderBy(col("hire_date").desc())

hiredate = sortdata.withColumn("row_number", row_number().over(windowfunc))
hiredate.show()

+------+-------+---+---------+----------+
|emp_id|dept_id|sal|hire_date|row_number|
+------+-------+---+---------+----------+
|    20|     10|200|10-Jun-19|         1|
|    10|     10|600|10-Jan-19|         2|
|    30|     20|300|20-Jan-20|         1|
|    40|     30|400|30-Jun-20|         1|
+------+-------+---+---------+----------+



In [37]:
hirefinal = hiredate.filter(hiredate.row_number == 1).select("emp_id", "dept_id", "sal", "hire_date")
hirefinal.show()

+------+-------+---+---------+
|emp_id|dept_id|sal|hire_date|
+------+-------+---+---------+
|    20|     10|200|10-Jun-19|
|    30|     20|300|20-Jan-20|
|    40|     30|400|30-Jun-20|
+------+-------+---+---------+



## reporting manager

In [38]:
manager_reporting = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/manager_reporting.csv")
manager_reporting.show()

+---+-----+----+-------+
| id| Name|Dept|Manager|
+---+-----+----+-------+
|101| John|   A|   null|
|102|  Dan|   A|    101|
|103|James|   A|    101|
|104|  Amy|   A|    101|
|105| Anne|   A|    101|
|106|  Ron|   B|    101|
+---+-----+----+-------+



In [48]:
# sql
dataset = manager_reporting.createOrReplaceTempView("managertable")
managerdf = spark.sql("""select Name 
                        from managertable 
                        where id in (select Manager 
                        from managertable 
                        group by Manager 
                        having count(*)>=5)
                        """)
managerdf.show()



+----+
|Name|
+----+
|John|
+----+



In [57]:
# DSL

dsl_data = manager_reporting.groupBy(col("Manager").alias("mgid")).count()
dsl_data.show()

+----+-----+
|mgid|count|
+----+-----+
| 101|    5|
|null|    1|
+----+-----+



In [63]:
filtereddata = dsl_data.filter(col("count") >= 5)
filtereddata.join(manager_reporting, col("mgid") == col("id"), "left").select("name").show()

+----+
|name|
+----+
|John|
+----+



## manager_details

In [65]:
manager_details = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/manager_details.csv")
manager_details.show()

+-----+------+-----+
|EMPID|EMNAME|MNGID|
+-----+------+-----+
|  101|  Mary|  102|
|  102|  Ravi| NULL|
|  103|   Raj|  102|
|  104|  Pete|  103|
|  105|Prasad|  103|
|  106|   Ben|  103|
+-----+------+-----+



In [78]:
mngid = manager_details.filter(col("mngid") != "NULL").select("mngid").withColumnRenamed("mngid", "id").distinct()
mngid.show()

+---+
| id|
+---+
|102|
|103|
+---+



In [82]:
final = mngid.join(manager_details, col("empid") == col("id"), "left").select("EMPID", "EMNAME")
final.show()

+-----+------+
|EMPID|EMNAME|
+-----+------+
|  102|  Ravi|
|  103|   Raj|
+-----+------+



### replace_underscore

In [92]:
replace_underscore = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/replace_underscore.csv")
replace_underscore.show()

+------+-----------+---------+
|Emp Id|   Emp Name|Dept Name|
+------+-----------+---------+
|   101|Alice james|    Sales|
|   102|   Bob jack|    Sales|
+------+-----------+---------+



In [95]:
# replace the column data, space with underscore
replace_underscore.withColumn("Emp Name 1", regexp_replace("Emp Name", " ", "_")).show()

+------+-----------+---------+-----------+
|Emp Id|   Emp Name|Dept Name| Emp Name 1|
+------+-----------+---------+-----------+
|   101|Alice james|    Sales|Alice_james|
|   102|   Bob jack|    Sales|   Bob_jack|
+------+-----------+---------+-----------+



In [97]:
# replace the column header space with underscore
replace_underscore1 = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/replace_underscore.csv")
replace_underscore1.show()

replace_underscore1.withColumnRenamed("Emp Id", "Emp_Id")\
                    .withColumnRenamed("Emp Name", "Emp_Name")\
                    .withColumnRenamed("Dept Name", "Dept_Name").show()

+------+-----------+---------+
|Emp Id|   Emp Name|Dept Name|
+------+-----------+---------+
|   101|Alice james|    Sales|
|   102|   Bob jack|    Sales|
+------+-----------+---------+

+------+-----------+---------+
|Emp_Id|   Emp_Name|Dept_Name|
+------+-----------+---------+
|   101|Alice james|    Sales|
|   102|   Bob jack|    Sales|
+------+-----------+---------+



### addscore

In [104]:
addscore = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/addscore.csv")
addscore.show()

+-----+------+
|Range|Number|
+-----+------+
|   90|     2|
|   60|     3|
|   70|     5|
|   80|     1|
+-----+------+



In [117]:
windowspec = Window.orderBy(addscore.Range.desc())
leadcol = addscore.withColumn("lead", lag(col("Number"),1).over(windowspec))
leadcol.show()

leadcol.withColumn("Number", expr("case when lead is null then number else number+lead end").cast("Integer")).show()
addscore.withColumn("Number", expr("sum(Number) over (order by Range desc rows unbounded preceding)").cast("Integer")).show()



+-----+------+----+
|Range|Number|lead|
+-----+------+----+
|   90|     2|null|
|   80|     1|   2|
|   70|     5|   1|
|   60|     3|   5|
+-----+------+----+

+-----+------+----+
|Range|Number|lead|
+-----+------+----+
|   90|     2|null|
|   80|     3|   2|
|   70|     6|   1|
|   60|     8|   5|
+-----+------+----+

+-----+------+
|Range|Number|
+-----+------+
|   90|     2|
|   80|     3|
|   70|     8|
|   60|    11|
+-----+------+



### list split
Input -> my_list = ['abc', 'for', 'abc', 'like','geek1','nerdy', 'xyz', 'love','questions','words', 'life']
 
Output -> [['abc', 'for', 'abc', 'like', 'geek1'], ['nerdy', 'xyz', 'love', 'questions', 'words'], ['life']]

In [133]:
my_list = ['abc', 'for', 'abc', 'like','geek1','nerdy', 'xyz', 'love','questions','words', 'life']

# using python
output = []
length = 0
for i in range((len(my_list)//5)+1):
    temp = []
#     print(i, end=" ")s
    if not length >len(my_list):
        output.append(my_list[length : length+5])
        length = length+5
        
print(output)

[['abc', 'for', 'abc', 'like', 'geek1'], ['nerdy', 'xyz', 'love', 'questions', 'words'], ['life']]


### Second Highest salary in department

In [146]:
df = ["department", "assetValue"]
dept = [("DEPT1", 1000), ("DEPT1", 500), ("DEPT1", 700), ("DEPT2", 400), ("DEPT2", 200),  ("DEPT3", 500), ("DEPT3", 200)]
# rdd = spark.sparkContext.parallelize(dept)
# rddf = rdd.toDF(df).show()

rdd = spark.createDataFrame(dept, schema=df)
rdd.show()


+----------+----------+
|department|assetValue|
+----------+----------+
|     DEPT1|      1000|
|     DEPT1|       500|
|     DEPT1|       700|
|     DEPT2|       400|
|     DEPT2|       200|
|     DEPT3|       500|
|     DEPT3|       200|
+----------+----------+



In [153]:
win = Window.partitionBy(col("department")).orderBy(col("assetValue").desc())
windowfunc = rdd.withColumn("col3", dense_rank().over(win))
windowfunc.show()



+----------+----------+----+
|department|assetValue|col3|
+----------+----------+----+
|     DEPT1|      1000|   1|
|     DEPT1|       700|   2|
|     DEPT1|       500|   3|
|     DEPT2|       400|   1|
|     DEPT2|       200|   2|
|     DEPT3|       500|   1|
|     DEPT3|       200|   2|
+----------+----------+----+



In [154]:
windowfunc.filter(col("col3") == 2).show()

+----------+----------+----+
|department|assetValue|col3|
+----------+----------+----+
|     DEPT1|       700|   2|
|     DEPT2|       200|   2|
|     DEPT3|       200|   2|
+----------+----------+----+



### List
A1 = [1,2,3]

A2 = [2,3,4]

output = [1, 2, 3, 4]

In [163]:
A1 = [1,2,3]
A2 = [2,3,4]

output = []

# using list extend method
A1.extend(A2)
for i in A1:
    if i not in output:
        output.append(i)
        
print(output)

# using set method

output = list(set(A1))
print(output)


# using zip() function
# both list should be same size, when we are using zip function
for i, j in zip(A1, A2):
#     print(i, j)
    if i not in output:
        output.append(i)
        
    if j not in output:
        output.append(j)
        
print(output)



[1, 2, 3, 4]
[1, 2, 3, 4]
[1, 2, 3, 4]


### name table, and Salary 

In [165]:
nametable = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/nametable.csv")
nametable.show()

salarytable = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/salarytable.csv").selectExpr("id as sid", "salary")
salarytable.show()

+---+-----+
| id| name|
+---+-----+
|  1|Henry|
|  2|Smith|
|  3| Hall|
+---+-----+

+---+------+
|sid|salary|
+---+------+
|  1|   100|
|  2|   500|
|  4|  1000|
+---+------+



In [167]:
leftjoin = nametable.join(salarytable, col("id") == col("sid"), "left").drop(col("sid"))
leftjoin.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|Henry|   100|
|  2|Smith|   500|
|  3| Hall|  null|
+---+-----+------+



In [168]:
finaltable = leftjoin.withColumn("salary", expr("case when salary is not null then salary else 0 End"))
finaltable.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|Henry|   100|
|  2|Smith|   500|
|  3| Hall|     0|
+---+-----+------+



### array_merge

In [169]:
array_merge = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/array_merge.csv")
array_merge.show()

+---+-------+
| id|subject|
+---+-------+
|  1|  Spark|
|  1|  Scala|
|  1|   Hive|
|  2|  Scala|
|  3|  Spark|
|  3|  Scala|
+---+-------+



In [198]:
# use collect_list to make array along with group_by

# array_merge.groupby("id").agg(collect_list("subject")).sort(col("id").desc()).show()
# array_merge.groupby("id").agg(collect_list("subject")).show()
madelist = array_merge.groupby("id").agg(collect_list("subject")).sort(col("id"))
madelist.show()


+---+---------------------+
| id|collect_list(subject)|
+---+---------------------+
|  1| [Spark, Scala, Hive]|
|  2|              [Scala]|
|  3|       [Spark, Scala]|
+---+---------------------+



### pivot marks table

In [205]:
pivot_table = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/pivot_table.csv")
pivot_table.show()

+---+-------+-----+
| Id|subject|marks|
+---+-------+-----+
|101|    Eng|   90|
|101|    Sci|   80|
|101|    Mat|   95|
|102|    Eng|   75|
|102|    Sci|   85|
|102|    Mat|   90|
+---+-------+-----+



In [207]:
pivot_table = pivot_table.withColumn("marks", col("marks").cast("Integer"))
pivot_sub = pivot_table.groupBy("id").pivot("subject").sum("marks")
pivot_sub.show()

+---+---+---+---+
| id|Eng|Mat|Sci|
+---+---+---+---+
|101| 90| 95| 80|
|102| 75| 90| 85|
+---+---+---+---+



## Take number only to list Scala code

val mylist = List(10,5,24,'Hi',90,12,'Hello')

mylist.collect{

    case num : Int => println(num)
    
}

In [209]:
# Take number only to list python code

aplhalist = [10,5,24,'Hi',90,12,'Hello']

for i in aplhalist:
    if isinstance(i, int):
        print(i, end=" ")

10 5 24 90 12 

### email_split

In [210]:
email_split = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/email_split.csv")
email_split.show()

+---+------+--------------------+
| id|  name|               email|
+---+------+--------------------+
|  1| Henry|   henry12@gmail.com|
|  2| Smith|     smith@yahoo.com|
|  3|Martin|martin221@hotmail...|
+---+------+--------------------+



In [213]:
email_df = email_split.withColumn("domain", split("email", "@")[1])
email_df.select("id", "name", "domain").show()

+---+------+-----------+
| id|  name|     domain|
+---+------+-----------+
|  1| Henry|  gmail.com|
|  2| Smith|  yahoo.com|
|  3|Martin|hotmail.com|
+---+------+-----------+



### student table

In [214]:
student_table = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/student table.csv")
student_table.show()

+------------+-------+-----+
|student_name|subject|marks|
+------------+-------+-----+
|           a|english|   99|
|           b|english|   99|
|           b|  maths|   45|
|           c|science|   35|
|           c|english|   98|
+------------+-------+-----+



In [251]:
# using collect method
averagemarks = student_table.selectExpr("avg(marks)").collect()[0][0]
print(averagemarks)
student_table.filter(student_table.marks > averagemarks).show()

# using the sql
student_table.createOrReplaceTempView("stu")
sp = spark.sql("select * from stu where marks > (select avg(marks) from stu)")
sp.show()

75.2
+------------+-------+-----+
|student_name|subject|marks|
+------------+-------+-----+
|           a|english|   99|
|           b|english|   99|
|           c|english|   98|
+------------+-------+-----+

+------------+-------+-----+
|student_name|subject|marks|
+------------+-------+-----+
|           a|english|   99|
|           b|english|   99|
|           c|english|   98|
+------------+-------+-----+



### EmployeeDetails, EmployeeSalary

In [256]:
EmployeeDetails = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/EmployeeDetails.csv")
EmployeeDetails.show()
# EmployeeDetails.printSchema()

EmployeeSalary = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/EmployeeSalary.csv")
EmployeeSalary.show()
# EmployeeSalary.printSchema()

+-----+------------+---------+-------------+----------+
|EmpId|    FullName|ManagerId|DateOfJoining|      City|
+-----+------------+---------+-------------+----------+
|  121|   John Snow|      321|   01/31/2014|   Toronto|
|  321|Walter White|      986|   01/30/2015|California|
|  421|Kuldeep Rana|      876|   27/11/2016| New Delhi|
+-----+------------+---------+-------------+----------+

root
 |-- EmpId: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- ManagerId: string (nullable = true)
 |-- DateOfJoining: string (nullable = true)
 |-- City: string (nullable = true)

+-----+-------+------+--------+
|EmpId|Project|Salary|Variable|
+-----+-------+------+--------+
|  121|     P1|  8000|     500|
|  321|     P2| 10000|    1000|
|  421|     P1| 12000|       0|
+-----+-------+------+--------+

root
 |-- EmpId: string (nullable = true)
 |-- Project: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Variable: string (nullable = true)



In [268]:
# update the salary of employee ID 421 to 8000

updateslary = EmployeeSalary.withColumn("Salary", expr("case when Empid='421' then '8000' else Salary end"))

# updateslary = EmployeeSalary.withColumn("Salary", when(EmployeeSalary.EmpId == "421", "8000").otherwise(EmployeeSalary.Salary))
updateslary.show()

+-----+-------+------+--------+
|EmpId|Project|Salary|Variable|
+-----+-------+------+--------+
|  121|     P1|  8000|     500|
|  321|     P2| 10000|    1000|
|  421|     P1|  8000|       0|
+-----+-------+------+--------+



In [277]:
# create a final dataframe with two columns, where finaldf = employe names and finalsalary(finalsalary = variable + salary)

finalSalary = updateslary.withColumn("finalsalary", updateslary.Salary + updateslary.Variable).selectExpr("EmpId as id", "finalsalary")
finalSalary.show()

finalSelect = finalSalary.join(EmployeeDetails, col("id") == col("EmpId"), "left").select("FullName", "finalsalary")
finalSelect.show()

+---+-----------+
| id|finalsalary|
+---+-----------+
|121|     8500.0|
|321|    11000.0|
|421|     8000.0|
+---+-----------+

+------------+-----------+
|    FullName|finalsalary|
+------------+-----------+
|   John Snow|     8500.0|
|Walter White|    11000.0|
|Kuldeep Rana|     8000.0|
+------------+-----------+



### create Dataframe and explode the array 

In [282]:
# create a sc.dataframe using array

# 1, (IT, HR)
# 2 , (MR, Sales, Finance)

df = ["id", "dept"]
dept = [(1, ["IT", "HR"]), (2 , ["MR", "Sales", "Finance"])]
# rdd = spark.sparkContext.parallelize(dept)
# rddf = rdd.toDF(df).show()

rdd = spark.createDataFrame(dept, schema=df)
rdd.show()
rdd.printSchema()

+---+--------------------+
| id|                dept|
+---+--------------------+
|  1|            [IT, HR]|
|  2|[MR, Sales, Finance]|
+---+--------------------+

root
 |-- id: long (nullable = true)
 |-- dept: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [284]:
explodeddf = rdd.withColumn("dept", explode(col("dept")))
explodeddf.show()

+---+-------+
| id|   dept|
+---+-------+
|  1|     IT|
|  1|     HR|
|  2|     MR|
|  2|  Sales|
|  2|Finance|
+---+-------+



### voting_data

In [289]:
# 1.Read the csv and create a dataframe
voting_data = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/voting_data.csv")
voting_data.show()

+----+---+------+------------+---------+
|Name|Age|Gender|Constituency|Voting_id|
+----+---+------+------------+---------+
| aaa| 22|     M|          TN|   RAZ000|
| bbb| 27|     M|          MH|   RAZ009|
| ccc| 35|     F|          KA|   RAZ007|
| ddd| 16|     F|          TN|   RAZ004|
| eee| 46|     M|          AP|   RAZ002|
+----+---+------+------------+---------+



In [290]:
# 2.Add a new column age_Check to df with condition age > 18 then true else false 
ageRest = voting_data.withColumn("age_check", expr("case when age > 18 then true else false end"))
ageRest.show()

+----+---+------+------------+---------+---------+
|Name|Age|Gender|Constituency|Voting_id|age_check|
+----+---+------+------------+---------+---------+
| aaa| 22|     M|          TN|   RAZ000|     true|
| bbb| 27|     M|          MH|   RAZ009|     true|
| ccc| 35|     F|          KA|   RAZ007|     true|
| ddd| 16|     F|          TN|   RAZ004|    false|
| eee| 46|     M|          AP|   RAZ002|     true|
+----+---+------+------------+---------+---------+



In [291]:
# 3.Create a table from dataframe and retrieve contents from it where gender is male

maletable = ageRest.filter(col("Gender") == "M")
maletable.show()

+----+---+------+------------+---------+---------+
|Name|Age|Gender|Constituency|Voting_id|age_check|
+----+---+------+------------+---------+---------+
| aaa| 22|     M|          TN|   RAZ000|     true|
| bbb| 27|     M|          MH|   RAZ009|     true|
| eee| 46|     M|          AP|   RAZ002|     true|
+----+---+------+------------+---------+---------+



### Patient Plan Data

In [293]:
Patientdata = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/Patientdata.csv")
Patientdata.show()

Plan = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/Plan.csv")
Plan.show()

+-----------+------+---+
|PatientName|PlanId|Amt|
+-----------+------+---+
|          X|     1|100|
|          X|     1|200|
|          Y|     2|200|
|          Y|     2|300|
|          Z|     3|400|
|          B|     3|400|
+-----------+------+---+

+------+--------+-----+
|PlanID|PlanName|Limit|
+------+--------+-----+
|     1|  Plan A|  150|
|     2|  Plan B|  250|
|     3|  Plan C|  350|
+------+--------+-----+



In [301]:
dispatient = Patientdata.dropDuplicates(["PatientName"]).withColumnRenamed("PlanId", "pid")
dispatient.show()

+-----------+---+---+
|PatientName|pid|Amt|
+-----------+---+---+
|          B|  3|400|
|          X|  1|100|
|          Y|  2|200|
|          Z|  3|400|
+-----------+---+---+



In [305]:
dis_plan = dispatient.join(Plan, col("pid") == col("PlanID"), "left").select("PatientName", "PlanName")
# dis_plan.sort(col("PlanName")).show()
dis_plan.show()

+-----------+--------+
|PatientName|PlanName|
+-----------+--------+
|          B|  Plan C|
|          X|  Plan A|
|          Y|  Plan B|
|          Z|  Plan C|
+-----------+--------+



# Batch 35 Qs

In [311]:
schema = ["store","entries"]
data =  [(1,["p1","p2","p3","p4"]),
(2,["p1"]),
(3,["p1","p2"]),
(4,['p1',"p2","p3",'p4',"p5","p6","p7"])]

df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)

+-----+----------------------------+
|store|entries                     |
+-----+----------------------------+
|1    |[p1, p2, p3, p4]            |
|2    |[p1]                        |
|3    |[p1, p2]                    |
|4    |[p1, p2, p3, p4, p5, p6, p7]|
+-----+----------------------------+



In [312]:
store_ = df.select("store").distinct().count()
print(store_)

4


In [318]:
enteries_ = df.withColumn("entries", explode(col("entries")))
enteries_.show()
enteries_.printSchema()

+-----+-------+
|store|entries|
+-----+-------+
|    1|     p1|
|    1|     p2|
|    1|     p3|
|    1|     p4|
|    2|     p1|
|    3|     p1|
|    3|     p2|
|    4|     p1|
|    4|     p2|
|    4|     p3|
|    4|     p4|
|    4|     p5|
|    4|     p6|
|    4|     p7|
+-----+-------+

root
 |-- store: long (nullable = true)
 |-- entries: string (nullable = true)



In [320]:
count_ent = enteries_.select("entries").distinct().count()
print(count_ent)

7


In [322]:
spark.createDataFrame([(store_, count_ent)], schema=["store" , "entries"]).show()

+-----+-------+
|store|entries|
+-----+-------+
|    4|      7|
+-----+-------+



### stu_table

In [323]:
stu_table = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/stu_table.csv")
stu_table.show()

+------+------+-----+---+----+---+------+
|RollNo|  name|tamil|eng|math|sci|social|
+------+------+-----+---+----+---+------+
|203040|Rajesh|   10| 20|  30| 40|    50|
+------+------+-----+---+----+---+------+



In [326]:
stu_table.withColumn("total" , (col("tamil")+col("eng")+col("math")+col("sci")+col("social")).cast("Integer")).show()

+------+------+-----+---+----+---+------+-----+
|RollNo|  name|tamil|eng|math|sci|social|total|
+------+------+-----+---+----+---+------+-----+
|203040|Rajesh|   10| 20|  30| 40|    50|  150|
+------+------+-----+---+----+---+------+-----+



### email_phone

In [327]:
email_phone = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/email_phone.csv")
email_phone.show()

+--------------------+----------+
|                Mail|       mob|
+--------------------+----------+
|Renuka1992@gmail.com|9856765434|
|anbu.arasu@gmail.com|9844567788|
+--------------------+----------+



In [331]:
regex_replce_data = email_phone.withColumn("Mail", regexp_replace(col("Mail"),"(?<!^).(?=.+@)","*")) \
                                .withColumn("mob", regexp_replace(col("mob"),"(?<!^).(?!$)","*"))
    
regex_replce_data.show()

+--------------------+----------+
|                Mail|       mob|
+--------------------+----------+
|R********2@gmail.com|9********4|
|a********u@gmail.com|9********8|
+--------------------+----------+



### stock_data

In [336]:
col = ["StockId","PredictedPrice"]
data = [("RIL", [1000,1005,1090,1200,1000,900,890]),
("HDFC", [890,940,810,730,735,960,980]),
("INFY", [1001,902,1000,990,1230,1100,1200])]

stock_data = spark.createDataFrame(data, schema=col)
stock_data.show(truncate=False)
stock_data.printSchema()

+-------+----------------------------------------+
|StockId|PredictedPrice                          |
+-------+----------------------------------------+
|RIL    |[1000, 1005, 1090, 1200, 1000, 900, 890]|
|HDFC   |[890, 940, 810, 730, 735, 960, 980]     |
|INFY   |[1001, 902, 1000, 990, 1230, 1100, 1200]|
+-------+----------------------------------------+

root
 |-- StockId: string (nullable = true)
 |-- PredictedPrice: array (nullable = true)
 |    |-- element: long (containsNull = true)



In [343]:
explode_stock_data = stock_data.withColumn("PredictedPrice", explode(stock_data.PredictedPrice))
explode_stock_data.show()

+-------+--------------+
|StockId|PredictedPrice|
+-------+--------------+
|    RIL|          1000|
|    RIL|          1005|
|    RIL|          1090|
|    RIL|          1200|
|    RIL|          1000|
|    RIL|           900|
|    RIL|           890|
|   HDFC|           890|
|   HDFC|           940|
|   HDFC|           810|
|   HDFC|           730|
|   HDFC|           735|
|   HDFC|           960|
|   HDFC|           980|
|   INFY|          1001|
|   INFY|           902|
|   INFY|          1000|
|   INFY|           990|
|   INFY|          1230|
|   INFY|          1100|
+-------+--------------+
only showing top 20 rows



In [346]:
max_stock_data = explode_stock_data.groupBy("StockId").max("PredictedPrice").withColumnRenamed("max(PredictedPrice)", "SellPrice")
max_stock_data.show()

+-------+---------+
|StockId|SellPrice|
+-------+---------+
|    RIL|     1200|
|   HDFC|      980|
|   INFY|     1230|
+-------+---------+



In [348]:
min_stock_data = explode_stock_data.groupBy("StockId").min("PredictedPrice").withColumnRenamed("StockId","StockId1").withColumnRenamed("min(PredictedPrice)", "BuyPrice")
min_stock_data.show()

+--------+--------+
|StockId1|BuyPrice|
+--------+--------+
|     RIL|     890|
|    HDFC|     730|
|    INFY|     902|
+--------+--------+



In [351]:
final_stock1 = max_stock_data.join(min_stock_data, max_stock_data.StockId == min_stock_data.StockId1, "left")
final_stock1.show()

+-------+---------+--------+--------+
|StockId|SellPrice|StockId1|BuyPrice|
+-------+---------+--------+--------+
|    RIL|     1200|     RIL|     890|
|   HDFC|      980|    HDFC|     730|
|   INFY|     1230|    INFY|     902|
+-------+---------+--------+--------+



In [353]:
final_stock = final_stock1.withColumn("Profit", final_stock1.SellPrice - final_stock1.BuyPrice) \
                            .select("StockId", "SellPrice", "BuyPrice", "Profit")
final_stock.show()

+-------+---------+--------+------+
|StockId|SellPrice|BuyPrice|Profit|
+-------+---------+--------+------+
|    RIL|     1200|     890|   310|
|   HDFC|      980|     730|   250|
|   INFY|     1230|     902|   328|
+-------+---------+--------+------+



### travel_data

In [356]:
schema = ['name','travel_location','age']
data = [('ravi',['pune','delhi','chennai','noida'],32),
('gautham',['delhi','chennai'],30),
('mary',['noida','pune'],35),
('thomas',['delhi','pune'],31),
('shankar',['chennai','noida'],30)]
 
df = spark.createDataFrame(data, schema=schema)
df.show()

+-------+--------------------+---+
|   name|     travel_location|age|
+-------+--------------------+---+
|   ravi|[pune, delhi, che...| 32|
|gautham|    [delhi, chennai]| 30|
|   mary|       [noida, pune]| 35|
| thomas|       [delhi, pune]| 31|
|shankar|    [chennai, noida]| 30|
+-------+--------------------+---+



In [357]:
explode_df = df.withColumn("travel_location", explode(df.travel_location))
explode_df.show()

# explode_df.groupBy("name").agg(count(col("travel_location"))).show()

+-------+---------------+---+
|   name|travel_location|age|
+-------+---------------+---+
|   ravi|           pune| 32|
|   ravi|          delhi| 32|
|   ravi|        chennai| 32|
|   ravi|          noida| 32|
|gautham|          delhi| 30|
|gautham|        chennai| 30|
|   mary|          noida| 35|
|   mary|           pune| 35|
| thomas|          delhi| 31|
| thomas|           pune| 31|
|shankar|        chennai| 30|
|shankar|          noida| 30|
+-------+---------------+---+



In [361]:
# total_travel = explode_df.groupBy("name").agg(count(explode_df.travel_location)).show()
total_travel = explode_df.groupBy("name").agg(count("travel_location")).withColumnRenamed("count(travel_location)", "total")
total_travel.show()

+-------+-----+
|   name|total|
+-------+-----+
|   ravi|    4|
|gautham|    2|
|   mary|    2|
| thomas|    2|
|shankar|    2|
+-------+-----+



In [367]:
total_travel.orderBy(desc("total")).limit(1).show()

+----+-----+
|name|total|
+----+-----+
|ravi|    4|
+----+-----+



#### I have three columns deptno,deptname and place using scala read a text file and display only deptno and deptname. no rdds and dataframes used.

In [379]:
split_dipt = sc.textFile("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/split_dipt.txt")
split_dipt.foreach(print)

splitted_dipt = split_dipt.map(lambda x : x.split(","))
splitted_dipt.foreach(print)

splitted_dipt.map(lambda x : (x[0], x[1])).foreach(print)





## second_highest_salary

In [380]:
second_highest_salary = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/second_highest_salary.csv")
second_highest_salary.show()

+-----+-------+----+----+
|EmpId|EmpName|Dept| Sal|
+-----+-------+----+----+
|    1|Charles|   A|1000|
|    2|Richard|   A|2000|
|    3|   John|   A|2000|
|    4| Alisha|   B| 400|
|    5|  Robin|   B| 500|
|    6|   Kara|   C| 700|
|    7|Natalie|   D| 900|
|    8|  Harry|   C| 600|
|    9|Charles|   D| 500|
|   10|   Kate|   A|1000|
+-----+-------+----+----+



In [405]:
# get 2nd highest salary using window functions
casted_data = second_highest_salary.withColumn("sal", second_highest_salary.Sal.cast("Integer"))
windowfunc = Window.orderBy(desc("Sal"))

final_sal = casted_data.withColumn("rank", dense_rank().over(windowfunc))
final_sal.show()

final1 = final_sal.filter(final_sal.rank == 2).limit(1)
final1.select("sal").show()

+-----+-------+----+----+----+
|EmpId|EmpName|Dept| sal|rank|
+-----+-------+----+----+----+
|    2|Richard|   A|2000|   1|
|    3|   John|   A|2000|   1|
|    1|Charles|   A|1000|   2|
|   10|   Kate|   A|1000|   2|
|    7|Natalie|   D| 900|   3|
|    6|   Kara|   C| 700|   4|
|    8|  Harry|   C| 600|   5|
|    5|  Robin|   B| 500|   6|
|    9|Charles|   D| 500|   6|
|    4| Alisha|   B| 400|   7|
+-----+-------+----+----+----+

+----+
| sal|
+----+
|1000|
+----+



### makeToList

In [406]:
makeToList = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/makeToList.csv")
makeToList.show()

+------+-----------+-------+
|CustId|   CustName|Address|
+------+-----------+-------+
|     1|   Mark Ray|     AB|
|     2|Peter Smith|     CD|
|     1|   Mark Ray|     EF|
|     2|Peter Smith|     GH|
|     2|Peter Smith|     CD|
|     3|       Kate|     IJ|
+------+-----------+-------+



In [410]:
makeToList.groupBy("CustName").agg(collect_list("Address")).show()

+-----------+---------------------+
|   CustName|collect_list(Address)|
+-----------+---------------------+
|       Kate|                 [IJ]|
|Peter Smith|         [CD, GH, CD]|
|   Mark Ray|             [AB, EF]|
+-----------+---------------------+



### custTable, pincodeTable

In [419]:
custTable = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/custTable.csv")
custTable.show()

pincodeTable = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/pincodeTable.csv").withColumnRenamed("PinCode", "PinCode1")
pincodeTable.show()

+------+--------+-------+-------+
|CustId|CustName|Address|PinCode|
+------+--------+-------+-------+
|     1|    Mark|     AB|    123|
|     2|   Peter|     CD|    456|
|     3|    Kate|     EF|    789|
+------+--------+-------+-------+

+--------+-----+
|PinCode1|State|
+--------+-----+
|     123|   MH|
|     456|   WB|
+--------+-----+



In [422]:
finalCust = custTable.join(pincodeTable, custTable.PinCode == pincodeTable.PinCode1, "inner")
finalCust.select("CustId","CustName","Address","PinCode", "State").show()

+------+--------+-------+-------+-----+
|CustId|CustName|Address|PinCode|State|
+------+--------+-------+-------+-----+
|     1|    Mark|     AB|    123|   MH|
|     2|   Peter|     CD|    456|   WB|
+------+--------+-------+-------+-----+



### explode_json_data

In [436]:
explode_json_data = spark.read.format("json").option("multiline", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/explode_json_data.json")
explode_json_data.show()
explode_json_data.printSchema()

+--------------------+
|                 emp|
+--------------------+
|[{[{hyderabad, in...|
+--------------------+

root
 |-- emp: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- line1: string (nullable = true)
 |    |    |    |    |-- line2: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)



In [438]:
df1 = explode_json_data.withColumn("emp", explode("emp"))
df1.show()
df1.printSchema()

+--------------------+
|                 emp|
+--------------------+
|{[{hyderabad, ind...|
|{[{bng, ind}, {bn...|
|{[{bng, ind}, {bn...|
+--------------------+

root
 |-- emp: struct (nullable = true)
 |    |-- address: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- line1: string (nullable = true)
 |    |    |    |-- line2: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)



In [441]:
df2 = df1.select("emp.id" , "emp.name", "emp.address")
df2.show()
df2.printSchema()

+---+----+--------------------+
| id|name|             address|
+---+----+--------------------+
|123| xyz|[{hyderabad, ind}...|
|124| abc|[{bng, ind}, {bng...|
|125|jhon|[{bng, ind}, {bng...|
+---+----+--------------------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- line1: string (nullable = true)
 |    |    |-- line2: string (nullable = true)



In [442]:
df3 =df2.withColumn("address", explode("address"))
df3.show()
df3.printSchema()

+---+----+-----------------+
| id|name|          address|
+---+----+-----------------+
|123| xyz| {hyderabad, ind}|
|123| xyz|{hyderabad, null}|
|123| xyz|      {null, ind}|
|124| abc|       {bng, ind}|
|124| abc|      {bng, null}|
|124| abc|     {null, null}|
|125|jhon|       {bng, ind}|
|125|jhon|      {bng, null}|
|125|jhon|     {null, null}|
+---+----+-----------------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- line1: string (nullable = true)
 |    |-- line2: string (nullable = true)



In [443]:
df4 = df3.select("id", "name", "address.*")
df4.show()
df4.printSchema()

+---+----+---------+-----+
| id|name|    line1|line2|
+---+----+---------+-----+
|123| xyz|hyderabad|  ind|
|123| xyz|hyderabad| null|
|123| xyz|     null|  ind|
|124| abc|      bng|  ind|
|124| abc|      bng| null|
|124| abc|     null| null|
|125|jhon|      bng|  ind|
|125|jhon|      bng| null|
|125|jhon|     null| null|
+---+----+---------+-----+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- line1: string (nullable = true)
 |-- line2: string (nullable = true)



### Populate_Manager

In [445]:
Populate_Manager = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/Populate_Manager.csv")
Populate_Manager.show()

+-----+-------+------+---------+
|Empid|empname|salary|managerid|
+-----+-------+------+---------+
|    1|    xyz| 10000|     NULL|
|    2|    abc| 20000|        1|
|    3|   fgkb| 30000|        2|
|    4|   gfkj| 50000|        2|
+-----+-------+------+---------+



In [447]:
# using sql 
Populate_Manager.createOrReplaceTempView("managertable")
df2 = spark.sql("select Empid, empname, salary, managerid from managertable where empid in (select distinct managerid from managertable)")
df2.show()

+-----+-------+------+---------+
|Empid|empname|salary|managerid|
+-----+-------+------+---------+
|    1|    xyz| 10000|     NULL|
|    2|    abc| 20000|        1|
+-----+-------+------+---------+



In [451]:
# using dsl
mgid = Populate_Manager.select("managerid").distinct().withColumnRenamed("managerid", "mgid")
mgid.show()

+----+
|mgid|
+----+
|   1|
|   2|
|NULL|
+----+



In [459]:
mgid.join(Populate_Manager, mgid.mgid == Populate_Manager.Empid, "inner").drop("mgid").show()

+-----+-------+------+---------+
|Empid|empname|salary|managerid|
+-----+-------+------+---------+
|    1|    xyz| 10000|     NULL|
|    2|    abc| 20000|        1|
+-----+-------+------+---------+



### parent_child_grand

In [464]:
parent_child_grand = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/parent_child_grand.csv")
parent_child_grand.show()

parent_child_grand1 = parent_child_grand.withColumnRenamed("child", "child1").withColumnRenamed("parent","parent1")
parent_child_grand1.show()

+-----+------+
|child|parent|
+-----+------+
|    A|    AA|
|    B|    BB|
|    C|    CC|
|   AA|   AAA|
|   BB|   BBB|
|   CC|   CCC|
+-----+------+

+------+-------+
|child1|parent1|
+------+-------+
|     A|     AA|
|     B|     BB|
|     C|     CC|
|    AA|    AAA|
|    BB|    BBB|
|    CC|    CCC|
+------+-------+



In [468]:
grandParent = parent_child_grand.join(parent_child_grand1, parent_child_grand1.child1 == parent_child_grand.parent , "inner")
grandParent.show()

grandParent.withColumnRenamed("parent1", "Grandparent").drop("child1").show()

+-----+------+------+-------+
|child|parent|child1|parent1|
+-----+------+------+-------+
|    A|    AA|    AA|    AAA|
|    B|    BB|    BB|    BBB|
|    C|    CC|    CC|    CCC|
+-----+------+------+-------+

+-----+------+-----------+
|child|parent|Grandparent|
+-----+------+-----------+
|    A|    AA|        AAA|
|    B|    BB|        BBB|
|    C|    CC|        CCC|
+-----+------+-----------+



### Employee_manager

In [469]:
Employee_manager = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/Employee_manager.csv")
Employee_manager.show()

+---+-------------+------+----------+
| id|         name|salary|manager_id|
+---+-------------+------+----------+
|  1|   John Smith| 10000|         3|
|  2|Jane Anderson| 12000|         3|
|  3|    Tom Lanon| 15000|         4|
|  4|  Anne Connor| 20000|      NULL|
|  5|  Jeremy York|  9000|         1|
+---+-------------+------+----------+



In [472]:
manager_ids = Employee_manager.selectExpr("manager_id as mgid").distinct()
manager_ids.show()

+----+
|mgid|
+----+
|   3|
|   1|
|   4|
|NULL|
+----+



In [474]:
Employee_manager.join(manager_ids, Employee_manager.id == manager_ids.mgid, "inner").drop("mgid").show()

+---+-----------+------+----------+
| id|       name|salary|manager_id|
+---+-----------+------+----------+
|  1| John Smith| 10000|         3|
|  3|  Tom Lanon| 15000|         4|
|  4|Anne Connor| 20000|      NULL|
+---+-----------+------+----------+



### salary_inc

In [481]:
salary_inc = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/salary_inc.csv")
salary_inc.show()

+-----------+------+----+
|employee_id|salary|year|
+-----------+------+----+
|          1| 80000|2020|
|          1| 70000|2019|
|          1| 60000|2018|
|          2| 65000|2020|
|          2| 65000|2019|
|          2| 60000|2018|
|          3| 65000|2019|
|          3| 60000|2018|
+-----------+------+----+



In [483]:
windowFunc = Window.partitionBy("employee_id").orderBy("year")

nextsal = salary_inc.withColumn("nextSal", lead("salary", 1).over(windowFunc))
nextsal.show()

+-----------+------+----+-------+
|employee_id|salary|year|nextSal|
+-----------+------+----+-------+
|          1| 60000|2018|  70000|
|          1| 70000|2019|  80000|
|          1| 80000|2020|   null|
|          2| 60000|2018|  65000|
|          2| 65000|2019|  65000|
|          2| 65000|2020|   null|
|          3| 60000|2018|  65000|
|          3| 65000|2019|   null|
+-----------+------+----+-------+



In [487]:
# nextsal.withColumn("diff", expr("case when nextsal is null then 0 else abs(nextSal-salary) end").cast("Integer")).show()
nextsal.withColumn("diff", expr("abs(nextSal-salary)").cast("Integer")).show()

+-----------+------+----+-------+-----+
|employee_id|salary|year|nextSal| diff|
+-----------+------+----+-------+-----+
|          1| 60000|2018|  70000|10000|
|          1| 70000|2019|  80000|10000|
|          1| 80000|2020|   null| null|
|          2| 60000|2018|  65000| 5000|
|          2| 65000|2019|  65000|    0|
|          2| 65000|2020|   null| null|
|          3| 60000|2018|  65000| 5000|
|          3| 65000|2019|   null| null|
+-----------+------+----+-------+-----+



### Customers_split.txt

In [488]:
data = sc.textFile("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/Customers_split.txt")
data.foreach(print)
# see output in console

In [489]:
data1 = data.map(lambda x : x.replace("|^|", ","))
data1.foreach(print)

In [500]:
data2 = data1.map(lambda x : tuple(x.split(",")))
data2.foreach(print)

### values_lead

In [501]:
values_lead = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/values_lead.csv")
values_lead.show()

+--------+----------+------+
|sensorid| timestamp|values|
+--------+----------+------+
|   11111|2021-01-15|    10|
|   11111|2021-01-16|    15|
|   11111|2021-01-17|    30|
|   11112|2021-01-15|    10|
|   11112|2021-01-16|    20|
|   11112|2021-01-17|    30|
+--------+----------+------+



In [503]:
windowfunc = Window.partitionBy("sensorid").orderBy("timestamp")

value_lead_data = values_lead.withColumn("values1", lead("values", 1).over(windowfunc))
value_lead_data.show()

+--------+----------+------+-------+
|sensorid| timestamp|values|values1|
+--------+----------+------+-------+
|   11111|2021-01-15|    10|     15|
|   11111|2021-01-16|    15|     30|
|   11111|2021-01-17|    30|   null|
|   11112|2021-01-15|    10|     20|
|   11112|2021-01-16|    20|     30|
|   11112|2021-01-17|    30|   null|
+--------+----------+------+-------+



In [505]:
data1 = value_lead_data.withColumn("values", expr("abs(values1-values)").cast("Int")).drop("values1")
data1.show()

+--------+----------+------+
|sensorid| timestamp|values|
+--------+----------+------+
|   11111|2021-01-15|     5|
|   11111|2021-01-16|    15|
|   11111|2021-01-17|  null|
|   11112|2021-01-15|    10|
|   11112|2021-01-16|    10|
|   11112|2021-01-17|  null|
+--------+----------+------+



In [506]:
findata = data1.filter("values is not null")
findata.show()

+--------+----------+------+
|sensorid| timestamp|values|
+--------+----------+------+
|   11111|2021-01-15|     5|
|   11111|2021-01-16|    15|
|   11112|2021-01-15|    10|
|   11112|2021-01-16|    10|
+--------+----------+------+



### Average_Salary_Under_Manager 

In [508]:
Average_Salary_Under_Manager = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/Average_Salary_Under_Manager.csv")
Average_Salary_Under_Manager.show()

+------+--------+------+----------+
|Emp_Id|Emp_name|Salary|Manager_Id|
+------+--------+------+----------+
|    10|    Anil| 50000|        18|
|    11|   Vikas| 75000|        16|
|    12|   Nisha| 40000|        18|
|    13|   Nidhi| 60000|        17|
|    14|   Priya| 80000|        18|
|    15|   Mohit| 45000|        18|
|    16|  Rajesh| 90000|        16|
|    17|   Raman| 55000|        16|
|    18| Santosh| 65000|        17|
+------+--------+------+----------+



In [512]:
dismanger = Average_Salary_Under_Manager.select("Manager_Id").withColumnRenamed("Manager_Id", "mid").distinct()
dismanger.show()
Average_Salary_Under_Manager.join(dismanger, Average_Salary_Under_Manager.Emp_Id == dismanger.mid, "inner").show()

+---+
|mid|
+---+
| 16|
| 18|
| 17|
+---+

+------+--------+------+----------+---+
|Emp_Id|Emp_name|Salary|Manager_Id|mid|
+------+--------+------+----------+---+
|    16|  Rajesh| 90000|        16| 16|
|    17|   Raman| 55000|        16| 17|
|    18| Santosh| 65000|        17| 18|
+------+--------+------+----------+---+



In [520]:
intdata = Average_Salary_Under_Manager.withColumn("Salary", Average_Salary_Under_Manager.Salary.cast("Integer"))
grouped_data = intdata.groupBy("Manager_Id").agg(avg("Salary").cast("Integer").alias("Average_Salary_Under_Manager")).withColumnRenamed("Manager_Id","mid")
grouped_data.show()

+---+----------------------------+
|mid|Average_Salary_Under_Manager|
+---+----------------------------+
| 16|                       73333|
| 18|                       53750|
| 17|                       62500|
+---+----------------------------+



In [522]:
final1 = Average_Salary_Under_Manager.join(grouped_data, Average_Salary_Under_Manager.Emp_Id == grouped_data.mid, "inner")
final1.show()

+------+--------+------+----------+---+----------------------------+
|Emp_Id|Emp_name|Salary|Manager_Id|mid|Average_Salary_Under_Manager|
+------+--------+------+----------+---+----------------------------+
|    16|  Rajesh| 90000|        16| 16|                       73333|
|    17|   Raman| 55000|        16| 17|                       62500|
|    18| Santosh| 65000|        17| 18|                       53750|
+------+--------+------+----------+---+----------------------------+



In [523]:
final = final1.selectExpr("Emp_Id as Manager_Id", "Emp_name as Manager", "Average_Salary_Under_Manager")
final.show()

+----------+-------+----------------------------+
|Manager_Id|Manager|Average_Salary_Under_Manager|
+----------+-------+----------------------------+
|        16| Rajesh|                       73333|
|        18|Santosh|                       53750|
|        17|  Raman|                       62500|
+----------+-------+----------------------------+



### equal_salary

In [524]:
equal_salary = spark.read.format("csv").option("header", "true")\
    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/equal_salary.csv")
equal_salary.show()

+---------+----------+---------+------+-------------------+----------+
|WORKER_ID|FIRST_NAME|LAST_NAME|SALARY|       JOINING_DATE|DEPARTMENT|
+---------+----------+---------+------+-------------------+----------+
|      001|    Monika|    Arora|100000|2014-02-20 09:00:00|        HR|
|      002|  Niharika|    Verma|300000|2014-06-11 09:00:00|     Admin|
|      003|    Vishal|  Singhal|300000|2014-02-20 09:00:00|        HR|
|      004|   Amitabh|    Singh|500000|2014-02-20 09:00:00|     Admin|
|      005|     Vivek|    Bhati|500000|2014-06-11 09:00:00|     Admin|
+---------+----------+---------+------+-------------------+----------+



### python Rjust, ljust

In [527]:
# 1
# 01
# 011
# 0111
# 01111

list = ["1", "01", "011", "0111", "01111"]
for i in list:
    print(i.rjust(5, "0"))
    

00001
00001
00011
00111
01111


### python rpad, lpad

In [533]:
# 1
# 01
# 011
# 0111
# 01111

data=spark.read.format("csv").option("header","true")\
                    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/pattern.txt")
data.show()
data.withColumn("input", lpad(data.input, 5, "0")).show()

+-----+
|input|
+-----+
|    1|
|   01|
|  011|
| 0111|
|01111|
+-----+

+-----+
|input|
+-----+
|00001|
|00001|
|00011|
|00111|
|01111|
+-----+



### equal_salary

In [535]:
equal_salary=spark.read.format("csv").option("header","true")\
                    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/equal_salary.csv")
equal_salary.show()

+---------+----------+---------+------+-------------------+----------+
|WORKER_ID|FIRST_NAME|LAST_NAME|SALARY|       JOINING_DATE|DEPARTMENT|
+---------+----------+---------+------+-------------------+----------+
|      001|    Monika|    Arora|100000|2014-02-20 09:00:00|        HR|
|      002|  Niharika|    Verma|300000|2014-06-11 09:00:00|     Admin|
|      003|    Vishal|  Singhal|300000|2014-02-20 09:00:00|        HR|
|      004|   Amitabh|    Singh|500000|2014-02-20 09:00:00|     Admin|
|      005|     Vivek|    Bhati|500000|2014-06-11 09:00:00|     Admin|
+---------+----------+---------+------+-------------------+----------+



In [539]:
equal_salary.groupBy("SALARY").agg(collect_list("FIRST_NAME").alias("SameSalary")).show()

+------+------------------+
|SALARY|        SameSalary|
+------+------------------+
|300000|[Niharika, Vishal]|
|100000|          [Monika]|
|500000|  [Amitabh, Vivek]|
+------+------------------+



### Ordered_dispatch

In [540]:
Ordered_dispatch=spark.read.format("csv").option("header","true")\
                    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/Ordered_dispatch.csv")
Ordered_dispatch.show()

+--------+-----------+----------+
|Order_id|status_date|    status|
+--------+-----------+----------+
|       1|      1-Jan|   Ordered|
|       1|      2-Jan|dispatched|
|       1|      3-Jan|dispatched|
|       1|      4-Jan|   Shipped|
|       1|      5-Jan|   Shipped|
|       1|      6-Jan| Delivered|
|       2|      1-Jan|   Ordered|
|       2|      2-Jan|dispatched|
|       2|      3-Jan|   shipped|
+--------+-----------+----------+



In [541]:
Ordered_dispatch.filter(Ordered_dispatch.status == "dispatched").show()

+--------+-----------+----------+
|Order_id|status_date|    status|
+--------+-----------+----------+
|       1|      2-Jan|dispatched|
|       1|      3-Jan|dispatched|
|       2|      2-Jan|dispatched|
+--------+-----------+----------+



### input: aabbccabca
### output: a2b2c2a1b1c1a1

In [561]:
# this code is not entirely correct
# input = "aabbccabca"
# output = a2b2c2a1b1c1a1

input = "aabbccabcaa"
num = 1
output = ""

for i in range(1, len(input)):
    if input[i-1] == input[i]:
        num = num+1
    else:
#         print(output)
        output = output + input[i-1] + str(num)
        num = 1
        
print(output)
    

a2b2c2a1b1c1


In [2]:
# this code is correct
input1 = "aaabbccaaabbba"
output = ""
i = 0

while i < len(input1):
    sum = 1
    while i + 1 < len(input1) and input1[i] == input1[i+1] :
        sum = sum + 1
        i += 1

    output = output + input1[i] + str(sum)
    i += 1
     
print(output)

a3b2c2a3b3a1


In [None]:
# scala code

# def func(s: String): String = {
#   var result = ""
#   var count = 1

#   for (i <- 1 until s.length) {
#     if (s(i) == s(i - 1)) {
#       count += 1
#     } else {
#       result += s(i - 1) + count.toString
#       count = 1
#     }
#   }

#   result
# }

# val input = "aabbccabca"
# val result = func(input)
# println(result)

In [None]:
### teamName_match 

In [562]:
teamName_match =spark.read.format("csv").option("header","true")\
                    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/teamName_match.csv")
teamName_match.show()

+---+-----------+
| Id|   TeamName|
+---+-----------+
|  1|      India|
|  2|  Australia|
|  3|    England|
|  4|New Zealand|
+---+-----------+



In [564]:
teamName_match1 = teamName_match.selectExpr("id as id1", "TeamName as tm1")
teamName_match1.show()

+---+-----------+
|id1|        tm1|
+---+-----------+
|  1|      India|
|  2|  Australia|
|  3|    England|
|  4|New Zealand|
+---+-----------+



In [573]:
# using sql
teamName_match.createOrReplaceTempView("df")
spark.sql("select concat(t1.TeamName,' vs ',t2.Teamname) as output from df t1 join df t2 on t1.TeamName < t2.TeamName").show()


+--------------------+
|              output|
+--------------------+
|India vs New Zealand|
|  Australia vs India|
|Australia vs England|
|Australia vs New ...|
|    England vs India|
|England vs New Ze...|
+--------------------+



In [577]:
# # using DSL
joinedtable = teamName_match.join(teamName_match1, teamName_match.TeamName < teamName_match1.tm1, "inner")
joinedtable.show()

+---+---------+---+-----------+
| Id| TeamName|id1|        tm1|
+---+---------+---+-----------+
|  1|    India|  4|New Zealand|
|  2|Australia|  1|      India|
|  2|Australia|  3|    England|
|  2|Australia|  4|New Zealand|
|  3|  England|  1|      India|
|  3|  England|  4|New Zealand|
+---+---------+---+-----------+



In [578]:
joinedtable.selectExpr("concat(TeamName, ' vs ' , tm1) as output").show()

+--------------------+
|              output|
+--------------------+
|India vs New Zealand|
|  Australia vs India|
|Australia vs England|
|Australia vs New ...|
|    England vs India|
|England vs New Ze...|
+--------------------+



### in python
### list1= ["India", "Australia", "England", "New Zealand"]
#### output should be same as above

In [580]:
list1= ["India", "Australia", "England", "New Zealand"]

for i in range(len(list1)):
    for j in range(len(list1)):
        if i < j:
            print(list1[i] + " vs " + list1[j])
    

India vs Australia
India vs England
India vs New Zealand
Australia vs England
Australia vs New Zealand
England vs New Zealand


### flight source and destination

In [605]:
### self join example

# empDF.as("emp1").join(empDF.as("emp2"), col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
# .select(col("emp1.emp_id"),col("emp1.name"),col("emp2.emp_id").as("superior_emp_id"),col("emp2.name").as("superior_emp_name"))
# .show(false)


In [600]:
schema = ["id", "airway", "src", "dest"]
Values = [(1, 'Indigo', 'India', 'Bhutan'),
(2, 'Air Asia', 'Aus', 'India'),
(3, 'Indigo', 'Nepal', 'SriLanka'),
(4, 'spice jet', 'SriLanka', 'Bhutan'),
(5, 'Indigo', 'Bhutan', 'SriLanka'),
(6, 'Air Asia', 'India', 'Japan'),
(7, 'spice jet', 'Bhutan', 'Nepal')]

dataframe = spark.createDataFrame(Values, schema=schema)
dataframe.show()

+---+---------+--------+--------+
| id|   airway|     src|    dest|
+---+---------+--------+--------+
|  1|   Indigo|   India|  Bhutan|
|  2| Air Asia|     Aus|   India|
|  3|   Indigo|   Nepal|SriLanka|
|  4|spice jet|SriLanka|  Bhutan|
|  5|   Indigo|  Bhutan|SriLanka|
|  6| Air Asia|   India|   Japan|
|  7|spice jet|  Bhutan|   Nepal|
+---+---------+--------+--------+



In [601]:
dataframe1 = dataframe.selectExpr("airway as airway1", "src as src1", "dest as dest1")
dataframe1.show()

+---+---------+--------+--------+
|id1|  airway1|    src1|   dest1|
+---+---------+--------+--------+
|  1|   Indigo|   India|  Bhutan|
|  2| Air Asia|     Aus|   India|
|  3|   Indigo|   Nepal|SriLanka|
|  4|spice jet|SriLanka|  Bhutan|
|  5|   Indigo|  Bhutan|SriLanka|
|  6| Air Asia|   India|   Japan|
|  7|spice jet|  Bhutan|   Nepal|
+---+---------+--------+--------+



In [602]:
joindf = dataframe.join(dataframe1, dataframe.dest == dataframe1.src1, "left").filter(dataframe.airway == dataframe1.airway1)
joindf.show()

+---+---------+--------+------+---+---------+------+--------+
| id|   airway|     src|  dest|id1|  airway1|  src1|   dest1|
+---+---------+--------+------+---+---------+------+--------+
|  2| Air Asia|     Aus| India|  6| Air Asia| India|   Japan|
|  1|   Indigo|   India|Bhutan|  5|   Indigo|Bhutan|SriLanka|
|  4|spice jet|SriLanka|Bhutan|  7|spice jet|Bhutan|   Nepal|
+---+---------+--------+------+---+---------+------+--------+



In [604]:
joindf.selectExpr("airway as Flight", "src as Source", " dest1 as Destination").show()

+---------+--------+-----------+
|   Flight|  Source|Destination|
+---------+--------+-----------+
| Air Asia|     Aus|      Japan|
|   Indigo|   India|   SriLanka|
|spice jet|SriLanka|      Nepal|
+---------+--------+-----------+



## list with wrong index

In [608]:
list = ['a', 'b', 'c', 'd', 'e']
print(list[9:]) 
print(list[-9:]) 

[]
['a', 'b', 'c', 'd', 'e']


## Reverse a string

In [610]:
# INPUT  ==> ("Hello World")
# OUTPUT ==> ("olleH dlroW")

input = "Hello World"
output = ""

for i in input:
    output = i + output
    
print(output)

dlroW olleH


In [None]:
# scala code

# val input="Hello World"
# val split_word= input.split(" ")
# val reverse=split_word.map(x=>x.reverse) 
# val result=reverse.reduce((a,b)=>a+" "+b)
# print(result)

### token_data

In [611]:
token_date =spark.read.format("csv").option("header","true")\
                    .load("file:///D:/Data Analytics applications/Eclipse Projects/scenarios_data/token_date.csv")
token_date.show()

+-----+----------+----------+
|Token|      Date|      Time|
+-----+----------+----------+
|12345|2023-05-17|12:29:47AM|
|12345|2023-05-17|12:29:49AM|
|12345|2023-05-17|12:29:54AM|
|12346|2023-05-17|12:29:46AM|
|12346|2023-05-17|12:29:48AM|
|12346|2023-05-17|12:29:50AM|
|12346|2023-05-17|12:29:51AM|
+-----+----------+----------+



In [612]:
windowfunc = Window.partitionBy("Token").orderBy("Time")

ordered_date = token_date.withColumn("visits", dense_rank().over(windowfunc))
ordered_date.show()

+-----+----------+----------+------+
|Token|      Date|      Time|visits|
+-----+----------+----------+------+
|12345|2023-05-17|12:29:47AM|     1|
|12345|2023-05-17|12:29:49AM|     2|
|12345|2023-05-17|12:29:54AM|     3|
|12346|2023-05-17|12:29:46AM|     1|
|12346|2023-05-17|12:29:48AM|     2|
|12346|2023-05-17|12:29:50AM|     3|
|12346|2023-05-17|12:29:51AM|     4|
+-----+----------+----------+------+



In [613]:
ordered_date.withColumn("visits", expr("case when visits = 1 then 1 else 0 end")).show()

+-----+----------+----------+------+
|Token|      Date|      Time|visits|
+-----+----------+----------+------+
|12345|2023-05-17|12:29:47AM|     1|
|12345|2023-05-17|12:29:49AM|     0|
|12345|2023-05-17|12:29:54AM|     0|
|12346|2023-05-17|12:29:46AM|     1|
|12346|2023-05-17|12:29:48AM|     0|
|12346|2023-05-17|12:29:50AM|     0|
|12346|2023-05-17|12:29:51AM|     0|
+-----+----------+----------+------+



### Write a Scala program to compute the sum of the two given integer values. 
### If the two values are the same, then return triples their sum.

In [618]:
a = 1
b = 1
sum = 0

if a == b:
    sum = (a + b) * 3
    
else:
    sum = a + b
    
print(sum)

6
