# PySpark Sql Functions

## Aggregate Functions

In [0]:
# The aggregate functions that we use generally are
# sum
# sumDistinct
# mean
# stddev
# variance
# min
# max
# avg
# count
# countDistinct
# distinct
# first ( new in pyspark)
# last ( new in pyspark)
# collect_list( new in pyspark)
# collect_set( new in pyspark)
# skewness, kurtosis( new in pyspark)

In [0]:
# generally how we use df.select(avg(df.colname).alias())
# or we might be using with withColumn .withColumn('coname', sum() or )
# df.groupBy('col1','col2').sum('num_col').mean('col3')

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('agg_window_datetime_functions').getOrCreate()

In [0]:

simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]

df = spark.createDataFrame(data = simpleData ,  schema = schema)
df.printSchema()
df.show()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



In [0]:
from pyspark.sql.functions import approx_count_distinct, countDistinct, count, collect_list, collect_set, min, max, sum, mean, variance, stddev,first, last,sumDistinct

print('approx_count_distinct: ' + str(df.select(approx_count_distinct('salary')).collect()[0][0]))
print('approx_count_distinct: ' + str(df.select(countDistinct('salary')).collect()[0][0]))
print('approx_count_distinct: ' + str(df.select(collect_set('salary')).collect()[0][0]))

approx_count_distinct: 6
approx_count_distinct: 6
approx_count_distinct: [4600, 3000, 3900, 4100, 3300, 2000]


## Window functions

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank,lag,lead
from pyspark.sql.functions import col,desc,asc 

windowSpec = Window.partitionBy('department').orderBy(col('salary').desc())

df.withColumn("row_number", row_number().over(windowSpec)).show()



+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         2|
|       Robert|     Sales|  4100|         3|
|         Saif|     Sales|  4100|         4|
|      Michael|     Sales|  4600|         5|
+-------------+----------+------+----------+



In [0]:
df = df.withColumn('rank', rank().over(windowSpec))
df = df.withColumn('dense_rank', dense_rank().over(windowSpec))
df = df.withColumn('lag', lag('salary',1,0).over(windowSpec))
df = df.withColumn('lead', lead('salary',1,0).over(windowSpec))
df.show()

+-------------+----------+------+----+----------+----+----+
|employee_name|department|salary|rank|dense_rank| lag|lead|
+-------------+----------+------+----+----------+----+----+
|        Maria|   Finance|  3000|   1|         1|   0|3300|
|        Scott|   Finance|  3300|   2|         2|3000|3900|
|          Jen|   Finance|  3900|   3|         3|3300|   0|
|        Kumar| Marketing|  2000|   1|         1|   0|3000|
|         Jeff| Marketing|  3000|   2|         2|2000|   0|
|        James|     Sales|  3000|   1|         1|   0|3000|
|        James|     Sales|  3000|   1|         1|3000|4100|
|       Robert|     Sales|  4100|   3|         2|3000|4100|
|         Saif|     Sales|  4100|   3|         2|4100|4600|
|      Michael|     Sales|  4600|   5|         3|4100|   0|
+-------------+----------+------+----+----------+----+----+



In [0]:
from pyspark.sql.functions import avg
windowSpecAgg = Window.partitionBy('department')
df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).show()

+-------------+----------+------+----+----------+---+----+---+------+-----+----+----+
|employee_name|department|salary|rank|dense_rank|lag|lead|row|   avg|  sum| min| max|
+-------------+----------+------+----+----------+---+----+---+------+-----+----+----+
|        Maria|   Finance|  3000|   1|         1|  0|3300|  1|3400.0|10200|3000|3900|
|        Kumar| Marketing|  2000|   1|         1|  0|3000|  1|2500.0| 5000|2000|3000|
|        James|     Sales|  3000|   1|         1|  0|3000|  1|3760.0|18800|3000|4600|
+-------------+----------+------+----+----------+---+----+---+------+-----+----+----+



## Datetime fucntions
- to be frank we use these functions quite frequently to handle data that has timestamps and these functionalities makes our life lot easier

In [0]:
# the ones that i know of are dateadd, datediff, extract,year,month, day, now, getdate(),

from pyspark.sql.functions import current_date, date_add, date_sub, add_months, months_between, next_day, last_day, date_format,dayofweek,dayofmonth,dayofyear,weekofyear

In [0]:
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]

df = spark.createDataFrame(data, ['id','date'])
df.show()

+---+----------+
| id|      date|
+---+----------+
|  1|2020-02-01|
|  2|2019-03-01|
|  3|2021-03-01|
+---+----------+



In [0]:
df.select(current_date().alias('current_date')).show()

+------------+
|current_date|
+------------+
|  2023-12-11|
|  2023-12-11|
|  2023-12-11|
+------------+



In [0]:
df.select(date_format('date', 'MMM-dd-yy')).show()

+----------------------------+
|date_format(date, MMM-dd-yy)|
+----------------------------+
|                   Feb-01-20|
|                   Mar-01-19|
|                   Mar-01-21|
+----------------------------+



In [0]:
df.select(weekofyear(current_date())).show()

+--------------------------+
|weekofyear(current_date())|
+--------------------------+
|                        50|
|                        50|
|                        50|
+--------------------------+



In [0]:
from pyspark.sql.functions import datediff
df.select(col("date"), 
    datediff(current_date(),col("date")).alias("datediff")  
  ).show()


+----------+--------+
|      date|datediff|
+----------+--------+
|2020-02-01|    1409|
|2019-03-01|    1746|
|2021-03-01|    1015|
+----------+--------+



In [0]:
df.select(col("date"), 
    add_months(col("date"),3).alias("add_months"), 
    add_months(col("date"),-3).alias("sub_months"), 
    date_add(col("date"),4).alias("date_add"), 
    date_sub(col("date"),4).alias("date_sub") 
  ).show()

+----------+----------+----------+----------+----------+
|      date|add_months|sub_months|  date_add|  date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+



In [0]:
df.select(col("input"), 
     year(col("input")).alias("year"), 
     month(col("input")).alias("month"), 
     next_day(col("input"),"Sunday").alias("next_day"), 
     weekofyear(col("input")).alias("weekofyear"),
     dayofweek(col("input")).alias("dayofweek"), 
     dayofmonth(col("input")).alias("dayofmonth"), 
     dayofyear(col("input")).alias("dayofyear"),
     hour(col("input")).alias("hour"),
     minute(col("input")).alias("minute"),
     second(col("input")).alias("second") 
  ).show()

# converting json data
- to get from json we use `from_json`
- to convert to json we use`to_json`

# Reading files from different datasources

- we will look on how to read a file
- how to write to a file

## csv files

In [0]:
spark.read.csv(path='.csv', )
#or
spark.read.format('csv').load()

# to read multiple csv files
spark.read.csv('path1, path2, path3')

# to read all files in a folder
spark.read.csv('folderpath')


# all the options for reading a csv file
spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv(".csv file")
# or if you have your own schema just use .schema(your_schema)

In [0]:
# writing to csv
df.write.csv('filename')
df.write.mode('overwrite').save('filepath')

# we have four saving modes
# overwrite - it overwrites
# append - it appends
# error - it raises an error
#   ignore - it does nothing and it doesnot save the data

## parquet File

In [0]:
saprk.read.parquet()
df.write.parquet()

df.write.mode('overwrite').parquet()

# we can also create parquet partition files
df.write.partitionBy().mode('overwrite').parquet()

## Json file

In [0]:
spark.read.json()
spark.read.option(multiline = True).load()
spark.read.json([multiple files])


# to wrtie to a json file

df.write.json('.json')

## to read from Hive table and write to hive
- `enableHiveSupport()`
- `saveAsTable()`

In [0]:
from os.path import abspath
from pyspark.sql import SparkSession

#enableHiveSupport() -> enables sparkSession to connect with Hive
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
    .builder \
    .appName("SparkByExamples.com") \
    .config("spark.sql.warehouse.dir", "/hive/warehouse/dir") \
    .config("hive.metastore.uris", "thrift://remote-host:9083") \
    .enableHiveSupport() \
    .getOrCreate()

# or Use the below approach
# Change using conf
spark.sparkContext().conf().set("spark.sql.warehouse.dir", "/user/hive/warehouse");
spark.sparkContext().conf().set("hive.metastore.uris", "thrift://localhost:9083");

## To read and write to sql server table

In [0]:
# to read from table
df = spark.read \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \
  .option("dbtable", "employee") \
  .option("user", "replace_user_name") \
  .option("password", "replace_password") \
  .load()

df.show()

# to write to table

sampleDF.write \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .mode("overwrite") \
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \
  .option("dbtable", "employee") \
  .option("user", "replace_user_name") \
  .option("password", "replace_password") \
  .save()

# Pyspark Built-in functions

## when()
- same as case statement when(col == this, thisvalue)

In [0]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [("James","M",60000), ("Michael","M",70000),
        ("Robert",None,400000), ("Maria","F",500000),
        ("Jen","",None)]

columns = ["name","gender","salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.show()

#Using When otherwise
from pyspark.sql.functions import when,col
df2 = df.withColumn("new_gender", when(df.gender == "M","Male")
                                 .when(df.gender == "F","Female")
                                 .when(df.gender.isNull() ,"")
                                 .otherwise(df.gender))
df2.show()

df2=df.select(col("*"),when(df.gender == "M","Male")
                  .when(df.gender == "F","Female")
                  .when(df.gender.isNull() ,"")
                  .otherwise(df.gender).alias("new_gender"))
df2.show()
# Using SQL Case When
from pyspark.sql.functions import expr
df3 = df.withColumn("new_gender", expr("CASE WHEN gender = 'M' THEN 'Male' " + 
           "WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
          "ELSE gender END"))
df3.show()

df4 = df.select(col("*"), expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
           "ELSE gender END").alias("new_gender"))

df.createOrReplaceTempView("EMP")
spark.sql("select name, CASE WHEN gender = 'M' THEN 'Male' " + 
               "WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
              "ELSE gender END as new_gender from EMP").show()

In [0]:
# other powerful functions is regexp_replace(colname, oldvalue, newvalue)