In [3]:
!pip install pyspark 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("SparkByExamples.com") \
        .getOrCreate() 


In [13]:

from pyspark.sql import Row
row=Row("James",40)

print(row[0] +","+str(row[1]))


James,40


In [11]:

row=Row(name="Alice", age=11)
print(row.name, row.age) 


Alice 11


In [17]:

Person = Row("name", "age")
p1=Person("James", 40)
p2=Person("Alice", 35)
print(p1.name +","+p2.name)

James,Alice


In [18]:

from pyspark.sql import SparkSession, Row
spark = SparkSession.builder.getOrCreate()

data = [Row(name="James,,Smith",lang=["Java","Scala","C++"],state="CA"), 
    Row(name="Michael,Rose,",lang=["Spark","Java","C++"],state="NJ"),
    Row(name="Robert,,Williams",lang=["CSharp","VB"],state="NV")]
rdd=spark.sparkContext.parallelize(data)
print(rdd.collect())


[Row(name='James,,Smith', lang=['Java', 'Scala', 'C++'], state='CA'), Row(name='Michael,Rose,', lang=['Spark', 'Java', 'C++'], state='NJ'), Row(name='Robert,,Williams', lang=['CSharp', 'VB'], state='NV')]


In [19]:

collData=rdd.collect()
for row in collData:
    print(row.name + "," +str(row.lang))


James,,Smith,['Java', 'Scala', 'C++']
Michael,Rose,,['Spark', 'Java', 'C++']
Robert,,Williams,['CSharp', 'VB']


In [21]:

Person=Row("name","lang","state")
data = [Person("James,,Smith",["Java","Scala","C++"],"CA"), 
    Person("Michael,Rose,",["Spark","Java","C++"],"NJ"),
    Person("Robert,,Williams",["CSharp","VB"],"NV")]


In [22]:

df=spark.createDataFrame(data)
df.printSchema()
df.show()


root
 |-- name: string (nullable = true)
 |-- lang: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)

+----------------+------------------+-----+
|            name|              lang|state|
+----------------+------------------+-----+
|    James,,Smith|[Java, Scala, C++]|   CA|
|   Michael,Rose,|[Spark, Java, C++]|   NJ|
|Robert,,Williams|      [CSharp, VB]|   NV|
+----------------+------------------+-----+



In [24]:
#changing columns names
columns = ["name","languagesAtSchool","currentState"]
df=spark.createDataFrame(data).toDF(*columns)
df.printSchema()
df.show()


root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)

+----------------+------------------+------------+
|            name| languagesAtSchool|currentState|
+----------------+------------------+------------+
|    James,,Smith|[Java, Scala, C++]|          CA|
|   Michael,Rose,|[Spark, Java, C++]|          NJ|
|Robert,,Williams|      [CSharp, VB]|          NV|
+----------------+------------------+------------+



In [27]:

#Create DataFrame with struct using Row class
from pyspark.sql import Row
data=[Row(name="James",prop=Row(hair="black",eye="blue")),
      Row(name="Ann",prop=Row(hair="grey",eye="black"))]
df=spark.createDataFrame(data)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- prop: struct (nullable = true)
 |    |-- hair: string (nullable = true)
 |    |-- eye: string (nullable = true)

+-----+-------------+
| name|         prop|
+-----+-------------+
|James|{black, blue}|
|  Ann|{grey, black}|
+-----+-------------+



In [30]:
# lit is use to insert the values
from pyspark.sql.functions import lit
colObj = lit("sparkbyexamples")
colObj

Column<'sparkbyexamples'>

In [38]:

data=[("James",23),
      ("Ann",40),
      ("RoRi",43)
      ]
df=spark.createDataFrame(data).toDF("name.fname","age")
df.printSchema()
#root
# |-- name.fname: string (nullable = true)
# |-- gender: long (nullable = true)

# Using DataFrame object (df)
df.select(df.age).show()
df.select(df["age"]).show()
#Accessing column name with dot (with backticks)
df.select(df['`name.fname`']).show()




root
 |-- name.fname: string (nullable = true)
 |-- age: long (nullable = true)

+---+
|age|
+---+
| 23|
| 40|
| 43|
+---+

+---+
|age|
+---+
| 23|
| 40|
| 43|
+---+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
|      RoRi|
+----------+



In [40]:
#Using SQL col() function
from pyspark.sql.functions import col
df.select(col("age")).show()
#Accessing column name with dot (with backticks)
df.select(col("`name.fname`")).show()

+---+
|age|
+---+
| 23|
| 40|
| 43|
+---+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
|      RoRi|
+----------+



In [43]:

#Create DataFrame with struct using Row class
from pyspark.sql import Row
data=[Row(name="James",prop=Row(hair="black",eye="blue")),
      Row(name="Ann",prop=Row(hair="grey",eye="black"))]
df=spark.createDataFrame(data)
df.printSchema()
#root
# |-- name: string (nullable = true)
# |-- prop: struct (nullable = true)
# |    |-- hair: string (nullable = true)
# |    |-- eye: string (nullable = true)

#Access struct column
df.select(df.prop.hair).show()
df.select(df["prop.eye"]).show()
df.select(col("prop.hair")).show()




root
 |-- name: string (nullable = true)
 |-- prop: struct (nullable = true)
 |    |-- hair: string (nullable = true)
 |    |-- eye: string (nullable = true)

+---------+
|prop.hair|
+---------+
|    black|
|     grey|
+---------+

+-----+
|  eye|
+-----+
| blue|
|black|
+-----+

+-----+
| hair|
+-----+
|black|
| grey|
+-----+



In [46]:
#Access all columns from struct
df.select(col("prop.*")).show()
df.select(col("*")).show()

+-----+-----+
| hair|  eye|
+-----+-----+
|black| blue|
| grey|black|
+-----+-----+

+-----+-------------+
| name|         prop|
+-----+-------------+
|James|{black, blue}|
|  Ann|{grey, black}|
+-----+-------------+



In [47]:

data=[(100,2,1),(200,3,4),(300,4,4)]
df=spark.createDataFrame(data).toDF("col1","col2","col3")
df.show()


+----+----+----+
|col1|col2|col3|
+----+----+----+
| 100|   2|   1|
| 200|   3|   4|
| 300|   4|   4|
+----+----+----+



In [48]:
#Arthmetic operations
df.select(df.col1 + df.col2).show()


+-------------+
|(col1 + col2)|
+-------------+
|          102|
|          203|
|          304|
+-------------+



In [None]:
df.select(df.col1 - df.col2).show() 
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()


df.select(df.col2 > df.col3).show()
df.select(df.col2 < df.col3).show()
df.select(df.col2 == df.col3).show()


In [50]:
df.select(df.col2 > df.col3).show()

+-------------+
|(col2 > col3)|
+-------------+
|         true|
|        false|
|        false|
+-------------+



In [51]:
df.select(df.col2 < df.col3).show()

+-------------+
|(col2 < col3)|
+-------------+
|        false|
|         true|
|        false|
+-------------+



In [52]:
df.select(df.col1 % df.col2).show()

+-------------+
|(col1 % col2)|
+-------------+
|            0|
|            2|
|            0|
+-------------+



In [60]:

data=[("James","Bond","007",None),
      ("Ann","Varsa","200",'F'),
      ("Tom Cruise","","400",''),
      ("Tom Brand",None,"400",'M')] 
columns=["fname","lname","id","gender"]
df=spark.createDataFrame(data,columns)
df.show()


+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|     James| Bond|007|  null|
|       Ann|Varsa|200|     F|
|Tom Cruise|     |400|      |
| Tom Brand| null|400|     M|
+----------+-----+---+------+



In [62]:

#alias
from pyspark.sql.functions import expr
df.select(df.fname.alias("first_name"),
          df.lname.alias("last_name")).show()

#Another example
df.select(expr(" fname ||','|| lname").alias("fullName") ).show()


+----------+---------+
|first_name|last_name|
+----------+---------+
|     James|     Bond|
|       Ann|    Varsa|
|Tom Cruise|         |
| Tom Brand|     null|
+----------+---------+

+-----------+
|   fullName|
+-----------+
| James,Bond|
|  Ann,Varsa|
|Tom Cruise,|
|       null|
+-----------+



In [63]:

#asc, desc to sort ascending and descending order repsectively.
df.sort(df.fname.asc()).show()
print(df.sort(df.fname.desc()).show())


+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|       Ann|Varsa|200|     F|
|     James| Bond|007|  null|
| Tom Brand| null|400|     M|
|Tom Cruise|     |400|      |
+----------+-----+---+------+

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|Tom Cruise|     |400|      |
| Tom Brand| null|400|     M|
|     James| Bond|007|  null|
|       Ann|Varsa|200|     F|
+----------+-----+---+------+

None


In [69]:

#cast
df.select(df.fname,df.id.cast("int")).printSchema()
df.printSchema()



root
 |-- fname: string (nullable = true)
 |-- id: integer (nullable = true)

root
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)



In [70]:

#between
df.filter(df.id.between(100,300)).show()


+-----+-----+---+------+
|fname|lname| id|gender|
+-----+-----+---+------+
|  Ann|Varsa|200|     F|
+-----+-----+---+------+



In [71]:

#contains
df.filter(df.fname.contains("Cruise")).show()


+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|Tom Cruise|     |400|      |
+----------+-----+---+------+



In [72]:

#startswith, endswith()
df.filter(df.fname.startswith("T")).show()
print(df.filter(df.fname.endswith("Cruise")).show())


+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|Tom Cruise|     |400|      |
| Tom Brand| null|400|     M|
+----------+-----+---+------+

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|Tom Cruise|     |400|      |
+----------+-----+---+------+

None


In [73]:

#isNull & isNotNull
df.filter(df.lname.isNull()).show()
print(df.filter(df.lname.isNotNull()).show())


+---------+-----+---+------+
|    fname|lname| id|gender|
+---------+-----+---+------+
|Tom Brand| null|400|     M|
+---------+-----+---+------+

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|     James| Bond|007|  null|
|       Ann|Varsa|200|     F|
|Tom Cruise|     |400|      |
+----------+-----+---+------+

None


In [82]:

#like , rlike
df.select(df.fname,df.lname,df.id) \
  .filter(df.fname.like("%nn")).show()



+-----+-----+---+
|fname|lname| id|
+-----+-----+---+
|  Ann|Varsa|200|
+-----+-----+---+



In [84]:

df.select(df.fname.substr(1,3).alias("substr")).show()


+------+
|substr|
+------+
|   Jam|
|   Ann|
|   Tom|
|   Tom|
+------+



In [91]:

#when & otherwise
from pyspark.sql.functions import when
df.select(df.fname,df.lname,when(df.gender=="M","Male") \
              .when(df.gender=="F","Female") \
              .when(df.gender=="null" ,'null') \
              .otherwise(df.gender).alias("new_gender") \
    ).show()



+----------+-----+----------+
|     fname|lname|new_gender|
+----------+-----+----------+
|     James| Bond|      null|
|       Ann|Varsa|    Female|
|Tom Cruise|     |          |
| Tom Brand| null|      Male|
+----------+-----+----------+



In [93]:

#isin
li=["007","200"]
df.select(df.fname,df.lname,df.id) \
  .filter(df.id.isin(li)) \
  .show()


+-----+-----+---+
|fname|lname| id|
+-----+-----+---+
|James| Bond|007|
|  Ann|Varsa|200|
+-----+-----+---+

