SparkSession is the entry point to Spark SQL. It is one of the very first objects you create while developing a Spark SQL application

Once created, SparkSession allows for **creating a DataFrame** (based on an RDD or a Scala Seq), **creating a Dataset**, accessing the **Spark SQL services** (e.g. ExperimentalMethods, ExecutionListenerManager, UDFRegistration), **executing a SQL query**, loading a table and the last but not least accessing DataFrameReader interface to load a dataset of the format of your choice (to some extent)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

In [0]:
data = [("James","Smith","USA","CA"),("Michael","Rose","USA","NY"), \
    ("Robert","Williams","USA","CA"),("Maria","Jones","USA","FL") \
  ]
columns=["firstname","lastname","country","state"]

df=spark.createDataFrame(data=data,schema=columns)
df.show()

# Spark collect() and collectAsList() are action operation that is used to retrieve all the elements of the RDD/DataFrame/Dataset (from all nodes) to the driver node

print(df.collect())

In [0]:
states1=df.rdd.map(lambda x: x[3]).collect()
print(states1)

In [0]:
#An OrderedDict is a dictionary subclass that remembers the order that keys were first inserted

from collections import OrderedDict 
res = list(OrderedDict.fromkeys(states1)) 
print(res)

Spark Map function takes one element as input process it according to custom code (specified by the developer) and returns one element at a time. Map transforms an RDD of length N into another RDD of length N. The input and output RDDs will typically have the same number of records

In [0]:
states2=df.rdd.map(lambda x: x.state).collect()  # Row-wise the functions is executed
print(states2)

In [0]:
states3=df.select(df.state).collect()
print(states3)

A FlatMap function takes one element as input process it according to custom code (specified by the developer) and returns 0 or more element at a time. flatMap() transforms an **RDD of length N into another RDD of length M.**

In [0]:
states4=df.select(df.state).rdd.flatMap(lambda x: x).collect()
print(states4)

In [0]:
states5=df.select(df.state).toPandas()['state']
states6=list(states5)
print(states6)

In [0]:
pandDF=df.select(df.state,df.firstname).toPandas()
print(list(pandDF['state']))
print(list(pandDF['firstname']))a

# Getting Dates using Spark

In [0]:
import pandas as pd    
data = [['Scott', 50], ['Jeff', 45], ['Thomas', 54],['Ann',34]] 
  
# Create the pandas DataFrame 
pandasDF = pd.DataFrame(data, columns = ['Name', 'Age']) 
  
# print dataframe. 
print(pandasDF)

In [0]:
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

In [0]:
sparkDF=spark.createDataFrame(pandasDF) 
sparkDF.printSchema()
sparkDF.show()

Spark SQL StructType & StructField classes are used to programmatically specify the schema to the DataFrame and creating complex columns like nested struct, array and map columns. StructType is a collection of StructField’s. Using StructField we can define column name, column data type, nullable column (boolean to specify if the field can be nullable or not) and metadata.

In [0]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
mySchema = StructType([ StructField("First Name", StringType(), True)\
                       ,StructField("Age", IntegerType(), True)])

In [0]:
sparkDF2 = spark.createDataFrame(pandasDF,schema=mySchema)
sparkDF2.printSchema()
sparkDF2.show()

In [0]:
spark.conf.set("spark.sql.execution.arrow.enabled","true") # Efficient in memory transformations between Spark DF and Pandas DF
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true") # If not happening inside the memory, then normal operation will proceed

In [0]:
pandasDF2=sparkDF2.select("*").toPandas()
print(pandasDF2)

In [0]:
test=spark.conf.get("spark.sql.execution.arrow.enabled")
print(test)

In [0]:
test123=spark.conf.get("spark.sql.execution.arrow.pyspark.fallback.enabled")
print(test123)

# Doing Aggregations

In [0]:
from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness 
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp,  var_pop

In [0]:
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]

In [0]:
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False) ##

In [0]:
df.select(approx_count_distinct("salary")).collect()[0][0]

In [0]:
df.select(avg("salary")).collect()[0][0]

In [0]:
df.select(collect_list("salary")).show(truncate=False)

In [0]:
df.select(countDistinct("department", "salary")).show()

In [0]:
print(df.select(first("salary")).show(truncate=False))
print(df.select(last("salary")).show(truncate=False))
print(df.select(kurtosis("salary")).show(truncate=False))

In [0]:
print(df.select(max("salary")).show(truncate=False))
print(df.select(min("salary")).show(truncate=False))
print(df.select(mean("salary")).show(truncate=False))

In [0]:
print(df.select(skewness("salary")).show(truncate=False))
print(df.select(stddev("salary"), stddev_samp("salary"), \
    stddev_pop("salary")).show(truncate=False))
print(df.select(sum("salary")).show(truncate=False))

In [0]:
print(df.select(sumDistinct("salary")).show(truncate=False))
print(df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
  .show(truncate=False))

# Spark RDD Actions

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data=[("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)]
inputRDD = spark.sparkContext.parallelize(data)

In [0]:
listRdd = spark.sparkContext.parallelize([1,2,3,4,5,3,2])

In [0]:
seqOp = (lambda x, y: x + y)
combOp = (lambda x, y: x + y)
agg=listRdd.aggregate(0, seqOp, combOp)
print(agg) 

In [0]:
seqOp2 = (lambda x, y: (x[0] + y, x[1] + 1))
combOp2 = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
agg2=listRdd.aggregate((0, 0), seqOp2, combOp2)
print(agg2)

In [0]:
agg2=listRdd.treeAggregate(0,seqOp, combOp)
print(agg2) # output 20

In [0]:
from operator import add
foldRes=listRdd.fold(0, add)
print(foldRes) # output 20

In [0]:
redRes=listRdd.reduce(add)
print(redRes)

In [0]:
add = lambda x, y: x + y
redRes=listRdd.treeReduce(add)
print(redRes)

In [0]:
data = listRdd.collect()
print(data)

In [0]:
#count, countApprox, countApproxDistinct
print("Count : "+str(listRdd.count()))
#Output: Count : 20
print("countApprox : "+str(listRdd.countApprox(1200)))
#Output: countApprox : (final: [7.000, 7.000])
print("countApproxDistinct : "+str(listRdd.countApproxDistinct()))
#Output: countApproxDistinct : 5
print("countApproxDistinct : "+str(inputRDD.countApproxDistinct()))
#Output: countApproxDistinct : 5

In [0]:
#countByValue, countByValueApprox
print("countByValue :  "+str(listRdd.countByValue()))

In [0]:
#first
print("first :  "+str(listRdd.first()))
#Output: first :  1
print("first :  "+str(inputRDD.first()))
#Output: first :  (Z,1)

In [0]:
#top
print("top : "+str(listRdd.top(2)))
#Output: take : 5,4
print("top : "+str(inputRDD.top(2)))
#Output: take : (Z,1),(C,40)

In [0]:
#min
print("min :  "+str(listRdd.min()))
#Output: min :  1
print("min :  "+str(inputRDD.min()))
#Output: min :  (A,20)  

In [0]:
#max
print("max :  "+str(listRdd.max()))
#Output: max :  5
print("max :  "+str(inputRDD.max()))
#Output: max :  (Z,1)

In [0]:
#take, takeOrdered, takeSample
print("take : "+str(listRdd.take(2)))
#Output: take : 1,2
print("takeOrdered : "+ str(listRdd.takeOrdered(2)))

In [0]:
#Output: takeOrdered : 1,2
print("take : "+str(listRdd.takeSample(1, 2)))

# Understanding Broadcast

In [0]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

rdd = spark.sparkContext.parallelize(data)

In [0]:
def state_convert(code):
    return broadcastStates.value[code]

In [0]:
result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)

# Understanding Flat Map Operations

In [0]:
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)

In [0]:
for element in rdd.collect():
    print(element)

In [0]:
#Flatmap    
rdd2=rdd.flatMap(lambda x: x.split(" "))
for element in rdd2.collect():
    print(element)

# Different MAP Operations

In [0]:
rdd2=rdd.map(lambda x: (x,1))
for element in rdd2.collect():
    print(element)

In [0]:
data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

In [0]:
rdd2=df.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df2=rdd2.toDF(["name","gender","new_salary"]   )
df2.show()

In [0]:
#Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    ) 
df2=rdd2.toDF(["name","gender","new_salary"]   )
df2.show()

In [0]:
def func1(x):
    firstName=x.firstname
    lastName=x.lastname
    name=firstName+","+lastName
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd2=df.rdd.map(lambda x: func1(x)).toDF().show()
rdd2=df.rdd.map(func1).toDF().show()

# Reduce by Key Operation

In [0]:
data = [('Project', 1),
('Gutenberg’s', 1),
('Alice’s', 1),
('Adventures', 1),
('in', 1),
('Wonderland', 1),
('Project', 1),
('Gutenberg’s', 1),
('Adventures', 1),
('in', 1),
('Wonderland', 1),
('Project', 1),
('Gutenberg’s', 1)]

rdd=spark.sparkContext.parallelize(data)

In [0]:
rdd2=rdd.reduceByKey(lambda a,b: a+b)
for element in rdd2.collect():
    print(element)

# Word Count Example

In [0]:
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)

In [0]:
#Flatmap    
rdd2=rdd.flatMap(lambda x: x.split(" "))
for element in rdd2.collect():
    print(element)

In [0]:
#map
rdd3=rdd2.map(lambda x: (x,1))
for element in rdd3.collect():
    print(element)
#reduceByKey
rdd4=rdd3.reduceByKey(lambda a,b: a+b)
for element in rdd4.collect():
    print(element)

In [0]:
#map
rdd5 = rdd4.map(lambda x: (x[1],x[0])).sortByKey()
for element in rdd5.collect():
    print(element)

In [0]:
#filter
rdd6 = rdd5.filter(lambda x : 'a' in x[1])
for element in rdd6.collect():
    print(element)

In [0]:
from pyspark.sql.functions import col,expr
data=[("2019-01-23",1),("2019-06-24",2),("2019-09-20",3)]
spark.createDataFrame(data).toDF("date","increment") \
    .select(col("date"),col("increment"), \
      expr("add_months(to_date(date,'yyyy-MM-dd'),cast(increment as int))").alias("inc_date")) \
    .show()