In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import lit,col,explode
spark=SparkSession.builder.appName('sp').getOrCreate()

In [0]:
df=spark.read.format("csv")\
        .option("header","true")\
        .option("inferSchema","true")\
        .load("/FileStore/tables/2010_12_01-1.csv")

df.printSchema()
df.createOrReplaceTempView("dfTable")

root
 |-- nvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
df.show(3)

+--------+---------+--------------------+--------+-------------+---------+----------+--------------+
|nvoiceNo|StockCode|         Description|Quantity|  InvoiceDate|UnitPrice|CustomerID|       Country|
+--------+---------+--------------------+--------+-------------+---------+----------+--------------+
|  536365|   85123A|WHITE HANGING HEA...|       6|01/12/10 8:26|     2.55|     17850|United Kingdom|
|  536365|    71053| WHITE METAL LANTERN|       6|01/12/10 8:26|     3.39|     17850|United Kingdom|
|  536365|   84406B|CREAM CUPID HEART...|       8|01/12/10 8:26|     2.75|     17850|United Kingdom|
+--------+---------+--------------------+--------+-------------+---------+----------+--------------+
only showing top 3 rows



In [0]:
from pyspark.sql.functions import lit

df.select(lit(5),lit("five"),lit(5.0))

Out[141]: DataFrame[5: int, five: string, 5.0: double]

In [0]:
df.where(col("nvoiceNo")==536365)\
  .select("nvoiceNo","Description")\
  .show(5)

+--------+--------------------+
|nvoiceNo|         Description|
+--------+--------------------+
|  536365|WHITE HANGING HEA...|
|  536365| WHITE METAL LANTERN|
|  536365|CREAM CUPID HEART...|
|  536365|KNITTED UNION FLA...|
|  536365|RED WOOLLY HOTTIE...|
+--------+--------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import instr

In [0]:
DOTCodeFilter=col("StockCode")=="DOT"
priceFilter=col("UnitPrice")>600
descripFilter=instr(col("Description"),"POSTAGE")>=1
df.withColumn("isExpensive",DOTCodeFilter & (priceFilter|descripFilter))\
  .where("isExpensive")\
  .select("unitPrice","isExpensive").show(5)

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
+---------+-----------+



In [0]:
from pyspark.sql.functions import expr,pow

In [0]:
fabricatedQuantity=pow(col("Quantity") * col("UnitPrice"),2)+5
df.select(expr("CustomerId"),fabricatedQuantity.alias("realQuantity")).show()

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|     17850|239.08999999999997|
|     17850|          418.7156|
|     17850|             489.0|
|     17850|          418.7156|
|     17850|          418.7156|
|     17850|239.09000000000003|
|     17850|            655.25|
|     17850|128.21000000000004|
|     17850|128.21000000000004|
|     13047|2929.6463999999996|
|     13047|163.76000000000005|
|     13047|163.76000000000005|
|     13047|             905.0|
|     13047|103.00999999999998|
|     13047|            655.25|
|     13047|225.52250000000004|
|     13047|401.00999999999993|
|     13047|323.62250000000006|
|     13047|323.62250000000006|
|     13047|           1016.24|
+----------+------------------+
only showing top 20 rows



In [0]:
df.selectExpr("CustomerId",("POWER((Quantity*UnitPrice),2.0)+5 ) as realQuantity").show()

[0;36m  File [0;32m"<command-3395353137604542>"[0;36m, line [0;32m1[0m
[0;31m    df.selectExpr("CustomerId",("POWER((Quantity*UnitPrice),2.0)+5 ) as realQuantity").show()[0m
[0m                                                                                             ^[0m
[0;31mSyntaxError[0m[0;31m:[0m unexpected EOF while parsing


In [0]:
from pyspark.sql.functions import lit,round,bround

df.select(round(lit("2.5")),bround(lit("2.5"))).show(2)



In [0]:
from pyspark.sql.functions import corr
df.stat.corr("Quantity","UnitPrice")
df.select(corr("Quantity","UnitPrice")).show()



In [0]:
df.describe().show()



In [0]:
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id()).show(2)



In [0]:
from pyspark.sql.functions import initcap
df.select(initcap(col("Description"))).show(4)




In [0]:
from pyspark.sql.functions import lower,upper
df.select(col("Description"),
lower(col("Description")),
upper(lower(col("Description")))).show(2)



In [0]:
from pyspark.sql.functions import rpad,lpad,rtrim,ltrim,lit,trim
df.select(
    ltrim(lit("    HELLO   ")).alias("ltrim"),
    rtrim(lit("    HELLO   ")).alias("rtrim"),
    trim(lit("   HELLO   ")).alias("trim"),
    lpad(lit("HELLO"),7,"o").alias("lpad"),
    rpad(lit("HELLO"),6,"o").alias("rpad")).show(2)



In [0]:
from pyspark.sql.functions import regexp_replace

regexp_string="(BLACK|WHITE|GREEN|BLUE|RED)"

df.select(regexp_replace(col("Description"),regexp_string,"COLOR").alias("color_clean"),col("Description")).show(3)



In [0]:
from pyspark.sql.functions import translate
df.select(translate(col("Description"),"WHIHA","13242"),col("Description")).show(2) #character level replacement



In [0]:
from pyspark.sql.functions import regexp_extract

df.select(regexp_extract(col("Description"),regexp_string,1),col("Description")).show(20)



In [0]:
containsBlack=col("Description").contains("BLACK")



In [0]:
df.filter(containsBlack).select(col("Description")).show(10)



In [0]:
containsBlack=instr(col("Description"),"BLACK")>=1
containsWhite=instr(col("Description"),"WHITE")>=1
df.withColumn("hasSimpleColor",containsBlack|containsWhite)\
  .where("hasSimpleColor")\
  .select("Description").show(4)



In [0]:
from pyspark.sql.functions import expr,locate

simpleColors=["black","white","red","blue","green"]
def color_locator(column,color_string):
    return locate(color_string.upper(),column).cast("boolean").alias("is_" + c)
    selectedColumns = [color_locator(df.Description,c)for c in simpleColors]
    selectedColumns.append(expr("*"))

df.select(*selectedColumns).where(expr("is_white OR is_red"))\
  .select("Description").show(3)



In [0]:
from pyspark.sql.functions import current_date,current_timestamp
dateDF=spark.range(10)\
    .withColumn("today",current_date())\
    .withColumn("now",current_timestamp())
dateDF.createOrReplaceTempView("dateTable")

dateDF.printSchema()



In [0]:
from pyspark.sql.functions import date_add,date_sub
dateDF.select(date_sub(col("today"),5),date_add(col("today"),5)).show()



In [0]:
from pyspark.sql.functions import datediff,months_between,to_date
dateDF.withColumn("week_ago",date_sub(col("today"),7))\
      .select(datediff(col("today"),col("week_ago"))).show(2)



In [0]:
dateDF.select(to_date(lit("2022-01-17")).alias("start"),
             to_date(lit("2023-01-20")).alias("end"))\
            .select(months_between(col("start"),col("end"))).show(4)



In [0]:
from pyspark.sql.functions import to_date,lit
spark.range(5).withColumn("date",lit("2019-05-29"))\
    .select(to_date(col("date"))).show(1)



In [0]:
dateDF.select(to_date(lit("2012-23-12")),to_date(lit("2020-09-11"))).show(1)



In [0]:
from pyspark.sql.functions import to_date
dateFormat="yyyy-dd-MM"
cleanDateDf=spark.range(1).select(
            to_date(lit("2017-12-11"),dateFormat).alias("date"),
            to_date(lit("2012-23-12"),dateFormat).alias("date1"))
cleanDateDf.show(1)



In [0]:
from pyspark.sql.functions import to_timestamp
cleanDateDf.select(to_timestamp(col("date"),dateFormat)).show()



In [0]:
cleanDateDf.filter(col("date1")>lit("2017-12-12")).show()



In [0]:
from pyspark.sql.functions import coalesce
df.select(coalesce(col("Description"),col("CustomerId"))).show(3) #Returns first non-null value



In [0]:
#nullif-returns null if two values are equal or else returns the second if they are not
#ifnull-returns second value if first is null
#nvl- returns second value if first is null, defaults to first.
#nvl2-returns second value if first is not null,if first value is null then return last value as show below

spark.sql("select ifnull(null,'return_value'),nullif('value7','value7'),nvl(null,'return_value'),nvl2(null,'noynull','else') from dfTable limit 1").show()



In [0]:
df.na.drop("any").show(4)



In [0]:
df.na.drop("all").show(3)



In [0]:
df.na.drop("all",subset=["StockCode","nvoiceNo"]).show()




In [0]:
df.na.fill("All values become this string")



In [0]:
df.na.fill(5:Integer)



In [0]:
fill_cols_vals={"StockCode":5,"Description":"No Value"}



In [0]:
df.na.replace([""],["UNKNOWN"],"DESCRIPTION")



In [0]:
df.selectExpr("(Description,nvoiceNo) as complex","*")



In [0]:
df.selectExpr("struct(Description,nvoiceNo) as complex").show()



In [0]:
df.selectExpr("struct(Description,nvoiceNo) as complex")



In [0]:
from pyspark.sql.functions import struct
complexDf=df.select(struct("Description","nvoiceNo").alias("complex"))



In [0]:
complexDf.show(2)



In [0]:
complexDf.select("complex.Description").show(2)



In [0]:
df.select(struct("Description","nvoiceNo"))



In [0]:
from pyspark.sql.functions import split
df1=df.select(split(col("Description")," ").alias("array_col"))
df1.selectExpr("array_col[0]").show(3)



In [0]:
df.select(split(col("Description")," ").alias('array_col')).selectExpr("array_col[0]").show(3)



In [0]:
from pyspark.sql.functions import size
df.select(size(split(col("Description")," "))).show(2)



In [0]:
from pyspark.sql.functions import array_contains
df.select(array_contains(split(col("Description")," "),"WHITE")).show(2)



In [0]:
from pyspark.sql.functions import column
df.withColumn("splitted",split(col("Description")," ")).withColumn("exploded",explode(col("splitted"))).select(col("splitted") ,column("exploded") ,"Description").show(10)



In [0]:
#Maps

from pyspark.sql.functions import create_map
df.select(create_map(col("Description"),col("nvoiceNo")).alias("complex_map")).show(5,False)



In [0]:
spark.sql("select map(Description,nvoiceNo) as complex_map from dfTable where Description is not NULL").show(5,False)



In [0]:
df.select(create_map(col("Description"),col("nvoiceNo")).alias("complex_map"))\
.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)



In [0]:
df.select(create_map(col("Description"),col("nvoiceNo")).alias("complex_map"))\
.select(explode(col("complex_map"))).show(2)



In [0]:
jsonDF = spark.range(1).selectExpr("""'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")

from pyspark.sql.functions import get_json_object, json_tuple
jsonDF.select(
get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column", json_tuple(col("jsonString"), "myJSONKey")).show(2)




In [0]:
from pyspark.sql.functions import from_json from pyspark.sql.types import *
parseSchema = StructType((
    StructField("InvoiceNo",StringType(),True),
StructField("Description",StringType(),True))) df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(to_json(col("myStruct")).alias("newJSON"))\ .select(from_json(col("newJSON"),parseSchema), col("newJSON")).show(2)




In [0]:
udfExampleDf=spark.range(5).toDF("num")
udfExampleDf.show()



In [0]:
def power3(double_value):
    return double_value**3
power3(2.0)



In [0]:
from pyspark.sql.functions import udf,col
power3udf=udf(power3)

udfExampleDf.select(power3udf(col("num"))).show(3)



In [0]:
from pyspark.sql.functions import IntegerType,DoubleType




In [0]:

spark.udf.register("power3udf",power3,IntegerType())

udfExampleDf.selectExpr("power3udf(num)").show()




