In [4]:
# Connecting to the container sql server
from pyspark.sql import SparkSession
import time


spark = SparkSession.builder \
    .appName("SparkSQLServer") \
    .master("spark://spark-master:7077") \
    .getOrCreate()


## RDD Creation 

In [None]:
#sparkContext is used to create the RDD

In [46]:
# Using sparkContext.parallelize()
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd1 = spark.sparkContext.parallelize(data)
rdd1.count()
rdd1.getNumPartitions()
print(rdd1.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]


In [53]:
# Create RDD from external Data source
rdd2 = spark.sparkContext.textFile(
    "/opt/spark-data/input/data.csv"
)
rdd2.collect()

['name,age,salary',
 'manish,28,55000',
 'ramesh,32,48000',
 'rohan,26,61000',
 'sapna,30,72000',
 'shivani,27,68000']

In [54]:

# Create an empty RDD with no partition    
rdd3 = spark.sparkContext.emptyRDD 
rdd3
# Output:
# rddString = spark.sparkContext.emptyRDD[String]

<bound method SparkContext.emptyRDD of <SparkContext master=spark://spark-master:7077 appName=SparkSQLServer>>

In [52]:
# Create RDD from external Data source
rdd2 = spark.sparkContext.textFile(
    "/opt/spark-data/input/data.csv"
)
rdd2.collect()

['name,age,salary',
 'manish,28,55000',
 'ramesh,32,48000',
 'rohan,26,61000',
 'sapna,30,72000',
 'shivani,27,68000']

## Create Empty DataFrame

In [55]:
# 1) create empty RDD
emptyrdd= spark.sparkContext.emptyRDD()


In [65]:
# 2. Create Empty DataFrame with Schema (StructType)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
           StructField('firstname', StringType(), True),
           StructField('middlename', StringType(), True),
           StructField('lastname', StringType(), True)
           ])
df1 = spark.createDataFrame(emptyrdd, schema)
df1.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)



In [67]:
# 3. Convert Empty RDD to DataFrame
df2 = emptyrdd.toDF(StructType([]))
df2

DataFrame[]

In [68]:
# 4. Create Empty DataFrame with Schema.
df3 = spark.createDataFrame([],schema)

In [73]:
# 5. Create Empty DataFrame without Schema (no columns)
df4 = spark.createDataFrame([],StructType([]))
df4.printSchema()

root



## Convert PySpark Dataframe to Pandas DataFrame

In [74]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [("James","","Smith","36636","M",60000),
        ("Michael","Rose","","40288","M",70000),
        ("Robert","","Williams","42114","",400000),
        ("Maria","Anne","Jones","39192","F",500000),
        ("Jen","Mary","Brown","","F",0)]

columns = ["first_name","middle_name","last_name","dob","gender","salary"]
pysparkDF = spark.createDataFrame(data = data, schema = columns)
pysparkDF.printSchema()
pysparkDF.show(truncate=False)

25/12/26 16:49:29 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|dob  |gender|salary|
+----------+-----------+---------+-----+------+------+
|James     |           |Smith    |36636|M     |60000 |
|Michael   |Rose       |         |40288|M     |70000 |
|Robert    |           |Williams |42114|      |400000|
|Maria     |Anne       |Jones    |39192|F     |500000|
|Jen       |Mary       |Brown    |     |F     |0     |
+----------+-----------+---------+-----+------+------+



In [78]:
pandasDF = pysparkDF.toPandas()
pandasDF

                                                                                

Unnamed: 0,first_name,middle_name,last_name,dob,gender,salary
0,James,,Smith,36636.0,M,60000
1,Michael,Rose,,40288.0,M,70000
2,Robert,,Williams,42114.0,,400000
3,Maria,Anne,Jones,39192.0,F,500000
4,Jen,Mary,Brown,,F,0


## StructType & StructField with DataFrame

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/30 11:08:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/30 11:08:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



                                                                                

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



###  PySpark Column Operators

In [19]:
data=[(100,2,1),(200,3,4),(300,4,4)]
cols = ["col1","col2","col3"]
df=spark.createDataFrame(data,cols)

#Arthmetic operations
df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show() 
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()

df.select(df.col2 > df.col3).show()
df.select(df.col2 < df.col3).show()
df.select(df.col2 == df.col3).show()

+-------------+
|(col1 + col2)|
+-------------+
|          102|
|          203|
|          304|
+-------------+

+-------------+
|(col1 - col2)|
+-------------+
|           98|
|          197|
|          296|
+-------------+

+-------------+
|(col1 * col2)|
+-------------+
|          200|
|          600|
|         1200|
+-------------+

+-----------------+
|    (col1 / col2)|
+-----------------+
|             50.0|
|66.66666666666667|
|             75.0|
+-----------------+

+-------------+
|(col1 % col2)|
+-------------+
|            0|
|            2|
|            0|
+-------------+

+-------------+
|(col2 > col3)|
+-------------+
|         true|
|        false|
|        false|
+-------------+

+-------------+
|(col2 < col3)|
+-------------+
|        false|
|         true|
|        false|
+-------------+

+-------------+
|(col2 = col3)|
+-------------+
|        false|
|        false|
|         true|
+-------------+



### PySpark Column Functions

In [21]:

data=[("James","Bond","100",None),
      ("Ann","Varsa","200",'F'),
      ("Tom Cruise","XXX","400",''),
      ("Tom Brand",None,"400",'M')] 
columns=["fname","lname","id","gender"]
df=spark.createDataFrame(data,columns)

In [22]:
# 4.1 alias() – Set’s name to Column
from pyspark.sql.functions import expr
df.select(df.fname.alias("first_name"), \
          df.lname.alias("last_name")
   ).show()

#Another example
df.select(expr(" fname ||','|| lname").alias("fullName") \
   ).show()


+----------+---------+
|first_name|last_name|
+----------+---------+
|     James|     Bond|
|       Ann|    Varsa|
|Tom Cruise|      XXX|
| Tom Brand|     NULL|
+----------+---------+

+--------------+
|      fullName|
+--------------+
|    James,Bond|
|     Ann,Varsa|
|Tom Cruise,XXX|
|          NULL|
+--------------+



In [23]:
# 4.2 asc() & desc() – Sort the DataFrame columns by Ascending or Descending order.
df.sort(df.fname.asc()).show()
df.sort(df.fname.desc()).show()

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|       Ann|Varsa|200|     F|
|     James| Bond|100|  NULL|
| Tom Brand| NULL|400|     M|
|Tom Cruise|  XXX|400|      |
+----------+-----+---+------+

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|Tom Cruise|  XXX|400|      |
| Tom Brand| NULL|400|     M|
|     James| Bond|100|  NULL|
|       Ann|Varsa|200|     F|
+----------+-----+---+------+



In [24]:
# 4.3 cast() & astype() – Used to convert the data Type.
df.select(df.fname,df.id.cast("int")).printSchema()

root
 |-- fname: string (nullable = true)
 |-- id: integer (nullable = true)



In [25]:
# 4.4 between() – Returns a Boolean expression when a column values in between lower and upper bound.
df.filter(df.id.between(100,300)).show()

+-----+-----+---+------+
|fname|lname| id|gender|
+-----+-----+---+------+
|James| Bond|100|  NULL|
|  Ann|Varsa|200|     F|
+-----+-----+---+------+



In [26]:
# 4.5 contains() 
df.filter(df.fname.contains("Cruise")).show()

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|Tom Cruise|  XXX|400|      |
+----------+-----+---+------+



In [27]:
# 4.8 isNull & isNotNull() – Checks if the DataFrame column has NULL or non NULL values.
df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()

+---------+-----+---+------+
|    fname|lname| id|gender|
+---------+-----+---+------+
|Tom Brand| NULL|400|     M|
+---------+-----+---+------+

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|     James| Bond|100|  NULL|
|       Ann|Varsa|200|     F|
|Tom Cruise|  XXX|400|      |
+----------+-----+---+------+



In [28]:
# 4.9 like() & rlike() – Similar to SQL LIKE expression

#like , rlike
df.select(df.fname,df.lname,df.id) \
  .filter(df.fname.like("%om")) 

DataFrame[fname: string, lname: string, id: string]

In [29]:
# 4.10 substr() – Returns a Column after getting sub string from the Column
df.select(df.fname.substr(1,2).alias("substr")).show()

+------+
|substr|
+------+
|    Ja|
|    An|
|    To|
|    To|
+------+



In [30]:
# 4.11 when() & otherwise() – It is similar to SQL Case When, executes sequence of expressions until it matches the condition and returns a value when match.
from pyspark.sql.functions import when
df.select(df.fname,df.lname,when(df.gender=="M","Male") \
              .when(df.gender=="F","Female") \
              .when(df.gender==None ,"") \
              .otherwise(df.gender).alias("new_gender") \
    ).show()

+----------+-----+----------+
|     fname|lname|new_gender|
+----------+-----+----------+
|     James| Bond|      NULL|
|       Ann|Varsa|    Female|
|Tom Cruise|  XXX|          |
| Tom Brand| NULL|      Male|
+----------+-----+----------+



In [35]:
df.select(df.fname,expr("case when gender='M'then'Male' " " when gender='F' then 'Female' " " else 'na' end")).show()


+----------+--------------------------------------------------------------------------+
|     fname|CASE WHEN (gender = M) THEN Male WHEN (gender = F) THEN Female ELSE na END|
+----------+--------------------------------------------------------------------------+
|     James|                                                                        na|
|       Ann|                                                                    Female|
|Tom Cruise|                                                                        na|
| Tom Brand|                                                                      Male|
+----------+--------------------------------------------------------------------------+

