Pyvot Examples : a pivot operation turns unique values from one column into multiple columns in the result set—essentially “rotating” your data from rows into columns.


In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [0]:
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

In [0]:
columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

In [0]:
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

In [0]:
pivotDF = df.groupBy("Product", "Country") \
            .sum("Amount") \
            .groupBy("Product") \
            .pivot("country") \
            .sum("sum(amount)")
pivotDF.printSchema()
pivotDF.show()

In [0]:
unpicotExpr = "stack(4, 'Canada', Canada,'China', China, 'Mexico',Mexico,'USA',USA) as (country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpicotExpr)) \
    .where("Total is not null")
unPivotDF.show(truncate=False)

In [0]:
unpicotExpr = "stack(4, 'Canada', Canada,'China', China, 'Mexico',Mexico,'USA',USA) as (country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpicotExpr))
unPivotDF.show(truncate=False)

Add Month:

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [0]:
from pyspark.sql.functions import col

data=[("2019-01-23",1),("2019-06-24",2),("2019-09-20",3)]
from pyspark.sql.functions import col, expr, to_date

data=[("2019-01-23",1),("2019-06-24",2),("2019-09-20",3)]

spark.createDataFrame(data).toDF("date","increment")\
     .select(col("date"),col("increment"), \
     expr("add_months(to_date(date,'yyyy-MM-dd'),cast(increment as int))").alias("inc_date")) \
     .show()

Aggregate Functions : In Spark, aggregate functions are operations that take a set of values and return a single summarizing value. They’re most commonly used after a groupBy (in the DataFrame API) or in a SQL SELECT with GROUP BY. Under the hood, Spark can push these computations down into map-side combiners and reduce tasks for efficient distributed execution.



In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp,  var_pop


In [0]:

spark = SparkSession.builder.getOrCreate()

In [0]:
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]

In [0]:
schema = ["employee_name", "department", "salary"]


In [0]:
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

In [0]:
print("approx_count_distinct: " + \
      str(df.select(approx_count_distinct("salary")).collect()[0][0]))

In [0]:
print("avg: " + str(df.select(avg("salary")).collect()[0][0]))

In [0]:
df.select(collect_list("salary")).show(truncate=False)

In [0]:
df.select(collect_set("salary")).show(truncate=False)

In [0]:
df2 = df.select(countDistinct("department", "salary"))
df2.show(truncate=False)

In [0]:
print("Distinct Count of Department &amp; Salary: "+str(df2.collect()[0][0]))

In [0]:
print("count: "+str(df.select(count("salary")).collect()[0][0]))

In [0]:
df.select(first("salary")).show(truncate=False)

In [0]:
df.select(last("salary")).show(truncate=False)

In [0]:
df.select(kurtosis("salary")).show(truncate=False)

In [0]:
df.select(max("salary")).show(truncate=False)

In [0]:
df.select(min("salary")).show(truncate=False)

In [0]:
df.select(mean("salary")).show(truncate=False)

In [0]:
df.select(skewness("salary")).show(truncate=False)

In [0]:
df.select(stddev("salary"), stddev_samp("salary"), \
    stddev_pop("salary")).show(truncate=False)

In [0]:
df.select(sum("salary")).show(truncate=False)

In [0]:
df.select(sumDistinct("salary")).show(truncate=False)

In [0]:
df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
  .show(truncate=False)

Array String



In [0]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .getOrCreate()

In [0]:
columns = ["name","languagesAtSchool","currentState"]

In [0]:
data = [("James,,Smith",["Java","Scala","C++"],"CA"), \
    ("Michael,Rose,",["Spark","Java","C++"],"NJ"), \
    ("Robert,,Williams",["CSharp","VB"],"NV")]

In [0]:
df = spark.createDataFrame(data=data,schema=columns)
df.printSchema()
df.show(truncate=False)

In [0]:
from pyspark.sql.functions import col, concat_ws

In [0]:
df2 = df.withColumn("languagesAtSchool",
   concat_ws(",",col("languagesAtSchool")))

In [0]:
df2.printSchema()
df2.show(truncate=False)

In [0]:
df.createOrReplaceTempView("ARRAY_STRING")

In [0]:
spark.sql("select name,concat_ws(',',languagesAtSchool) as  languagesAtSchool,currentState from ARRAY_STRING").show(truncate=False)

Array Type


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType,StructType,StructField
spark = SparkSession.builder \
                    .getOrCreate()

In [0]:
arrayCol = ArrayType(StringType(),False)

In [0]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

In [0]:
schema = StructType([
    StructField("name",StringType(),True),
    StructField("languagesAtSchool",ArrayType(StringType()),True),
    StructField("languagesAtWork",ArrayType(StringType()),True),
    StructField("currentState", StringType(), True),
    StructField("previousState", StringType(), True)
  ])

In [0]:
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()

In [0]:
from pyspark.sql.functions import explode
df.select(df.name,explode(df.languagesAtSchool)).show()

In [0]:
from pyspark.sql.functions import split
df.select(split(df.name,",").alias("nameAsArray")).show()

In [0]:
from pyspark.sql.functions import array
df.select(df.name,array(df.currentState,df.previousState).alias("States")).show()

In [0]:
from pyspark.sql.functions import array
df.select(split(df.name,",").alias("nameAsArray"),array(df.currentState,df.previousState).alias("States")).show()

In [0]:
from pyspark.sql.functions import array_contains
df.select(df.name,array_contains(df.languagesAtSchool,"Java")
    .alias("array_contains")).show()

Cast Column

In [0]:
import pyspark
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.getOrCreate()

In [0]:
simpleData = [("James",34,"2006-01-01","true","M",3000.60),
    ("Michael",33,"1980-01-10","true","F",3300.80),
    ("Robert",37,"06-01-1992","false","M",5000.50)
  ]

In [0]:
columns = ["firstname","age","jobStartDate","isGraduated","gender","salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType

In [0]:
df2 = df.withColumn("age",col("age").cast(StringType())) \
    .withColumn("isGraduated",col("isGraduated").cast(BooleanType())) \
    .withColumn("jobStartDate",col("jobStartDate").cast(DateType()))
df2.printSchema()


In [0]:

df3 = df2.selectExpr("cast(age as int) age",
    "cast(isGraduated as string) isGraduated",
    "cast(jobStartDate as string) jobStartDate")
df3.printSchema()
df3.show(truncate=False)

In [0]:
df3.createOrReplaceTempView("CastExample")
df4 = spark.sql("SELECT STRING(age),BOOLEAN(isGraduated),DATE(jobStartDate) from CastExample")
df4.printSchema()
df4.show(truncate=False)