# Windowing Function with code snippets (ab sab clear hai)

### Initialize the spark

In [1]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark = SparkSession.builder.appName("metadata approach").getOrCreate()
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark import SparkContext
sc= SparkContext.getOrCreate()

### Reading file

In [2]:
read_file = spark.read.csv("C:\\Users\\ariha\\Desktop\\pyspark\\empsalary.csv",header=True)

### spark sql approach

### creating a temporary view just to use spark sql

In [3]:
read_file_save= read_file.createTempView("emp_table")
reading_table = spark.sql("select * from emp_table")


In [11]:
reading_table.show()

+-----+---------+----------+------+
|EmpID|     Name|Department|Salary|
+-----+---------+----------+------+
|  101|  Arihant|        A1|  1000|
|  102| Shashank|        A1|  1000|
|  103|     Ranu|        A1|   500|
|  104|  Sangram|        A1|  3000|
|  105|    Singh|        A1|  1000|
|  106|   prapti|        A2|    10|
|  107|    kamal|        A2|    10|
|  108|     Dulo|        A2|    12|
|  109|   Kamali|        A2|    12|
|  110|Prajapati|        A2|    13|
|  111|   Prpait|        A2|    12|
|  112|     Mumu|        A3|  1000|
|  113|    Shivu|        A3|  1000|
+-----+---------+----------+------+



### Rank definition

In [4]:
rank_definition = spark.sql("""select EmpID,
                            Name,
                            Department,
                            Salary, 
                            RANK() OVER(PARTITION BY Department ORDER BY Salary) AS RANK_SALARY from emp_table""")

In [5]:
rank_definition.show(100)

+-----+---------+----------+------+-----------+
|EmpID|     Name|Department|Salary|RANK_SALARY|
+-----+---------+----------+------+-----------+
|  106|   prapti|        A2|    10|          1|
|  107|    kamal|        A2|    10|          1|
|  108|     Dulo|        A2|    12|          3|
|  109|   Kamali|        A2|    12|          3|
|  111|   Prpait|        A2|    12|          3|
|  110|Prajapati|        A2|    13|          6|
|  112|     Mumu|        A3|  1000|          1|
|  113|    Shivu|        A3|  1000|          1|
|  101|  Arihant|        A1|  1000|          1|
|  102| Shashank|        A1|  1000|          1|
|  105|    Singh|        A1|  1000|          1|
|  104|  Sangram|        A1|  3000|          4|
|  103|     Ranu|        A1|   500|          5|
+-----+---------+----------+------+-----------+



### dense_rank definition

In [7]:
dens_rank_def= spark.sql("""select EmpID,
                            Name,
                            Department,
                            Salary,
                            DENSE_RANK() over(PARTITION BY Department order by Salary) as desnse_rank from emp_table""")

In [8]:
dens_rank_def.show(100)

+-----+---------+----------+------+-----------+
|EmpID|     Name|Department|Salary|desnse_rank|
+-----+---------+----------+------+-----------+
|  106|   prapti|        A2|    10|          1|
|  107|    kamal|        A2|    10|          1|
|  108|     Dulo|        A2|    12|          2|
|  109|   Kamali|        A2|    12|          2|
|  111|   Prpait|        A2|    12|          2|
|  110|Prajapati|        A2|    13|          3|
|  112|     Mumu|        A3|  1000|          1|
|  113|    Shivu|        A3|  1000|          1|
|  101|  Arihant|        A1|  1000|          1|
|  102| Shashank|        A1|  1000|          1|
|  105|    Singh|        A1|  1000|          1|
|  104|  Sangram|        A1|  3000|          2|
|  103|     Ranu|        A1|   500|          3|
+-----+---------+----------+------+-----------+



### Row_number defined

In [9]:
row_number= spark.sql("""select EmpID,
                            Name,
                            Department,
                            Salary,
                            row_number() over(partition by department order by salary) as row_number_col from 
                            emp_table""")

In [10]:
row_number.show(1909)

+-----+---------+----------+------+--------------+
|EmpID|     Name|Department|Salary|row_number_col|
+-----+---------+----------+------+--------------+
|  106|   prapti|        A2|    10|             1|
|  107|    kamal|        A2|    10|             2|
|  108|     Dulo|        A2|    12|             3|
|  109|   Kamali|        A2|    12|             4|
|  111|   Prpait|        A2|    12|             5|
|  110|Prajapati|        A2|    13|             6|
|  112|     Mumu|        A3|  1000|             1|
|  113|    Shivu|        A3|  1000|             2|
|  101|  Arihant|        A1|  1000|             1|
|  102| Shashank|        A1|  1000|             2|
|  105|    Singh|        A1|  1000|             3|
|  104|  Sangram|        A1|  3000|             4|
|  103|     Ranu|        A1|   500|             5|
+-----+---------+----------+------+--------------+



### Data frame approach

In [14]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [15]:
# define the common snippet windowspec
windowSpec = Window.partitionBy("Department").orderBy("Salary")

#### Rank

In [18]:
rank_wala = read_file.withColumn("rank_col", rank().over(windowSpec))
rank_wala.show(100)

+-----+---------+----------+------+--------+
|EmpID|     Name|Department|Salary|rank_col|
+-----+---------+----------+------+--------+
|  106|   prapti|        A2|    10|       1|
|  107|    kamal|        A2|    10|       1|
|  108|     Dulo|        A2|    12|       3|
|  109|   Kamali|        A2|    12|       3|
|  111|   Prpait|        A2|    12|       3|
|  110|Prajapati|        A2|    13|       6|
|  112|     Mumu|        A3|  1000|       1|
|  113|    Shivu|        A3|  1000|       1|
|  101|  Arihant|        A1|  1000|       1|
|  102| Shashank|        A1|  1000|       1|
|  105|    Singh|        A1|  1000|       1|
|  104|  Sangram|        A1|  3000|       4|
|  103|     Ranu|        A1|   500|       5|
+-----+---------+----------+------+--------+



#### dense_rank

In [20]:
dense_rank_wala = read_file.withColumn("dense_rank_col", dense_rank().over(windowSpec))
dense_rank_wala.show(100)

+-----+---------+----------+------+--------------+
|EmpID|     Name|Department|Salary|dense_rank_col|
+-----+---------+----------+------+--------------+
|  106|   prapti|        A2|    10|             1|
|  107|    kamal|        A2|    10|             1|
|  108|     Dulo|        A2|    12|             2|
|  109|   Kamali|        A2|    12|             2|
|  111|   Prpait|        A2|    12|             2|
|  110|Prajapati|        A2|    13|             3|
|  112|     Mumu|        A3|  1000|             1|
|  113|    Shivu|        A3|  1000|             1|
|  101|  Arihant|        A1|  1000|             1|
|  102| Shashank|        A1|  1000|             1|
|  105|    Singh|        A1|  1000|             1|
|  104|  Sangram|        A1|  3000|             2|
|  103|     Ranu|        A1|   500|             3|
+-----+---------+----------+------+--------------+



#### Row_number

In [22]:
row_number_wala = read_file.withColumn("row_col", row_number().over(windowSpec))
row_number_wala.show(100)

+-----+---------+----------+------+-------+
|EmpID|     Name|Department|Salary|row_col|
+-----+---------+----------+------+-------+
|  106|   prapti|        A2|    10|      1|
|  107|    kamal|        A2|    10|      2|
|  108|     Dulo|        A2|    12|      3|
|  109|   Kamali|        A2|    12|      4|
|  111|   Prpait|        A2|    12|      5|
|  110|Prajapati|        A2|    13|      6|
|  112|     Mumu|        A3|  1000|      1|
|  113|    Shivu|        A3|  1000|      2|
|  101|  Arihant|        A1|  1000|      1|
|  102| Shashank|        A1|  1000|      2|
|  105|    Singh|        A1|  1000|      3|
|  104|  Sangram|        A1|  3000|      4|
|  103|     Ranu|        A1|   500|      5|
+-----+---------+----------+------+-------+



#### Theory :

Dense_rank follows the sequence where as rank doesnt follow the sequeence it focuses on positioning after arrnagements

In [34]:
salary = spark.sql("""select max(salary) as salary from emp_table where 
salary < (select max(salary) from emp_table) and department="A1" """)

salary.show()

+------+
|salary|
+------+
|  3000|
+------+



In [25]:
salary.show(100)

+-----+--------+----------+------+
|EmpID|    Name|Department|salary|
+-----+--------+----------+------+
|  101| Arihant|        A1|  1000|
|  102|Shashank|        A1|  1000|
|  103|    Ranu|        A1|   500|
|  104| Sangram|        A1|  3000|
|  105|   Singh|        A1|  1000|
+-----+--------+----------+------+



In [32]:
salary = spark.sql("""select max(salary) as salary from emp_table where 
salary < (select max(salary) from emp_table""")

ParseException: 
missing ')' at '<EOF>'(line 2, pos 43)

== SQL ==
select max(salary) as salary from emp_table where 
salary < (select max(salary) from emp_table
-------------------------------------------^^^


In [2]:
from sklearn.feature_extraction.text import *