In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf().set("spark.executor.memory", "1g") \
    .set("spark.driver.memory", "4g") \
    .setMaster("local[*]").setAppName("sample app")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

In [14]:
spark.getActiveSession()

spark2 = spark.newSession()   # has separate SQLConf, registered temporary views and UDFs, but shared SparkContext and table cache.
spark2

#spark.stop()

In [3]:
from pyspark import SQLContext, HiveContext
sc = spark.sparkContext
sqlContext = SQLContext(sc)
#hive_context = HiveContext(sc)
#or
#sc = SparkContext(conf=conf)
#spark = SparkSession(sc)


In [None]:
#hive session
hive_session = SparkSession \
    .builder \
    .enableHiveSupport() \
    .getOrCreate()


In [4]:
df = spark.sql('''select 'spark' as hello''')
df.show()
df.collect()

+-----+
|hello|
+-----+
|spark|
+-----+



[Row(hello='spark')]

In [3]:
##Create Dataframe programatically
from datetime import datetime
from pyspark.sql.types import StructType,StructField 
from pyspark.sql.types import StringType, IntegerType, ArrayType, DateType, MapType
data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M", 20000, datetime(2019, 12, 1), {"Jan": 22}),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F", 15000, datetime(2019, 12, 14), {"Feb": 19}),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F", 10000, datetime(2019, 12, 16), {"Mar": 30}),
 ]    
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
        StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True),
     StructField('salary', IntegerType(), True),
     StructField('createdAt', DateType(), True),
     StructField('mapfield', MapType(StringType(), IntegerType()), True)
 ])
df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- createdAt: date (nullable = true)
 |-- mapfield: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)

+-------------------+------------------+-----+------+------+----------+-----------+
|name               |languages         |state|gender|salary|createdAt |mapfield   |
+-------------------+------------------+-----+------+------+----------+-----------+
|[James, , Smith]   |[Java, Scala, C++]|OH   |M     |20000 |2019-12-01|[Jan -> 22]|
|[Anna, Rose, ]     |[Spark, Java, C++]|NY   |F     |15000 |2019-12-14|[Feb -> 19]|
|[Julia, , Williams]|[CSharp, VB]      |OH   |F 

In [4]:
#insert into dataframe
data = [
    (("Edwin","N","Max"),["CSharp","Python","JS"],"CA","M", 20000, datetime(2022, 12, 2), {"April": 4}),
    (("Jordan", "E", "Raphael"), [], "NY", "M", 50000, datetime(2023,5,24), {"May": 14})
]
df2 = spark.createDataFrame(data, schema)
df2.show()
df2 = df.union(df2)
df2.show()

+--------------------+--------------------+-----+------+------+----------+------------+
|                name|           languages|state|gender|salary| createdAt|    mapfield|
+--------------------+--------------------+-----+------+------+----------+------------+
|     [Edwin, N, Max]|[CSharp, Python, JS]|   CA|     M| 20000|2022-12-02|[April -> 4]|
|[Jordan, E, Raphael]|                  []|   NY|     M| 50000|2023-05-24| [May -> 14]|
+--------------------+--------------------+-----+------+------+----------+------------+

+--------------------+--------------------+-----+------+------+----------+------------+
|                name|           languages|state|gender|salary| createdAt|    mapfield|
+--------------------+--------------------+-----+------+------+----------+------------+
|    [James, , Smith]|  [Java, Scala, C++]|   OH|     M| 20000|2019-12-01| [Jan -> 22]|
|      [Anna, Rose, ]|  [Spark, Java, C++]|   NY|     F| 15000|2019-12-14| [Feb -> 19]|
| [Julia, , Williams]|        [

In [7]:
# Registering tables and views
df.registerTempTable("employee")
#or
sqlContext.registerDataFrameAsTable(df, "employee")

spark.sql("select * from employee").head()


#Registering views
df.createOrReplaceGlobalTempView("employees_global") #share data among different sessions and keep alive until your application ends
df.createOrReplaceTempView("employees")


#List available table and views
sqlContext.tableNames()
#or
spark.catalog.listDatabases()
spark.catalog.listTables()


#Drop tables and views
sqlContext.dropTempTable("employee")
spark.catalog.dropTempView("employees")
spark.catalog.dropGlobalTempView("employees_global")

Row(name=Row(firstname='James', middlename='', lastname='Smith'), languages=['Java', 'Scala', 'C++'], state='OH', gender='M', salary=20000, createdAt=datetime.date(2019, 12, 1))

['employee', 'employees']

[Database(name='default', description='default database', locationUri='file:/C:/Users/EdwinVivekN/AppData/Local/Programs/Python/Python37-32/myscripts/Spark/spark-warehouse')]

[Table(name='employee', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='employees', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [8]:
#Accessing nested elements
df.createOrReplaceTempView("employee")
spark.sql('''select name.firstname, languages[2] from employee''').show()

spark.sql('''select employee.languages[2], sum(salary) from employee group by employee.languages[2]''').show()


+---------+------------+
|firstname|languages[2]|
+---------+------------+
|    James|         C++|
|     Anna|         C++|
|    Julia|        null|
+---------+------------+

+------------+-----------+
|languages[2]|sum(salary)|
+------------+-----------+
|        null|      10000|
|         C++|      35000|
+------------+-----------+



In [None]:
#Working with Row object
from pyspark.sql import Row
Person = Row("name", "age")
Person
rdd = sc.parallelize([("alice", 22)])
person = rdd.map(lambda r: Person(*r))
person.collect()

for p in person.collect():
    print(p.name + " " + str(p.age))

In [9]:
row = spark.sql("select * from employee").head()
row
row.languages
row["name"]
row.asDict(True) #recursive - true, turns the nested Row as dict



Row(name=Row(firstname='James', middlename='', lastname='Smith'), languages=['Java', 'Scala', 'C++'], state='OH', gender='M', salary=20000, createdAt=datetime.date(2019, 12, 1))

['Java', 'Scala', 'C++']

Row(firstname='James', middlename='', lastname='Smith')

{'name': {'firstname': 'James', 'middlename': '', 'lastname': 'Smith'},
 'languages': ['Java', 'Scala', 'C++'],
 'state': 'OH',
 'gender': 'M',
 'salary': 20000,
 'createdAt': datetime.date(2019, 12, 1)}

In [47]:
#Caching 
sqlContext.registerDataFrameAsTable(df, "employee")
sqlContext.cacheTable("employee")
spark.sql("select * from employee").head()
sqlContext.uncacheTable("employee")

sqlContext.clearCache() # clear entire cache

spark.catalog.refreshTable("employee") # refresh all the cached metadata

#persist


In [65]:
#Load data

#JSON
dataframe = spark.read.json('json_data.json')
#TXT FILES# 
dataframe_txt = spark.read.text('text_data.txt')
#PARQUET FILES# 
dataframe_parquet = spark.read.load('parquet_data.parquet')

In [5]:
#Inspect data
loan = spark.read.csv('..\data\\train_loanpred.csv', inferSchema=True, header=True)
loan.dtypes
loan.columns
loan.count()
loan.distinct().count()
loan.printSchema()
loan.describe().show()
df.describe('ApplicantIncome').show()
df.select('Gender').distinct().count()

[('Loan_ID', 'string'),
 ('Gender', 'string'),
 ('Married', 'string'),
 ('Dependents', 'string'),
 ('Education', 'string'),
 ('Self_Employed', 'string'),
 ('ApplicantIncome', 'int'),
 ('CoapplicantIncome', 'int'),
 ('LoanAmount', 'int'),
 ('Loan_Amount_Term', 'int'),
 ('Credit_History', 'int'),
 ('Property_Area', 'string'),
 ('Loan_Status', 'string')]

+-------+--------+------+-------+------------------+------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------+-----------+
|summary| Loan_ID|Gender|Married|        Dependents|   Education|Self_Employed|  ApplicantIncome|CoapplicantIncome|        LoanAmount| Loan_Amount_Term|    Credit_History|Property_Area|Loan_Status|
+-------+--------+------+-------+------------------+------------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------+-----------+
|  count|     614|   601|    611|               599|         614|          582|              614|              614|               592|              600|               564|          614|        614|
|   mean|    null|  null|   null|0.5547445255474452|        null|         null|5403.459283387622|1621.244299674267|146.41216216216216|            342.0|0.8421985815602837|         null|       null|
| stddev| 

['Loan_ID',
 'Gender',
 'Married',
 'Dependents',
 'Education',
 'Self_Employed',
 'ApplicantIncome',
 'CoapplicantIncome',
 'LoanAmount',
 'Loan_Amount_Term',
 'Credit_History',
 'Property_Area',
 'Loan_Status']

614

614

In [58]:
loan.explain(True)

== Parsed Logical Plan ==
Relation[Loan_ID#2416,Gender#2417,Married#2418,Dependents#2419,Education#2420,Self_Employed#2421,ApplicantIncome#2422,CoapplicantIncome#2423,LoanAmount#2424,Loan_Amount_Term#2425,Credit_History#2426,Property_Area#2427,Loan_Status#2428] csv

== Analyzed Logical Plan ==
Loan_ID: string, Gender: string, Married: string, Dependents: string, Education: string, Self_Employed: string, ApplicantIncome: int, CoapplicantIncome: int, LoanAmount: int, Loan_Amount_Term: int, Credit_History: int, Property_Area: string, Loan_Status: string
Relation[Loan_ID#2416,Gender#2417,Married#2418,Dependents#2419,Education#2420,Self_Employed#2421,ApplicantIncome#2422,CoapplicantIncome#2423,LoanAmount#2424,Loan_Amount_Term#2425,Credit_History#2426,Property_Area#2427,Loan_Status#2428] csv

== Optimized Logical Plan ==
Relation[Loan_ID#2416,Gender#2417,Married#2418,Dependents#2419,Education#2420,Self_Employed#2421,ApplicantIncome#2422,CoapplicantIncome#2423,LoanAmount#2424,Loan_Amount_Term

In [51]:
loan.show(5)
loan.take(5)
loan.head()
loan.first()

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|                0|      null|             360|             1|        Urban|          Y|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|             1508|       128|             360|             1|        Rural|          N|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|                0|        66|             360|             1|        Urban|          Y

[Row(Loan_ID='LP001002', Gender='Male', Married='No', Dependents='0', Education='Graduate', Self_Employed='No', ApplicantIncome=5849, CoapplicantIncome=0, LoanAmount=None, Loan_Amount_Term=360, Credit_History=1, Property_Area='Urban', Loan_Status='Y'),
 Row(Loan_ID='LP001003', Gender='Male', Married='Yes', Dependents='1', Education='Graduate', Self_Employed='No', ApplicantIncome=4583, CoapplicantIncome=1508, LoanAmount=128, Loan_Amount_Term=360, Credit_History=1, Property_Area='Rural', Loan_Status='N'),
 Row(Loan_ID='LP001005', Gender='Male', Married='Yes', Dependents='0', Education='Graduate', Self_Employed='Yes', ApplicantIncome=3000, CoapplicantIncome=0, LoanAmount=66, Loan_Amount_Term=360, Credit_History=1, Property_Area='Urban', Loan_Status='Y'),
 Row(Loan_ID='LP001006', Gender='Male', Married='Yes', Dependents='0', Education='Not Graduate', Self_Employed='No', ApplicantIncome=2583, CoapplicantIncome=2358, LoanAmount=120, Loan_Amount_Term=360, Credit_History=1, Property_Area='Urba

Row(Loan_ID='LP001002', Gender='Male', Married='No', Dependents='0', Education='Graduate', Self_Employed='No', ApplicantIncome=5849, CoapplicantIncome=0, LoanAmount=None, Loan_Amount_Term=360, Credit_History=1, Property_Area='Urban', Loan_Status='Y')

Row(Loan_ID='LP001002', Gender='Male', Married='No', Dependents='0', Education='Graduate', Self_Employed='No', ApplicantIncome=5849, CoapplicantIncome=0, LoanAmount=None, Loan_Amount_Term=360, Credit_History=1, Property_Area='Urban', Loan_Status='Y')

In [12]:
#Find Count of Null, None, NaN of All DataFrame Columns
from pyspark.sql.functions import col,isnan,when,count
loan.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in loan.columns]).show()

loan.select([count(when(loan[c].isNull() == True, 1)).alias(c) for c in loan.columns]).show()

# To list only columns with NANs
import pandas as pd
loan_collected = loan.collect()
nan_columns = [''.join(key) for _ in loan_collected  for (key,val) in _.asDict().items() if pd.isna(val)]
nan_columns = list(set(nan_columns)) 
nan_columns
loan.select([count(when((isnan(c)),c)).alias(c) for c in nan_columns]).show()


#drop duplicate
uLoan = loan.dropDuplicates(['Gender'])
uLoan.count()

#drop NA
nLoan = loan.dropna(how='any')
nLoan.count()

#fill NA
fLoan = loan.fillna({"Gender": "T"})
fLoan.select("Gender").distinct().show()

#replace data
rLoan = fLoan.na.replace("T", "Child", "Gender") #old, new, columnname
rLoan.select("Gender").distinct().show()

+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|      0|    13|      3|        15|        0|           32|              0|                0|        22|              14|            50|            0|          0|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+

+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|Loan_ID|Gender|Marri

['Dependents',
 'Loan_Amount_Term',
 'Self_Employed',
 'Credit_History',
 'LoanAmount',
 'Gender',
 'Married']

+----------+----------------+-------------+--------------+----------+------+-------+
|Dependents|Loan_Amount_Term|Self_Employed|Credit_History|LoanAmount|Gender|Married|
+----------+----------------+-------------+--------------+----------+------+-------+
|         0|               0|            0|             0|         0|     0|      0|
+----------+----------------+-------------+--------------+----------+------+-------+



3

480

+------+
|Gender|
+------+
|     T|
|Female|
|  Male|
+------+

+------+
|Gender|
+------+
|Female|
| Child|
|  Male|
+------+



In [70]:
#add/remove/update columns

#add
new_loan = loan.withColumn("ApplicantIncome_New", loan.ApplicantIncome + 10000)
new_loan.first()

#update
new_loan = new_loan.withColumnRenamed("ApplicantIncome_New", "NewApplicantIncome")
new_loan.head()

#remove
new_loan.drop("NewApplicantIncome").columns

Row(Loan_ID='LP001002', Gender='Male', Married='No', Dependents='0', Education='Graduate', Self_Employed='No', ApplicantIncome=5849, CoapplicantIncome=0, LoanAmount=None, Loan_Amount_Term=360, Credit_History=1, Property_Area='Urban', Loan_Status='Y', ApplicantIncome_New=15849)

Row(Loan_ID='LP001002', Gender='Male', Married='No', Dependents='0', Education='Graduate', Self_Employed='No', ApplicantIncome=5849, CoapplicantIncome=0, LoanAmount=None, Loan_Amount_Term=360, Credit_History=1, Property_Area='Urban', Loan_Status='Y', NewApplicantIncome=15849)

['Loan_ID',
 'Gender',
 'Married',
 'Dependents',
 'Education',
 'Self_Employed',
 'ApplicantIncome',
 'CoapplicantIncome',
 'LoanAmount',
 'Loan_Amount_Term',
 'Credit_History',
 'Property_Area',
 'Loan_Status']

In [None]:
#Repartition
# Dataframe with 10 partitions
loan.repartition(10).rdd.getNumPartitions()

# Dataframe with 1 partition
loan.coalesce(1).rdd.getNumPartitions()

In [None]:
# Converting dataframe into an RDD
df_to_rdd = df.rdd
df_to_rdd.map(lambda x: x.hello + "application").collect()

#Converting RDD into a dataframe
rdd_to_df = df_to_rdd.toDF()
rdd_to_df.show()

# Converting dataframe into a RDD of string 
df.toJSON().first()

# Obtaining contents of df as Pandas 
df.toPandas()

In [None]:
#Write and Save
df.select("firstName", "city").write.save("nameAndCity.parquet")

df.select("firstName", "age").write.save("namesAndAges.json",format="json")

<h1>Queries 

In [68]:
#Select
import pyspark.sql.functions as F
df.select("*").show()
df.select("name", "languages").show()

#flatten single column
df.select("name.firstname", "gender", F.explode("languages").alias("lang_flatten")).show()

#flatten multiple columns 
explodedDF = df.withColumn("lang_flat", F.explode("languages")).withColumn("lang_flat2", F.explode("languages"))
explodedDF.select("name.firstname", "lang_flat", "lang_flat2").show()

df.select("salary", df["salary"] > 15000).show()

df.select("salary", df["salary"] + 15000).show()

+-------------------+------------------+-----+------+------+----------+
|               name|         languages|state|gender|salary| createdAt|
+-------------------+------------------+-----+------+------+----------+
|   [James, , Smith]|[Java, Scala, C++]|   OH|     M| 20000|2019-12-01|
|     [Anna, Rose, ]|[Spark, Java, C++]|   NY|     F| 15000|2019-12-14|
|[Julia, , Williams]|      [CSharp, VB]|   OH|     F| 10000|2019-12-16|
+-------------------+------------------+-----+------+------+----------+

+-------------------+------------------+
|               name|         languages|
+-------------------+------------------+
|   [James, , Smith]|[Java, Scala, C++]|
|     [Anna, Rose, ]|[Spark, Java, C++]|
|[Julia, , Williams]|      [CSharp, VB]|
+-------------------+------------------+

+--------------+------+------------+
|name.firstname|gender|lang_flatten|
+--------------+------+------------+
|         James|     M|        Java|
|         James|     M|       Scala|
|         James|     M

In [29]:
#When
df.select("salary", F.when(df["salary"] > 15000, "true!").alias("salary condition")).alias("sal").show()

#otherwise
df.select(F.when(df["salary"] > 15000, "true!").otherwise("false!")).show()


NameError: name 'df' is not defined

In [29]:
#Like
df[df.name.firstname.like("Anna")].show()
df.select(df.name.firstname.like("Anna")).show()

#startswith
df.select(when(df.name.firstname.startswith("Anna"), df.name.firstname).otherwise("Not Anna")).show()

#substring
df.select(df.name.firstName.substr(1, 3).alias("name")).show()

#between
df.select(df.salary.between(15000, 20000)).show()

#In
df[df.name.firstname.isin("Anna")].show()

#count
df.select(F.count(df.name.firstname)).show()

#filter
df[df.salary >= 20000].show()
df.filter(df.salary >= 20000).show()

+--------------+------------------+-----+------+------+----------+
|          name|         languages|state|gender|salary| createdAt|
+--------------+------------------+-----+------+------+----------+
|[Anna, Rose, ]|[Spark, Java, C++]|   NY|     F| 15000|2019-12-14|
+--------------+------------------+-----+------+------+----------+

+-------------------------+
|name[firstname] LIKE Anna|
+-------------------------+
|                    false|
|                     true|
|                    false|
+-------------------------+

+----------------------------------------------------------------------------------+
|CASE WHEN startswith(name[firstname], Anna) THEN name[firstname] ELSE Not Anna END|
+----------------------------------------------------------------------------------+
|                                                                          Not Anna|
|                                                                              Anna|
|                                         

In [88]:
#Group By
spark.sql('''select count(name.firstname) from employee where name.firstname like 'J%' group by name.firstname''').show()

df.groupBy(df.name.firstname.like('J%')).count().show()

df.select(df.name.firstname, when(df.name.firstname.like("J%"), 1).otherwise(0)).groupBy().sum().show()


#multiple aggregates on group by
df.groupBy(df.state).agg(F.sum(df.salary).alias("sum_salary"), F.min(df.salary).alias("min_salary")).show()


##Having
df.groupBy(df.state).agg(F.sum(df.salary).alias("sum_salary")).where(col("sum_salary") > 20000).show()

+------------------------------------+
|count(name.firstname AS `firstname`)|
+------------------------------------+
|                                   1|
|                                   1|
+------------------------------------+

+-----------------------+-----+
|name[firstname] LIKE J%|count|
+-----------------------+-----+
|                   true|    2|
|                  false|    1|
+-----------------------+-----+

+--------------------------------------------------------+
|sum(CASE WHEN name[firstname] LIKE J% THEN 1 ELSE 0 END)|
+--------------------------------------------------------+
|                                                       2|
+--------------------------------------------------------+

+-----+----------+----------+
|state|sum_salary|min_salary|
+-----+----------+----------+
|   OH|     30000|     10000|
|   NY|     15000|     15000|
+-----+----------+----------+

+-----+----------+
|state|sum_salary|
+-----+----------+
|   OH|     30000|
+-----+----------+


In [89]:
#join

#window - rows, range




<h2>Window

In [9]:
df2.createOrReplaceTempView("employees")
spark.sql('''select * from employees''').show()

+-------------------+--------------------+-----+------+------+----------+
|               name|           languages|state|gender|salary| createdAt|
+-------------------+--------------------+-----+------+------+----------+
|   [James, , Smith]|  [Java, Scala, C++]|   OH|     M| 20000|2019-12-01|
|     [Anna, Rose, ]|  [Spark, Java, C++]|   NY|     F| 15000|2019-12-14|
|[Julia, , Williams]|        [CSharp, VB]|   OH|     F| 10000|2019-12-16|
|    [Edwin, N, Max]|[CSharp, Python, JS]|   CA|     M| 20000|2022-12-02|
+-------------------+--------------------+-----+------+------+----------+



In [21]:
spark.sql('''select \
Row_Number() over (partition by state order by salary DESC, state ASC) as rownum, \
Dense_Rank() over (partition by gender order by salary ASC) as dense_rank, \
Rank() over (partition by state order by salary DESC) as rank, \
* from employees''').show()

+------+----------+----+-------------------+--------------------+-----+------+------+----------+
|rownum|dense_rank|rank|               name|           languages|state|gender|salary| createdAt|
+------+----------+----+-------------------+--------------------+-----+------+------+----------+
|     1|         1|   1|    [Edwin, N, Max]|[CSharp, Python, JS]|   CA|     M| 20000|2022-12-02|
|     1|         1|   1|   [James, , Smith]|  [Java, Scala, C++]|   OH|     M| 20000|2019-12-01|
|     2|         1|   2|[Julia, , Williams]|        [CSharp, VB]|   OH|     F| 10000|2019-12-16|
|     1|         2|   1|     [Anna, Rose, ]|  [Spark, Java, C++]|   NY|     F| 15000|2019-12-14|
+------+----------+----+-------------------+--------------------+-----+------+------+----------+



In [27]:
spark.sql('''select \
Row_Number() over (order by salary DESC, state ASC) as rownum, \
Rank() over (partition by state order by salary ASC) as rank, \
Dense_Rank() over (partition by gender order by salary ASC) as dense_rank, \
Percent_Rank() over (partition by state order by salary ASC) as percentrank, \
Ntile() over (partition by state order by salary ASC) as ntile, \
Cume_dist() over (partition by state order by salary ASC) as cume_dist, \
* from employees''').show()

+------+----+----------+-----------+-----+---------+-------------------+--------------------+-----+------+------+----------+
|rownum|rank|dense_rank|percentrank|ntile|cume_dist|               name|           languages|state|gender|salary| createdAt|
+------+----+----------+-----------+-----+---------+-------------------+--------------------+-----+------+------+----------+
|     4|   1|         1|        0.0|    1|      0.5|[Julia, , Williams]|        [CSharp, VB]|   OH|     F| 10000|2019-12-16|
|     3|   1|         2|        0.0|    1|      1.0|     [Anna, Rose, ]|  [Spark, Java, C++]|   NY|     F| 15000|2019-12-14|
|     1|   1|         1|        0.0|    1|      1.0|    [Edwin, N, Max]|[CSharp, Python, JS]|   CA|     M| 20000|2022-12-02|
|     2|   2|         1|        1.0|    1|      1.0|   [James, , Smith]|  [Java, Scala, C++]|   OH|     M| 20000|2019-12-01|
+------+----+----------+-----------+-----+---------+-------------------+--------------------+-----+------+------+----------+


In [18]:
spark.sql('''select \
First_value(gender) over (partition by state order by gender DESC) as first_value, \
Last_value(gender) over (partition by state order by gender DESC) as last_RANGE, \
Last_value(gender) over (partition by state) as last_ROWS, \
Last_value(gender) over (partition by state order by gender DESC rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as last_rows, \
state,gender from employees''').show()

+-----------+----------+---------+---------+-----+------+
|first_value|last_RANGE|last_ROWS|last_rows|state|gender|
+-----------+----------+---------+---------+-----+------+
|          M|         M|        M|        M|   CA|     M|
|          M|         M|        F|        F|   OH|     M|
|          M|         F|        F|        F|   OH|     F|
|          F|         F|        F|        F|   NY|     F|
+-----------+----------+---------+---------+-----+------+



In [11]:
spark.sql('''select \
Avg(salary) over (partition by state order by gender ASC) as rolling_avg_salary, \
Sum(salary) over (partition by state order by gender ASC) as rolling_sum_salary, \
Count(gender) over (partition by state order by gender ASC) as count_gender, \
* from employees''').show()


+------------------+------------------+------------+-------------------+--------------------+-----+------+------+----------+
|rolling_avg_salary|rolling_sum_salary|count_gender|               name|           languages|state|gender|salary| createdAt|
+------------------+------------------+------------+-------------------+--------------------+-----+------+------+----------+
|           20000.0|             20000|           1|    [Edwin, N, Max]|[CSharp, Python, JS]|   CA|     M| 20000|2022-12-02|
|           10000.0|             10000|           1|[Julia, , Williams]|        [CSharp, VB]|   OH|     F| 10000|2019-12-16|
|           15000.0|             30000|           2|   [James, , Smith]|  [Java, Scala, C++]|   OH|     M| 20000|2019-12-01|
|           15000.0|             15000|           1|     [Anna, Rose, ]|  [Spark, Java, C++]|   NY|     F| 15000|2019-12-14|
+------------------+------------------+------------+-------------------+--------------------+-----+------+------+----------+


In [21]:
from pyspark.sql import Window
import pyspark.sql.functions as F
window = Window.partitionBy("state").orderBy("gender")
df2.select(F.avg("salary").over(window).alias("rolling_avg_salary"), \
           F.sum("salary").over(window).alias("rolling_sum_salary"), \
           "*").show()

+------------------+------------------+-------------------+--------------------+-----+------+------+----------+
|rolling_avg_salary|rolling_sum_salary|               name|           languages|state|gender|salary| createdAt|
+------------------+------------------+-------------------+--------------------+-----+------+------+----------+
|           20000.0|             20000|    [Edwin, N, Max]|[CSharp, Python, JS]|   CA|     M| 20000|2022-12-02|
|           10000.0|             10000|[Julia, , Williams]|        [CSharp, VB]|   OH|     F| 10000|2019-12-16|
|           15000.0|             30000|   [James, , Smith]|  [Java, Scala, C++]|   OH|     M| 20000|2019-12-01|
|           15000.0|             15000|     [Anna, Rose, ]|  [Spark, Java, C++]|   NY|     F| 15000|2019-12-14|
+------------------+------------------+-------------------+--------------------+-----+------+------+----------+



<h2>Functions

In [44]:
#Array Functions
df2.select(F.array("gender", "state")).show()

df2.select("languages", F.sort_array("languages", True)).show()

nestedArrayDF = df2.select(F.array("languages", "languages").alias("langoflang"))
nestedArrayDF.select(F.flatten("langoflang")).collect()

+--------------------+
|array(gender, state)|
+--------------------+
|             [M, OH]|
|             [F, NY]|
|             [F, OH]|
|             [M, CA]|
|             [M, NY]|
+--------------------+

+--------------------+---------------------------+
|           languages|sort_array(languages, true)|
+--------------------+---------------------------+
|  [Java, Scala, C++]|         [C++, Java, Scala]|
|  [Spark, Java, C++]|         [C++, Java, Spark]|
|        [CSharp, VB]|               [CSharp, VB]|
|[CSharp, Python, JS]|       [CSharp, JS, Python]|
|                  []|                         []|
+--------------------+---------------------------+



[Row(flatten(langoflang)=['Java', 'Scala', 'C++', 'Java', 'Scala', 'C++']),
 Row(flatten(langoflang)=['Spark', 'Java', 'C++', 'Spark', 'Java', 'C++']),
 Row(flatten(langoflang)=['CSharp', 'VB', 'CSharp', 'VB']),
 Row(flatten(langoflang)=['CSharp', 'Python', 'JS', 'CSharp', 'Python', 'JS']),
 Row(flatten(langoflang)=[])]

In [6]:
#Generartor Functions
import pyspark.sql.functions as F

#explode
explodedDF = df2.withColumn("lang_flat", F.explode("languages"))
explodedDF.select("name.firstname","languages", "lang_flat").show()

#explode outer  
explode_o = df2.withColumn("lang_flat_outer", F.explode_outer("languages"))  #-- includes null
explode_o.select("name.firstname", "languages", "lang_flat_outer").show()

#pos explode
df2.select("name.firstname", "languages", F.posexplode("languages")).show()

#pos explode outer
df2.select("name.firstname", "languages", F.posexplode_outer("languages")).show()



+---------+--------------------+---------+
|firstname|           languages|lang_flat|
+---------+--------------------+---------+
|    James|  [Java, Scala, C++]|     Java|
|    James|  [Java, Scala, C++]|    Scala|
|    James|  [Java, Scala, C++]|      C++|
|     Anna|  [Spark, Java, C++]|    Spark|
|     Anna|  [Spark, Java, C++]|     Java|
|     Anna|  [Spark, Java, C++]|      C++|
|    Julia|        [CSharp, VB]|   CSharp|
|    Julia|        [CSharp, VB]|       VB|
|    Edwin|[CSharp, Python, JS]|   CSharp|
|    Edwin|[CSharp, Python, JS]|   Python|
|    Edwin|[CSharp, Python, JS]|       JS|
+---------+--------------------+---------+

+---------+--------------------+---------------+
|firstname|           languages|lang_flat_outer|
+---------+--------------------+---------------+
|    James|  [Java, Scala, C++]|           Java|
|    James|  [Java, Scala, C++]|          Scala|
|    James|  [Java, Scala, C++]|            C++|
|     Anna|  [Spark, Java, C++]|          Spark|
|     Anna|

In [11]:
#Register functions - UDF
def sumfunc(x):
    return x + x
        
from pyspark.sql.types import IntegerType 
import pyspark.sql.functions as F

df2.createOrReplaceTempView("employee")

#Registering function as UDF
spark.udf.register('sumofsalary', sumfunc, IntegerType())
spark.sql('''select sumofsalary(salary) from employee''').show()


sumofsalaryUDF = F.udf(sumfunc, IntegerType())
df.select(sumofsalaryUDF("salary")).show()


#Creating UDF using Anotations
@udf(returnType=StringType()) 
def upperCaseUDF(str):
    return str.upper()

df.withColumn("Upper_case_name", upperCaseUDF(F.col("name.firstname"))).show()

<function __main__.sumfunc(x)>

+-------------------+
|sumofsalary(salary)|
+-------------------+
|              40000|
|              30000|
|              20000|
|              40000|
+-------------------+

+---------------+
|sumfunc(salary)|
+---------------+
|          40000|
|          30000|
|          20000|
+---------------+

+-------------------+------------------+-----+------+------+----------+---------------+
|               name|         languages|state|gender|salary| createdAt|Upper_case_name|
+-------------------+------------------+-----+------+------+----------+---------------+
|   [James, , Smith]|[Java, Scala, C++]|   OH|     M| 20000|2019-12-01|          JAMES|
|     [Anna, Rose, ]|[Spark, Java, C++]|   NY|     F| 15000|2019-12-14|           ANNA|
|[Julia, , Williams]|      [CSharp, VB]|   OH|     F| 10000|2019-12-16|          JULIA|
+-------------------+------------------+-----+------+------+----------+---------------+

